Monorepo for Tangled
tangled.org
1package db
2
3import (
4 "context"
5 "log"
6 "slices"
7
8 "github.com/bluesky-social/indigo/atproto/syntax"
9 "tangled.org/core/api/tangled"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39
40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41 if star.RepoAt.Collection().String() != tangled.RepoNSID {
42 // skip string stars for now
43 return
44 }
45 var err error
46 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
47 if err != nil {
48 log.Printf("NewStar: failed to get repos: %v", err)
49 return
50 }
51
52 actorDid := syntax.DID(star.Did)
53 recipients := sets.Singleton(syntax.DID(repo.Did))
54 eventType := models.NotificationTypeRepoStarred
55 entityType := "repo"
56 entityId := star.RepoAt.String()
57 repoId := &repo.Id
58 var issueId *int64
59 var pullId *int64
60
61 n.notifyEvent(
62 actorDid,
63 recipients,
64 eventType,
65 entityType,
66 entityId,
67 repoId,
68 issueId,
69 pullId,
70 )
71}
72
73func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
74 // no-op
75}
76
77func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) {
78 var (
79 // built the recipients list:
80 // - the owner of the repo
81 // - | if the comment is a reply -> everybody on that thread
82 // | if the comment is a top level -> just the issue owner
83 // - remove mentioned users from the recipients list
84 recipients = sets.New[syntax.DID]()
85 entityType string
86 entityId string
87 repoId *int64
88 issueId *int64
89 pullId *int64
90 )
91
92 subjectDid, err := comment.Subject.Authority().AsDID()
93 if err != nil {
94 log.Printf("NewComment: expected did based at-uri for comment.subject")
95 return
96 }
97 switch comment.Subject.Collection() {
98 case tangled.RepoIssueNSID:
99 issues, err := db.GetIssues(
100 n.db,
101 orm.FilterEq("did", subjectDid),
102 orm.FilterEq("rkey", comment.Subject.RecordKey()),
103 )
104 if err != nil {
105 log.Printf("NewComment: failed to get issues: %v", err)
106 return
107 }
108 if len(issues) == 0 {
109 log.Printf("NewComment: no issue found for %s", comment.Subject)
110 return
111 }
112 issue := issues[0]
113
114 recipients.Insert(syntax.DID(issue.Repo.Did))
115 if comment.IsReply() {
116 // if this comment is a reply, then notify everybody in that thread
117 parentAtUri := *comment.ReplyTo
118
119 // find the parent thread, and add all DIDs from here to the recipient list
120 for _, t := range issue.CommentList() {
121 if t.Self.AtUri() == parentAtUri {
122 for _, p := range t.Participants() {
123 recipients.Insert(p)
124 }
125 }
126 }
127 } else {
128 // not a reply, notify just the issue author
129 recipients.Insert(syntax.DID(issue.Did))
130 }
131
132 entityType = "issue"
133 entityId = issue.AtUri().String()
134 repoId = &issue.Repo.Id
135 issueId = &issue.Id
136 case tangled.RepoPullNSID:
137 pulls, err := db.GetPulls(
138 n.db,
139 orm.FilterEq("owner_did", subjectDid),
140 orm.FilterEq("rkey", comment.Subject.RecordKey()),
141 )
142 if err != nil {
143 log.Printf("NewComment: failed to get pulls: %v", err)
144 return
145 }
146 if len(pulls) == 0 {
147 log.Printf("NewComment: no pull found for %s", comment.Subject)
148 return
149 }
150 pull := pulls[0]
151
152 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt))
153 if err != nil {
154 log.Printf("NewComment: failed to get repos: %v", err)
155 return
156 }
157
158 recipients.Insert(syntax.DID(pull.Repo.Did))
159 for _, p := range pull.Participants() {
160 recipients.Insert(syntax.DID(p))
161 }
162
163 entityType = "pull"
164 entityId = pull.AtUri().String()
165 repoId = &pull.Repo.Id
166 p := int64(pull.ID)
167 pullId = &p
168 default:
169 return // no-op
170 }
171
172 for _, m := range comment.Mentions {
173 recipients.Remove(m)
174 }
175
176 n.notifyEvent(
177 comment.Did,
178 recipients,
179 models.NotificationTypeIssueCommented,
180 entityType,
181 entityId,
182 repoId,
183 issueId,
184 pullId,
185 )
186 n.notifyEvent(
187 comment.Did,
188 sets.Collect(slices.Values(comment.Mentions)),
189 models.NotificationTypeUserMentioned,
190 entityType,
191 entityId,
192 repoId,
193 issueId,
194 pullId,
195 )
196}
197
198func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {
199 // no-op
200}
201
202func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
203 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
204 if err != nil {
205 log.Printf("failed to fetch collaborators: %v", err)
206 return
207 }
208
209 // build the recipients list
210 // - owner of the repo
211 // - collaborators in the repo
212 // - remove users already mentioned
213 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
214 for _, c := range collaborators {
215 recipients.Insert(c.SubjectDid)
216 }
217 for _, m := range mentions {
218 recipients.Remove(m)
219 }
220
221 actorDid := syntax.DID(issue.Did)
222 entityType := "issue"
223 entityId := issue.AtUri().String()
224 repoId := &issue.Repo.Id
225 issueId := &issue.Id
226 var pullId *int64
227
228 n.notifyEvent(
229 actorDid,
230 recipients,
231 models.NotificationTypeIssueCreated,
232 entityType,
233 entityId,
234 repoId,
235 issueId,
236 pullId,
237 )
238 n.notifyEvent(
239 actorDid,
240 sets.Collect(slices.Values(mentions)),
241 models.NotificationTypeUserMentioned,
242 entityType,
243 entityId,
244 repoId,
245 issueId,
246 pullId,
247 )
248}
249
250func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
251 // no-op for now
252}
253
254func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
255 actorDid := syntax.DID(follow.UserDid)
256 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
257 eventType := models.NotificationTypeFollowed
258 entityType := "follow"
259 entityId := follow.UserDid
260 var repoId, issueId, pullId *int64
261
262 n.notifyEvent(
263 actorDid,
264 recipients,
265 eventType,
266 entityType,
267 entityId,
268 repoId,
269 issueId,
270 pullId,
271 )
272}
273
274func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
275 // no-op
276}
277
278func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
279 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
280 if err != nil {
281 log.Printf("NewPull: failed to get repos: %v", err)
282 return
283 }
284 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
285 if err != nil {
286 log.Printf("failed to fetch collaborators: %v", err)
287 return
288 }
289
290 // build the recipients list
291 // - owner of the repo
292 // - collaborators in the repo
293 recipients := sets.Singleton(syntax.DID(repo.Did))
294 for _, c := range collaborators {
295 recipients.Insert(c.SubjectDid)
296 }
297
298 actorDid := syntax.DID(pull.OwnerDid)
299 eventType := models.NotificationTypePullCreated
300 entityType := "pull"
301 entityId := pull.AtUri().String()
302 repoId := &repo.Id
303 var issueId *int64
304 p := int64(pull.ID)
305 pullId := &p
306
307 n.notifyEvent(
308 actorDid,
309 recipients,
310 eventType,
311 entityType,
312 entityId,
313 repoId,
314 issueId,
315 pullId,
316 )
317}
318
319func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
320 // no-op
321}
322
323func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
324 // no-op
325}
326
327func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
328 // no-op
329}
330
331func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
332 // no-op
333}
334
335func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
336 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
337 if err != nil {
338 log.Printf("failed to fetch collaborators: %v", err)
339 return
340 }
341
342 // build up the recipients list:
343 // - repo owner
344 // - repo collaborators
345 // - all issue participants
346 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
347 for _, c := range collaborators {
348 recipients.Insert(c.SubjectDid)
349 }
350 for _, p := range issue.Participants() {
351 recipients.Insert(syntax.DID(p))
352 }
353
354 entityType := "pull"
355 entityId := issue.AtUri().String()
356 repoId := &issue.Repo.Id
357 issueId := &issue.Id
358 var pullId *int64
359 var eventType models.NotificationType
360
361 if issue.Open {
362 eventType = models.NotificationTypeIssueReopen
363 } else {
364 eventType = models.NotificationTypeIssueClosed
365 }
366
367 n.notifyEvent(
368 actor,
369 recipients,
370 eventType,
371 entityType,
372 entityId,
373 repoId,
374 issueId,
375 pullId,
376 )
377}
378
379func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
380 // Get repo details
381 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
382 if err != nil {
383 log.Printf("NewPullState: failed to get repos: %v", err)
384 return
385 }
386
387 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
388 if err != nil {
389 log.Printf("failed to fetch collaborators: %v", err)
390 return
391 }
392
393 // build up the recipients list:
394 // - repo owner
395 // - all pull participants
396 recipients := sets.Singleton(syntax.DID(repo.Did))
397 for _, c := range collaborators {
398 recipients.Insert(c.SubjectDid)
399 }
400 for _, p := range pull.Participants() {
401 recipients.Insert(syntax.DID(p))
402 }
403
404 entityType := "pull"
405 entityId := pull.AtUri().String()
406 repoId := &repo.Id
407 var issueId *int64
408 var eventType models.NotificationType
409 switch pull.State {
410 case models.PullClosed:
411 eventType = models.NotificationTypePullClosed
412 case models.PullOpen:
413 eventType = models.NotificationTypePullReopen
414 case models.PullMerged:
415 eventType = models.NotificationTypePullMerged
416 default:
417 log.Println("NewPullState: unexpected new PR state:", pull.State)
418 return
419 }
420 p := int64(pull.ID)
421 pullId := &p
422
423 n.notifyEvent(
424 actor,
425 recipients,
426 eventType,
427 entityType,
428 entityId,
429 repoId,
430 issueId,
431 pullId,
432 )
433}
434
435func (n *databaseNotifier) notifyEvent(
436 actorDid syntax.DID,
437 recipients sets.Set[syntax.DID],
438 eventType models.NotificationType,
439 entityType string,
440 entityId string,
441 repoId *int64,
442 issueId *int64,
443 pullId *int64,
444) {
445 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
446 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
447 return
448 }
449
450 recipients.Remove(actorDid)
451
452 prefMap, err := db.GetNotificationPreferences(
453 n.db,
454 orm.FilterIn("user_did", slices.Collect(recipients.All())),
455 )
456 if err != nil {
457 // failed to get prefs for users
458 return
459 }
460
461 // create a transaction for bulk notification storage
462 tx, err := n.db.Begin()
463 if err != nil {
464 // failed to start tx
465 return
466 }
467 defer tx.Rollback()
468
469 // filter based on preferences
470 for recipientDid := range recipients.All() {
471 prefs, ok := prefMap[recipientDid]
472 if !ok {
473 prefs = models.DefaultNotificationPreferences(recipientDid)
474 }
475
476 // skip users who don’t want this type
477 if !prefs.ShouldNotify(eventType) {
478 continue
479 }
480
481 // create notification
482 notif := &models.Notification{
483 RecipientDid: recipientDid.String(),
484 ActorDid: actorDid.String(),
485 Type: eventType,
486 EntityType: entityType,
487 EntityId: entityId,
488 RepoId: repoId,
489 IssueId: issueId,
490 PullId: pullId,
491 }
492
493 if err := db.CreateNotification(tx, notif); err != nil {
494 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
495 }
496 }
497
498 if err := tx.Commit(); err != nil {
499 // failed to commit
500 return
501 }
502}