Monorepo for Tangled tangled.org

appview/notify: merge new comment events into one #867

open opened by boltless.me targeting master from sl/wnrvrwyvrlzo
Labels

None yet.

assignee

None yet.

Participants 3
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3m7iohv2yba22
+131 -153
Diff #7
+1 -1
appview/issues/issues.go
··· 488 // reset atUri to make rollback a no-op 489 atUri = "" 490 491 - rp.notifier.NewIssueComment(r.Context(), &comment, mentions) 492 493 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 494 rp.pages.HxLocation(w, fmt.Sprintf("/%s/issues/%d#comment-%d", ownerSlashRepo, issue.IssueId, comment.Id))
··· 488 // reset atUri to make rollback a no-op 489 atUri = "" 490 491 + rp.notifier.NewComment(r.Context(), &comment) 492 493 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 494 rp.pages.HxLocation(w, fmt.Sprintf("/%s/issues/%d#comment-%d", ownerSlashRepo, issue.IssueId, comment.Id))
+110 -118
appview/notify/db/db.go
··· 74 // no-op 75 } 76 77 - func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 78 - collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 79 if err != nil { 80 - log.Printf("failed to fetch collaborators: %v", err) 81 return 82 } 83 84 - // build the recipients list 85 - // - owner of the repo 86 - // - collaborators in the repo 87 - // - remove users already mentioned 88 - recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 89 - for _, c := range collaborators { 90 - recipients.Insert(c.SubjectDid) 91 } 92 - for _, m := range mentions { 93 recipients.Remove(m) 94 } 95 96 - actorDid := syntax.DID(issue.Did) 97 - entityType := "issue" 98 - entityId := issue.AtUri().String() 99 - repoId := &issue.Repo.Id 100 - issueId := &issue.Id 101 - var pullId *int64 102 - 103 n.notifyEvent( 104 - actorDid, 105 recipients, 106 - models.NotificationTypeIssueCreated, 107 entityType, 108 entityId, 109 repoId, ··· 111 pullId, 112 ) 113 n.notifyEvent( 114 - actorDid, 115 - sets.Collect(slices.Values(mentions)), 116 models.NotificationTypeUserMentioned, 117 entityType, 118 entityId, ··· 122 ) 123 } 124 125 - func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 126 - issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.Subject)) 127 if err != nil { 128 - log.Printf("NewIssueComment: failed to get issues: %v", err) 129 - return 130 - } 131 - if len(issues) == 0 { 132 - log.Printf("NewIssueComment: no issue found for %s", comment.Subject) 133 return 134 } 135 - issue := issues[0] 136 137 - // built the recipients list: 138 - // - the owner of the repo 139 - // - | if the comment is a reply -> everybody on that thread 140 - // | if the comment is a top level -> just the issue owner 141 - // - remove mentioned users from the recipients list 142 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 143 - 144 - if comment.IsReply() { 145 - // if this comment is a reply, then notify everybody in that thread 146 - parentAtUri := *comment.ReplyTo 147 - 148 - // find the parent thread, and add all DIDs from here to the recipient list 149 - for _, t := range issue.CommentList() { 150 - if t.Self.AtUri() == parentAtUri { 151 - for _, p := range t.Participants() { 152 - recipients.Insert(p) 153 - } 154 - } 155 - } 156 - } else { 157 - // not a reply, notify just the issue author 158 - recipients.Insert(syntax.DID(issue.Did)) 159 } 160 - 161 for _, m := range mentions { 162 recipients.Remove(m) 163 } 164 165 - actorDid := syntax.DID(comment.Did) 166 entityType := "issue" 167 entityId := issue.AtUri().String() 168 repoId := &issue.Repo.Id ··· 172 n.notifyEvent( 173 actorDid, 174 recipients, 175 - models.NotificationTypeIssueCommented, 176 entityType, 177 entityId, 178 repoId, ··· 260 ) 261 } 262 263 - func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 264 - pulls, err := db.GetPulls(n.db, 265 - orm.FilterEq("owner_did", comment.Subject.Authority()), 266 - orm.FilterEq("rkey", comment.Subject.RecordKey()), 267 - ) 268 - if err != nil { 269 - log.Printf("NewPullComment: failed to get pulls: %v", err) 270 - return 271 - } 272 - if len(pulls) == 0 { 273 - log.Printf("NewPullComment: no pull found for %s", comment.Subject) 274 - return 275 - } 276 - pull := pulls[0] 277 - 278 - repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 279 - if err != nil { 280 - log.Printf("NewPullComment: failed to get repos: %v", err) 281 - return 282 - } 283 - 284 - // build up the recipients list: 285 - // - repo owner 286 - // - all pull participants 287 - // - remove those already mentioned 288 - recipients := sets.Singleton(syntax.DID(repo.Did)) 289 - for _, p := range pull.Participants() { 290 - recipients.Insert(syntax.DID(p)) 291 - } 292 - for _, m := range mentions { 293 - recipients.Remove(m) 294 - } 295 - 296 - actorDid := comment.Did 297 - eventType := models.NotificationTypePullCommented 298 - entityType := "pull" 299 - entityId := pull.AtUri().String() 300 - repoId := &repo.Id 301 - var issueId *int64 302 - p := int64(pull.ID) 303 - pullId := &p 304 - 305 - n.notifyEvent( 306 - actorDid, 307 - recipients, 308 - eventType, 309 - entityType, 310 - entityId, 311 - repoId, 312 - issueId, 313 - pullId, 314 - ) 315 - n.notifyEvent( 316 - actorDid, 317 - sets.Collect(slices.Values(mentions)), 318 - models.NotificationTypeUserMentioned, 319 - entityType, 320 - entityId, 321 - repoId, 322 - issueId, 323 - pullId, 324 - ) 325 - } 326 - 327 func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 328 // no-op 329 }
··· 74 // no-op 75 } 76 77 + func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) { 78 + var ( 79 + // built the recipients list: 80 + // - the owner of the repo 81 + // - | if the comment is a reply -> everybody on that thread 82 + // | if the comment is a top level -> just the issue owner 83 + // - remove mentioned users from the recipients list 84 + recipients = sets.New[syntax.DID]() 85 + entityType string 86 + entityId string 87 + repoId *int64 88 + issueId *int64 89 + pullId *int64 90 + ) 91 + 92 + subjectDid, err := comment.Subject.Authority().AsDID() 93 if err != nil { 94 + log.Printf("NewComment: expected did based at-uri for comment.subject") 95 return 96 } 97 + switch comment.Subject.Collection() { 98 + case tangled.RepoIssueNSID: 99 + issues, err := db.GetIssues( 100 + n.db, 101 + orm.FilterEq("did", subjectDid), 102 + orm.FilterEq("rkey", comment.Subject.RecordKey()), 103 + ) 104 + if err != nil { 105 + log.Printf("NewComment: failed to get issues: %v", err) 106 + return 107 + } 108 + if len(issues) == 0 { 109 + log.Printf("NewComment: no issue found for %s", comment.Subject) 110 + return 111 + } 112 + issue := issues[0] 113 + 114 + recipients.Insert(syntax.DID(issue.Repo.Did)) 115 + if comment.IsReply() { 116 + // if this comment is a reply, then notify everybody in that thread 117 + parentAtUri := *comment.ReplyTo 118 + 119 + // find the parent thread, and add all DIDs from here to the recipient list 120 + for _, t := range issue.CommentList() { 121 + if t.Self.AtUri() == parentAtUri { 122 + for _, p := range t.Participants() { 123 + recipients.Insert(p) 124 + } 125 + } 126 + } 127 + } else { 128 + // not a reply, notify just the issue author 129 + recipients.Insert(syntax.DID(issue.Did)) 130 + } 131 132 + entityType = "issue" 133 + entityId = issue.AtUri().String() 134 + repoId = &issue.Repo.Id 135 + issueId = &issue.Id 136 + case tangled.RepoPullNSID: 137 + pulls, err := db.GetPulls( 138 + n.db, 139 + orm.FilterEq("owner_did", subjectDid), 140 + orm.FilterEq("rkey", comment.Subject.RecordKey()), 141 + ) 142 + if err != nil { 143 + log.Printf("NewComment: failed to get pulls: %v", err) 144 + return 145 + } 146 + if len(pulls) == 0 { 147 + log.Printf("NewComment: no pull found for %s", comment.Subject) 148 + return 149 + } 150 + pull := pulls[0] 151 + 152 + pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 153 + if err != nil { 154 + log.Printf("NewComment: failed to get repos: %v", err) 155 + return 156 + } 157 + 158 + recipients.Insert(syntax.DID(pull.Repo.Did)) 159 + for _, p := range pull.Participants() { 160 + recipients.Insert(syntax.DID(p)) 161 + } 162 + 163 + entityType = "pull" 164 + entityId = pull.AtUri().String() 165 + repoId = &pull.Repo.Id 166 + p := int64(pull.ID) 167 + pullId = &p 168 + default: 169 + return // no-op 170 } 171 + 172 + for _, m := range comment.Mentions { 173 recipients.Remove(m) 174 } 175 176 n.notifyEvent( 177 + comment.Did, 178 recipients, 179 + models.NotificationTypeIssueCommented, 180 entityType, 181 entityId, 182 repoId, ··· 184 pullId, 185 ) 186 n.notifyEvent( 187 + comment.Did, 188 + sets.Collect(slices.Values(comment.Mentions)), 189 models.NotificationTypeUserMentioned, 190 entityType, 191 entityId, ··· 195 ) 196 } 197 198 + func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 199 + // no-op 200 + } 201 + 202 + func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 203 + collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 204 if err != nil { 205 + log.Printf("failed to fetch collaborators: %v", err) 206 return 207 } 208 209 + // build the recipients list 210 + // - owner of the repo 211 + // - collaborators in the repo 212 + // - remove users already mentioned 213 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 214 + for _, c := range collaborators { 215 + recipients.Insert(c.SubjectDid) 216 } 217 for _, m := range mentions { 218 recipients.Remove(m) 219 } 220 221 + actorDid := syntax.DID(issue.Did) 222 entityType := "issue" 223 entityId := issue.AtUri().String() 224 repoId := &issue.Repo.Id ··· 228 n.notifyEvent( 229 actorDid, 230 recipients, 231 + models.NotificationTypeIssueCreated, 232 entityType, 233 entityId, 234 repoId, ··· 316 ) 317 } 318 319 func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 320 // no-op 321 }
+8 -8
appview/notify/merged_notifier.go
··· 53 m.fanout("DeleteStar", ctx, star) 54 } 55 56 - func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 57 - m.fanout("NewIssue", ctx, issue, mentions) 58 } 59 60 - func (m *mergedNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 61 - m.fanout("NewIssueComment", ctx, comment, mentions) 62 } 63 64 func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { ··· 81 m.fanout("NewPull", ctx, pull) 82 } 83 84 - func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 85 - m.fanout("NewPullComment", ctx, comment, mentions) 86 - } 87 - 88 func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 89 m.fanout("NewPullState", ctx, actor, pull) 90 }
··· 53 m.fanout("DeleteStar", ctx, star) 54 } 55 56 + func (m *mergedNotifier) NewComment(ctx context.Context, comment *models.Comment) { 57 + m.fanout("NewComment", ctx, comment) 58 + } 59 + 60 + func (m *mergedNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 61 + m.fanout("DeleteComment", ctx, comment) 62 } 63 64 + func (m *mergedNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 65 + m.fanout("NewIssue", ctx, issue, mentions) 66 } 67 68 func (m *mergedNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { ··· 85 m.fanout("NewPull", ctx, pull) 86 } 87 88 func (m *mergedNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 89 m.fanout("NewPullState", ctx, actor, pull) 90 }
+7 -7
appview/notify/notifier.go
··· 13 NewStar(ctx context.Context, star *models.Star) 14 DeleteStar(ctx context.Context, star *models.Star) 15 16 NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) 17 - NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) 18 NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) 19 DeleteIssue(ctx context.Context, issue *models.Issue) 20 ··· 22 DeleteFollow(ctx context.Context, follow *models.Follow) 23 24 NewPull(ctx context.Context, pull *models.Pull) 25 - NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) 26 NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) 27 28 UpdateProfile(ctx context.Context, profile *models.Profile) ··· 42 func (m *BaseNotifier) NewStar(ctx context.Context, star *models.Star) {} 43 func (m *BaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {} 44 45 func (m *BaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {} 46 - func (m *BaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 47 - } 48 func (m *BaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {} 49 func (m *BaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {} 50 51 func (m *BaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {} 52 func (m *BaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {} 53 54 - func (m *BaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {} 55 - func (m *BaseNotifier) NewPullComment(ctx context.Context, models *models.Comment, mentions []syntax.DID) { 56 - } 57 func (m *BaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {} 58 59 func (m *BaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {}
··· 13 NewStar(ctx context.Context, star *models.Star) 14 DeleteStar(ctx context.Context, star *models.Star) 15 16 + NewComment(ctx context.Context, comment *models.Comment) 17 + DeleteComment(ctx context.Context, comment *models.Comment) 18 + 19 NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) 20 NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) 21 DeleteIssue(ctx context.Context, issue *models.Issue) 22 ··· 24 DeleteFollow(ctx context.Context, follow *models.Follow) 25 26 NewPull(ctx context.Context, pull *models.Pull) 27 NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) 28 29 UpdateProfile(ctx context.Context, profile *models.Profile) ··· 43 func (m *BaseNotifier) NewStar(ctx context.Context, star *models.Star) {} 44 func (m *BaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {} 45 46 + func (m *BaseNotifier) NewComment(ctx context.Context, comment *models.Comment) {} 47 + func (m *BaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {} 48 + 49 func (m *BaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {} 50 func (m *BaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {} 51 func (m *BaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {} 52 53 func (m *BaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {} 54 func (m *BaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {} 55 56 + func (m *BaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {} 57 func (m *BaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {} 58 59 func (m *BaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {}
+4 -18
appview/notify/posthog/notifier.go
··· 86 } 87 } 88 89 - func (n *posthogNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 90 - err := n.client.Enqueue(posthog.Capture{ 91 - DistinctId: comment.Did.String(), 92 - Event: "new_pull_comment", 93 - Properties: posthog.Properties{ 94 - "pull_at": comment.Subject, 95 - "mentions": mentions, 96 - }, 97 - }) 98 - if err != nil { 99 - log.Println("failed to enqueue posthog event:", err) 100 - } 101 - } 102 - 103 func (n *posthogNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) { 104 err := n.client.Enqueue(posthog.Capture{ 105 DistinctId: pull.OwnerDid, ··· 179 } 180 } 181 182 - func (n *posthogNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 183 err := n.client.Enqueue(posthog.Capture{ 184 DistinctId: comment.Did.String(), 185 - Event: "new_issue_comment", 186 Properties: posthog.Properties{ 187 - "issue_at": comment.Subject, 188 - "mentions": mentions, 189 }, 190 }) 191 if err != nil {
··· 86 } 87 } 88 89 func (n *posthogNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) { 90 err := n.client.Enqueue(posthog.Capture{ 91 DistinctId: pull.OwnerDid, ··· 165 } 166 } 167 168 + func (n *posthogNotifier) NewComment(ctx context.Context, comment *models.Comment) { 169 err := n.client.Enqueue(posthog.Capture{ 170 DistinctId: comment.Did.String(), 171 + Event: "new_comment", 172 Properties: posthog.Properties{ 173 + "subject_at": comment.Subject, 174 + "mentions": comment.Mentions, 175 }, 176 }) 177 if err != nil {
+1 -1
appview/pulls/pulls.go
··· 781 return 782 } 783 784 - s.notifier.NewPullComment(r.Context(), &comment, mentions) 785 786 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 787 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, comment.Id))
··· 781 return 782 } 783 784 + s.notifier.NewComment(r.Context(), &comment) 785 786 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 787 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, comment.Id))

History

8 rounds 6 comments
sign up or login to add to the discussion
1 commit
expand
appview/notify: merge new comment events into one
2/3 failed, 1/3 success
expand
merge conflicts detected
expand
  • appview/notify/db/db.go:260
  • appview/notify/merged_notifier.go:81
  • appview/notify/notifier.go:22
  • appview/pulls/opengraph.go:277
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 2 comments

on the whole this changeset lgtm. some questions i have regarding backwards compatibility:

  • it makes sense that records created with sh.tangled.issue.comment will not be ingested anymore
  • however, when we intend to perform backfill of records, we should support ingesting the old records. i think the code itself is quite an easy undertaking, but the value add will be huge (if we can backfill existing issue comments!), this need not be added into the present PR, it can come down the line (with your work on backfilling with tap perhaps)
  • we should also allow users to delete/update old records via firehose (so deletions/updates of sh.tangled.issue.comment should reflect correctly). as above, the ingester logic for this can come down the line.
  • we can stop maintaining the old record for much longer, although i suspect that the new comment record will be quite stable

@oppi.li thank you for the review!

when we intend to perform backfill of records, we should support ingesting the old records.

Yeah I think it would be better to leave ingester logic for issue.comment update/delete operations. Users should be able to modify existing records.

Backfilling issue.comment will be quite tricky as we cannot distinguish new comments since certain timestamp. Users can modify both created field and rkey. So there are only two options:

  • ingest legacy comment records until we completely drop them (we should silently migrate user records as much as possible before that)
  • have "legacy records" list in tangled appview and only ingest for those records. This way, other appviews won't be able to support legacy records.

we can stop maintaining the old record for much longer, although i suspect that the new comment record will be quite stable

There are few issues that can threat the comment lexicon stability:

I think we are roughly fine after changing body field type to sh.tangled.markup#markdown. Though we might we need more discussion for #328 just in case.

1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: merge new comment events into one
3/3 success
expand
expand 1 comment
1 commit
expand
appview/notify: merge new comment events into one
1/3 failed, 2/3 timeout
expand
expand 3 comments

note: I'm choosing sh.tangled.comment NSID here. Tell me if we would prefer something else like sh.tangled.feed.comment

I think sh.tangled.comment is fine, but more importantly, are we still sticking to sh.tangled or does it make sense to move this to the org.tangled namespace? Assuming we're moving everything eventually. cc @oppi.li

I think we should stick on sh.tangled for now as there can be large amounts of changes from PR refactor / did-for-repo. We can consider moving entire collections to org.tangled once tangled lexicons become more stable.