at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 9f8e65a00c77d2644e6d8ee145f2e58cc3f195ed 138 lines 5.7 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4def main [] { 5 let did = "did:web:guestbook.gaze.systems" 6 let port = 3002 7 let url = $"http://localhost:($port)" 8 let ws_url = $"ws://localhost:($port)/stream" 9 let db_path = (mktemp -d -t hydrant_stream_test.XXXXXX) 10 11 print $"testing streaming for ($did)..." 12 print $"database path: ($db_path)" 13 14 let binary = build-hydrant 15 let instance = start-hydrant $binary $db_path $port 16 17 mut test1_passed = false 18 mut test2_passed = false 19 20 if (wait-for-api $url) { 21 # test 1: connect to stream BEFORE backfill to catch live events 22 print "=== test 1: live streaming during backfill ===" 23 24 let live_output = $"($db_path)/stream_live.txt" 25 print $"starting stream listener -> ($live_output)" 26 27 # start websocat in background to capture live events (no cursor = live only) 28 let stream_pid = (bash -c $"websocat '($ws_url)' > '($live_output)' 2>&1 & echo $!" | str trim | into int) 29 print $"stream listener pid: ($stream_pid)" 30 sleep 1sec 31 32 # trigger backfill 33 print $"adding repo ($did) to tracking..." 34 http put -t application/json $"($url)/repos" [ { did: ($did) } ] 35 36 if (wait-for-backfill $url) { 37 sleep 2sec 38 39 # stop the stream listener 40 try { kill $stream_pid } 41 sleep 1sec 42 43 if ($live_output | path exists) { 44 let live_content = (open $live_output | str trim) 45 46 if ($live_content | is-empty) { 47 print "test 1 FAILED: no live events received during backfill" 48 } else { 49 let live_messages = ($live_content | lines) 50 let live_count = ($live_messages | length) 51 print $"test 1: received ($live_count) live events during backfill" 52 53 if $live_count > 0 { 54 let first = ($live_messages | first | from json) 55 print $" first event: id=($first.id), type=($first.type)" 56 print "test 1 PASSED: live streaming works" 57 $test1_passed = true 58 } 59 } 60 } else { 61 print "test 1 FAILED: output file not created" 62 } 63 64 # test 2: connect AFTER backfill with cursor=1 to replay all events 65 print "" 66 print "=== test 2: historical replay with cursor=0 ===" 67 68 sleep 2sec 69 70 let stats = (http get $"($url)/stats?accurate=true").counts 71 let events_count = ($stats.events | into int) 72 print $"total events in db: ($events_count)" 73 74 if $events_count > 0 { 75 let history_output = $"($db_path)/stream_history.txt" 76 77 # use same approach as test 1: background process with file output 78 # cursor=0 replays from the beginning (no cursor = live-tail only) 79 print "starting historical stream listener..." 80 let history_pid = (bash -c $"websocat '($ws_url)?cursor=0' > '($history_output)' 2>&1 & echo $!" | str trim | into int) 81 print $"history listener pid: ($history_pid)" 82 83 # wait for events to be streamed (should be fast for historical replay) 84 sleep 5sec 85 86 # kill the listener 87 try { kill $history_pid } 88 sleep 500ms 89 90 if ($history_output | path exists) { 91 let history_content = (open $history_output | str trim) 92 93 if ($history_content | is-empty) { 94 print "test 2 FAILED: no historical events received" 95 } else { 96 let history_messages = ($history_content | lines) 97 let history_count = ($history_messages | length) 98 print $"test 2: received ($history_count) historical events" 99 100 if $history_count > 0 { 101 let first = ($history_messages | first | from json) 102 let last = ($history_messages | last | from json) 103 print $" first event: id=($first.id), type=($first.type)" 104 print $" last event: id=($last.id), type=($last.type)" 105 106 if $history_count >= ($events_count | into int) { 107 print $"test 2 PASSED: replayed all ($history_count) events" 108 $test2_passed = true 109 } else { 110 print $"test 2 PARTIAL: got ($history_count)/($events_count) events" 111 $test2_passed = true 112 } 113 } 114 } 115 } else { 116 print "test 2 FAILED: output file not created" 117 } 118 } 119 } else { 120 print "backfill failed or timed out." 121 try { kill $stream_pid } 122 } 123 } else { 124 print "api failed to start." 125 } 126 127 let hydrant_pid = $instance.pid 128 print $"stopping hydrant - pid: ($hydrant_pid)..." 129 try { kill $hydrant_pid } 130 131 print "" 132 if $test1_passed and $test2_passed { 133 print "=== ALL TESTS PASSED ===" 134 } else { 135 print $"=== TESTS FAILED === test1: ($test1_passed), test2: ($test2_passed)" 136 exit 1 137 } 138}