Monorepo for Tangled tangled.org

appview/notify: merge new comment events into one #867

open opened by boltless.me targeting master from sl/wnrvrwyvrlzo
Labels

None yet.

assignee

None yet.

Participants 3
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3m7iohv2yba22
+126 -145
Diff #1
+1 -1
appview/issues/issues.go
··· 489 // reset atUri to make rollback a no-op 490 atUri = "" 491 492 - rp.notifier.NewIssueComment(r.Context(), &comment, mentions) 493 494 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 495 rp.pages.HxLocation(w, fmt.Sprintf("/%s/issues/%d#comment-%d", ownerSlashRepo, issue.IssueId, comment.Id))
··· 489 // reset atUri to make rollback a no-op 490 atUri = "" 491 492 + rp.notifier.NewComment(r.Context(), &comment) 493 494 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 495 rp.pages.HxLocation(w, fmt.Sprintf("/%s/issues/%d#comment-%d", ownerSlashRepo, issue.IssueId, comment.Id))
+105 -110
appview/notify/db/db.go
··· 74 // no-op 75 } 76 77 - func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 78 79 - // build the recipients list 80 - // - owner of the repo 81 - // - collaborators in the repo 82 - var recipients []syntax.DID 83 - recipients = append(recipients, syntax.DID(issue.Repo.Did)) 84 - collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 85 if err != nil { 86 - log.Printf("failed to fetch collaborators: %v", err) 87 return 88 } 89 - for _, c := range collaborators { 90 - recipients = append(recipients, c.SubjectDid) 91 - } 92 93 - actorDid := syntax.DID(issue.Did) 94 - entityType := "issue" 95 - entityId := issue.AtUri().String() 96 - repoId := &issue.Repo.Id 97 - issueId := &issue.Id 98 - var pullId *int64 99 100 n.notifyEvent( 101 - actorDid, 102 recipients, 103 - models.NotificationTypeIssueCreated, 104 entityType, 105 entityId, 106 repoId, ··· 108 pullId, 109 ) 110 n.notifyEvent( 111 - actorDid, 112 - mentions, 113 models.NotificationTypeUserMentioned, 114 entityType, 115 entityId, ··· 119 ) 120 } 121 122 - func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 123 - issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.Subject)) 124 - if err != nil { 125 - log.Printf("NewIssueComment: failed to get issues: %v", err) 126 - return 127 - } 128 - if len(issues) == 0 { 129 - log.Printf("NewIssueComment: no issue found for %s", comment.Subject) 130 - return 131 - } 132 - issue := issues[0] 133 134 var recipients []syntax.DID 135 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 136 - 137 - if comment.IsReply() { 138 - // if this comment is a reply, then notify everybody in that thread 139 - parentAtUri := *comment.ReplyTo 140 - allThreads := issue.CommentList() 141 - 142 - // find the parent thread, and add all DIDs from here to the recipient list 143 - for _, t := range allThreads { 144 - if t.Self.AtUri() == parentAtUri { 145 - recipients = append(recipients, t.Participants()...) 146 - } 147 - } 148 - } else { 149 - // not a reply, notify just the issue author 150 - recipients = append(recipients, syntax.DID(issue.Did)) 151 } 152 153 - actorDid := syntax.DID(comment.Did) 154 entityType := "issue" 155 entityId := issue.AtUri().String() 156 repoId := &issue.Repo.Id ··· 160 n.notifyEvent( 161 actorDid, 162 recipients, 163 - models.NotificationTypeIssueCommented, 164 entityType, 165 entityId, 166 repoId, ··· 249 ) 250 } 251 252 - func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 253 - pulls, err := db.GetPulls(n.db, 254 - orm.FilterEq("owner_did", comment.Subject.Authority()), 255 - orm.FilterEq("rkey", comment.Subject.RecordKey()), 256 - ) 257 - if err != nil { 258 - log.Printf("NewPullComment: failed to get pulls: %v", err) 259 - return 260 - } 261 - if len(pulls) == 0 { 262 - log.Printf("NewPullComment: no pull found for %s", comment.Subject) 263 - return 264 - } 265 - pull := pulls[0] 266 - 267 - repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 268 - if err != nil { 269 - log.Printf("NewPullComment: failed to get repos: %v", err) 270 - return 271 - } 272 - 273 - // build up the recipients list: 274 - // - repo owner 275 - // - all pull participants 276 - var recipients []syntax.DID 277 - recipients = append(recipients, syntax.DID(repo.Did)) 278 - for _, p := range pull.Participants() { 279 - recipients = append(recipients, syntax.DID(p)) 280 - } 281 - 282 - actorDid := comment.Did 283 - eventType := models.NotificationTypePullCommented 284 - entityType := "pull" 285 - entityId := pull.AtUri().String() 286 - repoId := &repo.Id 287 - var issueId *int64 288 - p := int64(pull.ID) 289 - pullId := &p 290 - 291 - n.notifyEvent( 292 - actorDid, 293 - recipients, 294 - eventType, 295 - entityType, 296 - entityId, 297 - repoId, 298 - issueId, 299 - pullId, 300 - ) 301 - n.notifyEvent( 302 - actorDid, 303 - mentions, 304 - models.NotificationTypeUserMentioned, 305 - entityType, 306 - entityId, 307 - repoId, 308 - issueId, 309 - pullId, 310 - ) 311 - } 312 - 313 func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 314 // no-op 315 }
··· 74 // no-op 75 } 76 77 + func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) { 78 + var ( 79 + recipients []syntax.DID 80 + entityType string 81 + entityId string 82 + repoId *int64 83 + issueId *int64 84 + pullId *int64 85 + ) 86 87 + subjectDid, err := comment.Subject.Authority().AsDID() 88 if err != nil { 89 + log.Printf("NewComment: expected did based at-uri for comment.subject") 90 return 91 } 92 + switch comment.Subject.Collection() { 93 + case tangled.RepoIssueNSID: 94 + issues, err := db.GetIssues( 95 + n.db, 96 + orm.FilterEq("did", subjectDid), 97 + orm.FilterEq("rkey", comment.Subject.RecordKey()), 98 + ) 99 + if err != nil { 100 + log.Printf("NewComment: failed to get issues: %v", err) 101 + return 102 + } 103 + if len(issues) == 0 { 104 + log.Printf("NewComment: no issue found for %s", comment.Subject) 105 + return 106 + } 107 + issue := issues[0] 108 + 109 + recipients = append(recipients, syntax.DID(issue.Repo.Did)) 110 + if comment.IsReply() { 111 + // if this comment is a reply, then notify everybody in that thread 112 + parentAtUri := *comment.ReplyTo 113 + allThreads := issue.CommentList() 114 + 115 + // find the parent thread, and add all DIDs from here to the recipient list 116 + for _, t := range allThreads { 117 + if t.Self.AtUri() == parentAtUri { 118 + recipients = append(recipients, t.Participants()...) 119 + } 120 + } 121 + } else { 122 + // not a reply, notify just the issue author 123 + recipients = append(recipients, syntax.DID(issue.Did)) 124 + } 125 126 + entityType = "issue" 127 + entityId = issue.AtUri().String() 128 + repoId = &issue.Repo.Id 129 + issueId = &issue.Id 130 + case tangled.RepoPullNSID: 131 + pulls, err := db.GetPullsWithLimit( 132 + n.db, 133 + 1, 134 + orm.FilterEq("owner_did", subjectDid), 135 + orm.FilterEq("rkey", comment.Subject.RecordKey()), 136 + ) 137 + if err != nil { 138 + log.Printf("NewComment: failed to get pulls: %v", err) 139 + return 140 + } 141 + if len(pulls) == 0 { 142 + log.Printf("NewComment: no pull found for %s", comment.Subject) 143 + return 144 + } 145 + pull := pulls[0] 146 + 147 + pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 148 + if err != nil { 149 + log.Printf("NewComment: failed to get repos: %v", err) 150 + return 151 + } 152 + 153 + recipients = append(recipients, syntax.DID(pull.Repo.Did)) 154 + for _, p := range pull.Participants() { 155 + recipients = append(recipients, syntax.DID(p)) 156 + } 157 + 158 + entityType = "pull" 159 + entityId = pull.AtUri().String() 160 + repoId = &pull.Repo.Id 161 + p := int64(pull.ID) 162 + pullId = &p 163 + default: 164 + return // no-op 165 + } 166 167 n.notifyEvent( 168 + comment.Did, 169 recipients, 170 + models.NotificationTypeIssueCommented, 171 entityType, 172 entityId, 173 repoId, ··· 175 pullId, 176 ) 177 n.notifyEvent( 178 + comment.Did, 179 + comment.Mentions, 180 models.NotificationTypeUserMentioned, 181 entityType, 182 entityId, ··· 186 ) 187 } 188 189 + func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 190 + // no-op 191 + } 192 + 193 + func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 194 195 + // build the recipients list 196 + // - owner of the repo 197 + // - collaborators in the repo 198 var recipients []syntax.DID 199 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 200 + collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 201 + if err != nil { 202 + log.Printf("failed to fetch collaborators: %v", err) 203 + return 204 + } 205 + for _, c := range collaborators { 206 + recipients = append(recipients, c.SubjectDid) 207 } 208 209 + actorDid := syntax.DID(issue.Did) 210 entityType := "issue" 211 entityId := issue.AtUri().String() 212 repoId := &issue.Repo.Id ··· 216 n.notifyEvent( 217 actorDid, 218 recipients, 219 + models.NotificationTypeIssueCreated, 220 entityType, 221 entityId, 222 repoId, ··· 305 ) 306 } 307 308 func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 309 // no-op 310 }
+8 -8
appview/notify/merged_notifier.go
··· 54 m.fanout("DeleteStar", ctx, star) 55 } 56 57 - func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 58 - m.fanout("NewIssue", ctx, issue, mentions) 59 } 60 61 - func (m *mergedNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 62 - m.fanout("NewIssueComment", ctx, comment, mentions) 63 } 64 65 func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { ··· 82 m.fanout("NewPull", ctx, pull) 83 } 84 85 - func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 86 - m.fanout("NewPullComment", ctx, comment, mentions) 87 - } 88 - 89 func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 90 m.fanout("NewPullState", ctx, actor, pull) 91 }
··· 54 m.fanout("DeleteStar", ctx, star) 55 } 56 57 + func (m *mergedNotifier) NewComment(ctx context.Context, comment *models.Comment) { 58 + m.fanout("NewComment", ctx, comment) 59 + } 60 + 61 + func (m *mergedNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 62 + m.fanout("DeleteComment", ctx, comment) 63 } 64 65 + func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 66 + m.fanout("NewIssue", ctx, issue, mentions) 67 } 68 69 func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { ··· 86 m.fanout("NewPull", ctx, pull) 87 } 88 89 func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 90 m.fanout("NewPullState", ctx, actor, pull) 91 }
+7 -7
appview/notify/notifier.go
··· 13 NewStar(ctx context.Context, star *models.Star) 14 DeleteStar(ctx context.Context, star *models.Star) 15 16 NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) 17 - NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) 18 NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) 19 DeleteIssue(ctx context.Context, issue *models.Issue) 20 ··· 22 DeleteFollow(ctx context.Context, follow *models.Follow) 23 24 NewPull(ctx context.Context, pull *models.Pull) 25 - NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) 26 NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) 27 28 UpdateProfile(ctx context.Context, profile *models.Profile) ··· 42 func (m *BaseNotifier) NewStar(ctx context.Context, star *models.Star) {} 43 func (m *BaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {} 44 45 func (m *BaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {} 46 - func (m *BaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 47 - } 48 func (m *BaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {} 49 func (m *BaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {} 50 51 func (m *BaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {} 52 func (m *BaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {} 53 54 - func (m *BaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {} 55 - func (m *BaseNotifier) NewPullComment(ctx context.Context, models *models.Comment, mentions []syntax.DID) { 56 - } 57 func (m *BaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {} 58 59 func (m *BaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {}
··· 13 NewStar(ctx context.Context, star *models.Star) 14 DeleteStar(ctx context.Context, star *models.Star) 15 16 + NewComment(ctx context.Context, comment *models.Comment) 17 + DeleteComment(ctx context.Context, comment *models.Comment) 18 + 19 NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) 20 NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) 21 DeleteIssue(ctx context.Context, issue *models.Issue) 22 ··· 24 DeleteFollow(ctx context.Context, follow *models.Follow) 25 26 NewPull(ctx context.Context, pull *models.Pull) 27 NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) 28 29 UpdateProfile(ctx context.Context, profile *models.Profile) ··· 43 func (m *BaseNotifier) NewStar(ctx context.Context, star *models.Star) {} 44 func (m *BaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {} 45 46 + func (m *BaseNotifier) NewComment(ctx context.Context, comment *models.Comment) {} 47 + func (m *BaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {} 48 + 49 func (m *BaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {} 50 func (m *BaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {} 51 func (m *BaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {} 52 53 func (m *BaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {} 54 func (m *BaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {} 55 56 + func (m *BaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {} 57 func (m *BaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {} 58 59 func (m *BaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {}
+4 -18
appview/notify/posthog/notifier.go
··· 86 } 87 } 88 89 - func (n *posthogNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 90 - err := n.client.Enqueue(posthog.Capture{ 91 - DistinctId: comment.Did.String(), 92 - Event: "new_pull_comment", 93 - Properties: posthog.Properties{ 94 - "pull_at": comment.Subject, 95 - "mentions": mentions, 96 - }, 97 - }) 98 - if err != nil { 99 - log.Println("failed to enqueue posthog event:", err) 100 - } 101 - } 102 - 103 func (n *posthogNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) { 104 err := n.client.Enqueue(posthog.Capture{ 105 DistinctId: pull.OwnerDid, ··· 179 } 180 } 181 182 - func (n *posthogNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 183 err := n.client.Enqueue(posthog.Capture{ 184 DistinctId: comment.Did.String(), 185 - Event: "new_issue_comment", 186 Properties: posthog.Properties{ 187 - "issue_at": comment.Subject, 188 - "mentions": mentions, 189 }, 190 }) 191 if err != nil {
··· 86 } 87 } 88 89 func (n *posthogNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) { 90 err := n.client.Enqueue(posthog.Capture{ 91 DistinctId: pull.OwnerDid, ··· 165 } 166 } 167 168 + func (n *posthogNotifier) NewComment(ctx context.Context, comment *models.Comment) { 169 err := n.client.Enqueue(posthog.Capture{ 170 DistinctId: comment.Did.String(), 171 + Event: "new_comment", 172 Properties: posthog.Properties{ 173 + "subject_at": comment.Subject, 174 + "mentions": comment.Mentions, 175 }, 176 }) 177 if err != nil {
+1 -1
appview/pulls/pulls.go
··· 794 return 795 } 796 797 - s.notifier.NewPullComment(r.Context(), &comment, mentions) 798 799 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 800 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, comment.Id))
··· 794 return 795 } 796 797 + s.notifier.NewComment(r.Context(), &comment) 798 799 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 800 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, comment.Id))

History

8 rounds 6 comments
sign up or login to add to the discussion
1 commit
expand
appview/notify: merge new comment events into one
2/3 failed, 1/3 success
expand
merge conflicts detected
expand
  • appview/notify/db/db.go:260
  • appview/notify/merged_notifier.go:81
  • appview/notify/notifier.go:22
  • appview/pulls/opengraph.go:277
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 2 comments

on the whole this changeset lgtm. some questions i have regarding backwards compatibility:

  • it makes sense that records created with sh.tangled.issue.comment will not be ingested anymore
  • however, when we intend to perform backfill of records, we should support ingesting the old records. i think the code itself is quite an easy undertaking, but the value add will be huge (if we can backfill existing issue comments!), this need not be added into the present PR, it can come down the line (with your work on backfilling with tap perhaps)
  • we should also allow users to delete/update old records via firehose (so deletions/updates of sh.tangled.issue.comment should reflect correctly). as above, the ingester logic for this can come down the line.
  • we can stop maintaining the old record for much longer, although i suspect that the new comment record will be quite stable

@oppi.li thank you for the review!

when we intend to perform backfill of records, we should support ingesting the old records.

Yeah I think it would be better to leave ingester logic for issue.comment update/delete operations. Users should be able to modify existing records.

Backfilling issue.comment will be quite tricky as we cannot distinguish new comments since certain timestamp. Users can modify both created field and rkey. So there are only two options:

  • ingest legacy comment records until we completely drop them (we should silently migrate user records as much as possible before that)
  • have "legacy records" list in tangled appview and only ingest for those records. This way, other appviews won't be able to support legacy records.

we can stop maintaining the old record for much longer, although i suspect that the new comment record will be quite stable

There are few issues that can threat the comment lexicon stability:

I think we are roughly fine after changing body field type to sh.tangled.markup#markdown. Though we might we need more discussion for #328 just in case.

1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 1 comment
1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 timeout
expand
expand 3 comments

note: I'm choosing sh.tangled.comment NSID here. Tell me if we would prefer something else like sh.tangled.feed.comment

I think sh.tangled.comment is fine, but more importantly, are we still sticking to sh.tangled or does it make sense to move this to the org.tangled namespace? Assuming we're moving everything eventually. cc @oppi.li

I think we should stick on sh.tangled for now as there can be large amounts of changes from PR refactor / did-for-repo. We can consider moving entire collections to org.tangled once tangled lexicons become more stable.