this repo has no description
1package db 2 3import ( 4 "context" 5 "log" 6 "maps" 7 "slices" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/appview/db" 12 "tangled.org/core/appview/models" 13 "tangled.org/core/appview/notify" 14 "tangled.org/core/idresolver" 15 "tangled.org/core/orm" 16) 17 18const ( 19 maxMentions = 5 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 if star.RepoAt.Collection().String() != tangled.RepoNSID { 42 // skip string stars for now 43 return 44 } 45 var err error 46 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 47 if err != nil { 48 log.Printf("NewStar: failed to get repos: %v", err) 49 return 50 } 51 52 actorDid := syntax.DID(star.Did) 53 recipients := []syntax.DID{syntax.DID(repo.Did)} 54 eventType := models.NotificationTypeRepoStarred 55 entityType := "repo" 56 entityId := star.RepoAt.String() 57 repoId := &repo.Id 58 var issueId *int64 59 var pullId *int64 60 61 n.notifyEvent( 62 actorDid, 63 recipients, 64 eventType, 65 entityType, 66 entityId, 67 repoId, 68 issueId, 69 pullId, 70 ) 71} 72 73func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 74 // no-op 75} 76 77func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 78 79 // build the recipients list 80 // - owner of the repo 81 // - collaborators in the repo 82 var recipients []syntax.DID 83 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 84 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 85 if err != nil { 86 log.Printf("failed to fetch collaborators: %v", err) 87 return 88 } 89 for _, c := range collaborators { 90 recipients = append(recipients, c.SubjectDid) 91 } 92 93 actorDid := syntax.DID(issue.Did) 94 entityType := "issue" 95 entityId := issue.AtUri().String() 96 repoId := &issue.Repo.Id 97 issueId := &issue.Id 98 var pullId *int64 99 100 n.notifyEvent( 101 actorDid, 102 recipients, 103 models.NotificationTypeIssueCreated, 104 entityType, 105 entityId, 106 repoId, 107 issueId, 108 pullId, 109 ) 110 n.notifyEvent( 111 actorDid, 112 mentions, 113 models.NotificationTypeUserMentioned, 114 entityType, 115 entityId, 116 repoId, 117 issueId, 118 pullId, 119 ) 120} 121 122func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 123 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt)) 124 if err != nil { 125 log.Printf("NewIssueComment: failed to get issues: %v", err) 126 return 127 } 128 if len(issues) == 0 { 129 log.Printf("NewIssueComment: no issue found for %s", comment.IssueAt) 130 return 131 } 132 issue := issues[0] 133 134 var recipients []syntax.DID 135 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 136 137 if comment.IsReply() { 138 // if this comment is a reply, then notify everybody in that thread 139 parentAtUri := *comment.ReplyTo 140 allThreads := issue.CommentList() 141 142 // find the parent thread, and add all DIDs from here to the recipient list 143 for _, t := range allThreads { 144 if t.Self.AtUri().String() == parentAtUri { 145 recipients = append(recipients, t.Participants()...) 146 } 147 } 148 } else { 149 // not a reply, notify just the issue author 150 recipients = append(recipients, syntax.DID(issue.Did)) 151 } 152 153 actorDid := syntax.DID(comment.Did) 154 entityType := "issue" 155 entityId := issue.AtUri().String() 156 repoId := &issue.Repo.Id 157 issueId := &issue.Id 158 var pullId *int64 159 160 n.notifyEvent( 161 actorDid, 162 recipients, 163 models.NotificationTypeIssueCommented, 164 entityType, 165 entityId, 166 repoId, 167 issueId, 168 pullId, 169 ) 170 n.notifyEvent( 171 actorDid, 172 mentions, 173 models.NotificationTypeUserMentioned, 174 entityType, 175 entityId, 176 repoId, 177 issueId, 178 pullId, 179 ) 180} 181 182func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 183 // no-op for now 184} 185 186func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 187 actorDid := syntax.DID(follow.UserDid) 188 recipients := []syntax.DID{syntax.DID(follow.SubjectDid)} 189 eventType := models.NotificationTypeFollowed 190 entityType := "follow" 191 entityId := follow.UserDid 192 var repoId, issueId, pullId *int64 193 194 n.notifyEvent( 195 actorDid, 196 recipients, 197 eventType, 198 entityType, 199 entityId, 200 repoId, 201 issueId, 202 pullId, 203 ) 204} 205 206func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 207 // no-op 208} 209 210func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 211 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 212 if err != nil { 213 log.Printf("NewPull: failed to get repos: %v", err) 214 return 215 } 216 217 // build the recipients list 218 // - owner of the repo 219 // - collaborators in the repo 220 var recipients []syntax.DID 221 recipients = append(recipients, syntax.DID(repo.Did)) 222 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 223 if err != nil { 224 log.Printf("failed to fetch collaborators: %v", err) 225 return 226 } 227 for _, c := range collaborators { 228 recipients = append(recipients, c.SubjectDid) 229 } 230 231 actorDid := syntax.DID(pull.OwnerDid) 232 eventType := models.NotificationTypePullCreated 233 entityType := "pull" 234 entityId := pull.AtUri().String() 235 repoId := &repo.Id 236 var issueId *int64 237 p := int64(pull.ID) 238 pullId := &p 239 240 n.notifyEvent( 241 actorDid, 242 recipients, 243 eventType, 244 entityType, 245 entityId, 246 repoId, 247 issueId, 248 pullId, 249 ) 250} 251 252func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 253 pull, err := db.GetPull(n.db, 254 syntax.ATURI(comment.RepoAt), 255 comment.PullId, 256 ) 257 if err != nil { 258 log.Printf("NewPullComment: failed to get pulls: %v", err) 259 return 260 } 261 262 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt)) 263 if err != nil { 264 log.Printf("NewPullComment: failed to get repos: %v", err) 265 return 266 } 267 268 // build up the recipients list: 269 // - repo owner 270 // - all pull participants 271 var recipients []syntax.DID 272 recipients = append(recipients, syntax.DID(repo.Did)) 273 for _, p := range pull.Participants() { 274 recipients = append(recipients, syntax.DID(p)) 275 } 276 277 actorDid := syntax.DID(comment.OwnerDid) 278 eventType := models.NotificationTypePullCommented 279 entityType := "pull" 280 entityId := pull.AtUri().String() 281 repoId := &repo.Id 282 var issueId *int64 283 p := int64(pull.ID) 284 pullId := &p 285 286 n.notifyEvent( 287 actorDid, 288 recipients, 289 eventType, 290 entityType, 291 entityId, 292 repoId, 293 issueId, 294 pullId, 295 ) 296 n.notifyEvent( 297 actorDid, 298 mentions, 299 models.NotificationTypeUserMentioned, 300 entityType, 301 entityId, 302 repoId, 303 issueId, 304 pullId, 305 ) 306} 307 308func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 309 // no-op 310} 311 312func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 313 // no-op 314} 315 316func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 317 // no-op 318} 319 320func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 321 // no-op 322} 323 324func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 325 // build up the recipients list: 326 // - repo owner 327 // - repo collaborators 328 // - all issue participants 329 var recipients []syntax.DID 330 recipients = append(recipients, syntax.DID(issue.Repo.Did)) 331 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 332 if err != nil { 333 log.Printf("failed to fetch collaborators: %v", err) 334 return 335 } 336 for _, c := range collaborators { 337 recipients = append(recipients, c.SubjectDid) 338 } 339 for _, p := range issue.Participants() { 340 recipients = append(recipients, syntax.DID(p)) 341 } 342 343 entityType := "pull" 344 entityId := issue.AtUri().String() 345 repoId := &issue.Repo.Id 346 issueId := &issue.Id 347 var pullId *int64 348 var eventType models.NotificationType 349 350 if issue.Open { 351 eventType = models.NotificationTypeIssueReopen 352 } else { 353 eventType = models.NotificationTypeIssueClosed 354 } 355 356 n.notifyEvent( 357 actor, 358 recipients, 359 eventType, 360 entityType, 361 entityId, 362 repoId, 363 issueId, 364 pullId, 365 ) 366} 367 368func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 369 // Get repo details 370 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 371 if err != nil { 372 log.Printf("NewPullState: failed to get repos: %v", err) 373 return 374 } 375 376 // build up the recipients list: 377 // - repo owner 378 // - all pull participants 379 var recipients []syntax.DID 380 recipients = append(recipients, syntax.DID(repo.Did)) 381 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 382 if err != nil { 383 log.Printf("failed to fetch collaborators: %v", err) 384 return 385 } 386 for _, c := range collaborators { 387 recipients = append(recipients, c.SubjectDid) 388 } 389 for _, p := range pull.Participants() { 390 recipients = append(recipients, syntax.DID(p)) 391 } 392 393 entityType := "pull" 394 entityId := pull.AtUri().String() 395 repoId := &repo.Id 396 var issueId *int64 397 var eventType models.NotificationType 398 switch pull.State { 399 case models.PullClosed: 400 eventType = models.NotificationTypePullClosed 401 case models.PullOpen: 402 eventType = models.NotificationTypePullReopen 403 case models.PullMerged: 404 eventType = models.NotificationTypePullMerged 405 default: 406 log.Println("NewPullState: unexpected new PR state:", pull.State) 407 return 408 } 409 p := int64(pull.ID) 410 pullId := &p 411 412 n.notifyEvent( 413 actor, 414 recipients, 415 eventType, 416 entityType, 417 entityId, 418 repoId, 419 issueId, 420 pullId, 421 ) 422} 423 424func (n *databaseNotifier) notifyEvent( 425 actorDid syntax.DID, 426 recipients []syntax.DID, 427 eventType models.NotificationType, 428 entityType string, 429 entityId string, 430 repoId *int64, 431 issueId *int64, 432 pullId *int64, 433) { 434 if eventType == models.NotificationTypeUserMentioned && len(recipients) > maxMentions { 435 recipients = recipients[:maxMentions] 436 } 437 recipientSet := make(map[syntax.DID]struct{}) 438 for _, did := range recipients { 439 // everybody except actor themselves 440 if did != actorDid { 441 recipientSet[did] = struct{}{} 442 } 443 } 444 445 prefMap, err := db.GetNotificationPreferences( 446 n.db, 447 orm.FilterIn("user_did", slices.Collect(maps.Keys(recipientSet))), 448 ) 449 if err != nil { 450 // failed to get prefs for users 451 return 452 } 453 454 // create a transaction for bulk notification storage 455 tx, err := n.db.Begin() 456 if err != nil { 457 // failed to start tx 458 return 459 } 460 defer tx.Rollback() 461 462 // filter based on preferences 463 for recipientDid := range recipientSet { 464 prefs, ok := prefMap[recipientDid] 465 if !ok { 466 prefs = models.DefaultNotificationPreferences(recipientDid) 467 } 468 469 // skip users who don’t want this type 470 if !prefs.ShouldNotify(eventType) { 471 continue 472 } 473 474 // create notification 475 notif := &models.Notification{ 476 RecipientDid: recipientDid.String(), 477 ActorDid: actorDid.String(), 478 Type: eventType, 479 EntityType: entityType, 480 EntityId: entityId, 481 RepoId: repoId, 482 IssueId: issueId, 483 PullId: pullId, 484 } 485 486 if err := db.CreateNotification(tx, notif); err != nil { 487 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 488 } 489 } 490 491 if err := tx.Commit(); err != nil { 492 // failed to commit 493 return 494 } 495}