at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4def run-auth-test [did: string, password: string, pds_url: string, relays: string, port: int] {
5 let url = $"http://localhost:($port)"
6 let ws_url = $"ws://localhost:($port)/stream"
7 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX)
8
9 # 1. authenticate
10 print $"authenticating with ($pds_url)..."
11 let session = authenticate $pds_url $did $password
12 let jwt = $session.accessJwt
13 print "authentication successful"
14
15 # 2. start hydrant
16 print $"starting hydrant on port ($port) with relays: ($relays)..."
17 let binary = "target/debug/hydrant" # already built in main
18 let instance = (with-env { HYDRANT_RELAY_HOSTS: $relays } {
19 start-hydrant $binary $db_path $port
20 })
21
22 mut test_passed = false
23
24 if (wait-for-api $url) {
25 # 3. start listener (live stream)
26 let output_file = $"($db_path)/stream_output.txt"
27 print $"starting stream listener -> ($output_file)"
28 # use websocat to capture output.
29 let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
30 print $"listener pid: ($stream_pid)"
31
32 # 4. add repo to hydrant (backfill trigger)
33 print $"adding repo ($did) to tracking..."
34 try {
35 http put -t application/json $"($url)/repos" [ { did: ($did) } ]
36 } catch {
37 print "warning: failed to add repo (might already be tracked), continuing..."
38 }
39
40 sleep 5sec
41
42 # 5. perform actions
43 let collection = "app.bsky.feed.post"
44 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
45 let record_data = {
46 "$type": "app.bsky.feed.post",
47 text: $"hydrant integration test ($timestamp)",
48 createdAt: $timestamp
49 }
50
51 print "--- action: create ---"
52 let create_res = create-record $pds_url $jwt $did $collection $record_data
53 print $"created uri: ($create_res.uri)"
54 print $"created cid: ($create_res.cid)"
55 let rkey = ($create_res.uri | split row "/" | last)
56
57 print "--- action: update ---"
58 let update_data = ($record_data | update text $"updated text ($timestamp)")
59
60 try {
61 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.putRecord" {
62 repo: $did,
63 collection: $collection,
64 rkey: $rkey,
65 record: $update_data,
66 }
67 print "updated record"
68 } catch { |err|
69 print $"update failed: ($err)"
70 # try to continue to delete
71 }
72
73 print "--- action: delete ---"
74 delete-record $pds_url $jwt $did $collection $rkey
75 print "deleted record"
76
77 print "--- action: deactivate ---"
78 deactivate-account $pds_url $jwt
79
80 sleep 1sec
81
82 # we might need to re-auth if session was killed by deactivation
83 print "re-authenticating..."
84 let session = authenticate $pds_url $did $password
85 let jwt = $session.accessJwt
86
87 sleep 1sec
88
89 print "--- action: activate ---"
90 activate-account $pds_url $jwt
91
92 # 6. verify
93 sleep 3sec
94 print "stopping listener..."
95 try { kill -9 $stream_pid }
96
97 if ($output_file | path exists) {
98 let content = (open $output_file | str trim)
99 if ($content | is-empty) {
100 print "failed: no events captured"
101 } else {
102 # parse json lines
103 let events = ($content | lines | each { |it| $it | from json })
104 let display_events = ($events | each { |e|
105 let value = if $e.type == "record" { $e | get -o record } else if $e.type == "account" { $e | get -o account } else { $e | get -o identity }
106 $e | select id type | insert value $value
107 })
108 print $"captured ($events | length) events"
109 $display_events | table -e | print
110
111 # filter live events for the relevant entities
112 let relevant_events = ($events | where { |it|
113 if $it.type == "record" {
114 if ($it.record | get -o live) == false {
115 return false
116 }
117 }
118 true
119 })
120
121 let checks = [
122 { |e| $e.type == "account" and $e.account.active == true },
123 { |e| $e.type == "record" and $e.record.action == "create" },
124 { |e| $e.type == "record" and $e.record.action == "update" },
125 { |e| $e.type == "record" and $e.record.action == "delete" },
126 { |e| $e.type == "account" and $e.account.active == false },
127 { |e| $e.type == "account" and $e.account.active == true }
128 ]
129
130 if ($relevant_events | length) != ($checks | length) {
131 print $"verification failed: expected ($checks | length) events, got ($relevant_events | length)"
132 $test_passed = false
133 } else {
134
135 mut failed = false
136 for i in 0..(($relevant_events | length) - 1) {
137 let event = ($relevant_events | get $i)
138 let check = ($checks | get $i)
139 if not (do $check $event) {
140 print $"verification failed at event #($i + 1)"
141 print $"event: ($event)"
142 $failed = true
143 break
144 }
145 }
146
147 if not $failed {
148 print "test success!"
149 $test_passed = true
150 } else {
151 $test_passed = false
152 }
153 }
154 }
155 } else {
156 print "failed: output file missing"
157 }
158
159 } else {
160 print "hydrant failed to start"
161 }
162
163 # cleanup
164 print "cleaning up..."
165 try { kill -9 $instance.pid }
166
167 $test_passed
168}
169
170def main [] {
171 let env_vars = load-env-file
172 let did = ($env_vars | get --optional TEST_REPO)
173 let password = ($env_vars | get --optional TEST_PASSWORD)
174
175 if ($did | is-empty) or ($password | is-empty) {
176 print "error: TEST_REPO and TEST_PASSWORD must be set in .env"
177 exit 1
178 }
179
180 let pds_url = resolve-pds $did
181
182 # ensure build
183 build-hydrant | ignore
184
185 print "=== running single-relay test ==="
186 let relay1 = "wss://relay.fire.hose.cam"
187 let success1 = run-auth-test $did $password $pds_url $relay1 3005
188
189 print ""
190 print "=== running multi-relay test ==="
191 let relay_multi = "wss://relay.fire.hose.cam,wss://relay3.fr.hose.cam,wss://relay1.us-west.bsky.network,wss://relay1.us-east.bsky.network"
192 let success2 = run-auth-test $did $password $pds_url $relay_multi 3015
193
194 if $success1 and $success2 {
195 print ""
196 print "ALL AUTHENTICATED STREAM TESTS PASSED"
197 exit 0
198 } else {
199 print ""
200 print $"TESTS FAILED: single=($success1), multi=($success2)"
201 exit 1
202 }
203}