this repo has no description
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14) 15 16type databaseNotifier struct { 17 db *db.DB 18 res *idresolver.Resolver 19} 20 21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 22 return &databaseNotifier{ 23 db: database, 24 res: resolver, 25 } 26} 27 28var _ notify.Notifier = &databaseNotifier{} 29 30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 31 // no-op for now 32} 33 34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 35 var err error 36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 37 if err != nil { 38 log.Printf("NewStar: failed to get repos: %v", err) 39 return 40 } 41 42 actorDid := syntax.DID(star.StarredByDid) 43 recipients := []syntax.DID{syntax.DID(repo.Did)} 44 eventType := models.NotificationTypeRepoStarred 45 entityType := "repo" 46 entityId := star.RepoAt.String() 47 repoId := &repo.Id 48 var issueId *int64 49 var pullId *int64 50 51 n.notifyEvent( 52 actorDid, 53 recipients, 54 eventType, 55 entityType, 56 entityId, 57 repoId, 58 issueId, 59 pullId, 60 ) 61} 62 63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 64 // no-op 65} 66 67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 68 69 // build the recipients list 70 // - owner of the repo 71 // - collaborators in the repo 72 var recipients []syntax.DID 73 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 75 if err != nil { 76 log.Printf("failed to fetch collaborators: %v", err) 77 return 78 } 79 for _, c := range collaborators { 80 recipients = append(recipients, c.SubjectDid) 81 } 82 83 actorDid := syntax.DID(issue.Did) 84 entityType := "issue" 85 entityId := issue.AtUri().String() 86 repoId := &issue.Repo.Id 87 issueId := &issue.Id 88 var pullId *int64 89 90 n.notifyEvent( 91 actorDid, 92 recipients, 93 models.NotificationTypeIssueCreated, 94 entityType, 95 entityId, 96 repoId, 97 issueId, 98 pullId, 99 ) 100 n.notifyEvent( 101 actorDid, 102 mentions, 103 models.NotificationTypeUserMentioned, 104 entityType, 105 entityId, 106 repoId, 107 issueId, 108 pullId, 109 ) 110} 111 112func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 113 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 114 if err != nil { 115 log.Printf("NewIssueComment: failed to get issues: %v", err) 116 return 117 } 118 if len(issues) == 0 { 119 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 120 return 121 } 122 issue := issues[0] 123 124 var recipients []syntax.DID 125 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 126 127 if comment.IsReply() { 128 // if this comment is a reply, then notify everybody in that thread 129 parentAtUri := *comment.ReplyTo 130 allThreads := issue.CommentList() 131 132 // find the parent thread, and add all DIDs from here to the recipient list 133 for _, t := range allThreads { 134 if t.Self.AtUri().String() == parentAtUri { 135 recipients = append(recipients, t.Participants()...) 136 } 137 } 138 } else { 139 // not a reply, notify just the issue author 140 recipients = append(recipients, syntax.DID(issue.Did)) 141 } 142 143 actorDid := syntax.DID(comment.Did) 144 entityType := "issue" 145 entityId := issue.AtUri().String() 146 repoId := &issue.Repo.Id 147 issueId := &issue.Id 148 var pullId *int64 149 150 n.notifyEvent( 151 actorDid, 152 recipients, 153 models.NotificationTypeIssueCommented, 154 entityType, 155 entityId, 156 repoId, 157 issueId, 158 pullId, 159 ) 160 n.notifyEvent( 161 actorDid, 162 mentions, 163 models.NotificationTypeUserMentioned, 164 entityType, 165 entityId, 166 repoId, 167 issueId, 168 pullId, 169 ) 170} 171 172func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 173 // no-op for now 174} 175 176func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 177 actorDid := syntax.DID(follow.UserDid) 178 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 179 eventType := models.NotificationTypeFollowed 180 entityType := "follow" 181 entityId := follow.UserDid 182 var repoId, issueId, pullId *int64 183 184 n.notifyEvent( 185 actorDid, 186 recipients, 187 eventType, 188 entityType, 189 entityId, 190 repoId, 191 issueId, 192 pullId, 193 ) 194} 195 196func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 197 // no-op 198} 199 200func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 201 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 202 if err != nil { 203 log.Printf("NewPull: failed to get repos: %v", err) 204 return 205 } 206 207 // build the recipients list 208 // - owner of the repo 209 // - collaborators in the repo 210 var recipients []syntax.DID 211 recipients = append(recipients, syntax.DID(repo.Did)) 212 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 213 if err != nil { 214 log.Printf("failed to fetch collaborators: %v", err) 215 return 216 } 217 for _, c := range collaborators { 218 recipients = append(recipients, c.SubjectDid) 219 } 220 221 actorDid := syntax.DID(pull.OwnerDid) 222 eventType := models.NotificationTypePullCreated 223 entityType := "pull" 224 entityId := pull.AtUri().String() 225 repoId := &repo.Id 226 var issueId *int64 227 p := int64(pull.ID) 228 pullId := &p 229 230 n.notifyEvent( 231 actorDid, 232 recipients, 233 eventType, 234 entityType, 235 entityId, 236 repoId, 237 issueId, 238 pullId, 239 ) 240} 241 242func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 243 pull, err := db.GetPull(n.db, 244 syntax.ATURI(comment.RepoAt), 245 comment.PullId, 246 ) 247 if err != nil { 248 log.Printf("NewPullComment: failed to get pulls: %v", err) 249 return 250 } 251 252 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 253 if err != nil { 254 log.Printf("NewPullComment: failed to get repos: %v", err) 255 return 256 } 257 258 // build up the recipients list: 259 // - repo owner 260 // - all pull participants 261 var recipients []syntax.DID 262 recipients = append(recipients, syntax.DID(repo.Did)) 263 for _, p := range pull.Participants() { 264 recipients = append(recipients, syntax.DID(p)) 265 } 266 267 actorDid := syntax.DID(comment.OwnerDid) 268 eventType := models.NotificationTypePullCommented 269 entityType := "pull" 270 entityId := pull.AtUri().String() 271 repoId := &repo.Id 272 var issueId *int64 273 p := int64(pull.ID) 274 pullId := &p 275 276 n.notifyEvent( 277 actorDid, 278 recipients, 279 eventType, 280 entityType, 281 entityId, 282 repoId, 283 issueId, 284 pullId, 285 ) 286 n.notifyEvent( 287 actorDid, 288 mentions, 289 models.NotificationTypeUserMentioned, 290 entityType, 291 entityId, 292 repoId, 293 issueId, 294 pullId, 295 ) 296} 297 298func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 299 // no-op 300} 301 302func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 303 // no-op 304} 305 306func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 307 // no-op 308} 309 310func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 311 // no-op 312} 313 314func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 315 // build up the recipients list: 316 // - repo owner 317 // - repo collaborators 318 // - all issue participants 319 var recipients []syntax.DID 320 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 321 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 322 if err != nil { 323 log.Printf("failed to fetch collaborators: %v", err) 324 return 325 } 326 for _, c := range collaborators { 327 recipients = append(recipients, c.SubjectDid) 328 } 329 for _, p := range issue.Participants() { 330 recipients = append(recipients, syntax.DID(p)) 331 } 332 333 entityType := "pull" 334 entityId := issue.AtUri().String() 335 repoId := &issue.Repo.Id 336 issueId := &issue.Id 337 var pullId *int64 338 var eventType models.NotificationType 339 340 if issue.Open { 341 eventType = models.NotificationTypeIssueReopen 342 } else { 343 eventType = models.NotificationTypeIssueClosed 344 } 345 346 n.notifyEvent( 347 actor, 348 recipients, 349 eventType, 350 entityType, 351 entityId, 352 repoId, 353 issueId, 354 pullId, 355 ) 356} 357 358func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 359 // Get repo details 360 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 361 if err != nil { 362 log.Printf("NewPullState: failed to get repos: %v", err) 363 return 364 } 365 366 // build up the recipients list: 367 // - repo owner 368 // - all pull participants 369 var recipients []syntax.DID 370 recipients = append(recipients, syntax.DID(repo.Did)) 371 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 372 if err != nil { 373 log.Printf("failed to fetch collaborators: %v", err) 374 return 375 } 376 for _, c := range collaborators { 377 recipients = append(recipients, c.SubjectDid) 378 } 379 for _, p := range pull.Participants() { 380 recipients = append(recipients, syntax.DID(p)) 381 } 382 383 entityType := "pull" 384 entityId := pull.AtUri().String() 385 repoId := &repo.Id 386 var issueId *int64 387 var eventType models.NotificationType 388 switch pull.State { 389 case models.PullClosed: 390 eventType = models.NotificationTypePullClosed 391 case models.PullOpen: 392 eventType = models.NotificationTypePullReopen 393 case models.PullMerged: 394 eventType = models.NotificationTypePullMerged 395 default: 396 log.Println("NewPullState: unexpected new PR state:", pull.State) 397 return 398 } 399 p := int64(pull.ID) 400 pullId := &p 401 402 n.notifyEvent( 403 actor, 404 recipients, 405 eventType, 406 entityType, 407 entityId, 408 repoId, 409 issueId, 410 pullId, 411 ) 412} 413 414func (n *databaseNotifier) notifyEvent( 415 actorDid syntax.DID, 416 recipients []syntax.DID, 417 eventType models.NotificationType, 418 entityType string, 419 entityId string, 420 repoId *int64, 421 issueId *int64, 422 pullId *int64, 423) { 424 recipientSet := make(map[syntax.DID]struct{}) 425 for _, did := range recipients { 426 // everybody except actor themselves 427 if did != actorDid { 428 recipientSet[did] = struct{}{} 429 } 430 } 431 432 prefMap, err := db.GetNotificationPreferences( 433 n.db, 434 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 435 ) 436 if err != nil { 437 // failed to get prefs for users 438 return 439 } 440 441 // create a transaction for bulk notification storage 442 tx, err := n.db.Begin() 443 if err != nil { 444 // failed to start tx 445 return 446 } 447 defer tx.Rollback() 448 449 // filter based on preferences 450 for recipientDid := range recipientSet { 451 prefs, ok := prefMap[recipientDid] 452 if !ok { 453 prefs = models.DefaultNotificationPreferences(recipientDid) 454 } 455 456 // skip users who don’t want this type 457 if !prefs.ShouldNotify(eventType) { 458 continue 459 } 460 461 // create notification 462 notif := &models.Notification{ 463 RecipientDid: recipientDid.String(), 464 ActorDid: actorDid.String(), 465 Type: eventType, 466 EntityType: entityType, 467 EntityId: entityId, 468 RepoId: repoId, 469 IssueId: issueId, 470 PullId: pullId, 471 } 472 473 if err := db.CreateNotification(tx, notif); err != nil { 474 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 475 } 476 } 477 478 if err := tx.Commit(); err != nil { 479 // failed to commit 480 return 481 } 482}