at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4def main [] {
5 let env_vars = load-env-file
6 let did = ($env_vars | get --optional TEST_REPO)
7 let password = ($env_vars | get --optional TEST_PASSWORD)
8
9 if ($did | is-empty) or ($password | is-empty) {
10 print "error: TEST_REPO and TEST_PASSWORD must be set in .env"
11 exit 1
12 }
13
14 let pds_url = resolve-pds $did
15
16 let port = 3005
17 let url = $"http://localhost:($port)"
18 let ws_url = $"ws://localhost:($port)/stream"
19 let db_path = (mktemp -d -t hydrant_auth_test.XXXXXX)
20
21 # 1. authenticate
22 print $"authenticating with ($pds_url)..."
23 let session = authenticate $pds_url $did $password
24 let jwt = $session.accessJwt
25 print "authentication successful"
26
27 # 2. start hydrant
28 print $"starting hydrant on port ($port)..."
29 let binary = build-hydrant
30 let instance = start-hydrant $binary $db_path $port
31
32 mut test_passed = false
33
34 if (wait-for-api $url) {
35 # 3. start listener (live stream)
36 let output_file = $"($db_path)/stream_output.txt"
37 print $"starting stream listener -> ($output_file)"
38 # use websocat to capture output.
39 let stream_pid = (bash -c $"websocat '($ws_url)' > '($output_file)' & echo $!" | str trim | into int)
40 print $"listener pid: ($stream_pid)"
41
42 # 4. add repo to hydrant (backfill trigger)
43 print $"adding repo ($did) to tracking..."
44 try {
45 http put -t application/json $"($url)/repos" [ { did: ($did) } ]
46 } catch {
47 print "warning: failed to add repo (might already be tracked), continuing..."
48 }
49
50 sleep 5sec
51
52 # 5. perform actions
53 let collection = "app.bsky.feed.post"
54 let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ")
55 let record_data = {
56 "$type": "app.bsky.feed.post",
57 text: $"hydrant integration test ($timestamp)",
58 createdAt: $timestamp
59 }
60
61 print "--- action: create ---"
62 let create_res = create-record $pds_url $jwt $did $collection $record_data
63 print $"created uri: ($create_res.uri)"
64 print $"created cid: ($create_res.cid)"
65 let rkey = ($create_res.uri | split row "/" | last)
66
67 print "--- action: update ---"
68 let update_data = ($record_data | update text $"updated text ($timestamp)")
69
70 try {
71 http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.putRecord" {
72 repo: $did,
73 collection: $collection,
74 rkey: $rkey,
75 record: $update_data,
76 }
77 print "updated record"
78 } catch { |err|
79 print $"update failed: ($err)"
80 # try to continue to delete
81 }
82
83 print "--- action: delete ---"
84 delete-record $pds_url $jwt $did $collection $rkey
85 print "deleted record"
86
87 print "--- action: deactivate ---"
88 deactivate-account $pds_url $jwt
89
90 sleep 1sec
91
92 # we might need to re-auth if session was killed by deactivation
93 print "re-authenticating..."
94 let session = authenticate $pds_url $did $password
95 let jwt = $session.accessJwt
96
97 sleep 1sec
98
99 print "--- action: activate ---"
100 activate-account $pds_url $jwt
101
102 # 6. verify
103 sleep 3sec
104 print "stopping listener..."
105 try { kill -9 $stream_pid }
106
107 if ($output_file | path exists) {
108 let content = (open $output_file | str trim)
109 if ($content | is-empty) {
110 print "failed: no events captured"
111 } else {
112 # parse json lines
113 let events = ($content | lines | each { |it| $it | from json })
114 let display_events = ($events | each { |e|
115 let value = if $e.type == "record" { $e | get -o record } else if $e.type == "account" { $e | get -o account } else { $e | get -o identity }
116 $e | select id type | insert value $value
117 })
118 print $"captured ($events | length) events"
119 $display_events | table -e | print
120
121 # filter live events for the relevant entities
122 let relevant_events = ($events | where { |it|
123 if $it.type == "record" {
124 if ($it.record | get -o live) == false {
125 return false
126 }
127 }
128 true
129 })
130
131 let checks = [
132 { |e| $e.type == "account" and $e.account.active == true },
133 { |e| $e.type == "record" and $e.record.action == "create" },
134 { |e| $e.type == "record" and $e.record.action == "update" },
135 { |e| $e.type == "record" and $e.record.action == "delete" },
136 { |e| $e.type == "account" and $e.account.active == false },
137 { |e| $e.type == "account" and $e.account.active == true },
138 { |e| $e.type == "identity" and $e.identity.did == $did }
139 ]
140
141 if ($relevant_events | length) != ($checks | length) {
142 print $"verification failed: expected ($checks | length) events, got ($relevant_events | length)"
143 $test_passed = false
144 } else {
145
146 mut failed = false
147 for i in 0..(($relevant_events | length) - 1) {
148 let event = ($relevant_events | get $i)
149 let check = ($checks | get $i)
150 if not (do $check $event) {
151 print $"verification failed at event #($i + 1)"
152 print $"event: ($event)"
153 $failed = true
154 break
155 }
156 }
157
158 if not $failed {
159 print "test success!"
160 $test_passed = true
161 } else {
162 $test_passed = false
163 }
164 }
165 }
166 } else {
167 print "failed: output file missing"
168 }
169
170 } else {
171 print "hydrant failed to start"
172 }
173
174 # cleanup
175 print "cleaning up..."
176 try { kill -9 $instance.pid }
177
178 if $test_passed {
179 exit 0
180 } else {
181 exit 1
182 }
183}