Monorepo for Tangled
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39
40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41 l := log.FromContext(ctx)
42
43 if star.RepoAt.Collection().String() != tangled.RepoNSID {
44 // skip string stars for now
45 return
46 }
47 var err error
48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
49 if err != nil {
50 l.Error("failed to get repos", "err", err)
51 return
52 }
53
54 actorDid := syntax.DID(star.Did)
55 recipients := sets.Singleton(syntax.DID(repo.Did))
56 eventType := models.NotificationTypeRepoStarred
57 entityType := "repo"
58 entityId := star.RepoAt.String()
59 repoId := &repo.Id
60 var issueId *int64
61 var pullId *int64
62
63 n.notifyEvent(
64 ctx,
65 actorDid,
66 recipients,
67 eventType,
68 entityType,
69 entityId,
70 repoId,
71 issueId,
72 pullId,
73 )
74}
75
76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
77 // no-op
78}
79
80func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
81 l := log.FromContext(ctx)
82
83 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
84 if err != nil {
85 l.Error("failed to fetch collaborators", "err", err)
86 return
87 }
88
89 // build the recipients list
90 // - owner of the repo
91 // - collaborators in the repo
92 // - remove users already mentioned
93 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
94 for _, c := range collaborators {
95 recipients.Insert(c.SubjectDid)
96 }
97 for _, m := range mentions {
98 recipients.Remove(m)
99 }
100
101 actorDid := syntax.DID(issue.Did)
102 entityType := "issue"
103 entityId := issue.AtUri().String()
104 repoId := &issue.Repo.Id
105 issueId := &issue.Id
106 var pullId *int64
107
108 n.notifyEvent(
109 ctx,
110 actorDid,
111 recipients,
112 models.NotificationTypeIssueCreated,
113 entityType,
114 entityId,
115 repoId,
116 issueId,
117 pullId,
118 )
119 n.notifyEvent(
120 ctx,
121 actorDid,
122 sets.Collect(slices.Values(mentions)),
123 models.NotificationTypeUserMentioned,
124 entityType,
125 entityId,
126 repoId,
127 issueId,
128 pullId,
129 )
130}
131
132func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
133 l := log.FromContext(ctx)
134
135 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt))
136 if err != nil {
137 l.Error("failed to get issues", "err", err)
138 return
139 }
140 if len(issues) == 0 {
141 l.Error("no issue found for", "err", comment.IssueAt)
142 return
143 }
144 issue := issues[0]
145
146 // built the recipients list:
147 // - the owner of the repo
148 // - | if the comment is a reply -> everybody on that thread
149 // | if the comment is a top level -> just the issue owner
150 // - remove mentioned users from the recipients list
151 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
152
153 if comment.IsReply() {
154 // if this comment is a reply, then notify everybody in that thread
155 parentAtUri := *comment.ReplyTo
156
157 // find the parent thread, and add all DIDs from here to the recipient list
158 for _, t := range issue.CommentList() {
159 if t.Self.AtUri().String() == parentAtUri {
160 for _, p := range t.Participants() {
161 recipients.Insert(p)
162 }
163 }
164 }
165 } else {
166 // not a reply, notify just the issue author
167 recipients.Insert(syntax.DID(issue.Did))
168 }
169
170 for _, m := range mentions {
171 recipients.Remove(m)
172 }
173
174 actorDid := syntax.DID(comment.Did)
175 entityType := "issue"
176 entityId := issue.AtUri().String()
177 repoId := &issue.Repo.Id
178 issueId := &issue.Id
179 var pullId *int64
180
181 n.notifyEvent(
182 ctx,
183 actorDid,
184 recipients,
185 models.NotificationTypeIssueCommented,
186 entityType,
187 entityId,
188 repoId,
189 issueId,
190 pullId,
191 )
192 n.notifyEvent(
193 ctx,
194 actorDid,
195 sets.Collect(slices.Values(mentions)),
196 models.NotificationTypeUserMentioned,
197 entityType,
198 entityId,
199 repoId,
200 issueId,
201 pullId,
202 )
203}
204
205func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
206 // no-op for now
207}
208
209func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
210 actorDid := syntax.DID(follow.UserDid)
211 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
212 eventType := models.NotificationTypeFollowed
213 entityType := "follow"
214 entityId := follow.UserDid
215 var repoId, issueId, pullId *int64
216
217 n.notifyEvent(
218 ctx,
219 actorDid,
220 recipients,
221 eventType,
222 entityType,
223 entityId,
224 repoId,
225 issueId,
226 pullId,
227 )
228}
229
230func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
231 // no-op
232}
233
234func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
235 l := log.FromContext(ctx)
236
237 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
238 if err != nil {
239 l.Error("failed to get repos", "err", err)
240 return
241 }
242 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
243 if err != nil {
244 l.Error("failed to fetch collaborators", "err", err)
245 return
246 }
247
248 // build the recipients list
249 // - owner of the repo
250 // - collaborators in the repo
251 recipients := sets.Singleton(syntax.DID(repo.Did))
252 for _, c := range collaborators {
253 recipients.Insert(c.SubjectDid)
254 }
255
256 actorDid := syntax.DID(pull.OwnerDid)
257 eventType := models.NotificationTypePullCreated
258 entityType := "pull"
259 entityId := pull.AtUri().String()
260 repoId := &repo.Id
261 var issueId *int64
262 p := int64(pull.ID)
263 pullId := &p
264
265 n.notifyEvent(
266 ctx,
267 actorDid,
268 recipients,
269 eventType,
270 entityType,
271 entityId,
272 repoId,
273 issueId,
274 pullId,
275 )
276}
277
278func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
279 l := log.FromContext(ctx)
280
281 pull, err := db.GetPull(n.db,
282 syntax.ATURI(comment.RepoAt),
283 comment.PullId,
284 )
285 if err != nil {
286 l.Error("failed to get pulls", "err", err)
287 return
288 }
289
290 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt))
291 if err != nil {
292 l.Error("failed to get repos", "err", err)
293 return
294 }
295
296 // build up the recipients list:
297 // - repo owner
298 // - all pull participants
299 // - remove those already mentioned
300 recipients := sets.Singleton(syntax.DID(repo.Did))
301 for _, p := range pull.Participants() {
302 recipients.Insert(syntax.DID(p))
303 }
304 for _, m := range mentions {
305 recipients.Remove(m)
306 }
307
308 actorDid := syntax.DID(comment.OwnerDid)
309 eventType := models.NotificationTypePullCommented
310 entityType := "pull"
311 entityId := pull.AtUri().String()
312 repoId := &repo.Id
313 var issueId *int64
314 p := int64(pull.ID)
315 pullId := &p
316
317 n.notifyEvent(
318 ctx,
319 actorDid,
320 recipients,
321 eventType,
322 entityType,
323 entityId,
324 repoId,
325 issueId,
326 pullId,
327 )
328 n.notifyEvent(
329 ctx,
330 actorDid,
331 sets.Collect(slices.Values(mentions)),
332 models.NotificationTypeUserMentioned,
333 entityType,
334 entityId,
335 repoId,
336 issueId,
337 pullId,
338 )
339}
340
341func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
342 // no-op
343}
344
345func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
346 // no-op
347}
348
349func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
350 // no-op
351}
352
353func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
354 // no-op
355}
356
357func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
358 l := log.FromContext(ctx)
359
360 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
361 if err != nil {
362 l.Error("failed to fetch collaborators", "err", err)
363 return
364 }
365
366 // build up the recipients list:
367 // - repo owner
368 // - repo collaborators
369 // - all issue participants
370 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
371 for _, c := range collaborators {
372 recipients.Insert(c.SubjectDid)
373 }
374 for _, p := range issue.Participants() {
375 recipients.Insert(syntax.DID(p))
376 }
377
378 entityType := "pull"
379 entityId := issue.AtUri().String()
380 repoId := &issue.Repo.Id
381 issueId := &issue.Id
382 var pullId *int64
383 var eventType models.NotificationType
384
385 if issue.Open {
386 eventType = models.NotificationTypeIssueReopen
387 } else {
388 eventType = models.NotificationTypeIssueClosed
389 }
390
391 n.notifyEvent(
392 ctx,
393 actor,
394 recipients,
395 eventType,
396 entityType,
397 entityId,
398 repoId,
399 issueId,
400 pullId,
401 )
402}
403
404func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
405 l := log.FromContext(ctx)
406
407 // Get repo details
408 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
409 if err != nil {
410 l.Error("failed to get repos", "err", err)
411 return
412 }
413
414 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
415 if err != nil {
416 l.Error("failed to fetch collaborators", "err", err)
417 return
418 }
419
420 // build up the recipients list:
421 // - repo owner
422 // - all pull participants
423 recipients := sets.Singleton(syntax.DID(repo.Did))
424 for _, c := range collaborators {
425 recipients.Insert(c.SubjectDid)
426 }
427 for _, p := range pull.Participants() {
428 recipients.Insert(syntax.DID(p))
429 }
430
431 entityType := "pull"
432 entityId := pull.AtUri().String()
433 repoId := &repo.Id
434 var issueId *int64
435 var eventType models.NotificationType
436 switch pull.State {
437 case models.PullClosed:
438 eventType = models.NotificationTypePullClosed
439 case models.PullOpen:
440 eventType = models.NotificationTypePullReopen
441 case models.PullMerged:
442 eventType = models.NotificationTypePullMerged
443 default:
444 l.Error("unexpected new PR state", "state", pull.State)
445 return
446 }
447 p := int64(pull.ID)
448 pullId := &p
449
450 n.notifyEvent(
451 ctx,
452 actor,
453 recipients,
454 eventType,
455 entityType,
456 entityId,
457 repoId,
458 issueId,
459 pullId,
460 )
461}
462
463func (n *databaseNotifier) notifyEvent(
464 ctx context.Context,
465 actorDid syntax.DID,
466 recipients sets.Set[syntax.DID],
467 eventType models.NotificationType,
468 entityType string,
469 entityId string,
470 repoId *int64,
471 issueId *int64,
472 pullId *int64,
473) {
474 l := log.FromContext(ctx)
475
476 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
477 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
478 return
479 }
480
481 recipients.Remove(actorDid)
482
483 prefMap, err := db.GetNotificationPreferences(
484 n.db,
485 orm.FilterIn("user_did", slices.Collect(recipients.All())),
486 )
487 if err != nil {
488 // failed to get prefs for users
489 return
490 }
491
492 // create a transaction for bulk notification storage
493 tx, err := n.db.Begin()
494 if err != nil {
495 // failed to start tx
496 return
497 }
498 defer tx.Rollback()
499
500 // filter based on preferences
501 for recipientDid := range recipients.All() {
502 prefs, ok := prefMap[recipientDid]
503 if !ok {
504 prefs = models.DefaultNotificationPreferences(recipientDid)
505 }
506
507 // skip users who don’t want this type
508 if !prefs.ShouldNotify(eventType) {
509 continue
510 }
511
512 // create notification
513 notif := &models.Notification{
514 RecipientDid: recipientDid.String(),
515 ActorDid: actorDid.String(),
516 Type: eventType,
517 EntityType: entityType,
518 EntityId: entityId,
519 RepoId: repoId,
520 IssueId: issueId,
521 PullId: pullId,
522 }
523
524 if err := db.CreateNotification(tx, notif); err != nil {
525 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
526 }
527 }
528
529 if err := tx.Commit(); err != nil {
530 // failed to commit
531 return
532 }
533}