this repo has no description
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14) 15 16type databaseNotifier struct { 17 db *db.DB 18 res *idresolver.Resolver 19} 20 21func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 22 return &databaseNotifier{ 23 db: database, 24 res: resolver, 25 } 26} 27 28var _ notify.Notifier = &databaseNotifier{} 29 30func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 31 // no-op for now 32} 33 34func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 35 var err error 36 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(star.RepoAt))) 37 if err != nil { 38 log.Printf("NewStar: failed to get repos: %v", err) 39 return 40 } 41 42 actorDid := syntax.DID(star.StarredByDid) 43 recipients := []syntax.DID{syntax.DID(repo.Did)} 44 eventType := models.NotificationTypeRepoStarred 45 entityType := "repo" 46 entityId := star.RepoAt.String() 47 repoId := &repo.Id 48 var issueId *int64 49 var pullId *int64 50 51 n.notifyEvent( 52 actorDid, 53 recipients, 54 eventType, 55 entityType, 56 entityId, 57 repoId, 58 issueId, 59 pullId, 60 ) 61} 62 63func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 64 // no-op 65} 66 67func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 68 69 // build the recipients list 70 // - owner of the repo 71 // - collaborators in the repo 72 var recipients []syntax.DID 73 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 74 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 75 if err != nil { 76 log.Printf("failed to fetch collaborators: %v", err) 77 return 78 } 79 for _, c := range collaborators { 80 recipients = append(recipients, c.SubjectDid) 81 } 82 83 actorDid := syntax.DID(issue.Did) 84 entityType := "issue" 85 entityId := issue.AtUri().String() 86 repoId := &issue.Repo.Id 87 issueId := &issue.Id 88 var pullId *int64 89 90 n.notifyEvent( 91 actorDid, 92 recipients, 93 models.NotificationTypeIssueCreated, 94 entityType, 95 entityId, 96 repoId, 97 issueId, 98 pullId, 99 ) 100 n.notifyEvent( 101 actorDid, 102 mentions, 103 models.NotificationTypeUserMentioned, 104 entityType, 105 entityId, 106 repoId, 107 issueId, 108 pullId, 109 ) 110} 111 112func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 113 issues, err := db.GetIssues(n.db, db.FilterEq("at_uri", comment.IssueAt)) 114 if err != nil { 115 log.Printf("NewIssueComment: failed to get issues: %v", err) 116 return 117 } 118 if len(issues) == 0 { 119 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 120 return 121 } 122 issue := issues[0] 123 124 var recipients []syntax.DID 125 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 126 127 if comment.IsReply() { 128 // if this comment is a reply, then notify everybody in that thread 129 parentAtUri := *comment.ReplyTo 130 allThreads := issue.CommentList() 131 132 // find the parent thread, and add all DIDs from here to the recipient list 133 for _, t := range allThreads { 134 if t.Self.AtUri().String() == parentAtUri { 135 recipients = append(recipients, t.Participants()...) 136 } 137 } 138 } else { 139 // not a reply, notify just the issue author 140 recipients = append(recipients, syntax.DID(issue.Did)) 141 } 142 143 actorDid := syntax.DID(comment.Did) 144 entityType := "issue" 145 entityId := issue.AtUri().String() 146 repoId := &issue.Repo.Id 147 issueId := &issue.Id 148 var pullId *int64 149 150 n.notifyEvent( 151 actorDid, 152 recipients, 153 models.NotificationTypeIssueCommented, 154 entityType, 155 entityId, 156 repoId, 157 issueId, 158 pullId, 159 ) 160 n.notifyEvent( 161 actorDid, 162 mentions, 163 models.NotificationTypeUserMentioned, 164 entityType, 165 entityId, 166 repoId, 167 issueId, 168 pullId, 169 ) 170} 171 172func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 173 // no-op for now 174} 175 176func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 177 actorDid := syntax.DID(follow.UserDid) 178 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 179 eventType := models.NotificationTypeFollowed 180 entityType := "follow" 181 entityId := follow.UserDid 182 var repoId, issueId, pullId *int64 183 184 n.notifyEvent( 185 actorDid, 186 recipients, 187 eventType, 188 entityType, 189 entityId, 190 repoId, 191 issueId, 192 pullId, 193 ) 194} 195 196func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 197 // no-op 198} 199 200func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 201 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 202 if err != nil { 203 log.Printf("NewPull: failed to get repos: %v", err) 204 return 205 } 206 207 // build the recipients list 208 // - owner of the repo 209 // - collaborators in the repo 210 var recipients []syntax.DID 211 recipients = append(recipients, syntax.DID(repo.Did)) 212 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 213 if err != nil { 214 log.Printf("failed to fetch collaborators: %v", err) 215 return 216 } 217 for _, c := range collaborators { 218 recipients = append(recipients, c.SubjectDid) 219 } 220 221 actorDid := syntax.DID(pull.OwnerDid) 222 eventType := models.NotificationTypePullCreated 223 entityType := "pull" 224 entityId := pull.AtUri().String() 225 repoId := &repo.Id 226 var issueId *int64 227 p := int64(pull.ID) 228 pullId := &p 229 230 n.notifyEvent( 231 actorDid, 232 recipients, 233 eventType, 234 entityType, 235 entityId, 236 repoId, 237 issueId, 238 pullId, 239 ) 240} 241 242func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment) { 243 pull, err := db.GetPull(n.db, 244 syntax.ATURI(comment.RepoAt), 245 comment.PullId, 246 ) 247 if err != nil { 248 log.Printf("NewPullComment: failed to get pulls: %v", err) 249 return 250 } 251 252 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", comment.RepoAt)) 253 if err != nil { 254 log.Printf("NewPullComment: failed to get repos: %v", err) 255 return 256 } 257 258 // build up the recipients list: 259 // - repo owner 260 // - all pull participants 261 var recipients []syntax.DID 262 recipients = append(recipients, syntax.DID(repo.Did)) 263 for _, p := range pull.Participants() { 264 recipients = append(recipients, syntax.DID(p)) 265 } 266 267 actorDid := syntax.DID(comment.OwnerDid) 268 eventType := models.NotificationTypePullCommented 269 entityType := "pull" 270 entityId := pull.AtUri().String() 271 repoId := &repo.Id 272 var issueId *int64 273 p := int64(pull.ID) 274 pullId := &p 275 276 n.notifyEvent( 277 actorDid, 278 recipients, 279 eventType, 280 entityType, 281 entityId, 282 repoId, 283 issueId, 284 pullId, 285 ) 286} 287 288func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 289 // no-op 290} 291 292func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 293 // no-op 294} 295 296func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 297 // no-op 298} 299 300func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 301 // no-op 302} 303 304func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 305 // build up the recipients list: 306 // - repo owner 307 // - repo collaborators 308 // - all issue participants 309 var recipients []syntax.DID 310 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 311 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", issue.Repo.RepoAt())) 312 if err != nil { 313 log.Printf("failed to fetch collaborators: %v", err) 314 return 315 } 316 for _, c := range collaborators { 317 recipients = append(recipients, c.SubjectDid) 318 } 319 for _, p := range issue.Participants() { 320 recipients = append(recipients, syntax.DID(p)) 321 } 322 323 entityType := "pull" 324 entityId := issue.AtUri().String() 325 repoId := &issue.Repo.Id 326 issueId := &issue.Id 327 var pullId *int64 328 var eventType models.NotificationType 329 330 if issue.Open { 331 eventType = models.NotificationTypeIssueReopen 332 } else { 333 eventType = models.NotificationTypeIssueClosed 334 } 335 336 n.notifyEvent( 337 actor, 338 recipients, 339 eventType, 340 entityType, 341 entityId, 342 repoId, 343 issueId, 344 pullId, 345 ) 346} 347 348func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 349 // Get repo details 350 repo, err := db.GetRepo(n.db, db.FilterEq("at_uri", string(pull.RepoAt))) 351 if err != nil { 352 log.Printf("NewPullState: failed to get repos: %v", err) 353 return 354 } 355 356 // build up the recipients list: 357 // - repo owner 358 // - all pull participants 359 var recipients []syntax.DID 360 recipients = append(recipients, syntax.DID(repo.Did)) 361 collaborators, err := db.GetCollaborators(n.db, db.FilterEq("repo_at", repo.RepoAt())) 362 if err != nil { 363 log.Printf("failed to fetch collaborators: %v", err) 364 return 365 } 366 for _, c := range collaborators { 367 recipients = append(recipients, c.SubjectDid) 368 } 369 for _, p := range pull.Participants() { 370 recipients = append(recipients, syntax.DID(p)) 371 } 372 373 entityType := "pull" 374 entityId := pull.AtUri().String() 375 repoId := &repo.Id 376 var issueId *int64 377 var eventType models.NotificationType 378 switch pull.State { 379 case models.PullClosed: 380 eventType = models.NotificationTypePullClosed 381 case models.PullOpen: 382 eventType = models.NotificationTypePullReopen 383 case models.PullMerged: 384 eventType = models.NotificationTypePullMerged 385 default: 386 log.Println("NewPullState: unexpected new PR state:", pull.State) 387 return 388 } 389 p := int64(pull.ID) 390 pullId := &p 391 392 n.notifyEvent( 393 actor, 394 recipients, 395 eventType, 396 entityType, 397 entityId, 398 repoId, 399 issueId, 400 pullId, 401 ) 402} 403 404func (n *databaseNotifier) notifyEvent( 405 actorDid syntax.DID, 406 recipients []syntax.DID, 407 eventType models.NotificationType, 408 entityType string, 409 entityId string, 410 repoId *int64, 411 issueId *int64, 412 pullId *int64, 413) { 414 recipientSet := make(map[syntax.DID]struct{}) 415 for _, did := range recipients { 416 // everybody except actor themselves 417 if did != actorDid { 418 recipientSet[did] = struct{}{} 419 } 420 } 421 422 prefMap, err := db.GetNotificationPreferences( 423 n.db, 424 db.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 425 ) 426 if err != nil { 427 // failed to get prefs for users 428 return 429 } 430 431 // create a transaction for bulk notification storage 432 tx, err := n.db.Begin() 433 if err != nil { 434 // failed to start tx 435 return 436 } 437 defer tx.Rollback() 438 439 // filter based on preferences 440 for recipientDid := range recipientSet { 441 prefs, ok := prefMap[recipientDid] 442 if !ok { 443 prefs = models.DefaultNotificationPreferences(recipientDid) 444 } 445 446 // skip users who don’t want this type 447 if !prefs.ShouldNotify(eventType) { 448 continue 449 } 450 451 // create notification 452 notif := &models.Notification{ 453 RecipientDid: recipientDid.String(), 454 ActorDid: actorDid.String(), 455 Type: eventType, 456 EntityType: entityType, 457 EntityId: entityId, 458 RepoId: repoId, 459 IssueId: issueId, 460 PullId: pullId, 461 } 462 463 if err := db.CreateNotification(tx, notif); err != nil { 464 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 465 } 466 } 467 468 if err := tx.Commit(); err != nil { 469 // failed to commit 470 return 471 } 472}