Monorepo for Tangled
at sl/shared-stacks 502 lines 12 kB view raw
1package db 2 3import ( 4 "context" 5 "log" 6 "slices" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "tangled.org/core/api/tangled" 10 "tangled.org/core/appview/db" 11 "tangled.org/core/appview/models" 12 "tangled.org/core/appview/notify" 13 "tangled.org/core/idresolver" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 if star.RepoAt.Collection().String() != tangled.RepoNSID { 42 // skip string stars for now 43 return 44 } 45 var err error 46 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 47 if err != nil { 48 log.Printf("NewStar: failed to get repos: %v", err) 49 return 50 } 51 52 actorDid := syntax.DID(star.Did) 53 recipients := sets.Singleton(syntax.DID(repo.Did)) 54 eventType := models.NotificationTypeRepoStarred 55 entityType := "repo" 56 entityId := star.RepoAt.String() 57 repoId := &repo.Id 58 var issueId *int64 59 var pullId *int64 60 61 n.notifyEvent( 62 actorDid, 63 recipients, 64 eventType, 65 entityType, 66 entityId, 67 repoId, 68 issueId, 69 pullId, 70 ) 71} 72 73func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 74 // no-op 75} 76 77func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) { 78 var ( 79 // built the recipients list: 80 // - the owner of the repo 81 // - | if the comment is a reply -> everybody on that thread 82 // | if the comment is a top level -> just the issue owner 83 // - remove mentioned users from the recipients list 84 recipients = sets.New[syntax.DID]() 85 entityType string 86 entityId string 87 repoId *int64 88 issueId *int64 89 pullId *int64 90 ) 91 92 subjectDid, err := comment.Subject.Authority().AsDID() 93 if err != nil { 94 log.Printf("NewComment: expected did based at-uri for comment.subject") 95 return 96 } 97 switch comment.Subject.Collection() { 98 case tangled.RepoIssueNSID: 99 issues, err := db.GetIssues( 100 n.db, 101 orm.FilterEq("did", subjectDid), 102 orm.FilterEq("rkey", comment.Subject.RecordKey()), 103 ) 104 if err != nil { 105 log.Printf("NewComment: failed to get issues: %v", err) 106 return 107 } 108 if len(issues) == 0 { 109 log.Printf("NewComment: no issue found for %s", comment.Subject) 110 return 111 } 112 issue := issues[0] 113 114 recipients.Insert(syntax.DID(issue.Repo.Did)) 115 if comment.IsReply() { 116 // if this comment is a reply, then notify everybody in that thread 117 parentAtUri := *comment.ReplyTo 118 119 // find the parent thread, and add all DIDs from here to the recipient list 120 for _, t := range issue.CommentList() { 121 if t.Self.AtUri() == parentAtUri { 122 for _, p := range t.Participants() { 123 recipients.Insert(p) 124 } 125 } 126 } 127 } else { 128 // not a reply, notify just the issue author 129 recipients.Insert(syntax.DID(issue.Did)) 130 } 131 132 entityType = "issue" 133 entityId = issue.AtUri().String() 134 repoId = &issue.Repo.Id 135 issueId = &issue.Id 136 case tangled.RepoPullNSID: 137 pulls, err := db.GetPulls( 138 n.db, 139 orm.FilterEq("owner_did", subjectDid), 140 orm.FilterEq("rkey", comment.Subject.RecordKey()), 141 ) 142 if err != nil { 143 log.Printf("NewComment: failed to get pulls: %v", err) 144 return 145 } 146 if len(pulls) == 0 { 147 log.Printf("NewComment: no pull found for %s", comment.Subject) 148 return 149 } 150 pull := pulls[0] 151 152 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 153 if err != nil { 154 log.Printf("NewComment: failed to get repos: %v", err) 155 return 156 } 157 158 recipients.Insert(syntax.DID(pull.Repo.Did)) 159 for _, p := range pull.Participants() { 160 recipients.Insert(syntax.DID(p)) 161 } 162 163 entityType = "pull" 164 entityId = pull.AtUri().String() 165 repoId = &pull.Repo.Id 166 p := int64(pull.ID) 167 pullId = &p 168 default: 169 return // no-op 170 } 171 172 for _, m := range comment.Mentions { 173 recipients.Remove(m) 174 } 175 176 n.notifyEvent( 177 comment.Did, 178 recipients, 179 models.NotificationTypeIssueCommented, 180 entityType, 181 entityId, 182 repoId, 183 issueId, 184 pullId, 185 ) 186 n.notifyEvent( 187 comment.Did, 188 sets.Collect(slices.Values(comment.Mentions)), 189 models.NotificationTypeUserMentioned, 190 entityType, 191 entityId, 192 repoId, 193 issueId, 194 pullId, 195 ) 196} 197 198func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 199 // no-op 200} 201 202func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 203 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 204 if err != nil { 205 log.Printf("failed to fetch collaborators: %v", err) 206 return 207 } 208 209 // build the recipients list 210 // - owner of the repo 211 // - collaborators in the repo 212 // - remove users already mentioned 213 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 214 for _, c := range collaborators { 215 recipients.Insert(c.SubjectDid) 216 } 217 for _, m := range mentions { 218 recipients.Remove(m) 219 } 220 221 actorDid := syntax.DID(issue.Did) 222 entityType := "issue" 223 entityId := issue.AtUri().String() 224 repoId := &issue.Repo.Id 225 issueId := &issue.Id 226 var pullId *int64 227 228 n.notifyEvent( 229 actorDid, 230 recipients, 231 models.NotificationTypeIssueCreated, 232 entityType, 233 entityId, 234 repoId, 235 issueId, 236 pullId, 237 ) 238 n.notifyEvent( 239 actorDid, 240 sets.Collect(slices.Values(mentions)), 241 models.NotificationTypeUserMentioned, 242 entityType, 243 entityId, 244 repoId, 245 issueId, 246 pullId, 247 ) 248} 249 250func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 251 // no-op for now 252} 253 254func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 255 actorDid := syntax.DID(follow.UserDid) 256 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 257 eventType := models.NotificationTypeFollowed 258 entityType := "follow" 259 entityId := follow.UserDid 260 var repoId, issueId, pullId *int64 261 262 n.notifyEvent( 263 actorDid, 264 recipients, 265 eventType, 266 entityType, 267 entityId, 268 repoId, 269 issueId, 270 pullId, 271 ) 272} 273 274func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 275 // no-op 276} 277 278func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 279 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 280 if err != nil { 281 log.Printf("NewPull: failed to get repos: %v", err) 282 return 283 } 284 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 285 if err != nil { 286 log.Printf("failed to fetch collaborators: %v", err) 287 return 288 } 289 290 // build the recipients list 291 // - owner of the repo 292 // - collaborators in the repo 293 recipients := sets.Singleton(syntax.DID(repo.Did)) 294 for _, c := range collaborators { 295 recipients.Insert(c.SubjectDid) 296 } 297 298 actorDid := syntax.DID(pull.OwnerDid) 299 eventType := models.NotificationTypePullCreated 300 entityType := "pull" 301 entityId := pull.AtUri().String() 302 repoId := &repo.Id 303 var issueId *int64 304 p := int64(pull.ID) 305 pullId := &p 306 307 n.notifyEvent( 308 actorDid, 309 recipients, 310 eventType, 311 entityType, 312 entityId, 313 repoId, 314 issueId, 315 pullId, 316 ) 317} 318 319func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 320 // no-op 321} 322 323func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 324 // no-op 325} 326 327func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 328 // no-op 329} 330 331func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 332 // no-op 333} 334 335func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 336 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 337 if err != nil { 338 log.Printf("failed to fetch collaborators: %v", err) 339 return 340 } 341 342 // build up the recipients list: 343 // - repo owner 344 // - repo collaborators 345 // - all issue participants 346 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 347 for _, c := range collaborators { 348 recipients.Insert(c.SubjectDid) 349 } 350 for _, p := range issue.Participants() { 351 recipients.Insert(syntax.DID(p)) 352 } 353 354 entityType := "pull" 355 entityId := issue.AtUri().String() 356 repoId := &issue.Repo.Id 357 issueId := &issue.Id 358 var pullId *int64 359 var eventType models.NotificationType 360 361 if issue.Open { 362 eventType = models.NotificationTypeIssueReopen 363 } else { 364 eventType = models.NotificationTypeIssueClosed 365 } 366 367 n.notifyEvent( 368 actor, 369 recipients, 370 eventType, 371 entityType, 372 entityId, 373 repoId, 374 issueId, 375 pullId, 376 ) 377} 378 379func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 380 // Get repo details 381 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 382 if err != nil { 383 log.Printf("NewPullState: failed to get repos: %v", err) 384 return 385 } 386 387 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 388 if err != nil { 389 log.Printf("failed to fetch collaborators: %v", err) 390 return 391 } 392 393 // build up the recipients list: 394 // - repo owner 395 // - all pull participants 396 recipients := sets.Singleton(syntax.DID(repo.Did)) 397 for _, c := range collaborators { 398 recipients.Insert(c.SubjectDid) 399 } 400 for _, p := range pull.Participants() { 401 recipients.Insert(syntax.DID(p)) 402 } 403 404 entityType := "pull" 405 entityId := pull.AtUri().String() 406 repoId := &repo.Id 407 var issueId *int64 408 var eventType models.NotificationType 409 switch pull.State { 410 case models.PullClosed: 411 eventType = models.NotificationTypePullClosed 412 case models.PullOpen: 413 eventType = models.NotificationTypePullReopen 414 case models.PullMerged: 415 eventType = models.NotificationTypePullMerged 416 default: 417 log.Println("NewPullState: unexpected new PR state:", pull.State) 418 return 419 } 420 p := int64(pull.ID) 421 pullId := &p 422 423 n.notifyEvent( 424 actor, 425 recipients, 426 eventType, 427 entityType, 428 entityId, 429 repoId, 430 issueId, 431 pullId, 432 ) 433} 434 435func (n *databaseNotifier) notifyEvent( 436 actorDid syntax.DID, 437 recipients sets.Set[syntax.DID], 438 eventType models.NotificationType, 439 entityType string, 440 entityId string, 441 repoId *int64, 442 issueId *int64, 443 pullId *int64, 444) { 445 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 446 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 447 return 448 } 449 450 recipients.Remove(actorDid) 451 452 prefMap, err := db.GetNotificationPreferences( 453 n.db, 454 orm.FilterIn("user_did", slices.Collect(recipients.All())), 455 ) 456 if err != nil { 457 // failed to get prefs for users 458 return 459 } 460 461 // create a transaction for bulk notification storage 462 tx, err := n.db.Begin() 463 if err != nil { 464 // failed to start tx 465 return 466 } 467 defer tx.Rollback() 468 469 // filter based on preferences 470 for recipientDid := range recipients.All() { 471 prefs, ok := prefMap[recipientDid] 472 if !ok { 473 prefs = models.DefaultNotificationPreferences(recipientDid) 474 } 475 476 // skip users who don’t want this type 477 if !prefs.ShouldNotify(eventType) { 478 continue 479 } 480 481 // create notification 482 notif := &models.Notification{ 483 RecipientDid: recipientDid.String(), 484 ActorDid: actorDid.String(), 485 Type: eventType, 486 EntityType: entityType, 487 EntityId: entityId, 488 RepoId: repoId, 489 IssueId: issueId, 490 PullId: pullId, 491 } 492 493 if err := db.CreateNotification(tx, notif); err != nil { 494 log.Printf("notifyEvent: failed to create notification for %s: %v", recipientDid, err) 495 } 496 } 497 498 if err := tx.Commit(); err != nil { 499 // failed to commit 500 return 501 } 502}