Monorepo for Tangled
at master 1125 lines 28 kB view raw
1package appview 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "slices" 12 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/go-git/go-git/v5/plumbing" 18 "github.com/ipfs/go-cid" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/config" 21 "tangled.org/core/appview/db" 22 "tangled.org/core/appview/models" 23 "tangled.org/core/appview/serververify" 24 "tangled.org/core/appview/validator" 25 "tangled.org/core/idresolver" 26 "tangled.org/core/orm" 27 "tangled.org/core/rbac" 28) 29 30type Ingester struct { 31 Db db.DbWrapper 32 Enforcer *rbac.Enforcer 33 IdResolver *idresolver.Resolver 34 Config *config.Config 35 Logger *slog.Logger 36 Validator *validator.Validator 37} 38 39type processFunc func(ctx context.Context, e *jmodels.Event) error 40 41func (i *Ingester) Ingest() processFunc { 42 return func(ctx context.Context, e *jmodels.Event) error { 43 var err error 44 defer func() { 45 eventTime := e.TimeUS 46 lastTimeUs := eventTime + 1 47 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 48 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 49 } 50 }() 51 52 l := i.Logger.With("kind", e.Kind) 53 switch e.Kind { 54 case jmodels.EventKindAccount: 55 if !e.Account.Active && *e.Account.Status == "deactivated" { 56 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 57 } 58 case jmodels.EventKindIdentity: 59 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 60 case jmodels.EventKindCommit: 61 switch e.Commit.Collection { 62 case tangled.GraphFollowNSID: 63 err = i.ingestFollow(e) 64 case tangled.FeedStarNSID: 65 err = i.ingestStar(e) 66 case tangled.PublicKeyNSID: 67 err = i.ingestPublicKey(e) 68 case tangled.RepoArtifactNSID: 69 err = i.ingestArtifact(e) 70 case tangled.ActorProfileNSID: 71 err = i.ingestProfile(e) 72 case tangled.SpindleMemberNSID: 73 err = i.ingestSpindleMember(ctx, e) 74 case tangled.SpindleNSID: 75 err = i.ingestSpindle(ctx, e) 76 case tangled.KnotMemberNSID: 77 err = i.ingestKnotMember(e) 78 case tangled.KnotNSID: 79 err = i.ingestKnot(e) 80 case tangled.StringNSID: 81 err = i.ingestString(e) 82 case tangled.RepoIssueNSID: 83 err = i.ingestIssue(ctx, e) 84 case tangled.RepoIssueCommentNSID: 85 err = i.ingestIssueComment(e) 86 case tangled.LabelDefinitionNSID: 87 err = i.ingestLabelDefinition(e) 88 case tangled.LabelOpNSID: 89 err = i.ingestLabelOp(e) 90 } 91 l = i.Logger.With("nsid", e.Commit.Collection) 92 } 93 94 if err != nil { 95 l.Warn("refused to ingest record", "err", err) 96 } 97 98 return nil 99 } 100} 101 102func (i *Ingester) ingestStar(e *jmodels.Event) error { 103 var err error 104 did := e.Did 105 106 l := i.Logger.With("handler", "ingestStar") 107 l = l.With("nsid", e.Commit.Collection) 108 109 switch e.Commit.Operation { 110 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 111 var subjectUri syntax.ATURI 112 113 raw := json.RawMessage(e.Commit.Record) 114 record := tangled.FeedStar{} 115 err := json.Unmarshal(raw, &record) 116 if err != nil { 117 l.Error("invalid record", "err", err) 118 return err 119 } 120 121 star := &models.Star{ 122 Did: did, 123 Rkey: e.Commit.RKey, 124 } 125 126 switch { 127 case record.SubjectDid != nil: 128 star.SubjectDid = *record.SubjectDid 129 repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 130 if repoErr == nil { 131 subjectUri = repo.RepoAt() 132 star.RepoAt = subjectUri 133 } 134 case record.Subject != nil: 135 subjectUri, err = syntax.ParseATURI(*record.Subject) 136 if err != nil { 137 l.Error("invalid record", "err", err) 138 return err 139 } 140 star.RepoAt = subjectUri 141 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 142 if repoErr == nil && repo.RepoDid != "" { 143 star.SubjectDid = repo.RepoDid 144 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 145 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 146 } 147 } 148 default: 149 l.Error("star record has neither subject nor subjectDid") 150 return fmt.Errorf("star record has neither subject nor subjectDid") 151 } 152 err = db.AddStar(i.Db, star) 153 case jmodels.CommitOperationDelete: 154 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 155 } 156 157 if err != nil { 158 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 159 } 160 161 return nil 162} 163 164func (i *Ingester) ingestFollow(e *jmodels.Event) error { 165 var err error 166 did := e.Did 167 168 l := i.Logger.With("handler", "ingestFollow") 169 l = l.With("nsid", e.Commit.Collection) 170 171 switch e.Commit.Operation { 172 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 173 raw := json.RawMessage(e.Commit.Record) 174 record := tangled.GraphFollow{} 175 err = json.Unmarshal(raw, &record) 176 if err != nil { 177 l.Error("invalid record", "err", err) 178 return err 179 } 180 181 err = db.AddFollow(i.Db, &models.Follow{ 182 UserDid: did, 183 SubjectDid: record.Subject, 184 Rkey: e.Commit.RKey, 185 }) 186 case jmodels.CommitOperationDelete: 187 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 188 } 189 190 if err != nil { 191 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 192 } 193 194 return nil 195} 196 197func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 198 did := e.Did 199 var err error 200 201 l := i.Logger.With("handler", "ingestPublicKey") 202 l = l.With("nsid", e.Commit.Collection) 203 204 switch e.Commit.Operation { 205 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 206 l.Debug("processing add of pubkey") 207 raw := json.RawMessage(e.Commit.Record) 208 record := tangled.PublicKey{} 209 err = json.Unmarshal(raw, &record) 210 if err != nil { 211 l.Error("invalid record", "err", err) 212 return err 213 } 214 215 name := record.Name 216 key := record.Key 217 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 218 case jmodels.CommitOperationDelete: 219 l.Debug("processing delete of pubkey") 220 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 221 } 222 223 if err != nil { 224 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 225 } 226 227 return nil 228} 229 230func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 231 did := e.Did 232 var err error 233 234 l := i.Logger.With("handler", "ingestArtifact") 235 l = l.With("nsid", e.Commit.Collection) 236 237 switch e.Commit.Operation { 238 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 239 raw := json.RawMessage(e.Commit.Record) 240 record := tangled.RepoArtifact{} 241 err = json.Unmarshal(raw, &record) 242 if err != nil { 243 l.Error("invalid record", "err", err) 244 return err 245 } 246 247 var repo *models.Repo 248 if record.RepoDid != nil && *record.RepoDid != "" { 249 repo, err = db.GetRepoByDid(i.Db, *record.RepoDid) 250 if err != nil && !errors.Is(err, sql.ErrNoRows) { 251 return fmt.Errorf("failed to look up repo by DID %s: %w", *record.RepoDid, err) 252 } 253 } 254 if repo == nil && record.Repo != nil { 255 repoAt, parseErr := syntax.ParseATURI(*record.Repo) 256 if parseErr != nil { 257 return parseErr 258 } 259 repo, err = db.GetRepoByAtUri(i.Db, repoAt.String()) 260 if err != nil { 261 return err 262 } 263 } 264 if repo == nil { 265 return fmt.Errorf("artifact record has neither valid repoDid nor repo field") 266 } 267 268 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.RepoIdentifier(), "repo:push") 269 if err != nil || !ok { 270 return err 271 } 272 273 repoDid := repo.RepoDid 274 if repoDid == "" && record.RepoDid != nil { 275 repoDid = *record.RepoDid 276 } 277 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 278 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 279 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 280 } 281 } 282 283 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 284 if err != nil { 285 createdAt = time.Now() 286 } 287 288 artifact := models.Artifact{ 289 Did: did, 290 Rkey: e.Commit.RKey, 291 RepoAt: repo.RepoAt(), 292 RepoDid: repoDid, 293 Tag: plumbing.Hash(record.Tag), 294 CreatedAt: createdAt, 295 BlobCid: cid.Cid(record.Artifact.Ref), 296 Name: record.Name, 297 Size: uint64(record.Artifact.Size), 298 MimeType: record.Artifact.MimeType, 299 } 300 301 err = db.AddArtifact(i.Db, artifact) 302 case jmodels.CommitOperationDelete: 303 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 304 } 305 306 if err != nil { 307 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 308 } 309 310 return nil 311} 312 313func (i *Ingester) ingestProfile(e *jmodels.Event) error { 314 did := e.Did 315 var err error 316 317 l := i.Logger.With("handler", "ingestProfile") 318 l = l.With("nsid", e.Commit.Collection) 319 320 if e.Commit.RKey != "self" { 321 return fmt.Errorf("ingestProfile only ingests `self` record") 322 } 323 324 switch e.Commit.Operation { 325 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 326 raw := json.RawMessage(e.Commit.Record) 327 record := tangled.ActorProfile{} 328 err = json.Unmarshal(raw, &record) 329 if err != nil { 330 l.Error("invalid record", "err", err) 331 return err 332 } 333 334 avatar := "" 335 if record.Avatar != nil { 336 avatar = record.Avatar.Ref.String() 337 } 338 339 description := "" 340 if record.Description != nil { 341 description = *record.Description 342 } 343 344 includeBluesky := record.Bluesky 345 346 pronouns := "" 347 if record.Pronouns != nil { 348 pronouns = *record.Pronouns 349 } 350 351 location := "" 352 if record.Location != nil { 353 location = *record.Location 354 } 355 356 var links [5]string 357 for i, l := range record.Links { 358 if i < 5 { 359 links[i] = l 360 } 361 } 362 363 var stats [2]models.VanityStat 364 for i, s := range record.Stats { 365 if i < 2 { 366 stats[i].Kind = models.ParseVanityStatKind(s) 367 } 368 } 369 370 var pinned [6]syntax.ATURI 371 for i, r := range record.PinnedRepositories { 372 if i < 6 { 373 pinned[i] = syntax.ATURI(r) 374 } 375 } 376 377 profile := models.Profile{ 378 Did: did, 379 Avatar: avatar, 380 Description: description, 381 IncludeBluesky: includeBluesky, 382 Location: location, 383 Links: links, 384 Stats: stats, 385 PinnedRepos: pinned, 386 Pronouns: pronouns, 387 } 388 389 ddb, ok := i.Db.Execer.(*db.DB) 390 if !ok { 391 return fmt.Errorf("failed to index profile record, invalid db cast") 392 } 393 394 tx, err := ddb.Begin() 395 if err != nil { 396 return fmt.Errorf("failed to start transaction") 397 } 398 399 err = db.ValidateProfile(tx, &profile) 400 if err != nil { 401 return fmt.Errorf("invalid profile record") 402 } 403 404 err = db.UpsertProfile(tx, &profile) 405 case jmodels.CommitOperationDelete: 406 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 407 } 408 409 if err != nil { 410 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 411 } 412 413 return nil 414} 415 416func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 417 did := e.Did 418 var err error 419 420 l := i.Logger.With("handler", "ingestSpindleMember") 421 l = l.With("nsid", e.Commit.Collection) 422 423 switch e.Commit.Operation { 424 case jmodels.CommitOperationCreate: 425 raw := json.RawMessage(e.Commit.Record) 426 record := tangled.SpindleMember{} 427 err = json.Unmarshal(raw, &record) 428 if err != nil { 429 l.Error("invalid record", "err", err) 430 return err 431 } 432 433 // only spindle owner can invite to spindles 434 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 435 if err != nil || !ok { 436 return fmt.Errorf("failed to enforce permissions: %w", err) 437 } 438 439 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 440 if err != nil { 441 return err 442 } 443 444 if memberId.Handle.IsInvalidHandle() { 445 return err 446 } 447 448 ddb, ok := i.Db.Execer.(*db.DB) 449 if !ok { 450 return fmt.Errorf("invalid db cast") 451 } 452 453 err = db.AddSpindleMember(ddb, models.SpindleMember{ 454 Did: syntax.DID(did), 455 Rkey: e.Commit.RKey, 456 Instance: record.Instance, 457 Subject: memberId.DID, 458 }) 459 if !ok { 460 return fmt.Errorf("failed to add to db: %w", err) 461 } 462 463 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 464 if err != nil { 465 return fmt.Errorf("failed to update ACLs: %w", err) 466 } 467 468 l.Info("added spindle member") 469 case jmodels.CommitOperationDelete: 470 rkey := e.Commit.RKey 471 472 ddb, ok := i.Db.Execer.(*db.DB) 473 if !ok { 474 return fmt.Errorf("failed to index profile record, invalid db cast") 475 } 476 477 // get record from db first 478 members, err := db.GetSpindleMembers( 479 ddb, 480 orm.FilterEq("did", did), 481 orm.FilterEq("rkey", rkey), 482 ) 483 if err != nil || len(members) != 1 { 484 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 485 } 486 member := members[0] 487 488 tx, err := ddb.Begin() 489 if err != nil { 490 return fmt.Errorf("failed to start txn: %w", err) 491 } 492 493 // remove record by rkey && update enforcer 494 if err = db.RemoveSpindleMember( 495 tx, 496 orm.FilterEq("did", did), 497 orm.FilterEq("rkey", rkey), 498 ); err != nil { 499 return fmt.Errorf("failed to remove from db: %w", err) 500 } 501 502 // update enforcer 503 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 504 if err != nil { 505 return fmt.Errorf("failed to update ACLs: %w", err) 506 } 507 508 if err = tx.Commit(); err != nil { 509 return fmt.Errorf("failed to commit txn: %w", err) 510 } 511 512 if err = i.Enforcer.E.SavePolicy(); err != nil { 513 return fmt.Errorf("failed to save ACLs: %w", err) 514 } 515 516 l.Info("removed spindle member") 517 } 518 519 return nil 520} 521 522func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 523 did := e.Did 524 var err error 525 526 l := i.Logger.With("handler", "ingestSpindle") 527 l = l.With("nsid", e.Commit.Collection) 528 529 switch e.Commit.Operation { 530 case jmodels.CommitOperationCreate: 531 raw := json.RawMessage(e.Commit.Record) 532 record := tangled.Spindle{} 533 err = json.Unmarshal(raw, &record) 534 if err != nil { 535 l.Error("invalid record", "err", err) 536 return err 537 } 538 539 instance := e.Commit.RKey 540 541 ddb, ok := i.Db.Execer.(*db.DB) 542 if !ok { 543 return fmt.Errorf("failed to index profile record, invalid db cast") 544 } 545 546 err := db.AddSpindle(ddb, models.Spindle{ 547 Owner: syntax.DID(did), 548 Instance: instance, 549 }) 550 if err != nil { 551 l.Error("failed to add spindle to db", "err", err, "instance", instance) 552 return err 553 } 554 555 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 556 if err != nil { 557 l.Error("failed to add spindle to db", "err", err, "instance", instance) 558 return err 559 } 560 561 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 562 if err != nil { 563 return fmt.Errorf("failed to mark verified: %w", err) 564 } 565 566 return nil 567 568 case jmodels.CommitOperationDelete: 569 instance := e.Commit.RKey 570 571 ddb, ok := i.Db.Execer.(*db.DB) 572 if !ok { 573 return fmt.Errorf("failed to index profile record, invalid db cast") 574 } 575 576 // get record from db first 577 spindles, err := db.GetSpindles( 578 ddb, 579 orm.FilterEq("owner", did), 580 orm.FilterEq("instance", instance), 581 ) 582 if err != nil || len(spindles) != 1 { 583 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 584 } 585 spindle := spindles[0] 586 587 tx, err := ddb.Begin() 588 if err != nil { 589 return err 590 } 591 defer func() { 592 tx.Rollback() 593 i.Enforcer.E.LoadPolicy() 594 }() 595 596 // remove spindle members first 597 err = db.RemoveSpindleMember( 598 tx, 599 orm.FilterEq("owner", did), 600 orm.FilterEq("instance", instance), 601 ) 602 if err != nil { 603 return err 604 } 605 606 err = db.DeleteSpindle( 607 tx, 608 orm.FilterEq("owner", did), 609 orm.FilterEq("instance", instance), 610 ) 611 if err != nil { 612 return err 613 } 614 615 if spindle.Verified != nil { 616 err = i.Enforcer.RemoveSpindle(instance) 617 if err != nil { 618 return err 619 } 620 } 621 622 err = tx.Commit() 623 if err != nil { 624 return err 625 } 626 627 err = i.Enforcer.E.SavePolicy() 628 if err != nil { 629 return err 630 } 631 } 632 633 return nil 634} 635 636func (i *Ingester) ingestString(e *jmodels.Event) error { 637 did := e.Did 638 rkey := e.Commit.RKey 639 640 var err error 641 642 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 643 l.Info("ingesting record") 644 645 ddb, ok := i.Db.Execer.(*db.DB) 646 if !ok { 647 return fmt.Errorf("failed to index string record, invalid db cast") 648 } 649 650 switch e.Commit.Operation { 651 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 652 raw := json.RawMessage(e.Commit.Record) 653 record := tangled.String{} 654 err = json.Unmarshal(raw, &record) 655 if err != nil { 656 l.Error("invalid record", "err", err) 657 return err 658 } 659 660 string := models.StringFromRecord(did, rkey, record) 661 662 if err = i.Validator.ValidateString(&string); err != nil { 663 l.Error("invalid record", "err", err) 664 return err 665 } 666 667 if err = db.AddString(ddb, string); err != nil { 668 l.Error("failed to add string", "err", err) 669 return err 670 } 671 672 return nil 673 674 case jmodels.CommitOperationDelete: 675 if err := db.DeleteString( 676 ddb, 677 orm.FilterEq("did", did), 678 orm.FilterEq("rkey", rkey), 679 ); err != nil { 680 l.Error("failed to delete", "err", err) 681 return fmt.Errorf("failed to delete string record: %w", err) 682 } 683 684 return nil 685 } 686 687 return nil 688} 689 690func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 691 did := e.Did 692 var err error 693 694 l := i.Logger.With("handler", "ingestKnotMember") 695 l = l.With("nsid", e.Commit.Collection) 696 697 switch e.Commit.Operation { 698 case jmodels.CommitOperationCreate: 699 raw := json.RawMessage(e.Commit.Record) 700 record := tangled.KnotMember{} 701 err = json.Unmarshal(raw, &record) 702 if err != nil { 703 l.Error("invalid record", "err", err) 704 return err 705 } 706 707 // only knot owner can invite to knots 708 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 709 if err != nil || !ok { 710 return fmt.Errorf("failed to enforce permissions: %w", err) 711 } 712 713 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 714 if err != nil { 715 return err 716 } 717 718 if memberId.Handle.IsInvalidHandle() { 719 return err 720 } 721 722 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 723 if err != nil { 724 return fmt.Errorf("failed to update ACLs: %w", err) 725 } 726 727 l.Info("added knot member") 728 case jmodels.CommitOperationDelete: 729 // we don't store knot members in a table (like we do for spindle) 730 // and we can't remove this just yet. possibly fixed if we switch 731 // to either: 732 // 1. a knot_members table like with spindle and store the rkey 733 // 2. use the knot host as the rkey 734 // 735 // TODO: implement member deletion 736 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 737 } 738 739 return nil 740} 741 742func (i *Ingester) ingestKnot(e *jmodels.Event) error { 743 did := e.Did 744 var err error 745 746 l := i.Logger.With("handler", "ingestKnot") 747 l = l.With("nsid", e.Commit.Collection) 748 749 switch e.Commit.Operation { 750 case jmodels.CommitOperationCreate: 751 raw := json.RawMessage(e.Commit.Record) 752 record := tangled.Knot{} 753 err = json.Unmarshal(raw, &record) 754 if err != nil { 755 l.Error("invalid record", "err", err) 756 return err 757 } 758 759 domain := e.Commit.RKey 760 761 ddb, ok := i.Db.Execer.(*db.DB) 762 if !ok { 763 return fmt.Errorf("failed to index profile record, invalid db cast") 764 } 765 766 err := db.AddKnot(ddb, domain, did) 767 if err != nil { 768 l.Error("failed to add knot to db", "err", err, "domain", domain) 769 return err 770 } 771 772 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 773 if err != nil { 774 l.Error("failed to verify knot", "err", err, "domain", domain) 775 return err 776 } 777 778 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 779 if err != nil { 780 return fmt.Errorf("failed to mark verified: %w", err) 781 } 782 783 return nil 784 785 case jmodels.CommitOperationDelete: 786 domain := e.Commit.RKey 787 788 ddb, ok := i.Db.Execer.(*db.DB) 789 if !ok { 790 return fmt.Errorf("failed to index knot record, invalid db cast") 791 } 792 793 // get record from db first 794 registrations, err := db.GetRegistrations( 795 ddb, 796 orm.FilterEq("domain", domain), 797 orm.FilterEq("did", did), 798 ) 799 if err != nil { 800 return fmt.Errorf("failed to get registration: %w", err) 801 } 802 if len(registrations) != 1 { 803 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 804 } 805 registration := registrations[0] 806 807 tx, err := ddb.Begin() 808 if err != nil { 809 return err 810 } 811 defer func() { 812 tx.Rollback() 813 i.Enforcer.E.LoadPolicy() 814 }() 815 816 err = db.DeleteKnot( 817 tx, 818 orm.FilterEq("did", did), 819 orm.FilterEq("domain", domain), 820 ) 821 if err != nil { 822 return err 823 } 824 825 if registration.Registered != nil { 826 err = i.Enforcer.RemoveKnot(domain) 827 if err != nil { 828 return err 829 } 830 } 831 832 err = tx.Commit() 833 if err != nil { 834 return err 835 } 836 837 err = i.Enforcer.E.SavePolicy() 838 if err != nil { 839 return err 840 } 841 } 842 843 return nil 844} 845func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 846 did := e.Did 847 rkey := e.Commit.RKey 848 849 var err error 850 851 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 852 l.Info("ingesting record") 853 854 ddb, ok := i.Db.Execer.(*db.DB) 855 if !ok { 856 return fmt.Errorf("failed to index issue record, invalid db cast") 857 } 858 859 switch e.Commit.Operation { 860 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 861 raw := json.RawMessage(e.Commit.Record) 862 record := tangled.RepoIssue{} 863 err = json.Unmarshal(raw, &record) 864 if err != nil { 865 l.Error("invalid record", "err", err) 866 return err 867 } 868 869 issue := models.IssueFromRecord(did, rkey, record) 870 871 if issue.RepoDid == "" && issue.RepoAt == "" { 872 return fmt.Errorf("issue record has neither repo nor repoDid") 873 } 874 875 if err := i.Validator.ValidateIssue(&issue); err != nil { 876 return fmt.Errorf("failed to validate issue: %w", err) 877 } 878 879 if issue.RepoDid == "" && record.Repo != nil { 880 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 881 if repoErr == nil && repo.RepoDid != "" { 882 issue.RepoDid = repo.RepoDid 883 if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 884 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 885 } 886 } 887 } 888 889 tx, err := ddb.BeginTx(ctx, nil) 890 if err != nil { 891 l.Error("failed to begin transaction", "err", err) 892 return err 893 } 894 defer tx.Rollback() 895 896 err = db.PutIssue(tx, &issue) 897 if err != nil { 898 l.Error("failed to create issue", "err", err) 899 return err 900 } 901 902 err = tx.Commit() 903 if err != nil { 904 l.Error("failed to commit txn", "err", err) 905 return err 906 } 907 908 return nil 909 910 case jmodels.CommitOperationDelete: 911 tx, err := ddb.BeginTx(ctx, nil) 912 if err != nil { 913 l.Error("failed to begin transaction", "err", err) 914 return err 915 } 916 defer tx.Rollback() 917 918 if err := db.DeleteIssues( 919 tx, 920 did, 921 rkey, 922 ); err != nil { 923 l.Error("failed to delete", "err", err) 924 return fmt.Errorf("failed to delete issue record: %w", err) 925 } 926 if err := tx.Commit(); err != nil { 927 l.Error("failed to commit txn", "err", err) 928 return err 929 } 930 931 return nil 932 } 933 934 return nil 935} 936 937func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 938 did := e.Did 939 rkey := e.Commit.RKey 940 941 var err error 942 943 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 944 l.Info("ingesting record") 945 946 ddb, ok := i.Db.Execer.(*db.DB) 947 if !ok { 948 return fmt.Errorf("failed to index issue comment record, invalid db cast") 949 } 950 951 switch e.Commit.Operation { 952 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 953 raw := json.RawMessage(e.Commit.Record) 954 record := tangled.RepoIssueComment{} 955 err = json.Unmarshal(raw, &record) 956 if err != nil { 957 return fmt.Errorf("invalid record: %w", err) 958 } 959 960 comment, err := models.IssueCommentFromRecord(did, rkey, record) 961 if err != nil { 962 return fmt.Errorf("failed to parse comment from record: %w", err) 963 } 964 965 if err := i.Validator.ValidateIssueComment(comment); err != nil { 966 return fmt.Errorf("failed to validate comment: %w", err) 967 } 968 969 tx, err := ddb.Begin() 970 if err != nil { 971 return fmt.Errorf("failed to start transaction: %w", err) 972 } 973 defer tx.Rollback() 974 975 _, err = db.AddIssueComment(tx, *comment) 976 if err != nil { 977 return fmt.Errorf("failed to create issue comment: %w", err) 978 } 979 980 return tx.Commit() 981 982 case jmodels.CommitOperationDelete: 983 if err := db.DeleteIssueComments( 984 ddb, 985 orm.FilterEq("did", did), 986 orm.FilterEq("rkey", rkey), 987 ); err != nil { 988 return fmt.Errorf("failed to delete issue comment record: %w", err) 989 } 990 991 return nil 992 } 993 994 return nil 995} 996 997func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 998 did := e.Did 999 rkey := e.Commit.RKey 1000 1001 var err error 1002 1003 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1004 l.Info("ingesting record") 1005 1006 ddb, ok := i.Db.Execer.(*db.DB) 1007 if !ok { 1008 return fmt.Errorf("failed to index label definition, invalid db cast") 1009 } 1010 1011 switch e.Commit.Operation { 1012 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1013 raw := json.RawMessage(e.Commit.Record) 1014 record := tangled.LabelDefinition{} 1015 err = json.Unmarshal(raw, &record) 1016 if err != nil { 1017 return fmt.Errorf("invalid record: %w", err) 1018 } 1019 1020 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1021 if err != nil { 1022 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1023 } 1024 1025 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1026 return fmt.Errorf("failed to validate labeldef: %w", err) 1027 } 1028 1029 _, err = db.AddLabelDefinition(ddb, def) 1030 if err != nil { 1031 return fmt.Errorf("failed to create labeldef: %w", err) 1032 } 1033 1034 return nil 1035 1036 case jmodels.CommitOperationDelete: 1037 if err := db.DeleteLabelDefinition( 1038 ddb, 1039 orm.FilterEq("did", did), 1040 orm.FilterEq("rkey", rkey), 1041 ); err != nil { 1042 return fmt.Errorf("failed to delete labeldef record: %w", err) 1043 } 1044 1045 return nil 1046 } 1047 1048 return nil 1049} 1050 1051func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1052 did := e.Did 1053 rkey := e.Commit.RKey 1054 1055 var err error 1056 1057 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1058 l.Info("ingesting record") 1059 1060 ddb, ok := i.Db.Execer.(*db.DB) 1061 if !ok { 1062 return fmt.Errorf("failed to index label op, invalid db cast") 1063 } 1064 1065 switch e.Commit.Operation { 1066 case jmodels.CommitOperationCreate: 1067 raw := json.RawMessage(e.Commit.Record) 1068 record := tangled.LabelOp{} 1069 err = json.Unmarshal(raw, &record) 1070 if err != nil { 1071 return fmt.Errorf("invalid record: %w", err) 1072 } 1073 1074 subject := syntax.ATURI(record.Subject) 1075 collection := subject.Collection() 1076 1077 var repo *models.Repo 1078 switch collection { 1079 case tangled.RepoIssueNSID: 1080 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1081 if err != nil || len(i) != 1 { 1082 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1083 } 1084 repo = i[0].Repo 1085 default: 1086 return fmt.Errorf("unsupport label subject: %s", collection) 1087 } 1088 1089 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1090 if err != nil { 1091 return fmt.Errorf("failed to build label application ctx: %w", err) 1092 } 1093 1094 ops := models.LabelOpsFromRecord(did, rkey, record) 1095 1096 for _, o := range ops { 1097 def, ok := actx.Defs[o.OperandKey] 1098 if !ok { 1099 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1100 } 1101 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1102 return fmt.Errorf("failed to validate labelop: %w", err) 1103 } 1104 } 1105 1106 tx, err := ddb.Begin() 1107 if err != nil { 1108 return err 1109 } 1110 defer tx.Rollback() 1111 1112 for _, o := range ops { 1113 _, err = db.AddLabelOp(tx, &o) 1114 if err != nil { 1115 return fmt.Errorf("failed to add labelop: %w", err) 1116 } 1117 } 1118 1119 if err = tx.Commit(); err != nil { 1120 return err 1121 } 1122 } 1123 1124 return nil 1125}