this repo has no description
1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16type databaseNotifier struct {
17 db *db.DB
18 res *idresolver.Resolver
19}
20
21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
22 return &databaseNotifier{
23 db: database,
24 res: resolver,
25 }
26}
27
28var _ notify.Notifier = &databaseNotifier{}
29
30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
31 // no-op for now
32}
33
34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
35 var err error
36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
37 if err != nil {
38 log.Printf("NewStar: failed to get repos: %v", err)
39 return
40 }
41
42 actorDid := syntax.DID(star.StarredByDid)
43 recipients := []syntax.DID{syntax.DID(repo.Did)}
44 eventType := models.NotificationTypeRepoStarred
45 entityType := "repo"
46 entityId := star.RepoAt.String()
47 repoId := &repo.Id
48 var issueId *int64
49 var pullId *int64
50
51 n.notifyEvent(
52 actorDid,
53 recipients,
54 eventType,
55 entityType,
56 entityId,
57 repoId,
58 issueId,
59 pullId,
60 )
61}
62
63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
64 // no-op
65}
66
67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue) {
68
69 // build the recipients list
70 // - owner of the repo
71 // - collaborators in the repo
72 var recipients []syntax.DID
73 recipients = append(recipients, syntax.DID(issue.Repo.Did))
74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
75 if err != nil {
76 log.Printf("failed to fetch collaborators: %v", err)
77 return
78 }
79 for _, c := range collaborators {
80 recipients = append(recipients, c.SubjectDid)
81 }
82
83 actorDid := syntax.DID(issue.Did)
84 eventType := models.NotificationTypeIssueCreated
85 entityType := "issue"
86 entityId := issue.AtUri().String()
87 repoId := &issue.Repo.Id
88 issueId := &issue.Id
89 var pullId *int64
90
91 n.notifyEvent(
92 actorDid,
93 recipients,
94 eventType,
95 entityType,
96 entityId,
97 repoId,
98 issueId,
99 pullId,
100 )
101}
102
103func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment) {
104 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
105 if err != nil {
106 log.Printf("NewIssueComment: failed to get issues: %v", err)
107 return
108 }
109 if len(issues) == 0 {
110 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
111 return
112 }
113 issue := issues[0]
114
115 var recipients []syntax.DID
116 recipients = append(recipients, syntax.DID(issue.Repo.Did))
117
118 if comment.IsReply() {
119 // if this comment is a reply, then notify everybody in that thread
120 parentAtUri := *comment.ReplyTo
121 allThreads := issue.CommentList()
122
123 // find the parent thread, and add all DIDs from here to the recipient list
124 for _, t := range allThreads {
125 if t.Self.AtUri().String() == parentAtUri {
126 recipients = append(recipients, t.Participants()...)
127 }
128 }
129 } else {
130 // not a reply, notify just the issue author
131 recipients = append(recipients, syntax.DID(issue.Did))
132 }
133
134 actorDid := syntax.DID(comment.Did)
135 eventType := models.NotificationTypeIssueCommented
136 entityType := "issue"
137 entityId := issue.AtUri().String()
138 repoId := &issue.Repo.Id
139 issueId := &issue.Id
140 var pullId *int64
141
142 n.notifyEvent(
143 actorDid,
144 recipients,
145 eventType,
146 entityType,
147 entityId,
148 repoId,
149 issueId,
150 pullId,
151 )
152}
153
154func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
155 // no-op for now
156}
157
158func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
159 actorDid := syntax.DID(follow.UserDid)
160 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
161 eventType := models.NotificationTypeFollowed
162 entityType := "follow"
163 entityId := follow.UserDid
164 var repoId, issueId, pullId *int64
165
166 n.notifyEvent(
167 actorDid,
168 recipients,
169 eventType,
170 entityType,
171 entityId,
172 repoId,
173 issueId,
174 pullId,
175 )
176}
177
178func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
179 // no-op
180}
181
182func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
183 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
184 if err != nil {
185 log.Printf("NewPull: failed to get repos: %v", err)
186 return
187 }
188
189 // build the recipients list
190 // - owner of the repo
191 // - collaborators in the repo
192 var recipients []syntax.DID
193 recipients = append(recipients, syntax.DID(repo.Did))
194 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
195 if err != nil {
196 log.Printf("failed to fetch collaborators: %v", err)
197 return
198 }
199 for _, c := range collaborators {
200 recipients = append(recipients, c.SubjectDid)
201 }
202
203 actorDid := syntax.DID(pull.OwnerDid)
204 eventType := models.NotificationTypePullCreated
205 entityType := "pull"
206 entityId := pull.PullAt().String()
207 repoId := &repo.Id
208 var issueId *int64
209 p := int64(pull.ID)
210 pullId := &p
211
212 n.notifyEvent(
213 actorDid,
214 recipients,
215 eventType,
216 entityType,
217 entityId,
218 repoId,
219 issueId,
220 pullId,
221 )
222}
223
224func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) {
225 pull, err := db.GetPull(n.db,
226 syntax.ATURI(comment.RepoAt),
227 comment.PullId,
228 )
229 if err != nil {
230 log.Printf("NewPullComment: failed to get pulls: %v", err)
231 return
232 }
233
234 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
235 if err != nil {
236 log.Printf("NewPullComment: failed to get repos: %v", err)
237 return
238 }
239
240 // build up the recipients list:
241 // - repo owner
242 // - all pull participants
243 var recipients []syntax.DID
244 recipients = append(recipients, syntax.DID(repo.Did))
245 for _, p := range pull.Participants() {
246 recipients = append(recipients, syntax.DID(p))
247 }
248
249 actorDid := syntax.DID(comment.OwnerDid)
250 eventType := models.NotificationTypePullCommented
251 entityType := "pull"
252 entityId := pull.PullAt().String()
253 repoId := &repo.Id
254 var issueId *int64
255 p := int64(pull.ID)
256 pullId := &p
257
258 n.notifyEvent(
259 actorDid,
260 recipients,
261 eventType,
262 entityType,
263 entityId,
264 repoId,
265 issueId,
266 pullId,
267 )
268}
269
270func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
271 // no-op
272}
273
274func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
275 // no-op
276}
277
278func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
279 // no-op
280}
281
282func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
283 // no-op
284}
285
286func (n *databaseNotifier) NewIssueState(ctx context.Context, issue *models.Issue) {
287 // build up the recipients list:
288 // - repo owner
289 // - repo collaborators
290 // - all issue participants
291 var recipients []syntax.DID
292 recipients = append(recipients, syntax.DID(issue.Repo.Did))
293 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
294 if err != nil {
295 log.Printf("failed to fetch collaborators: %v", err)
296 return
297 }
298 for _, c := range collaborators {
299 recipients = append(recipients, c.SubjectDid)
300 }
301 for _, p := range issue.Participants() {
302 recipients = append(recipients, syntax.DID(p))
303 }
304
305 actorDid := syntax.DID(issue.Repo.Did)
306 entityType := "pull"
307 entityId := issue.AtUri().String()
308 repoId := &issue.Repo.Id
309 issueId := &issue.Id
310 var pullId *int64
311 var eventType models.NotificationType
312
313 if issue.Open {
314 eventType = models.NotificationTypeIssueReopen
315 } else {
316 eventType = models.NotificationTypeIssueClosed
317 }
318
319 n.notifyEvent(
320 actorDid,
321 recipients,
322 eventType,
323 entityType,
324 entityId,
325 repoId,
326 issueId,
327 pullId,
328 )
329}
330
331func (n *databaseNotifier) NewPullMerged(ctx context.Context, pull *models.Pull) {
332 // Get repo details
333 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
334 if err != nil {
335 log.Printf("NewPullMerged: failed to get repos: %v", err)
336 return
337 }
338
339 // build up the recipients list:
340 // - repo owner
341 // - all pull participants
342 var recipients []syntax.DID
343 recipients = append(recipients, syntax.DID(repo.Did))
344 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
345 if err != nil {
346 log.Printf("failed to fetch collaborators: %v", err)
347 return
348 }
349 for _, c := range collaborators {
350 recipients = append(recipients, c.SubjectDid)
351 }
352 for _, p := range pull.Participants() {
353 recipients = append(recipients, syntax.DID(p))
354 }
355
356 actorDid := syntax.DID(repo.Did)
357 eventType := models.NotificationTypePullMerged
358 entityType := "pull"
359 entityId := pull.PullAt().String()
360 repoId := &repo.Id
361 var issueId *int64
362 p := int64(pull.ID)
363 pullId := &p
364
365 n.notifyEvent(
366 actorDid,
367 recipients,
368 eventType,
369 entityType,
370 entityId,
371 repoId,
372 issueId,
373 pullId,
374 )
375}
376
377func (n *databaseNotifier) NewPullClosed(ctx context.Context, pull *models.Pull) {
378 // Get repo details
379 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
380 if err != nil {
381 log.Printf("NewPullMerged: failed to get repos: %v", err)
382 return
383 }
384
385 // build up the recipients list:
386 // - repo owner
387 // - all pull participants
388 var recipients []syntax.DID
389 recipients = append(recipients, syntax.DID(repo.Did))
390 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
391 if err != nil {
392 log.Printf("failed to fetch collaborators: %v", err)
393 return
394 }
395 for _, c := range collaborators {
396 recipients = append(recipients, c.SubjectDid)
397 }
398 for _, p := range pull.Participants() {
399 recipients = append(recipients, syntax.DID(p))
400 }
401
402 actorDid := syntax.DID(repo.Did)
403 eventType := models.NotificationTypePullClosed
404 entityType := "pull"
405 entityId := pull.PullAt().String()
406 repoId := &repo.Id
407 var issueId *int64
408 p := int64(pull.ID)
409 pullId := &p
410
411 n.notifyEvent(
412 actorDid,
413 recipients,
414 eventType,
415 entityType,
416 entityId,
417 repoId,
418 issueId,
419 pullId,
420 )
421}
422
423func (n *databaseNotifier) NewPullReopen(ctx context.Context, pull *models.Pull) {
424 // Get repo details
425 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
426 if err != nil {
427 log.Printf("NewPullMerged: failed to get repos: %v", err)
428 return
429 }
430
431 // build up the recipients list:
432 // - repo owner
433 // - all pull participants
434 var recipients []syntax.DID
435 recipients = append(recipients, syntax.DID(repo.Did))
436 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
437 if err != nil {
438 log.Printf("failed to fetch collaborators: %v", err)
439 return
440 }
441 for _, c := range collaborators {
442 recipients = append(recipients, c.SubjectDid)
443 }
444 for _, p := range pull.Participants() {
445 recipients = append(recipients, syntax.DID(p))
446 }
447
448 actorDid := syntax.DID(repo.Did)
449 eventType := models.NotificationTypePullReopen
450 entityType := "pull"
451 entityId := pull.PullAt().String()
452 repoId := &repo.Id
453 var issueId *int64
454 p := int64(pull.ID)
455 pullId := &p
456
457 n.notifyEvent(
458 actorDid,
459 recipients,
460 eventType,
461 entityType,
462 entityId,
463 repoId,
464 issueId,
465 pullId,
466 )
467}
468
469func (n *databaseNotifier) notifyEvent(
470 actorDid syntax.DID,
471 recipients []syntax.DID,
472 eventType models.NotificationType,
473 entityType string,
474 entityId string,
475 repoId *int64,
476 issueId *int64,
477 pullId *int64,
478) {
479 recipientSet := make(map[syntax.DID]struct{})
480 for _, did := range recipients {
481 // everybody except actor themselves
482 if did != actorDid {
483 recipientSet[did] = struct{}{}
484 }
485 }
486
487 prefMap, err := db.GetNotificationPreferences(
488 n.db,
489 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
490 )
491 if err != nil {
492 // failed to get prefs for users
493 return
494 }
495
496 // create a transaction for bulk notification storage
497 tx, err := n.db.Begin()
498 if err != nil {
499 // failed to start tx
500 return
501 }
502 defer tx.Rollback()
503
504 // filter based on preferences
505 for recipientDid := range recipientSet {
506 prefs, ok := prefMap[recipientDid]
507 if !ok {
508 prefs = models.DefaultNotificationPreferences(recipientDid)
509 }
510
511 // skip users who don’t want this type
512 if !prefs.ShouldNotify(eventType) {
513 continue
514 }
515
516 // create notification
517 notif := &models.Notification{
518 RecipientDid: recipientDid.String(),
519 ActorDid: actorDid.String(),
520 Type: eventType,
521 EntityType: entityType,
522 EntityId: entityId,
523 RepoId: repoId,
524 IssueId: issueId,
525 PullId: pullId,
526 }
527
528 if err := db.CreateNotification(tx, notif); err != nil {
529 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
530 }
531 }
532
533 if err := tx.Commit(); err != nil {
534 // failed to commit
535 return
536 }
537}