at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1# build the hydrant binary
2export def build-hydrant [] {
3 print "building hydrant..."
4 cargo build --release --quiet
5 "target/release/hydrant"
6}
7
8# start hydrant in the background
9export def start-hydrant [binary: string, db_path: string, port: int] {
10 let log_file = $"($db_path)/hydrant.log"
11 print $"starting hydrant - logs at ($log_file)..."
12
13 let pid = (
14 with-env {
15 HYDRANT_DATABASE_PATH: ($db_path),
16 HYDRANT_FULL_NETWORK: "false",
17 HYDRANT_API_PORT: ($port | into string),
18 HYDRANT_ENABLE_DEBUG: "true",
19 HYDRANT_DEBUG_PORT: ($port + 1 | into string),
20 HYDRANT_LOG_LEVEL: "debug"
21 } {
22 sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int
23 }
24 )
25
26 print $"hydrant started with pid: ($pid)"
27 { pid: $pid, log: $log_file }
28}
29
30# wait for the api to become responsive
31export def wait-for-api [url: string] {
32 print "waiting for api to be ready..."
33 for i in 1..30 {
34 try {
35 http get $"($url)/stats"
36 return true
37 } catch {
38 sleep 1sec
39 }
40 }
41 false
42}
43
44# poll stats until backfill is complete or fails
45export def wait-for-backfill [url: string] {
46 print "waiting for backfill to complete..."
47 for i in 1..120 {
48 let stats = (http get $"($url)/stats?accurate=true").counts
49 let pending = ($stats.pending | into int)
50 let records = ($stats.records | into int)
51 let repos = ($stats.repos | into int)
52 let resync = ($stats.resync | into int)
53
54 print $"[($i)/120] pending: ($pending), records: ($records), repos: ($repos), resync: ($resync)"
55
56 if $resync > 0 {
57 print "resync state detected (failure or gone)!"
58 print ($stats | table)
59 return false
60 }
61
62 if ($pending == 0) and ($repos > 0) and ($records > 0) {
63 print "backfill complete."
64 return true
65 }
66
67 sleep 2sec
68 }
69 false
70}