at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 203 lines 7.3 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def run-auth-test [did: string, password: string, pds_url: string, relays: string, port: int] { 5 let url = $"http://localhost:($port)" 6 let ws_url = $"ws://localhost:($port)/stream" 7 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX) 8 9 # 1. authenticate 10 print $"authenticating with ($pds_url)..." 11 let session = authenticate $pds_url $did $password 12 let jwt = $session.accessJwt 13 print "authentication successful" 14 15 # 2. start hydrant 16 print $"starting hydrant on port ($port) with relays: ($relays)..." 17 let binary = "target/debug/hydrant" # already built in main 18 let instance = (with-env { HYDRANT_RELAY_HOSTS: $relays } { 19 start-hydrant $binary $db_path $port 20 }) 21 22 mut test_passed = false 23 24 if (wait-for-api $url) { 25 # 3. start listener (live stream) 26 let output_file = $"($db_path)/stream_output.txt" 27 print $"starting stream listener -> ($output_file)" 28 # use websocat to capture output. 29 let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 30 print $"listener pid: ($stream_pid)" 31 32 # 4. add repo to hydrant (backfill trigger) 33 print $"adding repo ($did) to tracking..." 34 try { 35 http put -t application/json $"($url)/repos" [ { did: ($did) } ] 36 } catch { 37 print "warning: failed to add repo (might already be tracked), continuing..." 38 } 39 40 sleep 5sec 41 42 # 5. perform actions 43 let collection = "app.bsky.feed.post" 44 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 45 let record_data = { 46 "$type": "app.bsky.feed.post", 47 text: $"hydrant integration test ($timestamp)", 48 createdAt: $timestamp 49 } 50 51 print "--- action: create ---" 52 let create_res = create-record $pds_url $jwt $did $collection $record_data 53 print $"created uri: ($create_res.uri)" 54 print $"created cid: ($create_res.cid)" 55 let rkey = ($create_res.uri | split row "/" | last) 56 57 print "--- action: update ---" 58 let update_data = ($record_data | update text $"updated text ($timestamp)") 59 60 try { 61 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.putRecord" { 62 repo: $did, 63 collection: $collection, 64 rkey: $rkey, 65 record: $update_data, 66 } 67 print "updated record" 68 } catch { |err| 69 print $"update failed: ($err)" 70 # try to continue to delete 71 } 72 73 print "--- action: delete ---" 74 delete-record $pds_url $jwt $did $collection $rkey 75 print "deleted record" 76 77 print "--- action: deactivate ---" 78 deactivate-account $pds_url $jwt 79 80 sleep 1sec 81 82 # we might need to re-auth if session was killed by deactivation 83 print "re-authenticating..." 84 let session = authenticate $pds_url $did $password 85 let jwt = $session.accessJwt 86 87 sleep 1sec 88 89 print "--- action: activate ---" 90 activate-account $pds_url $jwt 91 92 # 6. verify 93 sleep 3sec 94 print "stopping listener..." 95 try { kill -9 $stream_pid } 96 97 if ($output_file | path exists) { 98 let content = (open $output_file | str trim) 99 if ($content | is-empty) { 100 print "failed: no events captured" 101 } else { 102 # parse json lines 103 let events = ($content | lines | each { |it| $it | from json }) 104 let display_events = ($events | each { |e| 105 let value = if $e.type == "record" { $e | get -o record } else if $e.type == "account" { $e | get -o account } else { $e | get -o identity } 106 $e | select id type | insert value $value 107 }) 108 print $"captured ($events | length) events" 109 $display_events | table -e | print 110 111 # filter live events for the relevant entities 112 let relevant_events = ($events | where { |it| 113 if $it.type == "record" { 114 if ($it.record | get -o live) == false { 115 return false 116 } 117 } 118 true 119 }) 120 121 let checks = [ 122 { |e| $e.type == "account" and $e.account.active == true }, 123 { |e| $e.type == "record" and $e.record.action == "create" }, 124 { |e| $e.type == "record" and $e.record.action == "update" }, 125 { |e| $e.type == "record" and $e.record.action == "delete" }, 126 { |e| $e.type == "account" and $e.account.active == false }, 127 { |e| $e.type == "account" and $e.account.active == true } 128 ] 129 130 if ($relevant_events | length) != ($checks | length) { 131 print $"verification failed: expected ($checks | length) events, got ($relevant_events | length)" 132 $test_passed = false 133 } else { 134 135 mut failed = false 136 for i in 0..(($relevant_events | length) - 1) { 137 let event = ($relevant_events | get $i) 138 let check = ($checks | get $i) 139 if not (do $check $event) { 140 print $"verification failed at event #($i + 1)" 141 print $"event: ($event)" 142 $failed = true 143 break 144 } 145 } 146 147 if not $failed { 148 print "test success!" 149 $test_passed = true 150 } else { 151 $test_passed = false 152 } 153 } 154 } 155 } else { 156 print "failed: output file missing" 157 } 158 159 } else { 160 print "hydrant failed to start" 161 } 162 163 # cleanup 164 print "cleaning up..." 165 try { kill -9 $instance.pid } 166 167 $test_passed 168} 169 170def main [] { 171 let env_vars = load-env-file 172 let did = ($env_vars | get --optional TEST_REPO) 173 let password = ($env_vars | get --optional TEST_PASSWORD) 174 175 if ($did | is-empty) or ($password | is-empty) { 176 print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 177 exit 1 178 } 179 180 let pds_url = resolve-pds $did 181 182 # ensure build 183 build-hydrant | ignore 184 185 print "=== running single-relay test ===" 186 let relay1 = "wss://relay.fire.hose.cam" 187 let success1 = run-auth-test $did $password $pds_url $relay1 3005 188 189 print "" 190 print "=== running multi-relay test ===" 191 let relay_multi = "wss://relay.fire.hose.cam,wss://relay3.fr.hose.cam,wss://relay1.us-west.bsky.network,wss://relay1.us-east.bsky.network" 192 let success2 = run-auth-test $did $password $pds_url $relay_multi 3015 193 194 if $success1 and $success2 { 195 print "" 196 print "ALL AUTHENTICATED STREAM TESTS PASSED" 197 exit 0 198 } else { 199 print "" 200 print $"TESTS FAILED: single=($success1), multi=($success2)" 201 exit 1 202 } 203}