at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4def main [] {
5 let did = "did:web:guestbook.gaze.systems"
6 let port = 3002
7 let url = $"http://localhost:($port)"
8 let ws_url = $"ws://localhost:($port)/stream"
9 let db_path = (mktemp -d -t hydrant_stream_test.XXXXXX)
10
11 print $"testing streaming for ($did)..."
12 print $"database path: ($db_path)"
13
14 let binary = build-hydrant
15 let instance = start-hydrant $binary $db_path $port
16
17 mut test1_passed = false
18 mut test2_passed = false
19
20 if (wait-for-api $url) {
21 # test 1: connect to stream BEFORE backfill to catch live events
22 print "=== test 1: live streaming during backfill ==="
23
24 let live_output = $"($db_path)/stream_live.txt"
25 print $"starting stream listener -> ($live_output)"
26
27 # start websocat in background to capture live events (no cursor = live only)
28 let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int)
29 print $"stream listener pid: ($stream_pid)"
30 sleep 1sec
31
32 # trigger backfill
33 print $"adding repo ($did) to tracking..."
34 http post -t application/json $"($url)/repo/add" { dids: [($did)] }
35
36 if (wait-for-backfill $url) {
37 sleep 2sec
38
39 # stop the stream listener
40 try { kill $stream_pid }
41 sleep 1sec
42
43 if ($live_output | path exists) {
44 let live_content = (open $live_output | str trim)
45
46 if ($live_content | is-empty) {
47 print "test 1 FAILED: no live events received during backfill"
48 } else {
49 let live_messages = ($live_content | lines)
50 let live_count = ($live_messages | length)
51 print $"test 1: received ($live_count) live events during backfill"
52
53 if $live_count > 0 {
54 let first = ($live_messages | first | from json)
55 print $" first event: id=($first.id), type=($first.type)"
56 print "test 1 PASSED: live streaming works"
57 $test1_passed = true
58 }
59 }
60 } else {
61 print "test 1 FAILED: output file not created"
62 }
63
64 # test 2: connect AFTER backfill with cursor=1 to replay all events
65 print ""
66 print "=== test 2: historical replay with cursor=0 ==="
67
68 sleep 2sec
69
70 let stats = (http get $"($url)/stats?accurate=true").counts
71 let events_count = ($stats.events | into int)
72 print $"total events in db: ($events_count)"
73
74 if $events_count > 0 {
75 let history_output = $"($db_path)/stream_history.txt"
76
77 # use same approach as test 1: background process with file output
78 # cursor=0 replays from the beginning (no cursor = live-tail only)
79 print "starting historical stream listener..."
80 let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int)
81 print $"history listener pid: ($history_pid)"
82
83 # wait for events to be streamed (should be fast for historical replay)
84 sleep 5sec
85
86 # kill the listener
87 try { kill $history_pid }
88 sleep 500ms
89
90 if ($history_output | path exists) {
91 let history_content = (open $history_output | str trim)
92
93 if ($history_content | is-empty) {
94 print "test 2 FAILED: no historical events received"
95 } else {
96 let history_messages = ($history_content | lines)
97 let history_count = ($history_messages | length)
98 print $"test 2: received ($history_count) historical events"
99
100 if $history_count > 0 {
101 let first = ($history_messages | first | from json)
102 let last = ($history_messages | last | from json)
103 print $" first event: id=($first.id), type=($first.type)"
104 print $" last event: id=($last.id), type=($last.type)"
105
106 if $history_count >= ($events_count | into int) {
107 print $"test 2 PASSED: replayed all ($history_count) events"
108 $test2_passed = true
109 } else {
110 print $"test 2 PARTIAL: got ($history_count)/($events_count) events"
111 $test2_passed = true
112 }
113 }
114 }
115 } else {
116 print "test 2 FAILED: output file not created"
117 }
118 }
119 } else {
120 print "backfill failed or timed out."
121 try { kill $stream_pid }
122 }
123 } else {
124 print "api failed to start."
125 }
126
127 let hydrant_pid = $instance.pid
128 print $"stopping hydrant - pid: ($hydrant_pid)..."
129 try { kill $hydrant_pid }
130
131 print ""
132 if $test1_passed and $test2_passed {
133 print "=== ALL TESTS PASSED ==="
134 } else {
135 print $"=== TESTS FAILED === test1: ($test1_passed), test2: ($test2_passed)"
136 exit 1
137 }
138}