···50 plc_directory_url: String,
51 index_actors: Bool,
52 max_concurrent_per_pds: Int,
00053 did_cache: Option(Subject(did_cache.Message)),
54 )
55}
···62 Error(_) -> "https://plc.directory"
63 }
6465- // Get max concurrent per PDS from environment or use default of 6
66 let max_pds_concurrent = case envoy.get("BACKFILL_PDS_CONCURRENCY") {
67 Ok(val) -> {
68 case int.parse(val) {
69 Ok(n) -> n
70- Error(_) -> 6
000000000000000000000071 }
72 }
73- Error(_) -> 6
0000000000074 }
7576- // Configure hackney pool for better connection reuse
77- // We'll call directly into Erlang to set up the pool
78- configure_hackney_pool()
7980 BackfillConfig(
81 plc_directory_url: plc_url,
82 index_actors: True,
83 max_concurrent_per_pds: max_pds_concurrent,
00084 did_cache: None,
85 )
86}
···91 BackfillConfig(..config, did_cache: Some(cache))
92}
9394-/// Configure hackney connection pool with higher limits
95-/// Called via Erlang FFI to avoid atom conversion issues
96@external(erlang, "backfill_ffi", "configure_pool")
97-fn configure_hackney_pool() -> Nil
9899/// Acquire a permit from the global HTTP semaphore
100/// Blocks if at the concurrent request limit (150)
···697 max_concurrent: Int,
698 conn: sqlight.Connection,
699 validation_ctx: Option(honk.ValidationContext),
0700 reply_to: Subject(Int),
701) -> Nil {
702 logging.log(
···704 "[backfill] PDS worker starting for "
705 <> pds_url
706 <> " with "
707- <> string.inspect(list.length(repos))
708 <> " repos",
709 )
710 let subject = process.new_subject()
···737 collections,
738 conn,
739 validation_ctx,
0740 0,
741 )
742···745 "[backfill] PDS worker finished for "
746 <> pds_url
747 <> " with "
748- <> string.inspect(total_count)
749 <> " total records",
750 )
751 process.send(reply_to, total_count)
···760 collections: List(String),
761 conn: sqlight.Connection,
762 validation_ctx: Option(honk.ValidationContext),
0763 total: Int,
764) -> Int {
765 case in_flight {
766 0 -> total
767 _ -> {
768- // 5 minute timeout per CAR worker (validation adds processing time for large repos)
769- case process.receive(subject, 300_000) {
770 Ok(count) -> {
771 let new_total = total + count
772 case remaining {
···789 collections,
790 conn,
791 validation_ctx,
0792 new_total,
793 )
794 }
···801 collections,
802 conn,
803 validation_ctx,
0804 new_total,
805 )
806 }
···811 "[backfill] Timeout waiting for CAR worker on "
812 <> pds_url
813 <> " (in_flight: "
814- <> string.inspect(in_flight)
815 <> ", remaining: "
816- <> string.inspect(list.length(remaining))
817 <> ")",
818 )
819 sliding_window_car(
···824 collections,
825 conn,
826 validation_ctx,
0827 total,
828 )
829 }
···832 }
833}
834000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000835/// CAR-based streaming: fetch repos as CAR files and filter locally
836/// One request per repo instead of one per (repo, collection)
837pub fn get_records_for_repos_car(
···864 pds
865 })
866867- // Spawn one worker per PDS
868- let subject = process.new_subject()
869 let pds_entries = dict.to_list(repos_by_pds)
870 let pds_count = list.length(pds_entries)
871872- let _pds_workers =
873- pds_entries
874- |> list.map(fn(pds_entry) {
875- let #(pds_url, repo_pairs) = pds_entry
876- let pds_repos =
877- repo_pairs
878- |> list.map(fn(pair) {
879- let #(_pds, repo) = pair
880- repo
881- })
0000882883- process.spawn_unlinked(fn() {
884- pds_worker_car(
885- pds_url,
886- pds_repos,
887- collections,
888- config.max_concurrent_per_pds,
889- conn,
890- validation_ctx,
891- subject,
892- )
893 })
000000000000894 })
0895896- // Collect counts from all PDS workers
897- logging.log(
898- logging.Info,
899- "[backfill] Waiting for " <> string.inspect(pds_count) <> " PDS workers...",
900- )
901 let result =
902- list.range(1, pds_count)
903- |> list.fold(0, fn(acc, i) {
904- case process.receive(subject, 300_000) {
905- Ok(count) -> {
906- logging.log(
907- logging.Info,
908- "[backfill] PDS worker "
909- <> string.inspect(i)
910- <> "/"
911- <> string.inspect(pds_count)
912- <> " done ("
913- <> string.inspect(count)
914- <> " records)",
915- )
916- acc + count
917- }
918- Error(_) -> {
919- logging.log(
920- logging.Warning,
921- "[backfill] PDS worker "
922- <> string.inspect(i)
923- <> "/"
924- <> string.inspect(pds_count)
925- <> " timed out",
926- )
927- acc
928- }
929- }
930- })
931 logging.log(
932 logging.Info,
933 "[backfill] All PDS workers complete, total: "
934- <> string.inspect(result)
935 <> " records",
936 )
937 result
···50 plc_directory_url: String,
51 index_actors: Bool,
52 max_concurrent_per_pds: Int,
53+ max_pds_workers: Int,
54+ max_http_concurrent: Int,
55+ repo_fetch_timeout_ms: Int,
56 did_cache: Option(Subject(did_cache.Message)),
57 )
58}
···65 Error(_) -> "https://plc.directory"
66 }
6768+ // Get max concurrent per PDS from environment or use default of 4
69 let max_pds_concurrent = case envoy.get("BACKFILL_PDS_CONCURRENCY") {
70 Ok(val) -> {
71 case int.parse(val) {
72 Ok(n) -> n
73+ Error(_) -> 4
74+ }
75+ }
76+ Error(_) -> 4
77+ }
78+79+ // Get max PDS workers from environment or use default of 10
80+ let max_pds_workers = case envoy.get("BACKFILL_MAX_PDS_WORKERS") {
81+ Ok(val) -> {
82+ case int.parse(val) {
83+ Ok(n) -> n
84+ Error(_) -> 10
85+ }
86+ }
87+ Error(_) -> 10
88+ }
89+90+ // Get max HTTP concurrent from environment or use default of 50
91+ let max_http = case envoy.get("BACKFILL_MAX_HTTP_CONCURRENT") {
92+ Ok(val) -> {
93+ case int.parse(val) {
94+ Ok(n) -> n
95+ Error(_) -> 50
96 }
97 }
98+ Error(_) -> 50
99+ }
100+101+ // Get repo fetch timeout from environment or use default of 60s
102+ let repo_timeout = case envoy.get("BACKFILL_REPO_TIMEOUT") {
103+ Ok(val) -> {
104+ case int.parse(val) {
105+ Ok(n) -> n * 1000
106+ Error(_) -> 60_000
107+ }
108+ }
109+ Error(_) -> 60_000
110 }
111112+ // Configure hackney pool with the configured HTTP limit
113+ configure_hackney_pool(max_http)
0114115 BackfillConfig(
116 plc_directory_url: plc_url,
117 index_actors: True,
118 max_concurrent_per_pds: max_pds_concurrent,
119+ max_pds_workers: max_pds_workers,
120+ max_http_concurrent: max_http,
121+ repo_fetch_timeout_ms: repo_timeout,
122 did_cache: None,
123 )
124}
···129 BackfillConfig(..config, did_cache: Some(cache))
130}
131132+/// Configure hackney connection pool with specified limits
0133@external(erlang, "backfill_ffi", "configure_pool")
134+fn configure_hackney_pool(max_concurrent: Int) -> Nil
135136/// Acquire a permit from the global HTTP semaphore
137/// Blocks if at the concurrent request limit (150)
···734 max_concurrent: Int,
735 conn: sqlight.Connection,
736 validation_ctx: Option(honk.ValidationContext),
737+ timeout_ms: Int,
738 reply_to: Subject(Int),
739) -> Nil {
740 logging.log(
···742 "[backfill] PDS worker starting for "
743 <> pds_url
744 <> " with "
745+ <> int.to_string(list.length(repos))
746 <> " repos",
747 )
748 let subject = process.new_subject()
···775 collections,
776 conn,
777 validation_ctx,
778+ timeout_ms,
779 0,
780 )
781···784 "[backfill] PDS worker finished for "
785 <> pds_url
786 <> " with "
787+ <> int.to_string(total_count)
788 <> " total records",
789 )
790 process.send(reply_to, total_count)
···799 collections: List(String),
800 conn: sqlight.Connection,
801 validation_ctx: Option(honk.ValidationContext),
802+ timeout_ms: Int,
803 total: Int,
804) -> Int {
805 case in_flight {
806 0 -> total
807 _ -> {
808+ case process.receive(subject, timeout_ms) {
0809 Ok(count) -> {
810 let new_total = total + count
811 case remaining {
···828 collections,
829 conn,
830 validation_ctx,
831+ timeout_ms,
832 new_total,
833 )
834 }
···841 collections,
842 conn,
843 validation_ctx,
844+ timeout_ms,
845 new_total,
846 )
847 }
···852 "[backfill] Timeout waiting for CAR worker on "
853 <> pds_url
854 <> " (in_flight: "
855+ <> int.to_string(in_flight)
856 <> ", remaining: "
857+ <> int.to_string(list.length(remaining))
858 <> ")",
859 )
860 sliding_window_car(
···865 collections,
866 conn,
867 validation_ctx,
868+ timeout_ms,
869 total,
870 )
871 }
···874 }
875}
876877+/// Sliding window for PDS worker processing
878+/// Limits how many PDS endpoints are processed concurrently
879+fn sliding_window_pds(
880+ remaining: List(#(String, List(#(String, String)))),
881+ subject: Subject(Int),
882+ in_flight: Int,
883+ collections: List(String),
884+ max_concurrent_per_pds: Int,
885+ conn: sqlight.Connection,
886+ validation_ctx: Option(honk.ValidationContext),
887+ timeout_ms: Int,
888+ total: Int,
889+ pds_count: Int,
890+ completed: Int,
891+) -> Int {
892+ case in_flight {
893+ 0 -> total
894+ _ -> {
895+ // 5 minute timeout per PDS worker
896+ case process.receive(subject, 300_000) {
897+ Ok(count) -> {
898+ let new_total = total + count
899+ let new_completed = completed + 1
900+ logging.log(
901+ logging.Info,
902+ "[backfill] PDS worker "
903+ <> int.to_string(new_completed)
904+ <> "/"
905+ <> int.to_string(pds_count)
906+ <> " done ("
907+ <> int.to_string(count)
908+ <> " records)",
909+ )
910+ case remaining {
911+ [#(pds_url, repo_pairs), ..rest] -> {
912+ let pds_repos =
913+ repo_pairs
914+ |> list.map(fn(pair) {
915+ let #(_pds, repo) = pair
916+ repo
917+ })
918+ process.spawn_unlinked(fn() {
919+ pds_worker_car(
920+ pds_url,
921+ pds_repos,
922+ collections,
923+ max_concurrent_per_pds,
924+ conn,
925+ validation_ctx,
926+ timeout_ms,
927+ subject,
928+ )
929+ })
930+ sliding_window_pds(
931+ rest,
932+ subject,
933+ in_flight,
934+ collections,
935+ max_concurrent_per_pds,
936+ conn,
937+ validation_ctx,
938+ timeout_ms,
939+ new_total,
940+ pds_count,
941+ new_completed,
942+ )
943+ }
944+ [] ->
945+ sliding_window_pds(
946+ [],
947+ subject,
948+ in_flight - 1,
949+ collections,
950+ max_concurrent_per_pds,
951+ conn,
952+ validation_ctx,
953+ timeout_ms,
954+ new_total,
955+ pds_count,
956+ new_completed,
957+ )
958+ }
959+ }
960+ Error(_) -> {
961+ logging.log(
962+ logging.Warning,
963+ "[backfill] PDS worker timed out (in_flight: "
964+ <> int.to_string(in_flight)
965+ <> ", remaining: "
966+ <> int.to_string(list.length(remaining))
967+ <> ")",
968+ )
969+ sliding_window_pds(
970+ remaining,
971+ subject,
972+ in_flight - 1,
973+ collections,
974+ max_concurrent_per_pds,
975+ conn,
976+ validation_ctx,
977+ timeout_ms,
978+ total,
979+ pds_count,
980+ completed,
981+ )
982+ }
983+ }
984+ }
985+ }
986+}
987+988/// CAR-based streaming: fetch repos as CAR files and filter locally
989/// One request per repo instead of one per (repo, collection)
990pub fn get_records_for_repos_car(
···1017 pds
1018 })
1019001020 let pds_entries = dict.to_list(repos_by_pds)
1021 let pds_count = list.length(pds_entries)
10221023+ logging.log(
1024+ logging.Info,
1025+ "[backfill] Processing "
1026+ <> int.to_string(pds_count)
1027+ <> " PDS endpoints (max "
1028+ <> int.to_string(config.max_pds_workers)
1029+ <> " concurrent)...",
1030+ )
1031+1032+ // Use sliding window to limit concurrent PDS workers
1033+ let subject = process.new_subject()
1034+ let #(initial_pds, remaining_pds) =
1035+ list.split(pds_entries, config.max_pds_workers)
1036+ let initial_count = list.length(initial_pds)
10371038+ // Spawn initial batch of PDS workers
1039+ list.each(initial_pds, fn(pds_entry) {
1040+ let #(pds_url, repo_pairs) = pds_entry
1041+ let pds_repos =
1042+ repo_pairs
1043+ |> list.map(fn(pair) {
1044+ let #(_pds, repo) = pair
1045+ repo
001046 })
1047+1048+ process.spawn_unlinked(fn() {
1049+ pds_worker_car(
1050+ pds_url,
1051+ pds_repos,
1052+ collections,
1053+ config.max_concurrent_per_pds,
1054+ conn,
1055+ validation_ctx,
1056+ config.repo_fetch_timeout_ms,
1057+ subject,
1058+ )
1059 })
1060+ })
10611062+ // Process remaining with sliding window
00001063 let result =
1064+ sliding_window_pds(
1065+ remaining_pds,
1066+ subject,
1067+ initial_count,
1068+ collections,
1069+ config.max_concurrent_per_pds,
1070+ conn,
1071+ validation_ctx,
1072+ config.repo_fetch_timeout_ms,
1073+ 0,
1074+ pds_count,
1075+ 0,
1076+ )
1077+0000000000000001078 logging.log(
1079 logging.Info,
1080 "[backfill] All PDS workers complete, total: "
1081+ <> int.to_string(result)
1082 <> " records",
1083 )
1084 result
+12-27
server/src/backfill_ffi.erl
···1-module(backfill_ffi).
2--export([configure_pool/0, init_semaphore/0, acquire_permit/0, release_permit/0, rescue/1, monotonic_now/0, elapsed_ms/1]).
3-4-%% Maximum concurrent HTTP requests for backfill
5--define(MAX_CONCURRENT, 150).
67-%% Configure hackney connection pool with higher limits
8-configure_pool() ->
9 %% Suppress SSL handshake error notices (TLS alerts from bad certificates)
10- %% These clutter the logs when connecting to self-hosted PDS with bad certs
11- %% Set both the ssl application log level and logger level
12 application:set_env(ssl, log_level, error),
13 logger:set_application_level(ssl, error),
1415 %% Stop the default pool if it exists (ignore errors)
16 _ = hackney_pool:stop_pool(default),
1718- %% Start pool with increased connection limits and timeouts
19- %% timeout: how long to keep connections alive in the pool (ms)
20- %% max_connections: maximum number of connections in the pool
21- %% recv_timeout: how long to wait for response data (ms)
22 Options = [
23 {timeout, 150000},
24- {max_connections, 300},
25 {recv_timeout, 30000}
26 ],
2728- %% Start the pool (this will create it if it doesn't exist)
29 case hackney_pool:start_pool(default, Options) of
30 ok -> ok;
31 {error, {already_started, _}} -> ok;
···33 end,
3435 %% Initialize the semaphore for rate limiting
36- init_semaphore(),
3738- %% Return nil (atom 'nil' in Gleam)
39 nil.
4041%% Initialize the global semaphore using atomics
42-%% Uses persistent_term for fast global access
43-init_semaphore() ->
44- case persistent_term:get(backfill_semaphore, undefined) of
45- undefined ->
46- Ref = atomics:new(1, [{signed, true}]),
47- atomics:put(Ref, 1, ?MAX_CONCURRENT),
48- persistent_term:put(backfill_semaphore, Ref);
49- _ ->
50- %% Already initialized
51- ok
52- end.
5354%% Acquire a permit from the semaphore
55%% Blocks (with sleep) if no permits available
···1-module(backfill_ffi).
2+-export([configure_pool/1, init_semaphore/1, acquire_permit/0, release_permit/0, rescue/1, monotonic_now/0, elapsed_ms/1]).
00034+%% Configure hackney connection pool with specified limits
5+configure_pool(MaxConcurrent) ->
6 %% Suppress SSL handshake error notices (TLS alerts from bad certificates)
007 application:set_env(ssl, log_level, error),
8 logger:set_application_level(ssl, error),
910 %% Stop the default pool if it exists (ignore errors)
11 _ = hackney_pool:stop_pool(default),
1213+ %% Start pool with configured connection limits
00014 Options = [
15 {timeout, 150000},
16+ {max_connections, MaxConcurrent * 2},
17 {recv_timeout, 30000}
18 ],
19020 case hackney_pool:start_pool(default, Options) of
21 ok -> ok;
22 {error, {already_started, _}} -> ok;
···24 end,
2526 %% Initialize the semaphore for rate limiting
27+ init_semaphore(MaxConcurrent),
28029 nil.
3031%% Initialize the global semaphore using atomics
32+init_semaphore(MaxConcurrent) ->
33+ %% Always recreate to pick up new limit
34+ Ref = atomics:new(1, [{signed, true}]),
35+ atomics:put(Ref, 1, MaxConcurrent),
36+ persistent_term:put(backfill_semaphore, Ref),
37+ ok.
000003839%% Acquire a permit from the semaphore
40%% Blocks (with sleep) if no permits available