this repo has no description
1package db
2
3import (
4 "context"
5 "log"
6 "maps"
7 "slices"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/db"
11 "tangled.org/core/appview/models"
12 "tangled.org/core/appview/notify"
13 "tangled.org/core/idresolver"
14)
15
16type databaseNotifier struct {
17 db *db.DB
18 res *idresolver.Resolver
19}
20
21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
22 return &databaseNotifier{
23 db: database,
24 res: resolver,
25 }
26}
27
28var _ notify.Notifier = &databaseNotifier{}
29
30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
31 // no-op for now
32}
33
34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
35 var err error
36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt)))
37 if err != nil {
38 log.Printf("NewStar: failed to get repos: %v", err)
39 return
40 }
41
42 actorDid := syntax.DID(star.StarredByDid)
43 recipients := []syntax.DID{syntax.DID(repo.Did)}
44 eventType := models.NotificationTypeRepoStarred
45 entityType := "repo"
46 entityId := star.RepoAt.String()
47 repoId := &repo.Id
48 var issueId *int64
49 var pullId *int64
50
51 n.notifyEvent(
52 actorDid,
53 recipients,
54 eventType,
55 entityType,
56 entityId,
57 repoId,
58 issueId,
59 pullId,
60 )
61}
62
63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
64 // no-op
65}
66
67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
68
69 // build the recipients list
70 // - owner of the repo
71 // - collaborators in the repo
72 var recipients []syntax.DID
73 recipients = append(recipients, syntax.DID(issue.Repo.Did))
74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
75 if err != nil {
76 log.Printf("failed to fetch collaborators: %v", err)
77 return
78 }
79 for _, c := range collaborators {
80 recipients = append(recipients, c.SubjectDid)
81 }
82
83 actorDid := syntax.DID(issue.Did)
84 entityType := "issue"
85 entityId := issue.AtUri().String()
86 repoId := &issue.Repo.Id
87 issueId := &issue.Id
88 var pullId *int64
89
90 n.notifyEvent(
91 actorDid,
92 recipients,
93 models.NotificationTypeIssueCreated,
94 entityType,
95 entityId,
96 repoId,
97 issueId,
98 pullId,
99 )
100 n.notifyEvent(
101 actorDid,
102 mentions,
103 models.NotificationTypeUserMentioned,
104 entityType,
105 entityId,
106 repoId,
107 issueId,
108 pullId,
109 )
110}
111
112func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) {
113 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt))
114 if err != nil {
115 log.Printf("NewIssueComment: failed to get issues: %v", err)
116 return
117 }
118 if len(issues) == 0 {
119 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt)
120 return
121 }
122 issue := issues[0]
123
124 var recipients []syntax.DID
125 recipients = append(recipients, syntax.DID(issue.Repo.Did))
126
127 if comment.IsReply() {
128 // if this comment is a reply, then notify everybody in that thread
129 parentAtUri := *comment.ReplyTo
130 allThreads := issue.CommentList()
131
132 // find the parent thread, and add all DIDs from here to the recipient list
133 for _, t := range allThreads {
134 if t.Self.AtUri().String() == parentAtUri {
135 recipients = append(recipients, t.Participants()...)
136 }
137 }
138 } else {
139 // not a reply, notify just the issue author
140 recipients = append(recipients, syntax.DID(issue.Did))
141 }
142
143 actorDid := syntax.DID(comment.Did)
144 entityType := "issue"
145 entityId := issue.AtUri().String()
146 repoId := &issue.Repo.Id
147 issueId := &issue.Id
148 var pullId *int64
149
150 n.notifyEvent(
151 actorDid,
152 recipients,
153 models.NotificationTypeIssueCommented,
154 entityType,
155 entityId,
156 repoId,
157 issueId,
158 pullId,
159 )
160 n.notifyEvent(
161 actorDid,
162 mentions,
163 models.NotificationTypeUserMentioned,
164 entityType,
165 entityId,
166 repoId,
167 issueId,
168 pullId,
169 )
170}
171
172func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
173 // no-op for now
174}
175
176func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
177 actorDid := syntax.DID(follow.UserDid)
178 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)}
179 eventType := models.NotificationTypeFollowed
180 entityType := "follow"
181 entityId := follow.UserDid
182 var repoId, issueId, pullId *int64
183
184 n.notifyEvent(
185 actorDid,
186 recipients,
187 eventType,
188 entityType,
189 entityId,
190 repoId,
191 issueId,
192 pullId,
193 )
194}
195
196func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
197 // no-op
198}
199
200func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
201 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
202 if err != nil {
203 log.Printf("NewPull: failed to get repos: %v", err)
204 return
205 }
206
207 // build the recipients list
208 // - owner of the repo
209 // - collaborators in the repo
210 var recipients []syntax.DID
211 recipients = append(recipients, syntax.DID(repo.Did))
212 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
213 if err != nil {
214 log.Printf("failed to fetch collaborators: %v", err)
215 return
216 }
217 for _, c := range collaborators {
218 recipients = append(recipients, c.SubjectDid)
219 }
220
221 actorDid := syntax.DID(pull.OwnerDid)
222 eventType := models.NotificationTypePullCreated
223 entityType := "pull"
224 entityId := pull.AtUri().String()
225 repoId := &repo.Id
226 var issueId *int64
227 p := int64(pull.ID)
228 pullId := &p
229
230 n.notifyEvent(
231 actorDid,
232 recipients,
233 eventType,
234 entityType,
235 entityId,
236 repoId,
237 issueId,
238 pullId,
239 )
240}
241
242func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) {
243 pull, err := db.GetPull(n.db,
244 syntax.ATURI(comment.RepoAt),
245 comment.PullId,
246 )
247 if err != nil {
248 log.Printf("NewPullComment: failed to get pulls: %v", err)
249 return
250 }
251
252 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt))
253 if err != nil {
254 log.Printf("NewPullComment: failed to get repos: %v", err)
255 return
256 }
257
258 // build up the recipients list:
259 // - repo owner
260 // - all pull participants
261 var recipients []syntax.DID
262 recipients = append(recipients, syntax.DID(repo.Did))
263 for _, p := range pull.Participants() {
264 recipients = append(recipients, syntax.DID(p))
265 }
266
267 actorDid := syntax.DID(comment.OwnerDid)
268 eventType := models.NotificationTypePullCommented
269 entityType := "pull"
270 entityId := pull.AtUri().String()
271 repoId := &repo.Id
272 var issueId *int64
273 p := int64(pull.ID)
274 pullId := &p
275
276 n.notifyEvent(
277 actorDid,
278 recipients,
279 eventType,
280 entityType,
281 entityId,
282 repoId,
283 issueId,
284 pullId,
285 )
286}
287
288func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
289 // no-op
290}
291
292func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
293 // no-op
294}
295
296func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
297 // no-op
298}
299
300func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
301 // no-op
302}
303
304func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
305 // build up the recipients list:
306 // - repo owner
307 // - repo collaborators
308 // - all issue participants
309 var recipients []syntax.DID
310 recipients = append(recipients, syntax.DID(issue.Repo.Did))
311 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt()))
312 if err != nil {
313 log.Printf("failed to fetch collaborators: %v", err)
314 return
315 }
316 for _, c := range collaborators {
317 recipients = append(recipients, c.SubjectDid)
318 }
319 for _, p := range issue.Participants() {
320 recipients = append(recipients, syntax.DID(p))
321 }
322
323 entityType := "pull"
324 entityId := issue.AtUri().String()
325 repoId := &issue.Repo.Id
326 issueId := &issue.Id
327 var pullId *int64
328 var eventType models.NotificationType
329
330 if issue.Open {
331 eventType = models.NotificationTypeIssueReopen
332 } else {
333 eventType = models.NotificationTypeIssueClosed
334 }
335
336 n.notifyEvent(
337 actor,
338 recipients,
339 eventType,
340 entityType,
341 entityId,
342 repoId,
343 issueId,
344 pullId,
345 )
346}
347
348func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
349 // Get repo details
350 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt)))
351 if err != nil {
352 log.Printf("NewPullState: failed to get repos: %v", err)
353 return
354 }
355
356 // build up the recipients list:
357 // - repo owner
358 // - all pull participants
359 var recipients []syntax.DID
360 recipients = append(recipients, syntax.DID(repo.Did))
361 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt()))
362 if err != nil {
363 log.Printf("failed to fetch collaborators: %v", err)
364 return
365 }
366 for _, c := range collaborators {
367 recipients = append(recipients, c.SubjectDid)
368 }
369 for _, p := range pull.Participants() {
370 recipients = append(recipients, syntax.DID(p))
371 }
372
373 entityType := "pull"
374 entityId := pull.AtUri().String()
375 repoId := &repo.Id
376 var issueId *int64
377 var eventType models.NotificationType
378 switch pull.State {
379 case models.PullClosed:
380 eventType = models.NotificationTypePullClosed
381 case models.PullOpen:
382 eventType = models.NotificationTypePullReopen
383 case models.PullMerged:
384 eventType = models.NotificationTypePullMerged
385 default:
386 log.Println("NewPullState: unexpected new PR state:", pull.State)
387 return
388 }
389 p := int64(pull.ID)
390 pullId := &p
391
392 n.notifyEvent(
393 actor,
394 recipients,
395 eventType,
396 entityType,
397 entityId,
398 repoId,
399 issueId,
400 pullId,
401 )
402}
403
404func (n *databaseNotifier) notifyEvent(
405 actorDid syntax.DID,
406 recipients []syntax.DID,
407 eventType models.NotificationType,
408 entityType string,
409 entityId string,
410 repoId *int64,
411 issueId *int64,
412 pullId *int64,
413) {
414 recipientSet := make(map[syntax.DID]struct{})
415 for _, did := range recipients {
416 // everybody except actor themselves
417 if did != actorDid {
418 recipientSet[did] = struct{}{}
419 }
420 }
421
422 prefMap, err := db.GetNotificationPreferences(
423 n.db,
424 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))),
425 )
426 if err != nil {
427 // failed to get prefs for users
428 return
429 }
430
431 // create a transaction for bulk notification storage
432 tx, err := n.db.Begin()
433 if err != nil {
434 // failed to start tx
435 return
436 }
437 defer tx.Rollback()
438
439 // filter based on preferences
440 for recipientDid := range recipientSet {
441 prefs, ok := prefMap[recipientDid]
442 if !ok {
443 prefs = models.DefaultNotificationPreferences(recipientDid)
444 }
445
446 // skip users who don’t want this type
447 if !prefs.ShouldNotify(eventType) {
448 continue
449 }
450
451 // create notification
452 notif := &models.Notification{
453 RecipientDid: recipientDid.String(),
454 ActorDid: actorDid.String(),
455 Type: eventType,
456 EntityType: entityType,
457 EntityId: entityId,
458 RepoId: repoId,
459 IssueId: issueId,
460 PullId: pullId,
461 }
462
463 if err := db.CreateNotification(tx, notif); err != nil {
464 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err)
465 }
466 }
467
468 if err := tx.Commit(); err != nil {
469 // failed to commit
470 return
471 }
472}