at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[tests] move shared helpers to common.nu; add signal filter test

ptr.pet 771fa253 e7961aee

verified
+163 -59
-59
tests/authenticated_stream_test.nu
··· 1 1 #!/usr/bin/env nu 2 2 use common.nu * 3 3 4 - # simplistic dotenv parser 5 - def load-env-file [] { 6 - if (".env" | path exists) { 7 - let content = (open .env) 8 - $content | lines 9 - | where { |x| ($x | str trim | is-empty) == false and ($x | str trim | str starts-with "#") == false } 10 - | each { |x| 11 - let parts = ($x | split row "=" -n 2) 12 - { key: ($parts.0 | str trim), value: ($parts.1 | str trim | str trim -c '"' | str trim -c "'") } 13 - } 14 - | reduce -f {} { |it, acc| $acc | insert $it.key $it.value } 15 - } else { 16 - {} 17 - } 18 - } 19 - 20 - def authenticate [pds_url: string, identifier: string, password: string] { 21 - print $"authenticating with ($pds_url) for ($identifier)..." 22 - let resp = (http post -t application/json $"($pds_url)/xrpc/com.atproto.server.createSession" { 23 - identifier: $identifier, 24 - password: $password 25 - }) 26 - return $resp 27 - } 28 - 29 - def create-record [pds_url: string, jwt: string, repo: string, collection: string, record: any] { 30 - http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 31 - repo: $repo, 32 - collection: $collection, 33 - record: $record 34 - } 35 - } 36 - 37 - def delete-record [pds_url: string, jwt: string, repo: string, collection: string, rkey: string] { 38 - http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" { 39 - repo: $repo, 40 - collection: $collection, 41 - rkey: $rkey 42 - } 43 - } 44 - 45 - def deactivate-account [pds_url: string, jwt: string] { 46 - print "deactivating account..." 47 - http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.server.deactivateAccount" {} 48 - } 49 - 50 - def activate-account [pds_url: string, jwt: string] { 51 - print "activating account..." 52 - curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount" 53 - } 54 - 55 - def resolve-pds [did: string] { 56 - print $"resolving pds for ($did)..." 57 - let doc = (http get $"https://plc.wtf/($did)" | from json) 58 - let pds = ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint 59 - print $"resolved pds: ($pds)" 60 - return $pds 61 - } 62 - 63 4 def main [] { 64 5 let env_vars = load-env-file 65 6 let did = ($env_vars | get --optional TEST_REPO)
+51
tests/common.nu
··· 1 + export def load-env-file [] { 2 + if (".env" | path exists) { 3 + let content = (open .env) 4 + $content | lines 5 + | where { |x| ($x | str trim | is-empty) == false and ($x | str trim | str starts-with "#") == false } 6 + | each { |x| 7 + let parts = ($x | split row "=" -n 2) 8 + { key: ($parts.0 | str trim), value: ($parts.1 | str trim | str trim -c '"' | str trim -c "'") } 9 + } 10 + | reduce -f {} { |it, acc| $acc | insert $it.key $it.value } 11 + } else { 12 + {} 13 + } 14 + } 15 + 16 + export def resolve-pds [did: string] { 17 + let doc = (http get $"https://plc.wtf/($did)" | from json) 18 + ($doc.service | where type == "AtprotoPersonalDataServer" | first).serviceEndpoint 19 + } 20 + 21 + export def authenticate [pds_url: string, identifier: string, password: string] { 22 + http post -t application/json $"($pds_url)/xrpc/com.atproto.server.createSession" { 23 + identifier: $identifier, 24 + password: $password 25 + } 26 + } 27 + 28 + export def create-record [pds_url: string, jwt: string, repo: string, collection: string, record: any] { 29 + http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 30 + repo: $repo, 31 + collection: $collection, 32 + record: $record 33 + } 34 + } 35 + 36 + export def delete-record [pds_url: string, jwt: string, repo: string, collection: string, rkey: string] { 37 + http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" { 38 + repo: $repo, 39 + collection: $collection, 40 + rkey: $rkey 41 + } 42 + } 43 + 44 + export def deactivate-account [pds_url: string, jwt: string] { 45 + http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.server.deactivateAccount" {} 46 + } 47 + 48 + export def activate-account [pds_url: string, jwt: string] { 49 + curl -X POST -H "Content-Type: application/json" -H $"Authorization: Bearer ($jwt)" $"($pds_url)/xrpc/com.atproto.server.activateAccount" 50 + } 51 + 1 52 # build the hydrant binary 2 53 export def build-hydrant [] { 3 54 print "building hydrant..."
+112
tests/signal_filter_test.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let env_vars = load-env-file 6 + let did = ($env_vars | get --optional TEST_REPO) 7 + let password = ($env_vars | get --optional TEST_PASSWORD) 8 + 9 + if ($did | is-empty) or ($password | is-empty) { 10 + print "error: TEST_REPO and TEST_PASSWORD must be set in .env" 11 + exit 1 12 + } 13 + 14 + let port = 3007 15 + let url = $"http://localhost:($port)" 16 + let db_path = (mktemp -d -t hydrant_signal_test.XXXXXX) 17 + let collection = "app.bsky.feed.post" 18 + 19 + print $"database path: ($db_path)" 20 + 21 + let pds_url = resolve-pds $did 22 + print $"resolved pds: ($pds_url)" 23 + 24 + let session = authenticate $pds_url $did $password 25 + let jwt = $session.accessJwt 26 + print "authenticated" 27 + 28 + let binary = build-hydrant 29 + let instance = start-hydrant $binary $db_path $port 30 + 31 + mut test_passed = false 32 + 33 + if (wait-for-api $url) { 34 + # configure signal mode: index app.bsky.feed.post from anyone on the network 35 + print "configuring signal mode..." 36 + http patch -t application/json $"($url)/filter" { 37 + mode: "signal", 38 + signals: [$collection] 39 + } 40 + 41 + # verify filter state 42 + let filter = (http get $"($url)/filter") 43 + print $"filter state: ($filter | to json)" 44 + 45 + if $filter.mode != "signal" { 46 + print "FAILED: mode was not set to signal" 47 + } else if not ($filter.signals | any { |s| $s == $collection }) { 48 + print $"FAILED: ($collection) not in signals" 49 + } else { 50 + print "filter configured correctly" 51 + 52 + # wait a moment for the firehose to connect and the filter to take effect 53 + sleep 3sec 54 + 55 + let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 56 + let record_data = { 57 + "$type": $collection, 58 + text: $"hydrant signal filter test ($timestamp)", 59 + createdAt: $timestamp 60 + } 61 + 62 + print "creating post..." 63 + let create_res = (http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 64 + repo: $did, 65 + collection: $collection, 66 + record: $record_data 67 + }) 68 + let rkey = ($create_res.uri | split row "/" | last) 69 + print $"created: ($create_res.uri)" 70 + 71 + # give hydrant time to receive and process the firehose event 72 + sleep 5sec 73 + 74 + # verify the record was indexed 75 + print "checking indexed record..." 76 + let result = (try { 77 + http get $"($url)/xrpc/com.atproto.repo.getRecord?repo=($did)&collection=($collection)&rkey=($rkey)" 78 + } catch { 79 + null 80 + }) 81 + 82 + if ($result | is-empty) { 83 + print "FAILED: record not found in hydrant index" 84 + } else { 85 + print $"indexed record cid: ($result.cid)" 86 + print "test PASSED: signal filter correctly indexed the post" 87 + $test_passed = true 88 + } 89 + 90 + # cleanup: delete the test post 91 + print "cleaning up test post..." 92 + try { 93 + http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" { 94 + repo: $did, 95 + collection: $collection, 96 + rkey: $rkey 97 + } 98 + } 99 + } 100 + } else { 101 + print "hydrant failed to start" 102 + } 103 + 104 + print "stopping hydrant..." 105 + try { kill -9 $instance.pid } 106 + 107 + if $test_passed { 108 + exit 0 109 + } else { 110 + exit 1 111 + } 112 + }