this repo has no description
1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16type databaseNotifier struct {
17 db *db.DB
18 res *idresolver.Resolver
19}
20
21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
22 return &databaseNotifier{
23 db: database,
24 res: resolver,
25 }
26}
27
28var _ notify.Notifier = &databaseNotifier{}
29
30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
31 // no-op for now
32}
33
34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
35 var err error
36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
37 if err != nil {
38 log.Printf("NewStar: failed to get repos: %v", err)
39 return
40 }
41
42 actorDid := syntax.DID(star.StarredByDid)
43 recipients := []syntax.DID{syntax.DID(repo.Did)}
44 eventType := models.NotificationTypeRepoStarred
45 entityType := "repo"
46 entityId := star.RepoAt.String()
47 repoId := &repo.Id
48 var issueId *int64
49 var pullId *int64
50
51 n.notifyEvent(
52 actorDid,
53 recipients,
54 eventType,
55 entityType,
56 entityId,
57 repoId,
58 issueId,
59 pullId,
60 )
61}
62
63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
64 // no-op
65}
66
67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
68
69 // build the recipients list
70 // - owner of the repo
71 // - collaborators in the repo
72 var recipients []syntax.DID
73 recipients = append(recipients, syntax.DID(issue.Repo.Did))
74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
75 if err != nil {
76 log.Printf("failed to fetch collaborators: %v", err)
77 return
78 }
79 for _, c := range collaborators {
80 recipients = append(recipients, c.SubjectDid)
81 }
82
83 actorDid := syntax.DID(issue.Did)
84 entityType := "issue"
85 entityId := issue.AtUri().String()
86 repoId := &issue.Repo.Id
87 issueId := &issue.Id
88 var pullId *int64
89
90 n.notifyEvent(
91 actorDid,
92 recipients,
93 models.NotificationTypeIssueCreated,
94 entityType,
95 entityId,
96 repoId,
97 issueId,
98 pullId,
99 )
100 n.notifyEvent(
101 actorDid,
102 mentions,
103 models.NotificationTypeUserMentioned,
104 entityType,
105 entityId,
106 repoId,
107 issueId,
108 pullId,
109 )
110}
111
112func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
113 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
114 if err != nil {
115 log.Printf("NewIssueComment: failed to get issues: %v", err)
116 return
117 }
118 if len(issues) == 0 {
119 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
120 return
121 }
122 issue := issues[0]
123
124 var recipients []syntax.DID
125 recipients = append(recipients, syntax.DID(issue.Repo.Did))
126
127 if comment.IsReply() {
128 // if this comment is a reply, then notify everybody in that thread
129 parentAtUri := *comment.ReplyTo
130 allThreads := issue.CommentList()
131
132 // find the parent thread, and add all DIDs from here to the recipient list
133 for _, t := range allThreads {
134 if t.Self.AtUri().String() == parentAtUri {
135 recipients = append(recipients, t.Participants()...)
136 }
137 }
138 } else {
139 // not a reply, notify just the issue author
140 recipients = append(recipients, syntax.DID(issue.Did))
141 }
142
143 actorDid := syntax.DID(comment.Did)
144 entityType := "issue"
145 entityId := issue.AtUri().String()
146 repoId := &issue.Repo.Id
147 issueId := &issue.Id
148 var pullId *int64
149
150 n.notifyEvent(
151 actorDid,
152 recipients,
153 models.NotificationTypeIssueCommented,
154 entityType,
155 entityId,
156 repoId,
157 issueId,
158 pullId,
159 )
160 n.notifyEvent(
161 actorDid,
162 mentions,
163 models.NotificationTypeUserMentioned,
164 entityType,
165 entityId,
166 repoId,
167 issueId,
168 pullId,
169 )
170}
171
172func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
173 // no-op for now
174}
175
176func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
177 actorDid := syntax.DID(follow.UserDid)
178 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
179 eventType := models.NotificationTypeFollowed
180 entityType := "follow"
181 entityId := follow.UserDid
182 var repoId, issueId, pullId *int64
183
184 n.notifyEvent(
185 actorDid,
186 recipients,
187 eventType,
188 entityType,
189 entityId,
190 repoId,
191 issueId,
192 pullId,
193 )
194}
195
196func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
197 // no-op
198}
199
200func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
201 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
202 if err != nil {
203 log.Printf("NewPull: failed to get repos: %v", err)
204 return
205 }
206
207 // build the recipients list
208 // - owner of the repo
209 // - collaborators in the repo
210 var recipients []syntax.DID
211 recipients = append(recipients, syntax.DID(repo.Did))
212 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
213 if err != nil {
214 log.Printf("failed to fetch collaborators: %v", err)
215 return
216 }
217 for _, c := range collaborators {
218 recipients = append(recipients, c.SubjectDid)
219 }
220
221 actorDid := syntax.DID(pull.OwnerDid)
222 eventType := models.NotificationTypePullCreated
223 entityType := "pull"
224 entityId := pull.AtUri().String()
225 repoId := &repo.Id
226 var issueId *int64
227 p := int64(pull.ID)
228 pullId := &p
229
230 n.notifyEvent(
231 actorDid,
232 recipients,
233 eventType,
234 entityType,
235 entityId,
236 repoId,
237 issueId,
238 pullId,
239 )
240}
241
242func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) {
243 pull, err := db.GetPull(n.db,
244 syntax.ATURI(comment.RepoAt),
245 comment.PullId,
246 )
247 if err != nil {
248 log.Printf("NewPullComment: failed to get pulls: %v", err)
249 return
250 }
251
252 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
253 if err != nil {
254 log.Printf("NewPullComment: failed to get repos: %v", err)
255 return
256 }
257
258 // build up the recipients list:
259 // - repo owner
260 // - all pull participants
261 var recipients []syntax.DID
262 recipients = append(recipients, syntax.DID(repo.Did))
263 for _, p := range pull.Participants() {
264 recipients = append(recipients, syntax.DID(p))
265 }
266
267 actorDid := syntax.DID(comment.OwnerDid)
268 eventType := models.NotificationTypePullCommented
269 entityType := "pull"
270 entityId := pull.AtUri().String()
271 repoId := &repo.Id
272 var issueId *int64
273 p := int64(pull.ID)
274 pullId := &p
275
276 n.notifyEvent(
277 actorDid,
278 recipients,
279 eventType,
280 entityType,
281 entityId,
282 repoId,
283 issueId,
284 pullId,
285 )
286 n.notifyEvent(
287 actorDid,
288 mentions,
289 models.NotificationTypeUserMentioned,
290 entityType,
291 entityId,
292 repoId,
293 issueId,
294 pullId,
295 )
296}
297
298func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
299 // no-op
300}
301
302func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
303 // no-op
304}
305
306func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
307 // no-op
308}
309
310func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
311 // no-op
312}
313
314func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
315 // build up the recipients list:
316 // - repo owner
317 // - repo collaborators
318 // - all issue participants
319 var recipients []syntax.DID
320 recipients = append(recipients, syntax.DID(issue.Repo.Did))
321 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
322 if err != nil {
323 log.Printf("failed to fetch collaborators: %v", err)
324 return
325 }
326 for _, c := range collaborators {
327 recipients = append(recipients, c.SubjectDid)
328 }
329 for _, p := range issue.Participants() {
330 recipients = append(recipients, syntax.DID(p))
331 }
332
333 entityType := "pull"
334 entityId := issue.AtUri().String()
335 repoId := &issue.Repo.Id
336 issueId := &issue.Id
337 var pullId *int64
338 var eventType models.NotificationType
339
340 if issue.Open {
341 eventType = models.NotificationTypeIssueReopen
342 } else {
343 eventType = models.NotificationTypeIssueClosed
344 }
345
346 n.notifyEvent(
347 actor,
348 recipients,
349 eventType,
350 entityType,
351 entityId,
352 repoId,
353 issueId,
354 pullId,
355 )
356}
357
358func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
359 // Get repo details
360 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
361 if err != nil {
362 log.Printf("NewPullState: failed to get repos: %v", err)
363 return
364 }
365
366 // build up the recipients list:
367 // - repo owner
368 // - all pull participants
369 var recipients []syntax.DID
370 recipients = append(recipients, syntax.DID(repo.Did))
371 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
372 if err != nil {
373 log.Printf("failed to fetch collaborators: %v", err)
374 return
375 }
376 for _, c := range collaborators {
377 recipients = append(recipients, c.SubjectDid)
378 }
379 for _, p := range pull.Participants() {
380 recipients = append(recipients, syntax.DID(p))
381 }
382
383 entityType := "pull"
384 entityId := pull.AtUri().String()
385 repoId := &repo.Id
386 var issueId *int64
387 var eventType models.NotificationType
388 switch pull.State {
389 case models.PullClosed:
390 eventType = models.NotificationTypePullClosed
391 case models.PullOpen:
392 eventType = models.NotificationTypePullReopen
393 case models.PullMerged:
394 eventType = models.NotificationTypePullMerged
395 default:
396 log.Println("NewPullState: unexpected new PR state:", pull.State)
397 return
398 }
399 p := int64(pull.ID)
400 pullId := &p
401
402 n.notifyEvent(
403 actor,
404 recipients,
405 eventType,
406 entityType,
407 entityId,
408 repoId,
409 issueId,
410 pullId,
411 )
412}
413
414func (n *databaseNotifier) notifyEvent(
415 actorDid syntax.DID,
416 recipients []syntax.DID,
417 eventType models.NotificationType,
418 entityType string,
419 entityId string,
420 repoId *int64,
421 issueId *int64,
422 pullId *int64,
423) {
424 recipientSet := make(map[syntax.DID]struct{})
425 for _, did := range recipients {
426 // everybody except actor themselves
427 if did != actorDid {
428 recipientSet[did] = struct{}{}
429 }
430 }
431
432 prefMap, err := db.GetNotificationPreferences(
433 n.db,
434 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
435 )
436 if err != nil {
437 // failed to get prefs for users
438 return
439 }
440
441 // create a transaction for bulk notification storage
442 tx, err := n.db.Begin()
443 if err != nil {
444 // failed to start tx
445 return
446 }
447 defer tx.Rollback()
448
449 // filter based on preferences
450 for recipientDid := range recipientSet {
451 prefs, ok := prefMap[recipientDid]
452 if !ok {
453 prefs = models.DefaultNotificationPreferences(recipientDid)
454 }
455
456 // skip users who don’t want this type
457 if !prefs.ShouldNotify(eventType) {
458 continue
459 }
460
461 // create notification
462 notif := &models.Notification{
463 RecipientDid: recipientDid.String(),
464 ActorDid: actorDid.String(),
465 Type: eventType,
466 EntityType: entityType,
467 EntityId: entityId,
468 RepoId: repoId,
469 IssueId: issueId,
470 PullId: pullId,
471 }
472
473 if err := db.CreateNotification(tx, notif); err != nil {
474 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
475 }
476 }
477
478 if err := tx.Commit(); err != nil {
479 // failed to commit
480 return
481 }
482}