Monorepo for Tangled
at push-pkuzytwlwptp 1110 lines 27 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "slices" 12 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/go-git/go-git/v5/plumbing" 18 "github.com/ipfs/go-cid" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/config" 21 "tangled.org/core/appview/db" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/serververify" 24 "tangled.org/core/appview/validator" 25 "tangled.org/core/idresolver" 26 "tangled.org/core/orm" 27 "tangled.org/core/rbac" 28) 29 30type Ingester struct { 31 Db db.DbWrapper 32 Enforcer *rbac.Enforcer 33 IdResolver *idresolver.Resolver 34 Config *config.Config 35 Logger *slog.Logger 36 Validator *validator.Validator 37} 38 39type processFunc func(ctx context.Context, e *jmodels.Event) error 40 41func (i *Ingester) Ingest() processFunc { 42 return func(ctx context.Context, e *jmodels.Event) error { 43 var err error 44 defer func() { 45 eventTime := e.TimeUS 46 lastTimeUs := eventTime + 1 47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 49 } 50 }() 51 52 l := i.Logger.With("kind", e.Kind) 53 switch e.Kind { 54 case jmodels.EventKindAccount: 55 if !e.Account.Active && *e.Account.Status == "deactivated" { 56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 57 } 58 case jmodels.EventKindIdentity: 59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 60 case jmodels.EventKindCommit: 61 switch e.Commit.Collection { 62 case tangled.GraphFollowNSID: 63 err = i.ingestFollow(e) 64 case tangled.FeedStarNSID: 65 err = i.ingestStar(e) 66 case tangled.PublicKeyNSID: 67 err = i.ingestPublicKey(e) 68 case tangled.RepoArtifactNSID: 69 err = i.ingestArtifact(e) 70 case tangled.ActorProfileNSID: 71 err = i.ingestProfile(e) 72 case tangled.SpindleMemberNSID: 73 err = i.ingestSpindleMember(ctx, e) 74 case tangled.SpindleNSID: 75 err = i.ingestSpindle(ctx, e) 76 case tangled.KnotMemberNSID: 77 err = i.ingestKnotMember(e) 78 case tangled.KnotNSID: 79 err = i.ingestKnot(e) 80 case tangled.StringNSID: 81 err = i.ingestString(e) 82 case tangled.RepoIssueNSID: 83 err = i.ingestIssue(ctx, e) 84 case tangled.RepoIssueCommentNSID: 85 err = i.ingestIssueComment(e) 86 case tangled.LabelDefinitionNSID: 87 err = i.ingestLabelDefinition(e) 88 case tangled.LabelOpNSID: 89 err = i.ingestLabelOp(e) 90 } 91 l = i.Logger.With("nsid", e.Commit.Collection) 92 } 93 94 if err != nil { 95 l.Warn("refused to ingest record", "err", err) 96 } 97 98 return nil 99 } 100} 101 102func (i *Ingester) ingestStar(e *jmodels.Event) error { 103 var err error 104 did := e.Did 105 106 l := i.Logger.With("handler", "ingestStar") 107 l = l.With("nsid", e.Commit.Collection) 108 109 switch e.Commit.Operation { 110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 111 var subjectUri syntax.ATURI 112 113 raw := json.RawMessage(e.Commit.Record) 114 record := tangled.FeedStar{} 115 err := json.Unmarshal(raw, &record) 116 if err != nil { 117 l.Error("invalid record", "err", err) 118 return err 119 } 120 121 subjectUri, err = syntax.ParseATURI(record.Subject) 122 if err != nil { 123 l.Error("invalid record", "err", err) 124 return err 125 } 126 star := &models.Star{ 127 Did: did, 128 RepoAt: subjectUri, 129 Rkey: e.Commit.RKey, 130 } 131 if record.SubjectDid != nil { 132 star.SubjectDid = *record.SubjectDid 133 } 134 if star.SubjectDid == "" { 135 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 136 if repoErr == nil && repo.RepoDid != "" { 137 star.SubjectDid = repo.RepoDid 138 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, record.Subject); enqErr != nil { 139 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 140 } 141 } 142 } 143 err = db.AddStar(i.Db, star) 144 case jmodels.CommitOperationDelete: 145 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 146 } 147 148 if err != nil { 149 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 150 } 151 152 return nil 153} 154 155func (i *Ingester) ingestFollow(e *jmodels.Event) error { 156 var err error 157 did := e.Did 158 159 l := i.Logger.With("handler", "ingestFollow") 160 l = l.With("nsid", e.Commit.Collection) 161 162 switch e.Commit.Operation { 163 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 164 raw := json.RawMessage(e.Commit.Record) 165 record := tangled.GraphFollow{} 166 err = json.Unmarshal(raw, &record) 167 if err != nil { 168 l.Error("invalid record", "err", err) 169 return err 170 } 171 172 err = db.AddFollow(i.Db, &models.Follow{ 173 UserDid: did, 174 SubjectDid: record.Subject, 175 Rkey: e.Commit.RKey, 176 }) 177 case jmodels.CommitOperationDelete: 178 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 179 } 180 181 if err != nil { 182 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 183 } 184 185 return nil 186} 187 188func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 189 did := e.Did 190 var err error 191 192 l := i.Logger.With("handler", "ingestPublicKey") 193 l = l.With("nsid", e.Commit.Collection) 194 195 switch e.Commit.Operation { 196 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 197 l.Debug("processing add of pubkey") 198 raw := json.RawMessage(e.Commit.Record) 199 record := tangled.PublicKey{} 200 err = json.Unmarshal(raw, &record) 201 if err != nil { 202 l.Error("invalid record", "err", err) 203 return err 204 } 205 206 name := record.Name 207 key := record.Key 208 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 209 case jmodels.CommitOperationDelete: 210 l.Debug("processing delete of pubkey") 211 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 212 } 213 214 if err != nil { 215 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 216 } 217 218 return nil 219} 220 221func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 222 did := e.Did 223 var err error 224 225 l := i.Logger.With("handler", "ingestArtifact") 226 l = l.With("nsid", e.Commit.Collection) 227 228 switch e.Commit.Operation { 229 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 230 raw := json.RawMessage(e.Commit.Record) 231 record := tangled.RepoArtifact{} 232 err = json.Unmarshal(raw, &record) 233 if err != nil { 234 l.Error("invalid record", "err", err) 235 return err 236 } 237 238 repoAt, err := syntax.ParseATURI(record.Repo) 239 if err != nil { 240 return err 241 } 242 243 var repo *models.Repo 244 if record.RepoDid != nil && *record.RepoDid != "" { 245 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 246 if err != nil && !errors.Is(err, sql.ErrNoRows) { 247 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 248 } 249 } 250 if repo == nil { 251 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 252 if err != nil { 253 return err 254 } 255 } 256 257 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 258 if err != nil || !ok { 259 return err 260 } 261 262 repoDid := repo.RepoDid 263 if repoDid == "" && record.RepoDid != nil { 264 repoDid = *record.RepoDid 265 } 266 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") { 267 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, record.Repo); enqErr != nil { 268 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 269 } 270 } 271 272 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 273 if err != nil { 274 createdAt = time.Now() 275 } 276 277 artifact := models.Artifact{ 278 Did: did, 279 Rkey: e.Commit.RKey, 280 RepoAt: repoAt, 281 RepoDid: repoDid, 282 Tag: plumbing.Hash(record.Tag), 283 CreatedAt: createdAt, 284 BlobCid: cid.Cid(record.Artifact.Ref), 285 Name: record.Name, 286 Size: uint64(record.Artifact.Size), 287 MimeType: record.Artifact.MimeType, 288 } 289 290 err = db.AddArtifact(i.Db, artifact) 291 case jmodels.CommitOperationDelete: 292 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 293 } 294 295 if err != nil { 296 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 297 } 298 299 return nil 300} 301 302func (i *Ingester) ingestProfile(e *jmodels.Event) error { 303 did := e.Did 304 var err error 305 306 l := i.Logger.With("handler", "ingestProfile") 307 l = l.With("nsid", e.Commit.Collection) 308 309 if e.Commit.RKey != "self" { 310 return fmt.Errorf("ingestProfile only ingests `self` record") 311 } 312 313 switch e.Commit.Operation { 314 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 315 raw := json.RawMessage(e.Commit.Record) 316 record := tangled.ActorProfile{} 317 err = json.Unmarshal(raw, &record) 318 if err != nil { 319 l.Error("invalid record", "err", err) 320 return err 321 } 322 323 avatar := "" 324 if record.Avatar != nil { 325 avatar = record.Avatar.Ref.String() 326 } 327 328 description := "" 329 if record.Description != nil { 330 description = *record.Description 331 } 332 333 includeBluesky := record.Bluesky 334 335 pronouns := "" 336 if record.Pronouns != nil { 337 pronouns = *record.Pronouns 338 } 339 340 location := "" 341 if record.Location != nil { 342 location = *record.Location 343 } 344 345 var links [5]string 346 for i, l := range record.Links { 347 if i < 5 { 348 links[i] = l 349 } 350 } 351 352 var stats [2]models.VanityStat 353 for i, s := range record.Stats { 354 if i < 2 { 355 stats[i].Kind = models.ParseVanityStatKind(s) 356 } 357 } 358 359 var pinned [6]syntax.ATURI 360 for i, r := range record.PinnedRepositories { 361 if i < 6 { 362 pinned[i] = syntax.ATURI(r) 363 } 364 } 365 366 profile := models.Profile{ 367 Did: did, 368 Avatar: avatar, 369 Description: description, 370 IncludeBluesky: includeBluesky, 371 Location: location, 372 Links: links, 373 Stats: stats, 374 PinnedRepos: pinned, 375 Pronouns: pronouns, 376 } 377 378 ddb, ok := i.Db.Execer.(*db.DB) 379 if !ok { 380 return fmt.Errorf("failed to index profile record, invalid db cast") 381 } 382 383 tx, err := ddb.Begin() 384 if err != nil { 385 return fmt.Errorf("failed to start transaction") 386 } 387 388 err = db.ValidateProfile(tx, &profile) 389 if err != nil { 390 return fmt.Errorf("invalid profile record") 391 } 392 393 err = db.UpsertProfile(tx, &profile) 394 case jmodels.CommitOperationDelete: 395 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 396 } 397 398 if err != nil { 399 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 400 } 401 402 return nil 403} 404 405func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 406 did := e.Did 407 var err error 408 409 l := i.Logger.With("handler", "ingestSpindleMember") 410 l = l.With("nsid", e.Commit.Collection) 411 412 switch e.Commit.Operation { 413 case jmodels.CommitOperationCreate: 414 raw := json.RawMessage(e.Commit.Record) 415 record := tangled.SpindleMember{} 416 err = json.Unmarshal(raw, &record) 417 if err != nil { 418 l.Error("invalid record", "err", err) 419 return err 420 } 421 422 // only spindle owner can invite to spindles 423 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 424 if err != nil || !ok { 425 return fmt.Errorf("failed to enforce permissions: %w", err) 426 } 427 428 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 429 if err != nil { 430 return err 431 } 432 433 if memberId.Handle.IsInvalidHandle() { 434 return err 435 } 436 437 ddb, ok := i.Db.Execer.(*db.DB) 438 if !ok { 439 return fmt.Errorf("invalid db cast") 440 } 441 442 err = db.AddSpindleMember(ddb, models.SpindleMember{ 443 Did: syntax.DID(did), 444 Rkey: e.Commit.RKey, 445 Instance: record.Instance, 446 Subject: memberId.DID, 447 }) 448 if !ok { 449 return fmt.Errorf("failed to add to db: %w", err) 450 } 451 452 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 453 if err != nil { 454 return fmt.Errorf("failed to update ACLs: %w", err) 455 } 456 457 l.Info("added spindle member") 458 case jmodels.CommitOperationDelete: 459 rkey := e.Commit.RKey 460 461 ddb, ok := i.Db.Execer.(*db.DB) 462 if !ok { 463 return fmt.Errorf("failed to index profile record, invalid db cast") 464 } 465 466 // get record from db first 467 members, err := db.GetSpindleMembers( 468 ddb, 469 orm.FilterEq("did", did), 470 orm.FilterEq("rkey", rkey), 471 ) 472 if err != nil || len(members) != 1 { 473 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 474 } 475 member := members[0] 476 477 tx, err := ddb.Begin() 478 if err != nil { 479 return fmt.Errorf("failed to start txn: %w", err) 480 } 481 482 // remove record by rkey && update enforcer 483 if err = db.RemoveSpindleMember( 484 tx, 485 orm.FilterEq("did", did), 486 orm.FilterEq("rkey", rkey), 487 ); err != nil { 488 return fmt.Errorf("failed to remove from db: %w", err) 489 } 490 491 // update enforcer 492 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 493 if err != nil { 494 return fmt.Errorf("failed to update ACLs: %w", err) 495 } 496 497 if err = tx.Commit(); err != nil { 498 return fmt.Errorf("failed to commit txn: %w", err) 499 } 500 501 if err = i.Enforcer.E.SavePolicy(); err != nil { 502 return fmt.Errorf("failed to save ACLs: %w", err) 503 } 504 505 l.Info("removed spindle member") 506 } 507 508 return nil 509} 510 511func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 512 did := e.Did 513 var err error 514 515 l := i.Logger.With("handler", "ingestSpindle") 516 l = l.With("nsid", e.Commit.Collection) 517 518 switch e.Commit.Operation { 519 case jmodels.CommitOperationCreate: 520 raw := json.RawMessage(e.Commit.Record) 521 record := tangled.Spindle{} 522 err = json.Unmarshal(raw, &record) 523 if err != nil { 524 l.Error("invalid record", "err", err) 525 return err 526 } 527 528 instance := e.Commit.RKey 529 530 ddb, ok := i.Db.Execer.(*db.DB) 531 if !ok { 532 return fmt.Errorf("failed to index profile record, invalid db cast") 533 } 534 535 err := db.AddSpindle(ddb, models.Spindle{ 536 Owner: syntax.DID(did), 537 Instance: instance, 538 }) 539 if err != nil { 540 l.Error("failed to add spindle to db", "err", err, "instance", instance) 541 return err 542 } 543 544 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 545 if err != nil { 546 l.Error("failed to add spindle to db", "err", err, "instance", instance) 547 return err 548 } 549 550 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 551 if err != nil { 552 return fmt.Errorf("failed to mark verified: %w", err) 553 } 554 555 return nil 556 557 case jmodels.CommitOperationDelete: 558 instance := e.Commit.RKey 559 560 ddb, ok := i.Db.Execer.(*db.DB) 561 if !ok { 562 return fmt.Errorf("failed to index profile record, invalid db cast") 563 } 564 565 // get record from db first 566 spindles, err := db.GetSpindles( 567 ddb, 568 orm.FilterEq("owner", did), 569 orm.FilterEq("instance", instance), 570 ) 571 if err != nil || len(spindles) != 1 { 572 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 573 } 574 spindle := spindles[0] 575 576 tx, err := ddb.Begin() 577 if err != nil { 578 return err 579 } 580 defer func() { 581 tx.Rollback() 582 i.Enforcer.E.LoadPolicy() 583 }() 584 585 // remove spindle members first 586 err = db.RemoveSpindleMember( 587 tx, 588 orm.FilterEq("owner", did), 589 orm.FilterEq("instance", instance), 590 ) 591 if err != nil { 592 return err 593 } 594 595 err = db.DeleteSpindle( 596 tx, 597 orm.FilterEq("owner", did), 598 orm.FilterEq("instance", instance), 599 ) 600 if err != nil { 601 return err 602 } 603 604 if spindle.Verified != nil { 605 err = i.Enforcer.RemoveSpindle(instance) 606 if err != nil { 607 return err 608 } 609 } 610 611 err = tx.Commit() 612 if err != nil { 613 return err 614 } 615 616 err = i.Enforcer.E.SavePolicy() 617 if err != nil { 618 return err 619 } 620 } 621 622 return nil 623} 624 625func (i *Ingester) ingestString(e *jmodels.Event) error { 626 did := e.Did 627 rkey := e.Commit.RKey 628 629 var err error 630 631 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 632 l.Info("ingesting record") 633 634 ddb, ok := i.Db.Execer.(*db.DB) 635 if !ok { 636 return fmt.Errorf("failed to index string record, invalid db cast") 637 } 638 639 switch e.Commit.Operation { 640 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 641 raw := json.RawMessage(e.Commit.Record) 642 record := tangled.String{} 643 err = json.Unmarshal(raw, &record) 644 if err != nil { 645 l.Error("invalid record", "err", err) 646 return err 647 } 648 649 string := models.StringFromRecord(did, rkey, record) 650 651 if err = i.Validator.ValidateString(&string); err != nil { 652 l.Error("invalid record", "err", err) 653 return err 654 } 655 656 if err = db.AddString(ddb, string); err != nil { 657 l.Error("failed to add string", "err", err) 658 return err 659 } 660 661 return nil 662 663 case jmodels.CommitOperationDelete: 664 if err := db.DeleteString( 665 ddb, 666 orm.FilterEq("did", did), 667 orm.FilterEq("rkey", rkey), 668 ); err != nil { 669 l.Error("failed to delete", "err", err) 670 return fmt.Errorf("failed to delete string record: %w", err) 671 } 672 673 return nil 674 } 675 676 return nil 677} 678 679func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 680 did := e.Did 681 var err error 682 683 l := i.Logger.With("handler", "ingestKnotMember") 684 l = l.With("nsid", e.Commit.Collection) 685 686 switch e.Commit.Operation { 687 case jmodels.CommitOperationCreate: 688 raw := json.RawMessage(e.Commit.Record) 689 record := tangled.KnotMember{} 690 err = json.Unmarshal(raw, &record) 691 if err != nil { 692 l.Error("invalid record", "err", err) 693 return err 694 } 695 696 // only knot owner can invite to knots 697 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 698 if err != nil || !ok { 699 return fmt.Errorf("failed to enforce permissions: %w", err) 700 } 701 702 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 703 if err != nil { 704 return err 705 } 706 707 if memberId.Handle.IsInvalidHandle() { 708 return err 709 } 710 711 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 712 if err != nil { 713 return fmt.Errorf("failed to update ACLs: %w", err) 714 } 715 716 l.Info("added knot member") 717 case jmodels.CommitOperationDelete: 718 // we don't store knot members in a table (like we do for spindle) 719 // and we can't remove this just yet. possibly fixed if we switch 720 // to either: 721 // 1. a knot_members table like with spindle and store the rkey 722 // 2. use the knot host as the rkey 723 // 724 // TODO: implement member deletion 725 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 726 } 727 728 return nil 729} 730 731func (i *Ingester) ingestKnot(e *jmodels.Event) error { 732 did := e.Did 733 var err error 734 735 l := i.Logger.With("handler", "ingestKnot") 736 l = l.With("nsid", e.Commit.Collection) 737 738 switch e.Commit.Operation { 739 case jmodels.CommitOperationCreate: 740 raw := json.RawMessage(e.Commit.Record) 741 record := tangled.Knot{} 742 err = json.Unmarshal(raw, &record) 743 if err != nil { 744 l.Error("invalid record", "err", err) 745 return err 746 } 747 748 domain := e.Commit.RKey 749 750 ddb, ok := i.Db.Execer.(*db.DB) 751 if !ok { 752 return fmt.Errorf("failed to index profile record, invalid db cast") 753 } 754 755 err := db.AddKnot(ddb, domain, did) 756 if err != nil { 757 l.Error("failed to add knot to db", "err", err, "domain", domain) 758 return err 759 } 760 761 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 762 if err != nil { 763 l.Error("failed to verify knot", "err", err, "domain", domain) 764 return err 765 } 766 767 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 768 if err != nil { 769 return fmt.Errorf("failed to mark verified: %w", err) 770 } 771 772 return nil 773 774 case jmodels.CommitOperationDelete: 775 domain := e.Commit.RKey 776 777 ddb, ok := i.Db.Execer.(*db.DB) 778 if !ok { 779 return fmt.Errorf("failed to index knot record, invalid db cast") 780 } 781 782 // get record from db first 783 registrations, err := db.GetRegistrations( 784 ddb, 785 orm.FilterEq("domain", domain), 786 orm.FilterEq("did", did), 787 ) 788 if err != nil { 789 return fmt.Errorf("failed to get registration: %w", err) 790 } 791 if len(registrations) != 1 { 792 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 793 } 794 registration := registrations[0] 795 796 tx, err := ddb.Begin() 797 if err != nil { 798 return err 799 } 800 defer func() { 801 tx.Rollback() 802 i.Enforcer.E.LoadPolicy() 803 }() 804 805 err = db.DeleteKnot( 806 tx, 807 orm.FilterEq("did", did), 808 orm.FilterEq("domain", domain), 809 ) 810 if err != nil { 811 return err 812 } 813 814 if registration.Registered != nil { 815 err = i.Enforcer.RemoveKnot(domain) 816 if err != nil { 817 return err 818 } 819 } 820 821 err = tx.Commit() 822 if err != nil { 823 return err 824 } 825 826 err = i.Enforcer.E.SavePolicy() 827 if err != nil { 828 return err 829 } 830 } 831 832 return nil 833} 834func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 835 did := e.Did 836 rkey := e.Commit.RKey 837 838 var err error 839 840 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 841 l.Info("ingesting record") 842 843 ddb, ok := i.Db.Execer.(*db.DB) 844 if !ok { 845 return fmt.Errorf("failed to index issue record, invalid db cast") 846 } 847 848 switch e.Commit.Operation { 849 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 850 raw := json.RawMessage(e.Commit.Record) 851 record := tangled.RepoIssue{} 852 err = json.Unmarshal(raw, &record) 853 if err != nil { 854 l.Error("invalid record", "err", err) 855 return err 856 } 857 858 issue := models.IssueFromRecord(did, rkey, record) 859 860 if err := i.Validator.ValidateIssue(&issue); err != nil { 861 return fmt.Errorf("failed to validate issue: %w", err) 862 } 863 864 if issue.RepoDid == "" { 865 repo, repoErr := db.GetRepoByAtUri(i.Db, record.Repo) 866 if repoErr == nil && repo.RepoDid != "" { 867 issue.RepoDid = repo.RepoDid 868 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, record.Repo); enqErr != nil { 869 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 870 } 871 } 872 } 873 874 tx, err := ddb.BeginTx(ctx, nil) 875 if err != nil { 876 l.Error("failed to begin transaction", "err", err) 877 return err 878 } 879 defer tx.Rollback() 880 881 err = db.PutIssue(tx, &issue) 882 if err != nil { 883 l.Error("failed to create issue", "err", err) 884 return err 885 } 886 887 err = tx.Commit() 888 if err != nil { 889 l.Error("failed to commit txn", "err", err) 890 return err 891 } 892 893 return nil 894 895 case jmodels.CommitOperationDelete: 896 tx, err := ddb.BeginTx(ctx, nil) 897 if err != nil { 898 l.Error("failed to begin transaction", "err", err) 899 return err 900 } 901 defer tx.Rollback() 902 903 if err := db.DeleteIssues( 904 tx, 905 did, 906 rkey, 907 ); err != nil { 908 l.Error("failed to delete", "err", err) 909 return fmt.Errorf("failed to delete issue record: %w", err) 910 } 911 if err := tx.Commit(); err != nil { 912 l.Error("failed to commit txn", "err", err) 913 return err 914 } 915 916 return nil 917 } 918 919 return nil 920} 921 922func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 923 did := e.Did 924 rkey := e.Commit.RKey 925 926 var err error 927 928 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 929 l.Info("ingesting record") 930 931 ddb, ok := i.Db.Execer.(*db.DB) 932 if !ok { 933 return fmt.Errorf("failed to index issue comment record, invalid db cast") 934 } 935 936 switch e.Commit.Operation { 937 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 938 raw := json.RawMessage(e.Commit.Record) 939 record := tangled.RepoIssueComment{} 940 err = json.Unmarshal(raw, &record) 941 if err != nil { 942 return fmt.Errorf("invalid record: %w", err) 943 } 944 945 comment, err := models.IssueCommentFromRecord(did, rkey, record) 946 if err != nil { 947 return fmt.Errorf("failed to parse comment from record: %w", err) 948 } 949 950 if err := i.Validator.ValidateIssueComment(comment); err != nil { 951 return fmt.Errorf("failed to validate comment: %w", err) 952 } 953 954 tx, err := ddb.Begin() 955 if err != nil { 956 return fmt.Errorf("failed to start transaction: %w", err) 957 } 958 defer tx.Rollback() 959 960 _, err = db.AddIssueComment(tx, *comment) 961 if err != nil { 962 return fmt.Errorf("failed to create issue comment: %w", err) 963 } 964 965 return tx.Commit() 966 967 case jmodels.CommitOperationDelete: 968 if err := db.DeleteIssueComments( 969 ddb, 970 orm.FilterEq("did", did), 971 orm.FilterEq("rkey", rkey), 972 ); err != nil { 973 return fmt.Errorf("failed to delete issue comment record: %w", err) 974 } 975 976 return nil 977 } 978 979 return nil 980} 981 982func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 983 did := e.Did 984 rkey := e.Commit.RKey 985 986 var err error 987 988 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 989 l.Info("ingesting record") 990 991 ddb, ok := i.Db.Execer.(*db.DB) 992 if !ok { 993 return fmt.Errorf("failed to index label definition, invalid db cast") 994 } 995 996 switch e.Commit.Operation { 997 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 998 raw := json.RawMessage(e.Commit.Record) 999 record := tangled.LabelDefinition{} 1000 err = json.Unmarshal(raw, &record) 1001 if err != nil { 1002 return fmt.Errorf("invalid record: %w", err) 1003 } 1004 1005 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1006 if err != nil { 1007 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1008 } 1009 1010 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1011 return fmt.Errorf("failed to validate labeldef: %w", err) 1012 } 1013 1014 _, err = db.AddLabelDefinition(ddb, def) 1015 if err != nil { 1016 return fmt.Errorf("failed to create labeldef: %w", err) 1017 } 1018 1019 return nil 1020 1021 case jmodels.CommitOperationDelete: 1022 if err := db.DeleteLabelDefinition( 1023 ddb, 1024 orm.FilterEq("did", did), 1025 orm.FilterEq("rkey", rkey), 1026 ); err != nil { 1027 return fmt.Errorf("failed to delete labeldef record: %w", err) 1028 } 1029 1030 return nil 1031 } 1032 1033 return nil 1034} 1035 1036func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1037 did := e.Did 1038 rkey := e.Commit.RKey 1039 1040 var err error 1041 1042 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1043 l.Info("ingesting record") 1044 1045 ddb, ok := i.Db.Execer.(*db.DB) 1046 if !ok { 1047 return fmt.Errorf("failed to index label op, invalid db cast") 1048 } 1049 1050 switch e.Commit.Operation { 1051 case jmodels.CommitOperationCreate: 1052 raw := json.RawMessage(e.Commit.Record) 1053 record := tangled.LabelOp{} 1054 err = json.Unmarshal(raw, &record) 1055 if err != nil { 1056 return fmt.Errorf("invalid record: %w", err) 1057 } 1058 1059 subject := syntax.ATURI(record.Subject) 1060 collection := subject.Collection() 1061 1062 var repo *models.Repo 1063 switch collection { 1064 case tangled.RepoIssueNSID: 1065 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1066 if err != nil || len(i) != 1 { 1067 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1068 } 1069 repo = i[0].Repo 1070 default: 1071 return fmt.Errorf("unsupport label subject: %s", collection) 1072 } 1073 1074 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1075 if err != nil { 1076 return fmt.Errorf("failed to build label application ctx: %w", err) 1077 } 1078 1079 ops := models.LabelOpsFromRecord(did, rkey, record) 1080 1081 for _, o := range ops { 1082 def, ok := actx.Defs[o.OperandKey] 1083 if !ok { 1084 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1085 } 1086 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1087 return fmt.Errorf("failed to validate labelop: %w", err) 1088 } 1089 } 1090 1091 tx, err := ddb.Begin() 1092 if err != nil { 1093 return err 1094 } 1095 defer tx.Rollback() 1096 1097 for _, o := range ops { 1098 _, err = db.AddLabelOp(tx, &o) 1099 if err != nil { 1100 return fmt.Errorf("failed to add labelop: %w", err) 1101 } 1102 } 1103 1104 if err = tx.Commit(); err != nil { 1105 return err 1106 } 1107 } 1108 1109 return nil 1110}