at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1export def load-env-file [] {
2 if (".env" | path exists) {
3 let content = (open .env)
4 $content | lines
5 | where { |x| ($x | str trim | is-empty) == false and ($x | str trim | str starts-with "#") == false }
6 | each { |x|
7 let parts = ($x | split row "=" -n 2)
8 { key: ($parts.0 | str trim), value: ($parts.1 | str trim | str trim -c '"' | str trim -c "'") }
9 }
10 | reduce -f {} { |it, acc| $acc | insert $it.key $it.value }
11 } else {
12 {}
13 }
14}
15
16export def resolve-pds [did: string] {
17 let doc = (http get $"https://plc.wtf/($did)" | from json)
18 ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint
19}
20
21export def authenticate [pds_url: string, identifier: string, password: string] {
22 http post -t application/json $"($pds_url)/xrpc/com.atproto.server.createSession" {
23 identifier: $identifier,
24 password: $password
25 }
26}
27
28export def create-record [pds_url: string, jwt: string, repo: string, collection: string, record: any] {
29 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" {
30 repo: $repo,
31 collection: $collection,
32 record: $record
33 }
34}
35
36export def delete-record [pds_url: string, jwt: string, repo: string, collection: string, rkey: string] {
37 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" {
38 repo: $repo,
39 collection: $collection,
40 rkey: $rkey
41 }
42}
43
44export def deactivate-account [pds_url: string, jwt: string] {
45 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.server.deactivateAccount" {}
46}
47
48export def activate-account [pds_url: string, jwt: string] {
49 curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount"
50}
51
52# build the hydrant binary
53export def build-hydrant [] {
54 print "building hydrant..."
55 cargo build --release
56 "target/release/hydrant"
57}
58
59# start hydrant in the background
60export def start-hydrant [binary: string, db_path: string, port: int] {
61 let log_file = $"($db_path)/hydrant.log"
62 print $"starting hydrant - logs at ($log_file)..."
63
64 let hydrant_vars = ($env | transpose k v | where k =~ "HYDRANT_" | reduce -f {} { |it, acc| $acc | upsert $it.k $it.v })
65 let env_vars = {
66 HYDRANT_DATABASE_PATH: ($db_path),
67 HYDRANT_FULL_NETWORK: "false",
68 HYDRANT_API_PORT: ($port | into string),
69 HYDRANT_ENABLE_DEBUG: "true",
70 HYDRANT_DEBUG_PORT: ($port + 1 | into string),
71 HYDRANT_LOG_LEVEL: "debug"
72 } | merge $hydrant_vars
73
74 let pid = (with-env $env_vars {
75 sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int
76 })
77
78 print $"hydrant started with pid: ($pid)"
79 { pid: $pid, log: $log_file }
80}
81
82# wait for the api to become responsive
83export def wait-for-api [url: string] {
84 print "waiting for api to be ready..."
85 for i in 1..30 {
86 try {
87 http get $"($url)/stats"
88 return true
89 } catch {
90 sleep 1sec
91 }
92 }
93 false
94}
95
96# poll stats until backfill is complete or fails
97export def wait-for-backfill [url: string] {
98 print "waiting for backfill to complete..."
99 for i in 1..120 {
100 let stats = (http get $"($url)/stats?accurate=true").counts
101 let pending = ($stats.pending | into int)
102 let records = ($stats.records | into int)
103 let repos = ($stats.repos | into int)
104 let resync = ($stats.resync | into int)
105
106 print $"[($i)/120] pending: ($pending), records: ($records), repos: ($repos), resync: ($resync)"
107
108 if $resync > 0 {
109 print "resync state detected (failure or gone)!"
110 print ($stats | table)
111 return false
112 }
113
114 if ($pending == 0) and ($repos > 0) and ($records > 0) {
115 print "backfill complete."
116 return true
117 }
118
119 sleep 2sec
120 }
121 false
122}