Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39
40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41 l := log.FromContext(ctx)
42
43 if star.RepoAt.Collection().String() != tangled.RepoNSID {
44 // skip string stars for now
45 return
46 }
47 var err error
48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
49 if err != nil {
50 l.Error("failed to get repos", "err", err)
51 return
52 }
53
54 actorDid := syntax.DID(star.Did)
55 recipients := sets.Singleton(syntax.DID(repo.Did))
56 eventType := models.NotificationTypeRepoStarred
57 entityType := "repo"
58 entityId := star.RepoAt.String()
59 repoId := &repo.Id
60 var issueId *int64
61 var pullId *int64
62
63 n.notifyEvent(
64 ctx,
65 actorDid,
66 recipients,
67 eventType,
68 entityType,
69 entityId,
70 repoId,
71 issueId,
72 pullId,
73 )
74}
75
76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
77 // no-op
78}
79
80func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
81 l := log.FromContext(ctx)
82
83 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
84 if err != nil {
85 l.Error("failed to fetch collaborators", "err", err)
86 return
87 }
88
89 // build the recipients list
90 // - owner of the repo
91 // - collaborators in the repo
92 // - remove users already mentioned
93 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
94 for _, c := range collaborators {
95 recipients.Insert(c.SubjectDid)
96 }
97 for _, m := range mentions {
98 recipients.Remove(m)
99 }
100
101 actorDid := syntax.DID(issue.Did)
102 entityType := "issue"
103 entityId := issue.AtUri().String()
104 repoId := &issue.Repo.Id
105 issueId := &issue.Id
106 var pullId *int64
107
108 n.notifyEvent(
109 ctx,
110 actorDid,
111 recipients,
112 models.NotificationTypeIssueCreated,
113 entityType,
114 entityId,
115 repoId,
116 issueId,
117 pullId,
118 )
119 n.notifyEvent(
120 ctx,
121 actorDid,
122 sets.Collect(slices.Values(mentions)),
123 models.NotificationTypeUserMentioned,
124 entityType,
125 entityId,
126 repoId,
127 issueId,
128 pullId,
129 )
130}
131
132func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
133 l := log.FromContext(ctx)
134
135 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
136 if err != nil {
137 l.Error("failed to get issues", "err", err)
138 return
139 }
140 if len(issues) == 0 {
141 l.Error("no issue found for", "err", comment.IssueAt)
142 return
143 }
144 issue := issues[0]
145
146 // built the recipients list:
147 // - the owner of the repo
148 // - | if the comment is a reply -> everybody on that thread
149 // | if the comment is a top level -> just the issue owner
150 // - remove mentioned users from the recipients list
151 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
152
153 if comment.IsReply() {
154 // if this comment is a reply, then notify everybody in that thread
155 parentAtUri := *comment.ReplyTo
156
157 // find the parent thread, and add all DIDs from here to the recipient list
158 for _, t := range issue.CommentList() {
159 if t.Self.AtUri().String() == parentAtUri {
160 for _, p := range t.Participants() {
161 recipients.Insert(p)
162 }
163 }
164 }
165 } else {
166 // not a reply, notify just the issue author
167 recipients.Insert(syntax.DID(issue.Did))
168 }
169
170 for _, m := range mentions {
171 recipients.Remove(m)
172 }
173
174 actorDid := syntax.DID(comment.Did)
175 entityType := "issue"
176 entityId := issue.AtUri().String()
177 repoId := &issue.Repo.Id
178 issueId := &issue.Id
179 var pullId *int64
180
181 n.notifyEvent(
182 ctx,
183 actorDid,
184 recipients,
185 models.NotificationTypeIssueCommented,
186 entityType,
187 entityId,
188 repoId,
189 issueId,
190 pullId,
191 )
192 n.notifyEvent(
193 ctx,
194 actorDid,
195 sets.Collect(slices.Values(mentions)),
196 models.NotificationTypeUserMentioned,
197 entityType,
198 entityId,
199 repoId,
200 issueId,
201 pullId,
202 )
203}
204
205func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
206 // no-op for now
207}
208
209func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
210 actorDid := syntax.DID(follow.UserDid)
211 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
212 eventType := models.NotificationTypeFollowed
213 entityType := "follow"
214 entityId := follow.UserDid
215 var repoId, issueId, pullId *int64
216
217 n.notifyEvent(
218 ctx,
219 actorDid,
220 recipients,
221 eventType,
222 entityType,
223 entityId,
224 repoId,
225 issueId,
226 pullId,
227 )
228}
229
230func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
231 // no-op
232}
233
234func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
235 l := log.FromContext(ctx)
236
237 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
238 if err != nil {
239 l.Error("failed to get repos", "err", err)
240 return
241 }
242 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
243 if err != nil {
244 l.Error("failed to fetch collaborators", "err", err)
245 return
246 }
247
248 // build the recipients list
249 // - owner of the repo
250 // - collaborators in the repo
251 recipients := sets.Singleton(syntax.DID(repo.Did))
252 for _, c := range collaborators {
253 recipients.Insert(c.SubjectDid)
254 }
255
256 actorDid := syntax.DID(pull.OwnerDid)
257 eventType := models.NotificationTypePullCreated
258 entityType := "pull"
259 entityId := pull.AtUri().String()
260 repoId := &repo.Id
261 var issueId *int64
262 p := int64(pull.ID)
263 pullId := &p
264
265 n.notifyEvent(
266 ctx,
267 actorDid,
268 recipients,
269 eventType,
270 entityType,
271 entityId,
272 repoId,
273 issueId,
274 pullId,
275 )
276}
277
278func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
279 l := log.FromContext(ctx)
280
281 pull, err := db.GetPull(n.db,
282 syntax.ATURI(comment.RepoAt),
283 comment.PullId,
284 )
285 if err != nil {
286 l.Error("failed to get pulls", "err", err)
287 return
288 }
289
290 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
291 if err != nil {
292 l.Error("failed to get repos", "err", err)
293 return
294 }
295
296 // build up the recipients list:
297 // - repo owner
298 // - all pull participants
299 // - remove those already mentioned
300 recipients := sets.Singleton(syntax.DID(repo.Did))
301 for _, p := range pull.Participants() {
302 recipients.Insert(syntax.DID(p))
303 }
304 for _, m := range mentions {
305 recipients.Remove(m)
306 }
307
308 actorDid := syntax.DID(comment.OwnerDid)
309 eventType := models.NotificationTypePullCommented
310 entityType := "pull"
311 entityId := pull.AtUri().String()
312 repoId := &repo.Id
313 var issueId *int64
314 p := int64(pull.ID)
315 pullId := &p
316
317 n.notifyEvent(
318 ctx,
319 actorDid,
320 recipients,
321 eventType,
322 entityType,
323 entityId,
324 repoId,
325 issueId,
326 pullId,
327 )
328 n.notifyEvent(
329 ctx,
330 actorDid,
331 sets.Collect(slices.Values(mentions)),
332 models.NotificationTypeUserMentioned,
333 entityType,
334 entityId,
335 repoId,
336 issueId,
337 pullId,
338 )
339}
340
341func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
342 // no-op
343}
344
345func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
346 // no-op
347}
348
349func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
350 // no-op
351}
352
353func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
354 // no-op
355}
356
357func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
358 // no-op for now; webhooks are handled by the webhook notifier
359}
360
361func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
362 l := log.FromContext(ctx)
363
364 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
365 if err != nil {
366 l.Error("failed to fetch collaborators", "err", err)
367 return
368 }
369
370 // build up the recipients list:
371 // - repo owner
372 // - repo collaborators
373 // - all issue participants
374 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
375 for _, c := range collaborators {
376 recipients.Insert(c.SubjectDid)
377 }
378 for _, p := range issue.Participants() {
379 recipients.Insert(syntax.DID(p))
380 }
381
382 entityType := "issue"
383 entityId := issue.AtUri().String()
384 repoId := &issue.Repo.Id
385 issueId := &issue.Id
386 var pullId *int64
387 var eventType models.NotificationType
388
389 if issue.Open {
390 eventType = models.NotificationTypeIssueReopen
391 } else {
392 eventType = models.NotificationTypeIssueClosed
393 }
394
395 n.notifyEvent(
396 ctx,
397 actor,
398 recipients,
399 eventType,
400 entityType,
401 entityId,
402 repoId,
403 issueId,
404 pullId,
405 )
406}
407
408func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
409 l := log.FromContext(ctx)
410
411 // Get repo details
412 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
413 if err != nil {
414 l.Error("failed to get repos", "err", err)
415 return
416 }
417
418 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
419 if err != nil {
420 l.Error("failed to fetch collaborators", "err", err)
421 return
422 }
423
424 // build up the recipients list:
425 // - repo owner
426 // - all pull participants
427 recipients := sets.Singleton(syntax.DID(repo.Did))
428 for _, c := range collaborators {
429 recipients.Insert(c.SubjectDid)
430 }
431 for _, p := range pull.Participants() {
432 recipients.Insert(syntax.DID(p))
433 }
434
435 entityType := "pull"
436 entityId := pull.AtUri().String()
437 repoId := &repo.Id
438 var issueId *int64
439 var eventType models.NotificationType
440 switch pull.State {
441 case models.PullClosed:
442 eventType = models.NotificationTypePullClosed
443 case models.PullOpen:
444 eventType = models.NotificationTypePullReopen
445 case models.PullMerged:
446 eventType = models.NotificationTypePullMerged
447 default:
448 l.Error("unexpected new PR state", "state", pull.State)
449 return
450 }
451 p := int64(pull.ID)
452 pullId := &p
453
454 n.notifyEvent(
455 ctx,
456 actor,
457 recipients,
458 eventType,
459 entityType,
460 entityId,
461 repoId,
462 issueId,
463 pullId,
464 )
465}
466
467func (n *databaseNotifier) notifyEvent(
468 ctx context.Context,
469 actorDid syntax.DID,
470 recipients sets.Set[syntax.DID],
471 eventType models.NotificationType,
472 entityType string,
473 entityId string,
474 repoId *int64,
475 issueId *int64,
476 pullId *int64,
477) {
478 l := log.FromContext(ctx)
479
480 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
481 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
482 return
483 }
484
485 recipients.Remove(actorDid)
486
487 prefMap, err := db.GetNotificationPreferences(
488 n.db,
489 orm.FilterIn("user_did", slices.Collect(recipients.All())),
490 )
491 if err != nil {
492 // failed to get prefs for users
493 return
494 }
495
496 // create a transaction for bulk notification storage
497 tx, err := n.db.Begin()
498 if err != nil {
499 // failed to start tx
500 return
501 }
502 defer tx.Rollback()
503
504 // filter based on preferences
505 for recipientDid := range recipients.All() {
506 prefs, ok := prefMap[recipientDid]
507 if !ok {
508 prefs = models.DefaultNotificationPreferences(recipientDid)
509 }
510
511 // skip users who don’t want this type
512 if !prefs.ShouldNotify(eventType) {
513 continue
514 }
515
516 // create notification
517 notif := &models.Notification{
518 RecipientDid: recipientDid.String(),
519 ActorDid: actorDid.String(),
520 Type: eventType,
521 EntityType: entityType,
522 EntityId: entityId,
523 RepoId: repoId,
524 IssueId: issueId,
525 PullId: pullId,
526 }
527
528 if err := db.CreateNotification(tx, notif); err != nil {
529 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
530 }
531 }
532
533 if err := tx.Commit(); err != nil {
534 // failed to commit
535 return
536 }
537}