at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 116 lines 3.8 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def main [] { 5 let env_vars = load-env-file 6 let did = ($env_vars | get --optional TEST_REPO) 7 let password = ($env_vars | get --optional TEST_PASSWORD) 8 9 if ($did | is-empty) or ($password | is-empty) { 10 print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 11 exit 1 12 } 13 14 let port = 3011 15 let url = $"http://localhost:($port)" 16 let db_path = (mktemp -d -t hydrant_signal_test.XXXXXX) 17 18 let random_str = (random chars -l 6) 19 let collection = $"systems.hydrant.test.($random_str)" 20 21 print $"database path: ($db_path)" 22 23 let pds_url = resolve-pds $did 24 print $"resolved pds: ($pds_url)" 25 26 let session = authenticate $pds_url $did $password 27 let jwt = $session.accessJwt 28 print "authenticated" 29 30 let binary = build-hydrant 31 $env.HYDRANT_RELAY_HOST = "wss://bsky.network/" 32 let instance = start-hydrant $binary $db_path $port 33 34 mut test_passed = false 35 36 if (wait-for-api $url) { 37 # configure filter mode: index app.bsky.feed.post from anyone on the network 38 print "configuring filter mode..." 39 http patch -t application/json $"($url)/filter" { 40 mode: "filter", 41 signals: [$collection] 42 } 43 44 # verify filter state 45 let filter = (http get $"($url)/filter") 46 print $"filter state: ($filter | to json)" 47 48 if $filter.mode != "filter" { 49 print "FAILED: mode was not set to filter" 50 } else if not ($filter.signals | any { |s| $s == $collection }) { 51 print $"FAILED: ($collection) not in signals" 52 } else { 53 print "filter configured correctly" 54 55 # wait a moment for the firehose to connect and the filter to take effect 56 sleep 3sec 57 58 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 59 let record_data = { 60 "$type": $collection, 61 text: $"hydrant signal filter test ($timestamp) - bsky.network relay", 62 createdAt: $timestamp 63 } 64 65 print "creating post..." 66 let create_res = (http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 67 repo: $did, 68 collection: $collection, 69 validate: false, 70 record: $record_data 71 }) 72 let rkey = ($create_res.uri | split row "/" | last) 73 print $"created: ($create_res.uri)" 74 75 # give hydrant time to receive and process the firehose event and backfill 76 sleep 10sec 77 78 # verify the record was indexed 79 print "checking indexed record..." 80 let result = (try { 81 http get $"($url)/xrpc/com.atproto.repo.getRecord?repo=($did)&collection=($collection)&rkey=($rkey)" 82 } catch { 83 null 84 }) 85 86 if ($result | is-empty) { 87 print "FAILED: record not found in hydrant index" 88 } else { 89 print $"indexed record cid: ($result.cid)" 90 print "test PASSED: signal filter correctly indexed the post" 91 $test_passed = true 92 } 93 94 # cleanup: delete the test post 95 print "cleaning up test post..." 96 try { 97 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" { 98 repo: $did, 99 collection: $collection, 100 rkey: $rkey 101 } 102 } 103 } 104 } else { 105 print "hydrant failed to start" 106 } 107 108 print "stopping hydrant..." 109 try { kill -9 $instance.pid } 110 111 if $test_passed { 112 exit 0 113 } else { 114 exit 1 115 } 116}