forked from
slices.network/quickslice
Auto-indexing service and GraphQL API for AT Protocol Records
1import atproto_car
2import database/executor.{type Executor}
3import database/repositories/actors
4import database/repositories/config as config_repo
5import database/repositories/lexicons
6import database/repositories/records
7import database/types.{type Record, Record}
8import envoy
9import gleam/bit_array
10import gleam/bytes_tree
11import gleam/dict
12import gleam/dynamic.{type Dynamic}
13import gleam/dynamic/decode
14import gleam/erlang/process.{type Subject}
15import gleam/hackney
16import gleam/http/request
17import gleam/http/response as gleam_http_response
18import gleam/int
19import gleam/json
20import gleam/list
21import gleam/option.{type Option, None, Some}
22import gleam/result
23import gleam/string
24import gleam/time/duration
25import gleam/time/timestamp
26import honk
27import honk/errors
28import lib/http_client
29import lib/oauth/did_cache
30import logging
31
32/// Opaque type for monotonic time
33pub type MonotonicTime
34
35/// Get current monotonic time for timing measurements
36@external(erlang, "backfill_ffi", "monotonic_now")
37fn monotonic_now() -> MonotonicTime
38
39/// Get elapsed milliseconds since a start time
40@external(erlang, "backfill_ffi", "elapsed_ms")
41fn elapsed_ms(start: MonotonicTime) -> Int
42
43/// ATP data resolved from DID
44pub type AtprotoData {
45 AtprotoData(did: String, handle: String, pds: String)
46}
47
48/// Configuration for backfill operations
49pub type BackfillConfig {
50 BackfillConfig(
51 plc_directory_url: String,
52 index_actors: Bool,
53 max_concurrent_per_pds: Int,
54 max_pds_workers: Int,
55 max_http_concurrent: Int,
56 repo_fetch_timeout_ms: Int,
57 did_cache: Option(Subject(did_cache.Message)),
58 )
59}
60
61/// Creates a default backfill configuration
62pub fn default_config(db: Executor) -> BackfillConfig {
63 // Get PLC directory URL from database config
64 let plc_url = config_repo.get_plc_directory_url(db)
65
66 // Get max concurrent per PDS from environment or use default of 4
67 let max_pds_concurrent = case envoy.get("BACKFILL_PDS_CONCURRENCY") {
68 Ok(val) -> {
69 case int.parse(val) {
70 Ok(n) -> n
71 Error(_) -> 4
72 }
73 }
74 Error(_) -> 4
75 }
76
77 // Get max PDS workers from environment or use default of 10
78 let max_pds_workers = case envoy.get("BACKFILL_MAX_PDS_WORKERS") {
79 Ok(val) -> {
80 case int.parse(val) {
81 Ok(n) -> n
82 Error(_) -> 10
83 }
84 }
85 Error(_) -> 10
86 }
87
88 // Get max HTTP concurrent from environment or use default of 50
89 let max_http = case envoy.get("BACKFILL_MAX_HTTP_CONCURRENT") {
90 Ok(val) -> {
91 case int.parse(val) {
92 Ok(n) -> n
93 Error(_) -> 50
94 }
95 }
96 Error(_) -> 50
97 }
98
99 // Get repo fetch timeout from environment or use default of 60s
100 let repo_timeout = case envoy.get("BACKFILL_REPO_TIMEOUT") {
101 Ok(val) -> {
102 case int.parse(val) {
103 Ok(n) -> n * 1000
104 Error(_) -> 60_000
105 }
106 }
107 Error(_) -> 60_000
108 }
109
110 // Configure hackney pool with the configured HTTP limit
111 configure_hackney_pool(max_http)
112
113 BackfillConfig(
114 plc_directory_url: plc_url,
115 index_actors: True,
116 max_concurrent_per_pds: max_pds_concurrent,
117 max_pds_workers: max_pds_workers,
118 max_http_concurrent: max_http,
119 repo_fetch_timeout_ms: repo_timeout,
120 did_cache: None,
121 )
122}
123
124/// Creates a backfill configuration with a DID cache
125pub fn config_with_cache(
126 cache: Subject(did_cache.Message),
127 db: Executor,
128) -> BackfillConfig {
129 let config = default_config(db)
130 BackfillConfig(..config, did_cache: Some(cache))
131}
132
133/// Configure hackney connection pool with specified limits
134/// This initializes the HTTP semaphore for rate limiting
135@external(erlang, "backfill_ffi", "configure_pool")
136pub fn configure_hackney_pool(max_concurrent: Int) -> Nil
137
138/// Acquire a permit from the global HTTP semaphore
139/// Blocks if at the concurrent request limit (150)
140@external(erlang, "backfill_ffi", "acquire_permit")
141fn acquire_permit() -> Nil
142
143/// Release a permit back to the global HTTP semaphore
144@external(erlang, "backfill_ffi", "release_permit")
145fn release_permit() -> Nil
146
147/// Execute an HTTP request with semaphore-based rate limiting
148/// Acquires a permit before sending, releases after completion
149fn send_with_permit(
150 req: request.Request(String),
151) -> Result(gleam_http_response.Response(String), hackney.Error) {
152 acquire_permit()
153 let result = http_client.hackney_send(req)
154 release_permit()
155 result
156}
157
158/// Execute an HTTP request with permit, returning raw binary response
159/// Used for binary data like CAR files
160fn send_bits_with_permit(
161 req: request.Request(bytes_tree.BytesTree),
162) -> Result(gleam_http_response.Response(BitArray), hackney.Error) {
163 acquire_permit()
164 let result = http_client.hackney_send_bits(req)
165 release_permit()
166 result
167}
168
169/// Fetch a repo as a CAR file using com.atproto.sync.getRepo
170/// Returns raw CAR bytes
171fn fetch_repo_car(did: String, pds_url: String) -> Result(BitArray, String) {
172 let url = pds_url <> "/xrpc/com.atproto.sync.getRepo?did=" <> did
173
174 case request.to(url) {
175 Error(_) -> Error("Failed to create request for getRepo: " <> did)
176 Ok(req) -> {
177 // Convert to BytesTree body for send_bits (empty for GET request)
178 let bits_req = request.set_body(req, bytes_tree.new())
179
180 case send_bits_with_permit(bits_req) {
181 Error(err) -> {
182 Error(
183 "Failed to fetch repo CAR for "
184 <> did
185 <> ": "
186 <> string.inspect(err),
187 )
188 }
189 Ok(resp) -> {
190 case resp.status {
191 200 -> {
192 // Response body is already BitArray
193 Ok(resp.body)
194 }
195 status -> {
196 Error(
197 "getRepo failed for "
198 <> did
199 <> " (status: "
200 <> string.inspect(status)
201 <> ")",
202 )
203 }
204 }
205 }
206 }
207 }
208 }
209}
210
211/// Convert a CAR record (with proper MST path) to a database Record type
212fn car_record_with_path_to_db_record(
213 car_record: atproto_car.RecordWithPath,
214 did: String,
215) -> Record {
216 let now =
217 timestamp.system_time()
218 |> timestamp.to_rfc3339(duration.seconds(0))
219
220 Record(
221 uri: "at://"
222 <> did
223 <> "/"
224 <> car_record.collection
225 <> "/"
226 <> car_record.rkey,
227 cid: atproto_car.cid_to_string(car_record.cid),
228 did: did,
229 collection: car_record.collection,
230 json: atproto_car.record_to_json(car_record),
231 indexed_at: now,
232 rkey: car_record.rkey,
233 )
234}
235
236/// Result of record validation
237type ValidationResult {
238 /// Record passed validation
239 Valid
240 /// Record failed schema validation - should be skipped
241 Invalid(String)
242 /// Could not parse JSON for validation - insert anyway (CBOR edge cases)
243 ParseError(String)
244}
245
246/// Validate a record against lexicon schemas using pre-built context
247fn validate_record(
248 ctx: honk.ValidationContext,
249 collection: String,
250 json_string: String,
251) -> ValidationResult {
252 case honk.parse_json_string(json_string) {
253 Error(e) ->
254 // JSON parse failure - likely CBOR edge case, allow record through
255 ParseError("Failed to parse record JSON: " <> errors.to_string(e))
256 Ok(record_json) -> {
257 case honk.validate_record_with_context(ctx, collection, record_json) {
258 Ok(_) -> Valid
259 Error(e) -> Invalid(errors.to_string(e))
260 }
261 }
262 }
263}
264
265/// Load lexicons and build validation context
266/// Returns None on error (validation will be skipped)
267fn load_validation_context(conn: Executor) -> Option(honk.ValidationContext) {
268 case lexicons.get_all(conn) {
269 Error(_) -> {
270 logging.log(
271 logging.Warning,
272 "[backfill] Failed to load lexicons, skipping validation",
273 )
274 None
275 }
276 Ok(lexs) -> {
277 let lexicon_jsons =
278 lexs
279 |> list.filter_map(fn(lex) {
280 case honk.parse_json_string(lex.json) {
281 Ok(json_val) -> Ok(json_val)
282 Error(_) -> Error(Nil)
283 }
284 })
285 // Build context once from all lexicons
286 case honk.build_validation_context(lexicon_jsons) {
287 Ok(ctx) -> Some(ctx)
288 Error(_) -> {
289 logging.log(
290 logging.Warning,
291 "[backfill] Failed to build validation context, skipping validation",
292 )
293 None
294 }
295 }
296 }
297 }
298}
299
300/// Backfill a single repo using CAR file approach
301/// Fetches entire repo, parses CAR with MST walking, filters by collections, indexes records
302fn backfill_repo_car(
303 did: String,
304 pds_url: String,
305 collections: List(String),
306 conn: Executor,
307) -> Int {
308 // Load validation context - this path is used for single-repo backfills (e.g., new actor sync)
309 let validation_ctx = load_validation_context(conn)
310 backfill_repo_car_with_context(
311 did,
312 pds_url,
313 collections,
314 conn,
315 validation_ctx,
316 )
317}
318
319/// Backfill a single repo with pre-built validation context
320fn backfill_repo_car_with_context(
321 did: String,
322 pds_url: String,
323 collections: List(String),
324 conn: Executor,
325 validation_ctx: Option(honk.ValidationContext),
326) -> Int {
327 let total_start = monotonic_now()
328
329 // Phase 1: Fetch
330 let fetch_start = monotonic_now()
331 case fetch_repo_car(did, pds_url) {
332 Ok(car_bytes) -> {
333 let fetch_ms = elapsed_ms(fetch_start)
334 let car_size = bit_array.byte_size(car_bytes)
335
336 // Phase 2: Parse CAR and walk MST
337 let parse_start = monotonic_now()
338 case atproto_car.extract_records_with_paths(car_bytes, collections) {
339 Ok(car_records) -> {
340 let parse_ms = elapsed_ms(parse_start)
341
342 // Phase 3: Convert and validate
343 let validate_start = monotonic_now()
344 let #(db_records, invalid_count) =
345 car_records
346 |> list.fold(#([], 0), fn(acc, r) {
347 let #(records, invalids) = acc
348 let db_record = car_record_with_path_to_db_record(r, did)
349 case validation_ctx {
350 None -> #([db_record, ..records], invalids)
351 Some(ctx) -> {
352 case validate_record(ctx, r.collection, db_record.json) {
353 Valid -> #([db_record, ..records], invalids)
354 ParseError(_) -> #([db_record, ..records], invalids)
355 Invalid(msg) -> {
356 logging.log(
357 logging.Debug,
358 "[backfill] Invalid record "
359 <> r.collection
360 <> "/"
361 <> r.rkey
362 <> ": "
363 <> msg,
364 )
365 #(records, invalids + 1)
366 }
367 }
368 }
369 }
370 })
371 let validate_ms = elapsed_ms(validate_start)
372
373 // Phase 4: Insert into database
374 let insert_start = monotonic_now()
375 index_records(db_records, conn)
376 let insert_ms = elapsed_ms(insert_start)
377
378 let count = list.length(db_records)
379 let total_ms = elapsed_ms(total_start)
380
381 // Log summary at debug level (detailed per-repo timing)
382 logging.log(
383 logging.Debug,
384 "[backfill] "
385 <> did
386 <> " fetch="
387 <> int.to_string(fetch_ms)
388 <> "ms parse="
389 <> int.to_string(parse_ms)
390 <> "ms validate="
391 <> int.to_string(validate_ms)
392 <> "ms insert="
393 <> int.to_string(insert_ms)
394 <> "ms total="
395 <> int.to_string(total_ms)
396 <> "ms records="
397 <> int.to_string(count)
398 <> " invalid="
399 <> int.to_string(invalid_count)
400 <> " size="
401 <> int.to_string(car_size),
402 )
403 count
404 }
405 Error(err) -> {
406 logging.log(
407 logging.Warning,
408 "[backfill] CAR parse error for "
409 <> did
410 <> ": "
411 <> string.inspect(err),
412 )
413 0
414 }
415 }
416 }
417 Error(err) -> {
418 logging.log(logging.Warning, "[backfill] " <> err)
419 0
420 }
421 }
422}
423
424/// Check if an NSID matches the configured domain authority
425/// NSID format is like "com.example.post" where "com.example" is the authority
426pub fn nsid_matches_domain_authority(
427 nsid: String,
428 domain_authority: String,
429) -> Bool {
430 // NSID format: authority.name (e.g., "com.example.post")
431 // We need to check if the NSID starts with the domain authority
432 string.starts_with(nsid, domain_authority <> ".")
433}
434
435/// Get local and external collection IDs from configured lexicons
436/// Returns #(local_collection_ids, external_collection_ids)
437pub fn get_collection_ids(conn: Executor) -> #(List(String), List(String)) {
438 let domain_authority = case config_repo.get(conn, "domain_authority") {
439 Ok(authority) -> authority
440 Error(_) -> ""
441 }
442
443 case lexicons.get_record_types(conn) {
444 Ok(lexicon_list) -> {
445 let #(local, external) =
446 list.partition(lexicon_list, fn(lex) {
447 nsid_matches_domain_authority(lex.id, domain_authority)
448 })
449 #(
450 list.map(local, fn(lex) { lex.id }),
451 list.map(external, fn(lex) { lex.id }),
452 )
453 }
454 Error(_) -> #([], [])
455 }
456}
457
458/// Resolve a DID to get ATP data (PDS endpoint and handle)
459pub fn resolve_did(did: String, plc_url: String) -> Result(AtprotoData, String) {
460 // Check if this is a did:web DID
461 case string.starts_with(did, "did:web:") {
462 True -> resolve_did_web(did)
463 False -> resolve_did_plc(did, plc_url)
464 }
465}
466
467/// Resolve a DID with caching support
468/// Uses the did_cache if provided, otherwise falls back to direct resolution
469pub fn resolve_did_cached(
470 did: String,
471 plc_url: String,
472 cache: Option(Subject(did_cache.Message)),
473) -> Result(AtprotoData, String) {
474 case cache {
475 Some(c) -> {
476 // Try to get from cache first
477 case did_cache.get(c, did) {
478 Ok(cached_json) -> {
479 // Parse cached ATP data
480 case parse_cached_atp_data(cached_json) {
481 Ok(data) -> Ok(data)
482 Error(_) -> {
483 // Cache had invalid data, resolve fresh
484 resolve_and_cache(did, plc_url, c)
485 }
486 }
487 }
488 Error(_) -> {
489 // Not in cache, resolve and cache
490 resolve_and_cache(did, plc_url, c)
491 }
492 }
493 }
494 None -> resolve_did(did, plc_url)
495 }
496}
497
498/// Resolve DID and store in cache
499fn resolve_and_cache(
500 did: String,
501 plc_url: String,
502 cache: Subject(did_cache.Message),
503) -> Result(AtprotoData, String) {
504 case resolve_did(did, plc_url) {
505 Ok(data) -> {
506 // Cache for 1 hour (3600 seconds)
507 let cached_json = encode_atp_data(data)
508 did_cache.put(cache, did, cached_json, 3600)
509 Ok(data)
510 }
511 err -> err
512 }
513}
514
515/// Encode AtprotoData to JSON for caching
516fn encode_atp_data(data: AtprotoData) -> String {
517 json.object([
518 #("did", json.string(data.did)),
519 #("handle", json.string(data.handle)),
520 #("pds", json.string(data.pds)),
521 ])
522 |> json.to_string
523}
524
525/// Parse AtprotoData from cached JSON
526fn parse_cached_atp_data(json_str: String) -> Result(AtprotoData, String) {
527 let decoder = {
528 use did <- decode.field("did", decode.string)
529 use handle <- decode.field("handle", decode.string)
530 use pds <- decode.field("pds", decode.string)
531 decode.success(AtprotoData(did: did, handle: handle, pds: pds))
532 }
533
534 json.parse(json_str, decoder)
535 |> result.map_error(fn(_) { "Failed to parse cached ATP data" })
536}
537
538/// Resolve a did:web DID by fetching the DID document from HTTPS
539fn resolve_did_web(did: String) -> Result(AtprotoData, String) {
540 // Extract the domain from did:web:example.com
541 // did:web format: did:web:<domain>[:<port>][:<path>]
542 let parts = string.split(did, ":")
543 case parts {
544 ["did", "web", domain, ..rest] -> {
545 // Build the URL: https://<domain>/.well-known/did.json
546 // If there are additional path components, they go before /.well-known/did.json
547 let base_domain = case rest {
548 [] -> domain
549 path_parts -> domain <> "/" <> string.join(path_parts, "/")
550 }
551 let url = "https://" <> base_domain <> "/.well-known/did.json"
552
553 case request.to(url) {
554 Error(_) -> Error("Failed to create request for did:web DID: " <> did)
555 Ok(req) -> {
556 case send_with_permit(req) {
557 Error(_) -> Error("Failed to fetch did:web DID data for: " <> did)
558 Ok(resp) -> {
559 case resp.status {
560 200 -> parse_atproto_data(resp.body, did)
561 _ ->
562 Error(
563 "Failed to resolve DID "
564 <> did
565 <> " (status: "
566 <> string.inspect(resp.status)
567 <> ")",
568 )
569 }
570 }
571 }
572 }
573 }
574 }
575 _ -> Error("Invalid did:web format: " <> did)
576 }
577}
578
579/// Resolve a did:plc DID through the PLC directory
580fn resolve_did_plc(did: String, plc_url: String) -> Result(AtprotoData, String) {
581 let url = plc_url <> "/" <> did
582
583 case request.to(url) {
584 Error(_) -> Error("Failed to create request for DID: " <> did)
585 Ok(req) -> {
586 case send_with_permit(req) {
587 Error(_) -> Error("Failed to fetch DID data for: " <> did)
588 Ok(resp) -> {
589 case resp.status {
590 200 -> parse_atproto_data(resp.body, did)
591 _ ->
592 Error(
593 "Failed to resolve DID "
594 <> did
595 <> " (status: "
596 <> string.inspect(resp.status)
597 <> ")",
598 )
599 }
600 }
601 }
602 }
603 }
604}
605
606/// Parse ATP data from PLC directory response
607fn parse_atproto_data(body: String, did: String) -> Result(AtprotoData, String) {
608 // Simple decoder that extracts service and alsoKnownAs arrays
609 let decoder = {
610 use service_list <- decode.field(
611 "service",
612 decode.optional(decode.list(decode.dynamic)),
613 )
614 use handle_list <- decode.field(
615 "alsoKnownAs",
616 decode.optional(decode.list(decode.string)),
617 )
618 decode.success(#(service_list, handle_list))
619 }
620
621 case json.parse(body, decoder) {
622 Error(_) -> Error("Failed to parse ATP data for DID: " <> did)
623 Ok(#(service_list_opt, handle_list_opt)) -> {
624 // Extract PDS endpoint from service list
625 let pds = case service_list_opt {
626 Some(service_list) ->
627 service_list
628 |> list.find_map(fn(service_dyn) {
629 // Try to extract the service endpoint
630 let service_decoder = {
631 use service_type <- decode.field("type", decode.string)
632 use endpoint <- decode.field("serviceEndpoint", decode.string)
633 decode.success(#(service_type, endpoint))
634 }
635
636 case decode.run(service_dyn, service_decoder) {
637 Ok(#("AtprotoPersonalDataServer", endpoint)) -> Ok(endpoint)
638 _ -> Error(Nil)
639 }
640 })
641 |> result.unwrap("https://bsky.social")
642 None -> "https://bsky.social"
643 }
644
645 // Extract handle from alsoKnownAs
646 let handle = case handle_list_opt {
647 Some(handle_list) ->
648 handle_list
649 |> list.find(fn(h) { string.starts_with(h, "at://") })
650 |> result.map(fn(h) { string.replace(h, "at://", "") })
651 |> result.unwrap(did)
652 None -> did
653 }
654
655 Ok(AtprotoData(did: did, handle: handle, pds: pds))
656 }
657 }
658}
659
660/// Worker function that resolves a DID and sends result back
661fn resolve_did_worker(
662 did: String,
663 plc_url: String,
664 cache: Option(Subject(did_cache.Message)),
665 reply_to: Subject(Result(AtprotoData, Nil)),
666) -> Nil {
667 let result = case resolve_did_cached(did, plc_url, cache) {
668 Ok(atp_data) -> Ok(atp_data)
669 Error(err) -> {
670 logging.log(
671 logging.Error,
672 "[backfill] Error resolving DID " <> did <> ": " <> err,
673 )
674 Error(Nil)
675 }
676 }
677 process.send(reply_to, result)
678}
679
680/// Get ATP data for a list of repos (DIDs) - fully concurrent version
681pub fn get_atp_data_for_repos(
682 repos: List(String),
683 config: BackfillConfig,
684) -> List(AtprotoData) {
685 // Spawn all workers at once - Erlang VM can handle it
686 let subject = process.new_subject()
687 let repo_count = list.length(repos)
688
689 // Spawn all workers concurrently
690 let _workers =
691 repos
692 |> list.map(fn(repo) {
693 process.spawn_unlinked(fn() {
694 resolve_did_worker(
695 repo,
696 config.plc_directory_url,
697 config.did_cache,
698 subject,
699 )
700 })
701 })
702
703 // Collect results from all workers
704 list.range(1, repo_count)
705 |> list.filter_map(fn(_) {
706 case process.receive(subject, 30_000) {
707 Ok(result) -> result
708 Error(_) -> Error(Nil)
709 }
710 })
711}
712
713/// CAR-based worker that fetches entire repo and filters by collections
714fn fetch_repo_car_worker(
715 repo: String,
716 pds: String,
717 collections: List(String),
718 conn: Executor,
719 validation_ctx: Option(honk.ValidationContext),
720 reply_to: Subject(Int),
721) -> Nil {
722 // Wrap in rescue to catch any crashes
723 let count = rescue_car_backfill(repo, pds, collections, conn, validation_ctx)
724 process.send(reply_to, count)
725}
726
727/// Wrapper to catch crashes in CAR backfill
728fn rescue_car_backfill(
729 repo: String,
730 pds: String,
731 collections: List(String),
732 conn: Executor,
733 validation_ctx: Option(honk.ValidationContext),
734) -> Int {
735 case
736 rescue(fn() {
737 backfill_repo_car_with_context(
738 repo,
739 pds,
740 collections,
741 conn,
742 validation_ctx,
743 )
744 })
745 {
746 Ok(count) -> count
747 Error(err) -> {
748 logging.log(
749 logging.Error,
750 "[backfill] CAR worker crashed for "
751 <> repo
752 <> ": "
753 <> string.inspect(err),
754 )
755 0
756 }
757 }
758}
759
760/// Rescue wrapper - catches exceptions
761@external(erlang, "backfill_ffi", "rescue")
762pub fn rescue(f: fn() -> a) -> Result(a, Dynamic)
763
764/// CAR-based PDS worker - fetches each repo as CAR and filters locally
765fn pds_worker_car(
766 pds_url: String,
767 repos: List(String),
768 collections: List(String),
769 max_concurrent: Int,
770 conn: Executor,
771 validation_ctx: Option(honk.ValidationContext),
772 timeout_ms: Int,
773 reply_to: Subject(Int),
774) -> Nil {
775 logging.log(
776 logging.Debug,
777 "[backfill] PDS worker starting for "
778 <> pds_url
779 <> " with "
780 <> int.to_string(list.length(repos))
781 <> " repos",
782 )
783 let subject = process.new_subject()
784
785 // Start initial batch of workers
786 let #(initial_repos, remaining_repos) = list.split(repos, max_concurrent)
787 let initial_count = list.length(initial_repos)
788
789 // Spawn initial workers
790 list.each(initial_repos, fn(repo) {
791 process.spawn_unlinked(fn() {
792 fetch_repo_car_worker(
793 repo,
794 pds_url,
795 collections,
796 conn,
797 validation_ctx,
798 subject,
799 )
800 })
801 })
802
803 // Process with sliding window
804 let total_count =
805 sliding_window_car(
806 remaining_repos,
807 subject,
808 initial_count,
809 pds_url,
810 collections,
811 conn,
812 validation_ctx,
813 timeout_ms,
814 0,
815 )
816
817 logging.log(
818 logging.Debug,
819 "[backfill] PDS worker finished for "
820 <> pds_url
821 <> " with "
822 <> int.to_string(total_count)
823 <> " total records",
824 )
825 process.send(reply_to, total_count)
826}
827
828/// Sliding window for CAR-based processing
829fn sliding_window_car(
830 remaining: List(String),
831 subject: Subject(Int),
832 in_flight: Int,
833 pds_url: String,
834 collections: List(String),
835 conn: Executor,
836 validation_ctx: Option(honk.ValidationContext),
837 timeout_ms: Int,
838 total: Int,
839) -> Int {
840 case in_flight {
841 0 -> total
842 _ -> {
843 case process.receive(subject, timeout_ms) {
844 Ok(count) -> {
845 let new_total = total + count
846 case remaining {
847 [next, ..rest] -> {
848 process.spawn_unlinked(fn() {
849 fetch_repo_car_worker(
850 next,
851 pds_url,
852 collections,
853 conn,
854 validation_ctx,
855 subject,
856 )
857 })
858 sliding_window_car(
859 rest,
860 subject,
861 in_flight,
862 pds_url,
863 collections,
864 conn,
865 validation_ctx,
866 timeout_ms,
867 new_total,
868 )
869 }
870 [] ->
871 sliding_window_car(
872 [],
873 subject,
874 in_flight - 1,
875 pds_url,
876 collections,
877 conn,
878 validation_ctx,
879 timeout_ms,
880 new_total,
881 )
882 }
883 }
884 Error(_) -> {
885 logging.log(
886 logging.Warning,
887 "[backfill] Timeout waiting for CAR worker on "
888 <> pds_url
889 <> " (in_flight: "
890 <> int.to_string(in_flight)
891 <> ", remaining: "
892 <> int.to_string(list.length(remaining))
893 <> ")",
894 )
895 sliding_window_car(
896 remaining,
897 subject,
898 in_flight - 1,
899 pds_url,
900 collections,
901 conn,
902 validation_ctx,
903 timeout_ms,
904 total,
905 )
906 }
907 }
908 }
909 }
910}
911
912/// Sliding window for PDS worker processing
913/// Limits how many PDS endpoints are processed concurrently
914fn sliding_window_pds(
915 remaining: List(#(String, List(#(String, String)))),
916 subject: Subject(Int),
917 in_flight: Int,
918 collections: List(String),
919 max_concurrent_per_pds: Int,
920 conn: Executor,
921 validation_ctx: Option(honk.ValidationContext),
922 timeout_ms: Int,
923 total: Int,
924 pds_count: Int,
925 completed: Int,
926) -> Int {
927 case in_flight {
928 0 -> total
929 _ -> {
930 // 5 minute timeout per PDS worker
931 case process.receive(subject, 300_000) {
932 Ok(count) -> {
933 let new_total = total + count
934 let new_completed = completed + 1
935 logging.log(
936 logging.Info,
937 "[backfill] PDS worker "
938 <> int.to_string(new_completed)
939 <> "/"
940 <> int.to_string(pds_count)
941 <> " done ("
942 <> int.to_string(count)
943 <> " records)",
944 )
945 case remaining {
946 [#(pds_url, repo_pairs), ..rest] -> {
947 let pds_repos =
948 repo_pairs
949 |> list.map(fn(pair) {
950 let #(_pds, repo) = pair
951 repo
952 })
953 process.spawn_unlinked(fn() {
954 pds_worker_car(
955 pds_url,
956 pds_repos,
957 collections,
958 max_concurrent_per_pds,
959 conn,
960 validation_ctx,
961 timeout_ms,
962 subject,
963 )
964 })
965 sliding_window_pds(
966 rest,
967 subject,
968 in_flight,
969 collections,
970 max_concurrent_per_pds,
971 conn,
972 validation_ctx,
973 timeout_ms,
974 new_total,
975 pds_count,
976 new_completed,
977 )
978 }
979 [] ->
980 sliding_window_pds(
981 [],
982 subject,
983 in_flight - 1,
984 collections,
985 max_concurrent_per_pds,
986 conn,
987 validation_ctx,
988 timeout_ms,
989 new_total,
990 pds_count,
991 new_completed,
992 )
993 }
994 }
995 Error(_) -> {
996 logging.log(
997 logging.Warning,
998 "[backfill] PDS worker timed out (in_flight: "
999 <> int.to_string(in_flight)
1000 <> ", remaining: "
1001 <> int.to_string(list.length(remaining))
1002 <> ")",
1003 )
1004 sliding_window_pds(
1005 remaining,
1006 subject,
1007 in_flight - 1,
1008 collections,
1009 max_concurrent_per_pds,
1010 conn,
1011 validation_ctx,
1012 timeout_ms,
1013 total,
1014 pds_count,
1015 completed,
1016 )
1017 }
1018 }
1019 }
1020 }
1021}
1022
1023/// CAR-based streaming: fetch repos as CAR files and filter locally
1024/// One request per repo instead of one per (repo, collection)
1025pub fn get_records_for_repos_car(
1026 repos: List(String),
1027 collections: List(String),
1028 atp_data: List(AtprotoData),
1029 config: BackfillConfig,
1030 conn: Executor,
1031) -> Int {
1032 // Build validation context ONCE for all repos
1033 let validation_ctx = load_validation_context(conn)
1034
1035 // Group repos by PDS
1036 let repos_by_pds =
1037 repos
1038 |> list.filter_map(fn(repo) {
1039 case list.find(atp_data, fn(data) { data.did == repo }) {
1040 Error(_) -> {
1041 logging.log(
1042 logging.Error,
1043 "[backfill] No ATP data found for repo: " <> repo,
1044 )
1045 Error(Nil)
1046 }
1047 Ok(data) -> Ok(#(data.pds, repo))
1048 }
1049 })
1050 |> list.group(fn(pair) {
1051 let #(pds, _repo) = pair
1052 pds
1053 })
1054
1055 let pds_entries = dict.to_list(repos_by_pds)
1056 let pds_count = list.length(pds_entries)
1057
1058 logging.log(
1059 logging.Info,
1060 "[backfill] Processing "
1061 <> int.to_string(pds_count)
1062 <> " PDS endpoints (max "
1063 <> int.to_string(config.max_pds_workers)
1064 <> " concurrent)...",
1065 )
1066
1067 // Use sliding window to limit concurrent PDS workers
1068 let subject = process.new_subject()
1069 let #(initial_pds, remaining_pds) =
1070 list.split(pds_entries, config.max_pds_workers)
1071 let initial_count = list.length(initial_pds)
1072
1073 // Spawn initial batch of PDS workers
1074 list.each(initial_pds, fn(pds_entry) {
1075 let #(pds_url, repo_pairs) = pds_entry
1076 let pds_repos =
1077 repo_pairs
1078 |> list.map(fn(pair) {
1079 let #(_pds, repo) = pair
1080 repo
1081 })
1082
1083 process.spawn_unlinked(fn() {
1084 pds_worker_car(
1085 pds_url,
1086 pds_repos,
1087 collections,
1088 config.max_concurrent_per_pds,
1089 conn,
1090 validation_ctx,
1091 config.repo_fetch_timeout_ms,
1092 subject,
1093 )
1094 })
1095 })
1096
1097 // Process remaining with sliding window
1098 let result =
1099 sliding_window_pds(
1100 remaining_pds,
1101 subject,
1102 initial_count,
1103 collections,
1104 config.max_concurrent_per_pds,
1105 conn,
1106 validation_ctx,
1107 config.repo_fetch_timeout_ms,
1108 0,
1109 pds_count,
1110 0,
1111 )
1112
1113 logging.log(
1114 logging.Info,
1115 "[backfill] All PDS workers complete, total: "
1116 <> int.to_string(result)
1117 <> " records",
1118 )
1119 result
1120}
1121
1122/// Index records into the database using batch inserts
1123pub fn index_records(records: List(Record), conn: Executor) -> Nil {
1124 case records.batch_insert(conn, records) {
1125 Ok(_) -> Nil
1126 Error(err) -> {
1127 logging.log(
1128 logging.Error,
1129 "[backfill] Failed to batch insert records: " <> string.inspect(err),
1130 )
1131 }
1132 }
1133}
1134
1135/// Index actors into the database using batch upsert for efficiency
1136pub fn index_actors(atp_data: List(AtprotoData), conn: Executor) -> Nil {
1137 // Convert AtprotoData to tuples for batch upsert
1138 let actor_tuples =
1139 atp_data
1140 |> list.map(fn(data) { #(data.did, data.handle) })
1141
1142 case actors.batch_upsert(conn, actor_tuples) {
1143 Ok(_) -> Nil
1144 Error(err) -> {
1145 logging.log(
1146 logging.Error,
1147 "[backfill] Failed to batch upsert actors: " <> string.inspect(err),
1148 )
1149 }
1150 }
1151}
1152
1153/// Backfill all collections (primary and external) for a newly discovered actor
1154/// This is called when a new actor is created via Jetstream or GraphQL mutations
1155pub fn backfill_collections_for_actor(
1156 db: Executor,
1157 did: String,
1158 collection_ids: List(String),
1159 external_collection_ids: List(String),
1160 plc_url: String,
1161) -> Nil {
1162 // Ensure HTTP semaphore is initialized (may not be if called outside normal backfill flow)
1163 configure_hackney_pool(150)
1164
1165 let all_collections = list.append(collection_ids, external_collection_ids)
1166 let total_count = list.length(all_collections)
1167
1168 logging.log(
1169 logging.Info,
1170 "[backfill] Starting background sync for new actor: "
1171 <> did
1172 <> " ("
1173 <> string.inspect(total_count)
1174 <> " collections: "
1175 <> string.inspect(list.length(collection_ids))
1176 <> " primary + "
1177 <> string.inspect(list.length(external_collection_ids))
1178 <> " external)",
1179 )
1180
1181 // Resolve DID to get PDS endpoint
1182 case resolve_did(did, plc_url) {
1183 Ok(atp_data) -> {
1184 // Use CAR-based approach - single request gets all collections
1185 let total_records =
1186 backfill_repo_car(did, atp_data.pds, all_collections, db)
1187
1188 logging.log(
1189 logging.Info,
1190 "[backfill] Completed sync for "
1191 <> did
1192 <> " ("
1193 <> string.inspect(total_records)
1194 <> " total records)",
1195 )
1196 }
1197 Error(err) -> {
1198 logging.log(
1199 logging.Error,
1200 "[backfill] Failed to resolve DID for backfill: " <> did <> " - " <> err,
1201 )
1202 }
1203 }
1204}
1205
1206/// Fetch repos that have records for a specific collection from the relay with pagination
1207fn fetch_repos_for_collection(
1208 collection: String,
1209 db: Executor,
1210) -> Result(List(String), String) {
1211 fetch_repos_paginated(collection, None, [], db)
1212}
1213
1214/// Helper function for paginated repo fetching
1215fn fetch_repos_paginated(
1216 collection: String,
1217 cursor: Option(String),
1218 acc: List(String),
1219 db: Executor,
1220) -> Result(List(String), String) {
1221 // Get relay URL from database config
1222 let relay_url = config_repo.get_relay_url(db)
1223
1224 // Build URL with large limit and cursor
1225 let base_url =
1226 relay_url
1227 <> "/xrpc/com.atproto.sync.listReposByCollection?collection="
1228 <> collection
1229 <> "&limit=1000"
1230
1231 let url = case cursor {
1232 Some(c) -> base_url <> "&cursor=" <> c
1233 None -> base_url
1234 }
1235
1236 case request.to(url) {
1237 Error(_) -> Error("Failed to create request for collection: " <> collection)
1238 Ok(req) -> {
1239 case send_with_permit(req) {
1240 Error(_) ->
1241 Error("Failed to fetch repos for collection: " <> collection)
1242 Ok(resp) -> {
1243 case resp.status {
1244 200 -> {
1245 case parse_repos_response(resp.body) {
1246 Ok(#(repos, next_cursor)) -> {
1247 let new_acc = list.append(acc, repos)
1248 case next_cursor {
1249 Some(c) ->
1250 fetch_repos_paginated(collection, Some(c), new_acc, db)
1251 None -> {
1252 logging.log(
1253 logging.Info,
1254 "[backfill] Found "
1255 <> string.inspect(list.length(new_acc))
1256 <> " total repositories for collection \""
1257 <> collection
1258 <> "\"",
1259 )
1260 Ok(new_acc)
1261 }
1262 }
1263 }
1264 Error(err) -> Error(err)
1265 }
1266 }
1267 _ ->
1268 Error(
1269 "Failed to fetch repos for collection "
1270 <> collection
1271 <> " (status: "
1272 <> string.inspect(resp.status)
1273 <> ")",
1274 )
1275 }
1276 }
1277 }
1278 }
1279 }
1280}
1281
1282/// Parse the response from com.atproto.sync.listReposByCollection
1283fn parse_repos_response(
1284 body: String,
1285) -> Result(#(List(String), Option(String)), String) {
1286 let decoder = {
1287 use repos <- decode.field(
1288 "repos",
1289 decode.list({
1290 use did <- decode.field("did", decode.string)
1291 decode.success(did)
1292 }),
1293 )
1294 decode.success(repos)
1295 }
1296
1297 // Parse repos first
1298 case json.parse(body, decoder) {
1299 Error(_) -> Error("Failed to parse repos response")
1300 Ok(repos) -> {
1301 // Try to extract cursor separately
1302 let cursor_decoder = {
1303 use cursor <- decode.field("cursor", decode.optional(decode.string))
1304 decode.success(cursor)
1305 }
1306
1307 let cursor = case json.parse(body, cursor_decoder) {
1308 Ok(c) -> c
1309 Error(_) -> None
1310 }
1311
1312 Ok(#(repos, cursor))
1313 }
1314 }
1315}
1316
1317/// Main backfill function - backfill collections for specified repos
1318pub fn backfill_collections(
1319 repos: List(String),
1320 collections: List(String),
1321 external_collections: List(String),
1322 config: BackfillConfig,
1323 conn: Executor,
1324) -> Nil {
1325 logging.log(logging.Info, "")
1326 logging.log(logging.Info, "[backfill] Starting backfill operation")
1327
1328 case collections {
1329 [] -> {
1330 logging.log(
1331 logging.Error,
1332 "[backfill] No collections specified for backfill",
1333 )
1334 Nil
1335 }
1336 _ -> {
1337 logging.log(
1338 logging.Info,
1339 "[backfill] Processing "
1340 <> string.inspect(list.length(collections))
1341 <> " collections: "
1342 <> string.join(collections, ", "),
1343 )
1344
1345 run_backfill(repos, collections, external_collections, config, conn)
1346 }
1347 }
1348}
1349
1350fn run_backfill(
1351 repos: List(String),
1352 collections: List(String),
1353 external_collections: List(String),
1354 config: BackfillConfig,
1355 conn: Executor,
1356) -> Nil {
1357 case external_collections {
1358 [] -> Nil
1359 _ ->
1360 logging.log(
1361 logging.Info,
1362 "[backfill] Including "
1363 <> string.inspect(list.length(external_collections))
1364 <> " external collections: "
1365 <> string.join(external_collections, ", "),
1366 )
1367 }
1368
1369 // Determine which repos to use
1370 let all_repos = case repos {
1371 [] -> {
1372 // Fetch repos for all collections from the relay
1373 logging.log(
1374 logging.Info,
1375 "[backfill] Fetching repositories for collections...",
1376 )
1377 let fetched_repos =
1378 collections
1379 |> list.filter_map(fn(collection) {
1380 case fetch_repos_for_collection(collection, conn) {
1381 Ok(repos) -> Ok(repos)
1382 Error(err) -> {
1383 logging.log(logging.Error, "[backfill] " <> err)
1384 Error(Nil)
1385 }
1386 }
1387 })
1388 |> list.flatten
1389 |> list.unique
1390
1391 logging.log(
1392 logging.Info,
1393 "[backfill] Processing "
1394 <> string.inspect(list.length(fetched_repos))
1395 <> " unique repositories",
1396 )
1397 fetched_repos
1398 }
1399 provided_repos -> {
1400 logging.log(
1401 logging.Info,
1402 "[backfill] Using "
1403 <> string.inspect(list.length(provided_repos))
1404 <> " provided repositories",
1405 )
1406 provided_repos
1407 }
1408 }
1409
1410 // Get ATP data for all repos
1411 logging.log(logging.Info, "[backfill] Resolving ATP data for repositories...")
1412 let atp_data = get_atp_data_for_repos(all_repos, config)
1413 logging.log(
1414 logging.Info,
1415 "[backfill] Resolved ATP data for "
1416 <> string.inspect(list.length(atp_data))
1417 <> "/"
1418 <> string.inspect(list.length(all_repos))
1419 <> " repositories",
1420 )
1421
1422 // Index actors first (if enabled in config)
1423 case config.index_actors {
1424 True -> {
1425 logging.log(logging.Info, "[backfill] Indexing actors...")
1426 index_actors(atp_data, conn)
1427 logging.log(
1428 logging.Info,
1429 "[backfill] Indexed "
1430 <> string.inspect(list.length(atp_data))
1431 <> " actors",
1432 )
1433 }
1434 False ->
1435 logging.log(
1436 logging.Info,
1437 "[backfill] Skipping actor indexing (disabled in config)",
1438 )
1439 }
1440
1441 // Combine main and external collections for concurrent processing
1442 let all_collections = list.append(collections, external_collections)
1443
1444 // Use CAR-based approach: one getRepo request per repo, filter locally
1445 logging.log(
1446 logging.Info,
1447 "[backfill] Fetching repos as CAR files and filtering locally...",
1448 )
1449 let total_count =
1450 get_records_for_repos_car(
1451 all_repos,
1452 all_collections,
1453 atp_data,
1454 config,
1455 conn,
1456 )
1457 logging.log(
1458 logging.Info,
1459 "[backfill] Backfill complete! Indexed "
1460 <> string.inspect(total_count)
1461 <> " total records via CAR",
1462 )
1463}