at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4def main [] {
5 let env_vars = load-env-file
6 let did = ($env_vars | get --optional TEST_REPO)
7 let password = ($env_vars | get --optional TEST_PASSWORD)
8
9 if ($did | is-empty) or ($password | is-empty) {
10 print "error: TEST_REPO and TEST_PASSWORD must be set in .env"
11 exit 1
12 }
13
14 let port = 3011
15 let url = $"http://localhost:($port)"
16 let db_path = (mktemp -d -t hydrant_signal_test.XXXXXX)
17
18 let random_str = (random chars -l 6)
19 let collection = $"systems.hydrant.test.($random_str)"
20
21 print $"database path: ($db_path)"
22
23 let pds_url = resolve-pds $did
24 print $"resolved pds: ($pds_url)"
25
26 let session = authenticate $pds_url $did $password
27 let jwt = $session.accessJwt
28 print "authenticated"
29
30 let binary = build-hydrant
31 $env.HYDRANT_RELAY_HOST = "wss://bsky.network/"
32 let instance = start-hydrant $binary $db_path $port
33
34 mut test_passed = false
35
36 if (wait-for-api $url) {
37 # configure filter mode: index app.bsky.feed.post from anyone on the network
38 print "configuring filter mode..."
39 http patch -t application/json $"($url)/filter" {
40 mode: "filter",
41 signals: [$collection]
42 }
43
44 # verify filter state
45 let filter = (http get $"($url)/filter")
46 print $"filter state: ($filter | to json)"
47
48 if $filter.mode != "filter" {
49 print "FAILED: mode was not set to filter"
50 } else if not ($filter.signals | any { |s| $s == $collection }) {
51 print $"FAILED: ($collection) not in signals"
52 } else {
53 print "filter configured correctly"
54
55 # wait a moment for the firehose to connect and the filter to take effect
56 sleep 3sec
57
58 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
59 let record_data = {
60 "$type": $collection,
61 text: $"hydrant signal filter test ($timestamp) - bsky.network relay",
62 createdAt: $timestamp
63 }
64
65 print "creating post..."
66 let create_res = (http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" {
67 repo: $did,
68 collection: $collection,
69 validate: false,
70 record: $record_data
71 })
72 let rkey = ($create_res.uri | split row "/" | last)
73 print $"created: ($create_res.uri)"
74
75 # give hydrant time to receive and process the firehose event and backfill
76 sleep 10sec
77
78 # verify the record was indexed
79 print "checking indexed record..."
80 let result = (try {
81 http get $"($url)/xrpc/com.atproto.repo.getRecord?repo=($did)&collection=($collection)&rkey=($rkey)"
82 } catch {
83 null
84 })
85
86 if ($result | is-empty) {
87 print "FAILED: record not found in hydrant index"
88 } else {
89 print $"indexed record cid: ($result.cid)"
90 print "test PASSED: signal filter correctly indexed the post"
91 $test_passed = true
92 }
93
94 # cleanup: delete the test post
95 print "cleaning up test post..."
96 try {
97 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.deleteRecord" {
98 repo: $did,
99 collection: $collection,
100 rkey: $rkey
101 }
102 }
103 }
104 } else {
105 print "hydrant failed to start"
106 }
107
108 print "stopping hydrant..."
109 try { kill -9 $instance.pid }
110
111 if $test_passed {
112 exit 0
113 } else {
114 exit 1
115 }
116}