at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at ffa704fdf0884cad263253b2cc44d87c8fb5f4fe 122 lines 4.1 kB view raw
1export def load-env-file [] { 2 if (".env" | path exists) { 3 let content = (open .env) 4 $content | lines 5 | where { |x| ($x | str trim | is-empty) == false and ($x | str trim | str starts-with "#") == false } 6 | each { |x| 7 let parts = ($x | split row "=" -n 2) 8 { key: ($parts.0 | str trim), value: ($parts.1 | str trim | str trim -c '"' | str trim -c "'") } 9 } 10 | reduce -f {} { |it, acc| $acc | insert $it.key $it.value } 11 } else { 12 {} 13 } 14} 15 16export def resolve-pds [did: string] { 17 let doc = (http get $"https://plc.wtf/($did)" | from json) 18 ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint 19} 20 21export def authenticate [pds_url: string, identifier: string, password: string] { 22 http post -t application/json $"($pds_url)/xrpc/com.atproto.server.createSession" { 23 identifier: $identifier, 24 password: $password 25 } 26} 27 28export def create-record [pds_url: string, jwt: string, repo: string, collection: string, record: any] { 29 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 30 repo: $repo, 31 collection: $collection, 32 record: $record 33 } 34} 35 36export def delete-record [pds_url: string, jwt: string, repo: string, collection: string, rkey: string] { 37 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" { 38 repo: $repo, 39 collection: $collection, 40 rkey: $rkey 41 } 42} 43 44export def deactivate-account [pds_url: string, jwt: string] { 45 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.server.deactivateAccount" {} 46} 47 48export def activate-account [pds_url: string, jwt: string] { 49 curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount" 50} 51 52# build the hydrant binary 53export def build-hydrant [] { 54 print "building hydrant..." 55 cargo build --release 56 "target/release/hydrant" 57} 58 59# start hydrant in the background 60export def start-hydrant [binary: string, db_path: string, port: int] { 61 let log_file = $"($db_path)/hydrant.log" 62 print $"starting hydrant - logs at ($log_file)..." 63 64 let hydrant_vars = ($env | transpose k v | where k =~ "HYDRANT_" | reduce -f {} { |it, acc| $acc | upsert $it.k $it.v }) 65 let env_vars = { 66 HYDRANT_DATABASE_PATH: ($db_path), 67 HYDRANT_FULL_NETWORK: "false", 68 HYDRANT_API_PORT: ($port | into string), 69 HYDRANT_ENABLE_DEBUG: "true", 70 HYDRANT_DEBUG_PORT: ($port + 1 | into string), 71 HYDRANT_LOG_LEVEL: "debug" 72 } | merge $hydrant_vars 73 74 let pid = (with-env $env_vars { 75 sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 76 }) 77 78 print $"hydrant started with pid: ($pid)" 79 { pid: $pid, log: $log_file } 80} 81 82# wait for the api to become responsive 83export def wait-for-api [url: string] { 84 print "waiting for api to be ready..." 85 for i in 1..30 { 86 try { 87 http get $"($url)/stats" 88 return true 89 } catch { 90 sleep 1sec 91 } 92 } 93 false 94} 95 96# poll stats until backfill is complete or fails 97export def wait-for-backfill [url: string] { 98 print "waiting for backfill to complete..." 99 for i in 1..120 { 100 let stats = (http get $"($url)/stats?accurate=true").counts 101 let pending = ($stats.pending | into int) 102 let records = ($stats.records | into int) 103 let repos = ($stats.repos | into int) 104 let resync = ($stats.resync | into int) 105 106 print $"[($i)/120] pending: ($pending), records: ($records), repos: ($repos), resync: ($resync)" 107 108 if $resync > 0 { 109 print "resync state detected (failure or gone)!" 110 print ($stats | table) 111 return false 112 } 113 114 if ($pending == 0) and ($repos > 0) and ($records > 0) { 115 print "backfill complete." 116 return true 117 } 118 119 sleep 2sec 120 } 121 false 122}