at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 140 lines 4.9 kB view raw
1#!/usr/bin/env nu 2use common.nu * 3 4# compare records between hydrant and upstream pds 5export def check-consistency [hydrant_url: string, pds_url: string, did: string] { 6 print "comparing records with pds..." 7 let collections = [ 8 "app.bsky.feed.post" 9 "app.bsky.actor.profile" 10 ] 11 12 mut success = true 13 14 for coll in $collections { 15 for is_rev in [false, true] { 16 print $"checking collection: ($coll) reverse:($is_rev)" 17 18 let hydrant_records = (http get $"($hydrant_url)/xrpc/com.atproto.repo.listRecords?repo=($did)&collection=($coll)&reverse=($is_rev)").records 19 let pds_records = (http get $"($pds_url)/xrpc/com.atproto.repo.listRecords?repo=($did)&collection=($coll)&reverse=($is_rev)").records 20 21 let hydrant_count = ($hydrant_records | length) 22 let pds_count = ($pds_records | length) 23 24 print $" hydrant count: ($hydrant_count), pds count: ($pds_count)" 25 26 if $hydrant_count != $pds_count { 27 print $" mismatch in count for ($coll) rev:($is_rev)!" 28 $success = false 29 continue 30 } 31 32 if $hydrant_count > 0 { 33 let h_first = ($hydrant_records | first).uri | split row "/" | last 34 let p_first = ($pds_records | first).uri | split row "/" | last 35 print $" first rkey - hydrant: ($h_first), pds: ($p_first)" 36 } 37 38 # compare cids and rkeys 39 for i in 0..($hydrant_count - 1) { 40 let h_record = ($hydrant_records | get $i) 41 let p_record = ($pds_records | get $i) 42 43 if $h_record.cid != $p_record.cid { 44 let h_rkey = ($h_record.uri | split row "/" | last) 45 let p_rkey = ($p_record.uri | split row "/" | last) 46 print $" mismatch at index ($i) for ($coll) rev:($is_rev):" 47 print $" hydrant: ($h_rkey) -> ($h_record.cid)" 48 print $" pds: ($p_rkey) -> ($p_record.cid)" 49 $success = false 50 } 51 } 52 } 53 } 54 $success 55} 56 57# verify countRecords API against debug endpoint 58def check-count [hydrant_url: string, debug_url: string, did: string] { 59 print "verifying countRecords API..." 60 let collections = [ 61 "app.bsky.feed.post" 62 "app.bsky.actor.profile" 63 ] 64 65 mut success = true 66 67 for coll in $collections { 68 print $" checking count for ($coll)..." 69 70 # 1. get cached count from API 71 let api_count = try { 72 (http get $"($hydrant_url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($coll)").count 73 } catch { 74 print $" error calling countRecords API for ($coll)" 75 return false 76 } 77 78 # 2. get actual scan count from debug endpoint 79 let debug_count = try { 80 (http get $"($debug_url)/debug/count?did=($did)&collection=($coll)").count 81 } catch { 82 print $" error calling debug count for ($coll)" 83 return false 84 } 85 86 print $" api: ($api_count), debug scan: ($debug_count)" 87 88 if $api_count != $debug_count { 89 print $" COUNT MISMATCH for ($coll)! api: ($api_count) vs scan: ($debug_count)" 90 $success = false 91 } 92 } 93 $success 94} 95 96def main [] { 97 let did = "did:plc:dfl62fgb7wtjj3fcbb72naae" 98 let pds = "https://zwsp.xyz" 99 let port = 3001 100 let url = $"http://localhost:($port)" 101 let debug_url = $"http://127.0.0.1:($port + 1)" 102 let db_path = (mktemp -d -t hydrant_test.XXXXXX) 103 104 print $"testing backfill integrity for ($did)..." 105 print $"database path: ($db_path)" 106 107 let binary = build-hydrant 108 let instance = start-hydrant $binary $db_path $port 109 110 mut success = false 111 112 if (wait-for-api $url) { 113 # track the repo via API 114 print $"adding repo ($did) to tracking..." 115 http put -t application/json $"($url)/repos" [ { did: ($did) } ] 116 117 if (wait-for-backfill $url) { 118 # Run both consistency checks 119 let integrity_passed = (check-consistency $url $pds $did) 120 let count_passed = (check-count $url $debug_url $did) 121 122 if $integrity_passed and $count_passed { 123 print "all integrity checks passed!" 124 $success = true 125 } else { 126 print $"integrity checks failed. consistency: ($integrity_passed), count: ($count_passed)" 127 } 128 } else { 129 print "backfill failed or timed out." 130 } 131 } else { 132 print "api failed to start." 133 } 134 135 let hydrant_pid = $instance.pid 136 print $"stopping hydrant - pid: ($hydrant_pid)..." 137 try { kill $hydrant_pid } 138 139 if not $success { exit 1 } 140}