Auto-indexing service and GraphQL API for AT Protocol Records
at main 1463 lines 40 kB view raw
1import atproto_car 2import database/executor.{type Executor} 3import database/repositories/actors 4import database/repositories/config as config_repo 5import database/repositories/lexicons 6import database/repositories/records 7import database/types.{type Record, Record} 8import envoy 9import gleam/bit_array 10import gleam/bytes_tree 11import gleam/dict 12import gleam/dynamic.{type Dynamic} 13import gleam/dynamic/decode 14import gleam/erlang/process.{type Subject} 15import gleam/hackney 16import gleam/http/request 17import gleam/http/response as gleam_http_response 18import gleam/int 19import gleam/json 20import gleam/list 21import gleam/option.{type Option, None, Some} 22import gleam/result 23import gleam/string 24import gleam/time/duration 25import gleam/time/timestamp 26import honk 27import honk/errors 28import lib/http_client 29import lib/oauth/did_cache 30import logging 31 32/// Opaque type for monotonic time 33pub type MonotonicTime 34 35/// Get current monotonic time for timing measurements 36@external(erlang, "backfill_ffi", "monotonic_now") 37fn monotonic_now() -> MonotonicTime 38 39/// Get elapsed milliseconds since a start time 40@external(erlang, "backfill_ffi", "elapsed_ms") 41fn elapsed_ms(start: MonotonicTime) -> Int 42 43/// ATP data resolved from DID 44pub type AtprotoData { 45 AtprotoData(did: String, handle: String, pds: String) 46} 47 48/// Configuration for backfill operations 49pub type BackfillConfig { 50 BackfillConfig( 51 plc_directory_url: String, 52 index_actors: Bool, 53 max_concurrent_per_pds: Int, 54 max_pds_workers: Int, 55 max_http_concurrent: Int, 56 repo_fetch_timeout_ms: Int, 57 did_cache: Option(Subject(did_cache.Message)), 58 ) 59} 60 61/// Creates a default backfill configuration 62pub fn default_config(db: Executor) -> BackfillConfig { 63 // Get PLC directory URL from database config 64 let plc_url = config_repo.get_plc_directory_url(db) 65 66 // Get max concurrent per PDS from environment or use default of 4 67 let max_pds_concurrent = case envoy.get("BACKFILL_PDS_CONCURRENCY") { 68 Ok(val) -> { 69 case int.parse(val) { 70 Ok(n) -> n 71 Error(_) -> 4 72 } 73 } 74 Error(_) -> 4 75 } 76 77 // Get max PDS workers from environment or use default of 10 78 let max_pds_workers = case envoy.get("BACKFILL_MAX_PDS_WORKERS") { 79 Ok(val) -> { 80 case int.parse(val) { 81 Ok(n) -> n 82 Error(_) -> 10 83 } 84 } 85 Error(_) -> 10 86 } 87 88 // Get max HTTP concurrent from environment or use default of 50 89 let max_http = case envoy.get("BACKFILL_MAX_HTTP_CONCURRENT") { 90 Ok(val) -> { 91 case int.parse(val) { 92 Ok(n) -> n 93 Error(_) -> 50 94 } 95 } 96 Error(_) -> 50 97 } 98 99 // Get repo fetch timeout from environment or use default of 60s 100 let repo_timeout = case envoy.get("BACKFILL_REPO_TIMEOUT") { 101 Ok(val) -> { 102 case int.parse(val) { 103 Ok(n) -> n * 1000 104 Error(_) -> 60_000 105 } 106 } 107 Error(_) -> 60_000 108 } 109 110 // Configure hackney pool with the configured HTTP limit 111 configure_hackney_pool(max_http) 112 113 BackfillConfig( 114 plc_directory_url: plc_url, 115 index_actors: True, 116 max_concurrent_per_pds: max_pds_concurrent, 117 max_pds_workers: max_pds_workers, 118 max_http_concurrent: max_http, 119 repo_fetch_timeout_ms: repo_timeout, 120 did_cache: None, 121 ) 122} 123 124/// Creates a backfill configuration with a DID cache 125pub fn config_with_cache( 126 cache: Subject(did_cache.Message), 127 db: Executor, 128) -> BackfillConfig { 129 let config = default_config(db) 130 BackfillConfig(..config, did_cache: Some(cache)) 131} 132 133/// Configure hackney connection pool with specified limits 134/// This initializes the HTTP semaphore for rate limiting 135@external(erlang, "backfill_ffi", "configure_pool") 136pub fn configure_hackney_pool(max_concurrent: Int) -> Nil 137 138/// Acquire a permit from the global HTTP semaphore 139/// Blocks if at the concurrent request limit (150) 140@external(erlang, "backfill_ffi", "acquire_permit") 141fn acquire_permit() -> Nil 142 143/// Release a permit back to the global HTTP semaphore 144@external(erlang, "backfill_ffi", "release_permit") 145fn release_permit() -> Nil 146 147/// Execute an HTTP request with semaphore-based rate limiting 148/// Acquires a permit before sending, releases after completion 149fn send_with_permit( 150 req: request.Request(String), 151) -> Result(gleam_http_response.Response(String), hackney.Error) { 152 acquire_permit() 153 let result = http_client.hackney_send(req) 154 release_permit() 155 result 156} 157 158/// Execute an HTTP request with permit, returning raw binary response 159/// Used for binary data like CAR files 160fn send_bits_with_permit( 161 req: request.Request(bytes_tree.BytesTree), 162) -> Result(gleam_http_response.Response(BitArray), hackney.Error) { 163 acquire_permit() 164 let result = http_client.hackney_send_bits(req) 165 release_permit() 166 result 167} 168 169/// Fetch a repo as a CAR file using com.atproto.sync.getRepo 170/// Returns raw CAR bytes 171fn fetch_repo_car(did: String, pds_url: String) -> Result(BitArray, String) { 172 let url = pds_url <> "/xrpc/com.atproto.sync.getRepo?did=" <> did 173 174 case request.to(url) { 175 Error(_) -> Error("Failed to create request for getRepo: " <> did) 176 Ok(req) -> { 177 // Convert to BytesTree body for send_bits (empty for GET request) 178 let bits_req = request.set_body(req, bytes_tree.new()) 179 180 case send_bits_with_permit(bits_req) { 181 Error(err) -> { 182 Error( 183 "Failed to fetch repo CAR for " 184 <> did 185 <> ": " 186 <> string.inspect(err), 187 ) 188 } 189 Ok(resp) -> { 190 case resp.status { 191 200 -> { 192 // Response body is already BitArray 193 Ok(resp.body) 194 } 195 status -> { 196 Error( 197 "getRepo failed for " 198 <> did 199 <> " (status: " 200 <> string.inspect(status) 201 <> ")", 202 ) 203 } 204 } 205 } 206 } 207 } 208 } 209} 210 211/// Convert a CAR record (with proper MST path) to a database Record type 212fn car_record_with_path_to_db_record( 213 car_record: atproto_car.RecordWithPath, 214 did: String, 215) -> Record { 216 let now = 217 timestamp.system_time() 218 |> timestamp.to_rfc3339(duration.seconds(0)) 219 220 Record( 221 uri: "at://" 222 <> did 223 <> "/" 224 <> car_record.collection 225 <> "/" 226 <> car_record.rkey, 227 cid: atproto_car.cid_to_string(car_record.cid), 228 did: did, 229 collection: car_record.collection, 230 json: atproto_car.record_to_json(car_record), 231 indexed_at: now, 232 rkey: car_record.rkey, 233 ) 234} 235 236/// Result of record validation 237type ValidationResult { 238 /// Record passed validation 239 Valid 240 /// Record failed schema validation - should be skipped 241 Invalid(String) 242 /// Could not parse JSON for validation - insert anyway (CBOR edge cases) 243 ParseError(String) 244} 245 246/// Validate a record against lexicon schemas using pre-built context 247fn validate_record( 248 ctx: honk.ValidationContext, 249 collection: String, 250 json_string: String, 251) -> ValidationResult { 252 case honk.parse_json_string(json_string) { 253 Error(e) -> 254 // JSON parse failure - likely CBOR edge case, allow record through 255 ParseError("Failed to parse record JSON: " <> errors.to_string(e)) 256 Ok(record_json) -> { 257 case honk.validate_record_with_context(ctx, collection, record_json) { 258 Ok(_) -> Valid 259 Error(e) -> Invalid(errors.to_string(e)) 260 } 261 } 262 } 263} 264 265/// Load lexicons and build validation context 266/// Returns None on error (validation will be skipped) 267fn load_validation_context(conn: Executor) -> Option(honk.ValidationContext) { 268 case lexicons.get_all(conn) { 269 Error(_) -> { 270 logging.log( 271 logging.Warning, 272 "[backfill] Failed to load lexicons, skipping validation", 273 ) 274 None 275 } 276 Ok(lexs) -> { 277 let lexicon_jsons = 278 lexs 279 |> list.filter_map(fn(lex) { 280 case honk.parse_json_string(lex.json) { 281 Ok(json_val) -> Ok(json_val) 282 Error(_) -> Error(Nil) 283 } 284 }) 285 // Build context once from all lexicons 286 case honk.build_validation_context(lexicon_jsons) { 287 Ok(ctx) -> Some(ctx) 288 Error(_) -> { 289 logging.log( 290 logging.Warning, 291 "[backfill] Failed to build validation context, skipping validation", 292 ) 293 None 294 } 295 } 296 } 297 } 298} 299 300/// Backfill a single repo using CAR file approach 301/// Fetches entire repo, parses CAR with MST walking, filters by collections, indexes records 302fn backfill_repo_car( 303 did: String, 304 pds_url: String, 305 collections: List(String), 306 conn: Executor, 307) -> Int { 308 // Load validation context - this path is used for single-repo backfills (e.g., new actor sync) 309 let validation_ctx = load_validation_context(conn) 310 backfill_repo_car_with_context( 311 did, 312 pds_url, 313 collections, 314 conn, 315 validation_ctx, 316 ) 317} 318 319/// Backfill a single repo with pre-built validation context 320fn backfill_repo_car_with_context( 321 did: String, 322 pds_url: String, 323 collections: List(String), 324 conn: Executor, 325 validation_ctx: Option(honk.ValidationContext), 326) -> Int { 327 let total_start = monotonic_now() 328 329 // Phase 1: Fetch 330 let fetch_start = monotonic_now() 331 case fetch_repo_car(did, pds_url) { 332 Ok(car_bytes) -> { 333 let fetch_ms = elapsed_ms(fetch_start) 334 let car_size = bit_array.byte_size(car_bytes) 335 336 // Phase 2: Parse CAR and walk MST 337 let parse_start = monotonic_now() 338 case atproto_car.extract_records_with_paths(car_bytes, collections) { 339 Ok(car_records) -> { 340 let parse_ms = elapsed_ms(parse_start) 341 342 // Phase 3: Convert and validate 343 let validate_start = monotonic_now() 344 let #(db_records, invalid_count) = 345 car_records 346 |> list.fold(#([], 0), fn(acc, r) { 347 let #(records, invalids) = acc 348 let db_record = car_record_with_path_to_db_record(r, did) 349 case validation_ctx { 350 None -> #([db_record, ..records], invalids) 351 Some(ctx) -> { 352 case validate_record(ctx, r.collection, db_record.json) { 353 Valid -> #([db_record, ..records], invalids) 354 ParseError(_) -> #([db_record, ..records], invalids) 355 Invalid(msg) -> { 356 logging.log( 357 logging.Debug, 358 "[backfill] Invalid record " 359 <> r.collection 360 <> "/" 361 <> r.rkey 362 <> ": " 363 <> msg, 364 ) 365 #(records, invalids + 1) 366 } 367 } 368 } 369 } 370 }) 371 let validate_ms = elapsed_ms(validate_start) 372 373 // Phase 4: Insert into database 374 let insert_start = monotonic_now() 375 index_records(db_records, conn) 376 let insert_ms = elapsed_ms(insert_start) 377 378 let count = list.length(db_records) 379 let total_ms = elapsed_ms(total_start) 380 381 // Log summary at debug level (detailed per-repo timing) 382 logging.log( 383 logging.Debug, 384 "[backfill] " 385 <> did 386 <> " fetch=" 387 <> int.to_string(fetch_ms) 388 <> "ms parse=" 389 <> int.to_string(parse_ms) 390 <> "ms validate=" 391 <> int.to_string(validate_ms) 392 <> "ms insert=" 393 <> int.to_string(insert_ms) 394 <> "ms total=" 395 <> int.to_string(total_ms) 396 <> "ms records=" 397 <> int.to_string(count) 398 <> " invalid=" 399 <> int.to_string(invalid_count) 400 <> " size=" 401 <> int.to_string(car_size), 402 ) 403 count 404 } 405 Error(err) -> { 406 logging.log( 407 logging.Warning, 408 "[backfill] CAR parse error for " 409 <> did 410 <> ": " 411 <> string.inspect(err), 412 ) 413 0 414 } 415 } 416 } 417 Error(err) -> { 418 logging.log(logging.Warning, "[backfill] " <> err) 419 0 420 } 421 } 422} 423 424/// Check if an NSID matches the configured domain authority 425/// NSID format is like "com.example.post" where "com.example" is the authority 426pub fn nsid_matches_domain_authority( 427 nsid: String, 428 domain_authority: String, 429) -> Bool { 430 // NSID format: authority.name (e.g., "com.example.post") 431 // We need to check if the NSID starts with the domain authority 432 string.starts_with(nsid, domain_authority <> ".") 433} 434 435/// Get local and external collection IDs from configured lexicons 436/// Returns #(local_collection_ids, external_collection_ids) 437pub fn get_collection_ids(conn: Executor) -> #(List(String), List(String)) { 438 let domain_authority = case config_repo.get(conn, "domain_authority") { 439 Ok(authority) -> authority 440 Error(_) -> "" 441 } 442 443 case lexicons.get_record_types(conn) { 444 Ok(lexicon_list) -> { 445 let #(local, external) = 446 list.partition(lexicon_list, fn(lex) { 447 nsid_matches_domain_authority(lex.id, domain_authority) 448 }) 449 #( 450 list.map(local, fn(lex) { lex.id }), 451 list.map(external, fn(lex) { lex.id }), 452 ) 453 } 454 Error(_) -> #([], []) 455 } 456} 457 458/// Resolve a DID to get ATP data (PDS endpoint and handle) 459pub fn resolve_did(did: String, plc_url: String) -> Result(AtprotoData, String) { 460 // Check if this is a did:web DID 461 case string.starts_with(did, "did:web:") { 462 True -> resolve_did_web(did) 463 False -> resolve_did_plc(did, plc_url) 464 } 465} 466 467/// Resolve a DID with caching support 468/// Uses the did_cache if provided, otherwise falls back to direct resolution 469pub fn resolve_did_cached( 470 did: String, 471 plc_url: String, 472 cache: Option(Subject(did_cache.Message)), 473) -> Result(AtprotoData, String) { 474 case cache { 475 Some(c) -> { 476 // Try to get from cache first 477 case did_cache.get(c, did) { 478 Ok(cached_json) -> { 479 // Parse cached ATP data 480 case parse_cached_atp_data(cached_json) { 481 Ok(data) -> Ok(data) 482 Error(_) -> { 483 // Cache had invalid data, resolve fresh 484 resolve_and_cache(did, plc_url, c) 485 } 486 } 487 } 488 Error(_) -> { 489 // Not in cache, resolve and cache 490 resolve_and_cache(did, plc_url, c) 491 } 492 } 493 } 494 None -> resolve_did(did, plc_url) 495 } 496} 497 498/// Resolve DID and store in cache 499fn resolve_and_cache( 500 did: String, 501 plc_url: String, 502 cache: Subject(did_cache.Message), 503) -> Result(AtprotoData, String) { 504 case resolve_did(did, plc_url) { 505 Ok(data) -> { 506 // Cache for 1 hour (3600 seconds) 507 let cached_json = encode_atp_data(data) 508 did_cache.put(cache, did, cached_json, 3600) 509 Ok(data) 510 } 511 err -> err 512 } 513} 514 515/// Encode AtprotoData to JSON for caching 516fn encode_atp_data(data: AtprotoData) -> String { 517 json.object([ 518 #("did", json.string(data.did)), 519 #("handle", json.string(data.handle)), 520 #("pds", json.string(data.pds)), 521 ]) 522 |> json.to_string 523} 524 525/// Parse AtprotoData from cached JSON 526fn parse_cached_atp_data(json_str: String) -> Result(AtprotoData, String) { 527 let decoder = { 528 use did <- decode.field("did", decode.string) 529 use handle <- decode.field("handle", decode.string) 530 use pds <- decode.field("pds", decode.string) 531 decode.success(AtprotoData(did: did, handle: handle, pds: pds)) 532 } 533 534 json.parse(json_str, decoder) 535 |> result.map_error(fn(_) { "Failed to parse cached ATP data" }) 536} 537 538/// Resolve a did:web DID by fetching the DID document from HTTPS 539fn resolve_did_web(did: String) -> Result(AtprotoData, String) { 540 // Extract the domain from did:web:example.com 541 // did:web format: did:web:<domain>[:<port>][:<path>] 542 let parts = string.split(did, ":") 543 case parts { 544 ["did", "web", domain, ..rest] -> { 545 // Build the URL: https://<domain>/.well-known/did.json 546 // If there are additional path components, they go before /.well-known/did.json 547 let base_domain = case rest { 548 [] -> domain 549 path_parts -> domain <> "/" <> string.join(path_parts, "/") 550 } 551 let url = "https://" <> base_domain <> "/.well-known/did.json" 552 553 case request.to(url) { 554 Error(_) -> Error("Failed to create request for did:web DID: " <> did) 555 Ok(req) -> { 556 case send_with_permit(req) { 557 Error(_) -> Error("Failed to fetch did:web DID data for: " <> did) 558 Ok(resp) -> { 559 case resp.status { 560 200 -> parse_atproto_data(resp.body, did) 561 _ -> 562 Error( 563 "Failed to resolve DID " 564 <> did 565 <> " (status: " 566 <> string.inspect(resp.status) 567 <> ")", 568 ) 569 } 570 } 571 } 572 } 573 } 574 } 575 _ -> Error("Invalid did:web format: " <> did) 576 } 577} 578 579/// Resolve a did:plc DID through the PLC directory 580fn resolve_did_plc(did: String, plc_url: String) -> Result(AtprotoData, String) { 581 let url = plc_url <> "/" <> did 582 583 case request.to(url) { 584 Error(_) -> Error("Failed to create request for DID: " <> did) 585 Ok(req) -> { 586 case send_with_permit(req) { 587 Error(_) -> Error("Failed to fetch DID data for: " <> did) 588 Ok(resp) -> { 589 case resp.status { 590 200 -> parse_atproto_data(resp.body, did) 591 _ -> 592 Error( 593 "Failed to resolve DID " 594 <> did 595 <> " (status: " 596 <> string.inspect(resp.status) 597 <> ")", 598 ) 599 } 600 } 601 } 602 } 603 } 604} 605 606/// Parse ATP data from PLC directory response 607fn parse_atproto_data(body: String, did: String) -> Result(AtprotoData, String) { 608 // Simple decoder that extracts service and alsoKnownAs arrays 609 let decoder = { 610 use service_list <- decode.field( 611 "service", 612 decode.optional(decode.list(decode.dynamic)), 613 ) 614 use handle_list <- decode.field( 615 "alsoKnownAs", 616 decode.optional(decode.list(decode.string)), 617 ) 618 decode.success(#(service_list, handle_list)) 619 } 620 621 case json.parse(body, decoder) { 622 Error(_) -> Error("Failed to parse ATP data for DID: " <> did) 623 Ok(#(service_list_opt, handle_list_opt)) -> { 624 // Extract PDS endpoint from service list 625 let pds = case service_list_opt { 626 Some(service_list) -> 627 service_list 628 |> list.find_map(fn(service_dyn) { 629 // Try to extract the service endpoint 630 let service_decoder = { 631 use service_type <- decode.field("type", decode.string) 632 use endpoint <- decode.field("serviceEndpoint", decode.string) 633 decode.success(#(service_type, endpoint)) 634 } 635 636 case decode.run(service_dyn, service_decoder) { 637 Ok(#("AtprotoPersonalDataServer", endpoint)) -> Ok(endpoint) 638 _ -> Error(Nil) 639 } 640 }) 641 |> result.unwrap("https://bsky.social") 642 None -> "https://bsky.social" 643 } 644 645 // Extract handle from alsoKnownAs 646 let handle = case handle_list_opt { 647 Some(handle_list) -> 648 handle_list 649 |> list.find(fn(h) { string.starts_with(h, "at://") }) 650 |> result.map(fn(h) { string.replace(h, "at://", "") }) 651 |> result.unwrap(did) 652 None -> did 653 } 654 655 Ok(AtprotoData(did: did, handle: handle, pds: pds)) 656 } 657 } 658} 659 660/// Worker function that resolves a DID and sends result back 661fn resolve_did_worker( 662 did: String, 663 plc_url: String, 664 cache: Option(Subject(did_cache.Message)), 665 reply_to: Subject(Result(AtprotoData, Nil)), 666) -> Nil { 667 let result = case resolve_did_cached(did, plc_url, cache) { 668 Ok(atp_data) -> Ok(atp_data) 669 Error(err) -> { 670 logging.log( 671 logging.Error, 672 "[backfill] Error resolving DID " <> did <> ": " <> err, 673 ) 674 Error(Nil) 675 } 676 } 677 process.send(reply_to, result) 678} 679 680/// Get ATP data for a list of repos (DIDs) - fully concurrent version 681pub fn get_atp_data_for_repos( 682 repos: List(String), 683 config: BackfillConfig, 684) -> List(AtprotoData) { 685 // Spawn all workers at once - Erlang VM can handle it 686 let subject = process.new_subject() 687 let repo_count = list.length(repos) 688 689 // Spawn all workers concurrently 690 let _workers = 691 repos 692 |> list.map(fn(repo) { 693 process.spawn_unlinked(fn() { 694 resolve_did_worker( 695 repo, 696 config.plc_directory_url, 697 config.did_cache, 698 subject, 699 ) 700 }) 701 }) 702 703 // Collect results from all workers 704 list.range(1, repo_count) 705 |> list.filter_map(fn(_) { 706 case process.receive(subject, 30_000) { 707 Ok(result) -> result 708 Error(_) -> Error(Nil) 709 } 710 }) 711} 712 713/// CAR-based worker that fetches entire repo and filters by collections 714fn fetch_repo_car_worker( 715 repo: String, 716 pds: String, 717 collections: List(String), 718 conn: Executor, 719 validation_ctx: Option(honk.ValidationContext), 720 reply_to: Subject(Int), 721) -> Nil { 722 // Wrap in rescue to catch any crashes 723 let count = rescue_car_backfill(repo, pds, collections, conn, validation_ctx) 724 process.send(reply_to, count) 725} 726 727/// Wrapper to catch crashes in CAR backfill 728fn rescue_car_backfill( 729 repo: String, 730 pds: String, 731 collections: List(String), 732 conn: Executor, 733 validation_ctx: Option(honk.ValidationContext), 734) -> Int { 735 case 736 rescue(fn() { 737 backfill_repo_car_with_context( 738 repo, 739 pds, 740 collections, 741 conn, 742 validation_ctx, 743 ) 744 }) 745 { 746 Ok(count) -> count 747 Error(err) -> { 748 logging.log( 749 logging.Error, 750 "[backfill] CAR worker crashed for " 751 <> repo 752 <> ": " 753 <> string.inspect(err), 754 ) 755 0 756 } 757 } 758} 759 760/// Rescue wrapper - catches exceptions 761@external(erlang, "backfill_ffi", "rescue") 762pub fn rescue(f: fn() -> a) -> Result(a, Dynamic) 763 764/// CAR-based PDS worker - fetches each repo as CAR and filters locally 765fn pds_worker_car( 766 pds_url: String, 767 repos: List(String), 768 collections: List(String), 769 max_concurrent: Int, 770 conn: Executor, 771 validation_ctx: Option(honk.ValidationContext), 772 timeout_ms: Int, 773 reply_to: Subject(Int), 774) -> Nil { 775 logging.log( 776 logging.Debug, 777 "[backfill] PDS worker starting for " 778 <> pds_url 779 <> " with " 780 <> int.to_string(list.length(repos)) 781 <> " repos", 782 ) 783 let subject = process.new_subject() 784 785 // Start initial batch of workers 786 let #(initial_repos, remaining_repos) = list.split(repos, max_concurrent) 787 let initial_count = list.length(initial_repos) 788 789 // Spawn initial workers 790 list.each(initial_repos, fn(repo) { 791 process.spawn_unlinked(fn() { 792 fetch_repo_car_worker( 793 repo, 794 pds_url, 795 collections, 796 conn, 797 validation_ctx, 798 subject, 799 ) 800 }) 801 }) 802 803 // Process with sliding window 804 let total_count = 805 sliding_window_car( 806 remaining_repos, 807 subject, 808 initial_count, 809 pds_url, 810 collections, 811 conn, 812 validation_ctx, 813 timeout_ms, 814 0, 815 ) 816 817 logging.log( 818 logging.Debug, 819 "[backfill] PDS worker finished for " 820 <> pds_url 821 <> " with " 822 <> int.to_string(total_count) 823 <> " total records", 824 ) 825 process.send(reply_to, total_count) 826} 827 828/// Sliding window for CAR-based processing 829fn sliding_window_car( 830 remaining: List(String), 831 subject: Subject(Int), 832 in_flight: Int, 833 pds_url: String, 834 collections: List(String), 835 conn: Executor, 836 validation_ctx: Option(honk.ValidationContext), 837 timeout_ms: Int, 838 total: Int, 839) -> Int { 840 case in_flight { 841 0 -> total 842 _ -> { 843 case process.receive(subject, timeout_ms) { 844 Ok(count) -> { 845 let new_total = total + count 846 case remaining { 847 [next, ..rest] -> { 848 process.spawn_unlinked(fn() { 849 fetch_repo_car_worker( 850 next, 851 pds_url, 852 collections, 853 conn, 854 validation_ctx, 855 subject, 856 ) 857 }) 858 sliding_window_car( 859 rest, 860 subject, 861 in_flight, 862 pds_url, 863 collections, 864 conn, 865 validation_ctx, 866 timeout_ms, 867 new_total, 868 ) 869 } 870 [] -> 871 sliding_window_car( 872 [], 873 subject, 874 in_flight - 1, 875 pds_url, 876 collections, 877 conn, 878 validation_ctx, 879 timeout_ms, 880 new_total, 881 ) 882 } 883 } 884 Error(_) -> { 885 logging.log( 886 logging.Warning, 887 "[backfill] Timeout waiting for CAR worker on " 888 <> pds_url 889 <> " (in_flight: " 890 <> int.to_string(in_flight) 891 <> ", remaining: " 892 <> int.to_string(list.length(remaining)) 893 <> ")", 894 ) 895 sliding_window_car( 896 remaining, 897 subject, 898 in_flight - 1, 899 pds_url, 900 collections, 901 conn, 902 validation_ctx, 903 timeout_ms, 904 total, 905 ) 906 } 907 } 908 } 909 } 910} 911 912/// Sliding window for PDS worker processing 913/// Limits how many PDS endpoints are processed concurrently 914fn sliding_window_pds( 915 remaining: List(#(String, List(#(String, String)))), 916 subject: Subject(Int), 917 in_flight: Int, 918 collections: List(String), 919 max_concurrent_per_pds: Int, 920 conn: Executor, 921 validation_ctx: Option(honk.ValidationContext), 922 timeout_ms: Int, 923 total: Int, 924 pds_count: Int, 925 completed: Int, 926) -> Int { 927 case in_flight { 928 0 -> total 929 _ -> { 930 // 5 minute timeout per PDS worker 931 case process.receive(subject, 300_000) { 932 Ok(count) -> { 933 let new_total = total + count 934 let new_completed = completed + 1 935 logging.log( 936 logging.Info, 937 "[backfill] PDS worker " 938 <> int.to_string(new_completed) 939 <> "/" 940 <> int.to_string(pds_count) 941 <> " done (" 942 <> int.to_string(count) 943 <> " records)", 944 ) 945 case remaining { 946 [#(pds_url, repo_pairs), ..rest] -> { 947 let pds_repos = 948 repo_pairs 949 |> list.map(fn(pair) { 950 let #(_pds, repo) = pair 951 repo 952 }) 953 process.spawn_unlinked(fn() { 954 pds_worker_car( 955 pds_url, 956 pds_repos, 957 collections, 958 max_concurrent_per_pds, 959 conn, 960 validation_ctx, 961 timeout_ms, 962 subject, 963 ) 964 }) 965 sliding_window_pds( 966 rest, 967 subject, 968 in_flight, 969 collections, 970 max_concurrent_per_pds, 971 conn, 972 validation_ctx, 973 timeout_ms, 974 new_total, 975 pds_count, 976 new_completed, 977 ) 978 } 979 [] -> 980 sliding_window_pds( 981 [], 982 subject, 983 in_flight - 1, 984 collections, 985 max_concurrent_per_pds, 986 conn, 987 validation_ctx, 988 timeout_ms, 989 new_total, 990 pds_count, 991 new_completed, 992 ) 993 } 994 } 995 Error(_) -> { 996 logging.log( 997 logging.Warning, 998 "[backfill] PDS worker timed out (in_flight: " 999 <> int.to_string(in_flight) 1000 <> ", remaining: " 1001 <> int.to_string(list.length(remaining)) 1002 <> ")", 1003 ) 1004 sliding_window_pds( 1005 remaining, 1006 subject, 1007 in_flight - 1, 1008 collections, 1009 max_concurrent_per_pds, 1010 conn, 1011 validation_ctx, 1012 timeout_ms, 1013 total, 1014 pds_count, 1015 completed, 1016 ) 1017 } 1018 } 1019 } 1020 } 1021} 1022 1023/// CAR-based streaming: fetch repos as CAR files and filter locally 1024/// One request per repo instead of one per (repo, collection) 1025pub fn get_records_for_repos_car( 1026 repos: List(String), 1027 collections: List(String), 1028 atp_data: List(AtprotoData), 1029 config: BackfillConfig, 1030 conn: Executor, 1031) -> Int { 1032 // Build validation context ONCE for all repos 1033 let validation_ctx = load_validation_context(conn) 1034 1035 // Group repos by PDS 1036 let repos_by_pds = 1037 repos 1038 |> list.filter_map(fn(repo) { 1039 case list.find(atp_data, fn(data) { data.did == repo }) { 1040 Error(_) -> { 1041 logging.log( 1042 logging.Error, 1043 "[backfill] No ATP data found for repo: " <> repo, 1044 ) 1045 Error(Nil) 1046 } 1047 Ok(data) -> Ok(#(data.pds, repo)) 1048 } 1049 }) 1050 |> list.group(fn(pair) { 1051 let #(pds, _repo) = pair 1052 pds 1053 }) 1054 1055 let pds_entries = dict.to_list(repos_by_pds) 1056 let pds_count = list.length(pds_entries) 1057 1058 logging.log( 1059 logging.Info, 1060 "[backfill] Processing " 1061 <> int.to_string(pds_count) 1062 <> " PDS endpoints (max " 1063 <> int.to_string(config.max_pds_workers) 1064 <> " concurrent)...", 1065 ) 1066 1067 // Use sliding window to limit concurrent PDS workers 1068 let subject = process.new_subject() 1069 let #(initial_pds, remaining_pds) = 1070 list.split(pds_entries, config.max_pds_workers) 1071 let initial_count = list.length(initial_pds) 1072 1073 // Spawn initial batch of PDS workers 1074 list.each(initial_pds, fn(pds_entry) { 1075 let #(pds_url, repo_pairs) = pds_entry 1076 let pds_repos = 1077 repo_pairs 1078 |> list.map(fn(pair) { 1079 let #(_pds, repo) = pair 1080 repo 1081 }) 1082 1083 process.spawn_unlinked(fn() { 1084 pds_worker_car( 1085 pds_url, 1086 pds_repos, 1087 collections, 1088 config.max_concurrent_per_pds, 1089 conn, 1090 validation_ctx, 1091 config.repo_fetch_timeout_ms, 1092 subject, 1093 ) 1094 }) 1095 }) 1096 1097 // Process remaining with sliding window 1098 let result = 1099 sliding_window_pds( 1100 remaining_pds, 1101 subject, 1102 initial_count, 1103 collections, 1104 config.max_concurrent_per_pds, 1105 conn, 1106 validation_ctx, 1107 config.repo_fetch_timeout_ms, 1108 0, 1109 pds_count, 1110 0, 1111 ) 1112 1113 logging.log( 1114 logging.Info, 1115 "[backfill] All PDS workers complete, total: " 1116 <> int.to_string(result) 1117 <> " records", 1118 ) 1119 result 1120} 1121 1122/// Index records into the database using batch inserts 1123pub fn index_records(records: List(Record), conn: Executor) -> Nil { 1124 case records.batch_insert(conn, records) { 1125 Ok(_) -> Nil 1126 Error(err) -> { 1127 logging.log( 1128 logging.Error, 1129 "[backfill] Failed to batch insert records: " <> string.inspect(err), 1130 ) 1131 } 1132 } 1133} 1134 1135/// Index actors into the database using batch upsert for efficiency 1136pub fn index_actors(atp_data: List(AtprotoData), conn: Executor) -> Nil { 1137 // Convert AtprotoData to tuples for batch upsert 1138 let actor_tuples = 1139 atp_data 1140 |> list.map(fn(data) { #(data.did, data.handle) }) 1141 1142 case actors.batch_upsert(conn, actor_tuples) { 1143 Ok(_) -> Nil 1144 Error(err) -> { 1145 logging.log( 1146 logging.Error, 1147 "[backfill] Failed to batch upsert actors: " <> string.inspect(err), 1148 ) 1149 } 1150 } 1151} 1152 1153/// Backfill all collections (primary and external) for a newly discovered actor 1154/// This is called when a new actor is created via Jetstream or GraphQL mutations 1155pub fn backfill_collections_for_actor( 1156 db: Executor, 1157 did: String, 1158 collection_ids: List(String), 1159 external_collection_ids: List(String), 1160 plc_url: String, 1161) -> Nil { 1162 // Ensure HTTP semaphore is initialized (may not be if called outside normal backfill flow) 1163 configure_hackney_pool(150) 1164 1165 let all_collections = list.append(collection_ids, external_collection_ids) 1166 let total_count = list.length(all_collections) 1167 1168 logging.log( 1169 logging.Info, 1170 "[backfill] Starting background sync for new actor: " 1171 <> did 1172 <> " (" 1173 <> string.inspect(total_count) 1174 <> " collections: " 1175 <> string.inspect(list.length(collection_ids)) 1176 <> " primary + " 1177 <> string.inspect(list.length(external_collection_ids)) 1178 <> " external)", 1179 ) 1180 1181 // Resolve DID to get PDS endpoint 1182 case resolve_did(did, plc_url) { 1183 Ok(atp_data) -> { 1184 // Use CAR-based approach - single request gets all collections 1185 let total_records = 1186 backfill_repo_car(did, atp_data.pds, all_collections, db) 1187 1188 logging.log( 1189 logging.Info, 1190 "[backfill] Completed sync for " 1191 <> did 1192 <> " (" 1193 <> string.inspect(total_records) 1194 <> " total records)", 1195 ) 1196 } 1197 Error(err) -> { 1198 logging.log( 1199 logging.Error, 1200 "[backfill] Failed to resolve DID for backfill: " <> did <> " - " <> err, 1201 ) 1202 } 1203 } 1204} 1205 1206/// Fetch repos that have records for a specific collection from the relay with pagination 1207fn fetch_repos_for_collection( 1208 collection: String, 1209 db: Executor, 1210) -> Result(List(String), String) { 1211 fetch_repos_paginated(collection, None, [], db) 1212} 1213 1214/// Helper function for paginated repo fetching 1215fn fetch_repos_paginated( 1216 collection: String, 1217 cursor: Option(String), 1218 acc: List(String), 1219 db: Executor, 1220) -> Result(List(String), String) { 1221 // Get relay URL from database config 1222 let relay_url = config_repo.get_relay_url(db) 1223 1224 // Build URL with large limit and cursor 1225 let base_url = 1226 relay_url 1227 <> "/xrpc/com.atproto.sync.listReposByCollection?collection=" 1228 <> collection 1229 <> "&limit=1000" 1230 1231 let url = case cursor { 1232 Some(c) -> base_url <> "&cursor=" <> c 1233 None -> base_url 1234 } 1235 1236 case request.to(url) { 1237 Error(_) -> Error("Failed to create request for collection: " <> collection) 1238 Ok(req) -> { 1239 case send_with_permit(req) { 1240 Error(_) -> 1241 Error("Failed to fetch repos for collection: " <> collection) 1242 Ok(resp) -> { 1243 case resp.status { 1244 200 -> { 1245 case parse_repos_response(resp.body) { 1246 Ok(#(repos, next_cursor)) -> { 1247 let new_acc = list.append(acc, repos) 1248 case next_cursor { 1249 Some(c) -> 1250 fetch_repos_paginated(collection, Some(c), new_acc, db) 1251 None -> { 1252 logging.log( 1253 logging.Info, 1254 "[backfill] Found " 1255 <> string.inspect(list.length(new_acc)) 1256 <> " total repositories for collection \"" 1257 <> collection 1258 <> "\"", 1259 ) 1260 Ok(new_acc) 1261 } 1262 } 1263 } 1264 Error(err) -> Error(err) 1265 } 1266 } 1267 _ -> 1268 Error( 1269 "Failed to fetch repos for collection " 1270 <> collection 1271 <> " (status: " 1272 <> string.inspect(resp.status) 1273 <> ")", 1274 ) 1275 } 1276 } 1277 } 1278 } 1279 } 1280} 1281 1282/// Parse the response from com.atproto.sync.listReposByCollection 1283fn parse_repos_response( 1284 body: String, 1285) -> Result(#(List(String), Option(String)), String) { 1286 let decoder = { 1287 use repos <- decode.field( 1288 "repos", 1289 decode.list({ 1290 use did <- decode.field("did", decode.string) 1291 decode.success(did) 1292 }), 1293 ) 1294 decode.success(repos) 1295 } 1296 1297 // Parse repos first 1298 case json.parse(body, decoder) { 1299 Error(_) -> Error("Failed to parse repos response") 1300 Ok(repos) -> { 1301 // Try to extract cursor separately 1302 let cursor_decoder = { 1303 use cursor <- decode.field("cursor", decode.optional(decode.string)) 1304 decode.success(cursor) 1305 } 1306 1307 let cursor = case json.parse(body, cursor_decoder) { 1308 Ok(c) -> c 1309 Error(_) -> None 1310 } 1311 1312 Ok(#(repos, cursor)) 1313 } 1314 } 1315} 1316 1317/// Main backfill function - backfill collections for specified repos 1318pub fn backfill_collections( 1319 repos: List(String), 1320 collections: List(String), 1321 external_collections: List(String), 1322 config: BackfillConfig, 1323 conn: Executor, 1324) -> Nil { 1325 logging.log(logging.Info, "") 1326 logging.log(logging.Info, "[backfill] Starting backfill operation") 1327 1328 case collections { 1329 [] -> { 1330 logging.log( 1331 logging.Error, 1332 "[backfill] No collections specified for backfill", 1333 ) 1334 Nil 1335 } 1336 _ -> { 1337 logging.log( 1338 logging.Info, 1339 "[backfill] Processing " 1340 <> string.inspect(list.length(collections)) 1341 <> " collections: " 1342 <> string.join(collections, ", "), 1343 ) 1344 1345 run_backfill(repos, collections, external_collections, config, conn) 1346 } 1347 } 1348} 1349 1350fn run_backfill( 1351 repos: List(String), 1352 collections: List(String), 1353 external_collections: List(String), 1354 config: BackfillConfig, 1355 conn: Executor, 1356) -> Nil { 1357 case external_collections { 1358 [] -> Nil 1359 _ -> 1360 logging.log( 1361 logging.Info, 1362 "[backfill] Including " 1363 <> string.inspect(list.length(external_collections)) 1364 <> " external collections: " 1365 <> string.join(external_collections, ", "), 1366 ) 1367 } 1368 1369 // Determine which repos to use 1370 let all_repos = case repos { 1371 [] -> { 1372 // Fetch repos for all collections from the relay 1373 logging.log( 1374 logging.Info, 1375 "[backfill] Fetching repositories for collections...", 1376 ) 1377 let fetched_repos = 1378 collections 1379 |> list.filter_map(fn(collection) { 1380 case fetch_repos_for_collection(collection, conn) { 1381 Ok(repos) -> Ok(repos) 1382 Error(err) -> { 1383 logging.log(logging.Error, "[backfill] " <> err) 1384 Error(Nil) 1385 } 1386 } 1387 }) 1388 |> list.flatten 1389 |> list.unique 1390 1391 logging.log( 1392 logging.Info, 1393 "[backfill] Processing " 1394 <> string.inspect(list.length(fetched_repos)) 1395 <> " unique repositories", 1396 ) 1397 fetched_repos 1398 } 1399 provided_repos -> { 1400 logging.log( 1401 logging.Info, 1402 "[backfill] Using " 1403 <> string.inspect(list.length(provided_repos)) 1404 <> " provided repositories", 1405 ) 1406 provided_repos 1407 } 1408 } 1409 1410 // Get ATP data for all repos 1411 logging.log(logging.Info, "[backfill] Resolving ATP data for repositories...") 1412 let atp_data = get_atp_data_for_repos(all_repos, config) 1413 logging.log( 1414 logging.Info, 1415 "[backfill] Resolved ATP data for " 1416 <> string.inspect(list.length(atp_data)) 1417 <> "/" 1418 <> string.inspect(list.length(all_repos)) 1419 <> " repositories", 1420 ) 1421 1422 // Index actors first (if enabled in config) 1423 case config.index_actors { 1424 True -> { 1425 logging.log(logging.Info, "[backfill] Indexing actors...") 1426 index_actors(atp_data, conn) 1427 logging.log( 1428 logging.Info, 1429 "[backfill] Indexed " 1430 <> string.inspect(list.length(atp_data)) 1431 <> " actors", 1432 ) 1433 } 1434 False -> 1435 logging.log( 1436 logging.Info, 1437 "[backfill] Skipping actor indexing (disabled in config)", 1438 ) 1439 } 1440 1441 // Combine main and external collections for concurrent processing 1442 let all_collections = list.append(collections, external_collections) 1443 1444 // Use CAR-based approach: one getRepo request per repo, filter locally 1445 logging.log( 1446 logging.Info, 1447 "[backfill] Fetching repos as CAR files and filtering locally...", 1448 ) 1449 let total_count = 1450 get_records_for_repos_car( 1451 all_repos, 1452 all_collections, 1453 atp_data, 1454 config, 1455 conn, 1456 ) 1457 logging.log( 1458 logging.Info, 1459 "[backfill] Backfill complete! Indexed " 1460 <> string.inspect(total_count) 1461 <> " total records via CAR", 1462 ) 1463}