Monorepo for Tangled
at 67c0e9bc7d93ce141284b0554feacb3c1d57f6d0 537 lines 12 kB view raw
1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 l := log.FromContext(ctx) 42 43 if star.RepoAt.Collection().String() != tangled.RepoNSID { 44 // skip string stars for now 45 return 46 } 47 var err error 48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 49 if err != nil { 50 l.Error("failed to get repos", "err", err) 51 return 52 } 53 54 actorDid := syntax.DID(star.Did) 55 recipients := sets.Singleton(syntax.DID(repo.Did)) 56 eventType := models.NotificationTypeRepoStarred 57 entityType := "repo" 58 entityId := star.RepoAt.String() 59 repoId := &repo.Id 60 var issueId *int64 61 var pullId *int64 62 63 n.notifyEvent( 64 ctx, 65 actorDid, 66 recipients, 67 eventType, 68 entityType, 69 entityId, 70 repoId, 71 issueId, 72 pullId, 73 ) 74} 75 76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 77 // no-op 78} 79 80func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 81 l := log.FromContext(ctx) 82 83 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 84 if err != nil { 85 l.Error("failed to fetch collaborators", "err", err) 86 return 87 } 88 89 // build the recipients list 90 // - owner of the repo 91 // - collaborators in the repo 92 // - remove users already mentioned 93 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 94 for _, c := range collaborators { 95 recipients.Insert(c.SubjectDid) 96 } 97 for _, m := range mentions { 98 recipients.Remove(m) 99 } 100 101 actorDid := syntax.DID(issue.Did) 102 entityType := "issue" 103 entityId := issue.AtUri().String() 104 repoId := &issue.Repo.Id 105 issueId := &issue.Id 106 var pullId *int64 107 108 n.notifyEvent( 109 ctx, 110 actorDid, 111 recipients, 112 models.NotificationTypeIssueCreated, 113 entityType, 114 entityId, 115 repoId, 116 issueId, 117 pullId, 118 ) 119 n.notifyEvent( 120 ctx, 121 actorDid, 122 sets.Collect(slices.Values(mentions)), 123 models.NotificationTypeUserMentioned, 124 entityType, 125 entityId, 126 repoId, 127 issueId, 128 pullId, 129 ) 130} 131 132func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.IssueComment, mentions []syntax.DID) { 133 l := log.FromContext(ctx) 134 135 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.IssueAt)) 136 if err != nil { 137 l.Error("failed to get issues", "err", err) 138 return 139 } 140 if len(issues) == 0 { 141 l.Error("no issue found for", "err", comment.IssueAt) 142 return 143 } 144 issue := issues[0] 145 146 // built the recipients list: 147 // - the owner of the repo 148 // - | if the comment is a reply -> everybody on that thread 149 // | if the comment is a top level -> just the issue owner 150 // - remove mentioned users from the recipients list 151 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 152 153 if comment.IsReply() { 154 // if this comment is a reply, then notify everybody in that thread 155 parentAtUri := *comment.ReplyTo 156 157 // find the parent thread, and add all DIDs from here to the recipient list 158 for _, t := range issue.CommentList() { 159 if t.Self.AtUri().String() == parentAtUri { 160 for _, p := range t.Participants() { 161 recipients.Insert(p) 162 } 163 } 164 } 165 } else { 166 // not a reply, notify just the issue author 167 recipients.Insert(syntax.DID(issue.Did)) 168 } 169 170 for _, m := range mentions { 171 recipients.Remove(m) 172 } 173 174 actorDid := syntax.DID(comment.Did) 175 entityType := "issue" 176 entityId := issue.AtUri().String() 177 repoId := &issue.Repo.Id 178 issueId := &issue.Id 179 var pullId *int64 180 181 n.notifyEvent( 182 ctx, 183 actorDid, 184 recipients, 185 models.NotificationTypeIssueCommented, 186 entityType, 187 entityId, 188 repoId, 189 issueId, 190 pullId, 191 ) 192 n.notifyEvent( 193 ctx, 194 actorDid, 195 sets.Collect(slices.Values(mentions)), 196 models.NotificationTypeUserMentioned, 197 entityType, 198 entityId, 199 repoId, 200 issueId, 201 pullId, 202 ) 203} 204 205func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 206 // no-op for now 207} 208 209func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 210 actorDid := syntax.DID(follow.UserDid) 211 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 212 eventType := models.NotificationTypeFollowed 213 entityType := "follow" 214 entityId := follow.UserDid 215 var repoId, issueId, pullId *int64 216 217 n.notifyEvent( 218 ctx, 219 actorDid, 220 recipients, 221 eventType, 222 entityType, 223 entityId, 224 repoId, 225 issueId, 226 pullId, 227 ) 228} 229 230func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 231 // no-op 232} 233 234func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 235 l := log.FromContext(ctx) 236 237 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 238 if err != nil { 239 l.Error("failed to get repos", "err", err) 240 return 241 } 242 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 243 if err != nil { 244 l.Error("failed to fetch collaborators", "err", err) 245 return 246 } 247 248 // build the recipients list 249 // - owner of the repo 250 // - collaborators in the repo 251 recipients := sets.Singleton(syntax.DID(repo.Did)) 252 for _, c := range collaborators { 253 recipients.Insert(c.SubjectDid) 254 } 255 256 actorDid := syntax.DID(pull.OwnerDid) 257 eventType := models.NotificationTypePullCreated 258 entityType := "pull" 259 entityId := pull.AtUri().String() 260 repoId := &repo.Id 261 var issueId *int64 262 p := int64(pull.ID) 263 pullId := &p 264 265 n.notifyEvent( 266 ctx, 267 actorDid, 268 recipients, 269 eventType, 270 entityType, 271 entityId, 272 repoId, 273 issueId, 274 pullId, 275 ) 276} 277 278func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 279 l := log.FromContext(ctx) 280 281 pull, err := db.GetPull(n.db, 282 syntax.ATURI(comment.RepoAt), 283 comment.PullId, 284 ) 285 if err != nil { 286 l.Error("failed to get pulls", "err", err) 287 return 288 } 289 290 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt)) 291 if err != nil { 292 l.Error("failed to get repos", "err", err) 293 return 294 } 295 296 // build up the recipients list: 297 // - repo owner 298 // - all pull participants 299 // - remove those already mentioned 300 recipients := sets.Singleton(syntax.DID(repo.Did)) 301 for _, p := range pull.Participants() { 302 recipients.Insert(syntax.DID(p)) 303 } 304 for _, m := range mentions { 305 recipients.Remove(m) 306 } 307 308 actorDid := syntax.DID(comment.OwnerDid) 309 eventType := models.NotificationTypePullCommented 310 entityType := "pull" 311 entityId := pull.AtUri().String() 312 repoId := &repo.Id 313 var issueId *int64 314 p := int64(pull.ID) 315 pullId := &p 316 317 n.notifyEvent( 318 ctx, 319 actorDid, 320 recipients, 321 eventType, 322 entityType, 323 entityId, 324 repoId, 325 issueId, 326 pullId, 327 ) 328 n.notifyEvent( 329 ctx, 330 actorDid, 331 sets.Collect(slices.Values(mentions)), 332 models.NotificationTypeUserMentioned, 333 entityType, 334 entityId, 335 repoId, 336 issueId, 337 pullId, 338 ) 339} 340 341func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 342 // no-op 343} 344 345func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 346 // no-op 347} 348 349func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 350 // no-op 351} 352 353func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 354 // no-op 355} 356 357func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 358 // no-op for now; webhooks are handled by the webhook notifier 359} 360 361func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 362 l := log.FromContext(ctx) 363 364 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 365 if err != nil { 366 l.Error("failed to fetch collaborators", "err", err) 367 return 368 } 369 370 // build up the recipients list: 371 // - repo owner 372 // - repo collaborators 373 // - all issue participants 374 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 375 for _, c := range collaborators { 376 recipients.Insert(c.SubjectDid) 377 } 378 for _, p := range issue.Participants() { 379 recipients.Insert(syntax.DID(p)) 380 } 381 382 entityType := "issue" 383 entityId := issue.AtUri().String() 384 repoId := &issue.Repo.Id 385 issueId := &issue.Id 386 var pullId *int64 387 var eventType models.NotificationType 388 389 if issue.Open { 390 eventType = models.NotificationTypeIssueReopen 391 } else { 392 eventType = models.NotificationTypeIssueClosed 393 } 394 395 n.notifyEvent( 396 ctx, 397 actor, 398 recipients, 399 eventType, 400 entityType, 401 entityId, 402 repoId, 403 issueId, 404 pullId, 405 ) 406} 407 408func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 409 l := log.FromContext(ctx) 410 411 // Get repo details 412 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 413 if err != nil { 414 l.Error("failed to get repos", "err", err) 415 return 416 } 417 418 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 419 if err != nil { 420 l.Error("failed to fetch collaborators", "err", err) 421 return 422 } 423 424 // build up the recipients list: 425 // - repo owner 426 // - all pull participants 427 recipients := sets.Singleton(syntax.DID(repo.Did)) 428 for _, c := range collaborators { 429 recipients.Insert(c.SubjectDid) 430 } 431 for _, p := range pull.Participants() { 432 recipients.Insert(syntax.DID(p)) 433 } 434 435 entityType := "pull" 436 entityId := pull.AtUri().String() 437 repoId := &repo.Id 438 var issueId *int64 439 var eventType models.NotificationType 440 switch pull.State { 441 case models.PullClosed: 442 eventType = models.NotificationTypePullClosed 443 case models.PullOpen: 444 eventType = models.NotificationTypePullReopen 445 case models.PullMerged: 446 eventType = models.NotificationTypePullMerged 447 default: 448 l.Error("unexpected new PR state", "state", pull.State) 449 return 450 } 451 p := int64(pull.ID) 452 pullId := &p 453 454 n.notifyEvent( 455 ctx, 456 actor, 457 recipients, 458 eventType, 459 entityType, 460 entityId, 461 repoId, 462 issueId, 463 pullId, 464 ) 465} 466 467func (n *databaseNotifier) notifyEvent( 468 ctx context.Context, 469 actorDid syntax.DID, 470 recipients sets.Set[syntax.DID], 471 eventType models.NotificationType, 472 entityType string, 473 entityId string, 474 repoId *int64, 475 issueId *int64, 476 pullId *int64, 477) { 478 l := log.FromContext(ctx) 479 480 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 481 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 482 return 483 } 484 485 recipients.Remove(actorDid) 486 487 prefMap, err := db.GetNotificationPreferences( 488 n.db, 489 orm.FilterIn("user_did", slices.Collect(recipients.All())), 490 ) 491 if err != nil { 492 // failed to get prefs for users 493 return 494 } 495 496 // create a transaction for bulk notification storage 497 tx, err := n.db.Begin() 498 if err != nil { 499 // failed to start tx 500 return 501 } 502 defer tx.Rollback() 503 504 // filter based on preferences 505 for recipientDid := range recipients.All() { 506 prefs, ok := prefMap[recipientDid] 507 if !ok { 508 prefs = models.DefaultNotificationPreferences(recipientDid) 509 } 510 511 // skip users who don’t want this type 512 if !prefs.ShouldNotify(eventType) { 513 continue 514 } 515 516 // create notification 517 notif := &models.Notification{ 518 RecipientDid: recipientDid.String(), 519 ActorDid: actorDid.String(), 520 Type: eventType, 521 EntityType: entityType, 522 EntityId: entityId, 523 RepoId: repoId, 524 IssueId: issueId, 525 PullId: pullId, 526 } 527 528 if err := db.CreateNotification(tx, notif); err != nil { 529 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 530 } 531 } 532 533 if err := tx.Commit(); err != nil { 534 // failed to commit 535 return 536 } 537}