at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at ffa704fdf0884cad263253b2cc44d87c8fb5f4fe 183 lines 6.6 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def main [] { 5 let env_vars = load-env-file 6 let did = ($env_vars | get --optional TEST_REPO) 7 let password = ($env_vars | get --optional TEST_PASSWORD) 8 9 if ($did | is-empty) or ($password | is-empty) { 10 print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 11 exit 1 12 } 13 14 let pds_url = resolve-pds $did 15 16 let port = 3005 17 let url = $"http://localhost:($port)" 18 let ws_url = $"ws://localhost:($port)/stream" 19 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX) 20 21 # 1. authenticate 22 print $"authenticating with ($pds_url)..." 23 let session = authenticate $pds_url $did $password 24 let jwt = $session.accessJwt 25 print "authentication successful" 26 27 # 2. start hydrant 28 print $"starting hydrant on port ($port)..." 29 let binary = build-hydrant 30 let instance = start-hydrant $binary $db_path $port 31 32 mut test_passed = false 33 34 if (wait-for-api $url) { 35 # 3. start listener (live stream) 36 let output_file = $"($db_path)/stream_output.txt" 37 print $"starting stream listener -> ($output_file)" 38 # use websocat to capture output. 39 let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int) 40 print $"listener pid: ($stream_pid)" 41 42 # 4. add repo to hydrant (backfill trigger) 43 print $"adding repo ($did) to tracking..." 44 try { 45 http put -t application/json $"($url)/repos" [ { did: ($did) } ] 46 } catch { 47 print "warning: failed to add repo (might already be tracked), continuing..." 48 } 49 50 sleep 5sec 51 52 # 5. perform actions 53 let collection = "app.bsky.feed.post" 54 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 55 let record_data = { 56 "$type": "app.bsky.feed.post", 57 text: $"hydrant integration test ($timestamp)", 58 createdAt: $timestamp 59 } 60 61 print "--- action: create ---" 62 let create_res = create-record $pds_url $jwt $did $collection $record_data 63 print $"created uri: ($create_res.uri)" 64 print $"created cid: ($create_res.cid)" 65 let rkey = ($create_res.uri | split row "/" | last) 66 67 print "--- action: update ---" 68 let update_data = ($record_data | update text $"updated text ($timestamp)") 69 70 try { 71 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.putRecord" { 72 repo: $did, 73 collection: $collection, 74 rkey: $rkey, 75 record: $update_data, 76 } 77 print "updated record" 78 } catch { |err| 79 print $"update failed: ($err)" 80 # try to continue to delete 81 } 82 83 print "--- action: delete ---" 84 delete-record $pds_url $jwt $did $collection $rkey 85 print "deleted record" 86 87 print "--- action: deactivate ---" 88 deactivate-account $pds_url $jwt 89 90 sleep 1sec 91 92 # we might need to re-auth if session was killed by deactivation 93 print "re-authenticating..." 94 let session = authenticate $pds_url $did $password 95 let jwt = $session.accessJwt 96 97 sleep 1sec 98 99 print "--- action: activate ---" 100 activate-account $pds_url $jwt 101 102 # 6. verify 103 sleep 3sec 104 print "stopping listener..." 105 try { kill -9 $stream_pid } 106 107 if ($output_file | path exists) { 108 let content = (open $output_file | str trim) 109 if ($content | is-empty) { 110 print "failed: no events captured" 111 } else { 112 # parse json lines 113 let events = ($content | lines | each { |it| $it | from json }) 114 let display_events = ($events | each { |e| 115 let value = if $e.type == "record" { $e | get -o record } else if $e.type == "account" { $e | get -o account } else { $e | get -o identity } 116 $e | select id type | insert value $value 117 }) 118 print $"captured ($events | length) events" 119 $display_events | table -e | print 120 121 # filter live events for the relevant entities 122 let relevant_events = ($events | where { |it| 123 if $it.type == "record" { 124 if ($it.record | get -o live) == false { 125 return false 126 } 127 } 128 true 129 }) 130 131 let checks = [ 132 { |e| $e.type == "account" and $e.account.active == true }, 133 { |e| $e.type == "record" and $e.record.action == "create" }, 134 { |e| $e.type == "record" and $e.record.action == "update" }, 135 { |e| $e.type == "record" and $e.record.action == "delete" }, 136 { |e| $e.type == "account" and $e.account.active == false }, 137 { |e| $e.type == "account" and $e.account.active == true }, 138 { |e| $e.type == "identity" and $e.identity.did == $did } 139 ] 140 141 if ($relevant_events | length) != ($checks | length) { 142 print $"verification failed: expected ($checks | length) events, got ($relevant_events | length)" 143 $test_passed = false 144 } else { 145 146 mut failed = false 147 for i in 0..(($relevant_events | length) - 1) { 148 let event = ($relevant_events | get $i) 149 let check = ($checks | get $i) 150 if not (do $check $event) { 151 print $"verification failed at event #($i + 1)" 152 print $"event: ($event)" 153 $failed = true 154 break 155 } 156 } 157 158 if not $failed { 159 print "test success!" 160 $test_passed = true 161 } else { 162 $test_passed = false 163 } 164 } 165 } 166 } else { 167 print "failed: output file missing" 168 } 169 170 } else { 171 print "hydrant failed to start" 172 } 173 174 # cleanup 175 print "cleaning up..." 176 try { kill -9 $instance.pid } 177 178 if $test_passed { 179 exit 0 180 } else { 181 exit 1 182 } 183}