at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1#!/usr/bin/env nu
2use common.nu *
3
4# compare records between hydrant and upstream pds
5export def check-consistency [hydrant_url: string, pds_url: string, did: string] {
6 print "comparing records with pds..."
7 let collections = [
8 "app.bsky.feed.post"
9 "app.bsky.actor.profile"
10 ]
11
12 mut success = true
13
14 for coll in $collections {
15 for is_rev in [false, true] {
16 print $"checking collection: ($coll) reverse:($is_rev)"
17
18 let hydrant_records = (http get $"($hydrant_url)/xrpc/com.atproto.repo.listRecords?repo=($did)&collection=($coll)&reverse=($is_rev)").records
19 let pds_records = (http get $"($pds_url)/xrpc/com.atproto.repo.listRecords?repo=($did)&collection=($coll)&reverse=($is_rev)").records
20
21 let hydrant_count = ($hydrant_records | length)
22 let pds_count = ($pds_records | length)
23
24 print $" hydrant count: ($hydrant_count), pds count: ($pds_count)"
25
26 if $hydrant_count != $pds_count {
27 print $" mismatch in count for ($coll) rev:($is_rev)!"
28 $success = false
29 continue
30 }
31
32 if $hydrant_count > 0 {
33 let h_first = ($hydrant_records | first).uri | split row "/" | last
34 let p_first = ($pds_records | first).uri | split row "/" | last
35 print $" first rkey - hydrant: ($h_first), pds: ($p_first)"
36 }
37
38 # compare cids and rkeys
39 for i in 0..($hydrant_count - 1) {
40 let h_record = ($hydrant_records | get $i)
41 let p_record = ($pds_records | get $i)
42
43 if $h_record.cid != $p_record.cid {
44 let h_rkey = ($h_record.uri | split row "/" | last)
45 let p_rkey = ($p_record.uri | split row "/" | last)
46 print $" mismatch at index ($i) for ($coll) rev:($is_rev):"
47 print $" hydrant: ($h_rkey) -> ($h_record.cid)"
48 print $" pds: ($p_rkey) -> ($p_record.cid)"
49 $success = false
50 }
51 }
52 }
53 }
54 $success
55}
56
57# verify countRecords API against debug endpoint
58def check-count [hydrant_url: string, debug_url: string, did: string] {
59 print "verifying countRecords API..."
60 let collections = [
61 "app.bsky.feed.post"
62 "app.bsky.actor.profile"
63 ]
64
65 mut success = true
66
67 for coll in $collections {
68 print $" checking count for ($coll)..."
69
70 # 1. get cached count from API
71 let api_count = try {
72 (http get $"($hydrant_url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($coll)").count
73 } catch {
74 print $" error calling countRecords API for ($coll)"
75 return false
76 }
77
78 # 2. get actual scan count from debug endpoint
79 let debug_count = try {
80 (http get $"($debug_url)/debug/count?did=($did)&collection=($coll)").count
81 } catch {
82 print $" error calling debug count for ($coll)"
83 return false
84 }
85
86 print $" api: ($api_count), debug scan: ($debug_count)"
87
88 if $api_count != $debug_count {
89 print $" COUNT MISMATCH for ($coll)! api: ($api_count) vs scan: ($debug_count)"
90 $success = false
91 }
92 }
93 $success
94}
95
96def main [] {
97 let did = "did:plc:dfl62fgb7wtjj3fcbb72naae"
98 let pds = "https://zwsp.xyz"
99 let port = 3001
100 let url = $"http://localhost:($port)"
101 let debug_url = $"http://127.0.0.1:($port + 1)"
102 let db_path = (mktemp -d -t hydrant_test.XXXXXX)
103
104 print $"testing backfill integrity for ($did)..."
105 print $"database path: ($db_path)"
106
107 let binary = build-hydrant
108 let instance = start-hydrant $binary $db_path $port
109
110 mut success = false
111
112 if (wait-for-api $url) {
113 # track the repo via API
114 print $"adding repo ($did) to tracking..."
115 http put -t application/json $"($url)/repos" [ { did: ($did) } ]
116
117 if (wait-for-backfill $url) {
118 # Run both consistency checks
119 let integrity_passed = (check-consistency $url $pds $did)
120 let count_passed = (check-count $url $debug_url $did)
121
122 if $integrity_passed and $count_passed {
123 print "all integrity checks passed!"
124 $success = true
125 } else {
126 print $"integrity checks failed. consistency: ($integrity_passed), count: ($count_passed)"
127 }
128 } else {
129 print "backfill failed or timed out."
130 }
131 } else {
132 print "api failed to start."
133 }
134
135 let hydrant_pid = $instance.pid
136 print $"stopping hydrant - pid: ($hydrant_pid)..."
137 try { kill $hydrant_pid }
138
139 if not $success { exit 1 }
140}