at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[api] add more debug endpoints for db inspection

ptr.pet 90e4116e 34489a48

verified
+295 -6
+1
Cargo.lock
··· 1623 1623 "chrono", 1624 1624 "fjall", 1625 1625 "futures", 1626 + "hex", 1626 1627 "humantime", 1627 1628 "jacquard", 1628 1629 "jacquard-api",
+1
Cargo.toml
··· 38 38 humantime = "2.3.0" 39 39 40 40 mimalloc = { version = "0.1", features = ["v3"] } 41 + hex = "0.4" 41 42 scc = "3"
+201 -2
src/api/debug.rs
··· 1 1 use crate::db::keys; 2 + use crate::types::{RepoState, ResyncState, StoredEvent}; 2 3 use crate::{api::AppState, db::types::TrimmedDid}; 3 4 use axum::{ 5 + Json, 4 6 extract::{Query, State}, 5 7 http::StatusCode, 6 - Json, 7 8 }; 9 + use jacquard::types::cid::Cid; 8 10 use jacquard::types::ident::AtIdentifier; 9 11 use serde::{Deserialize, Serialize}; 12 + use serde_json::Value; 13 + use std::str::FromStr; 10 14 use std::sync::Arc; 11 15 12 16 #[derive(Deserialize)] ··· 21 25 } 22 26 23 27 pub fn router() -> axum::Router<Arc<AppState>> { 24 - axum::Router::new().route("/debug/count", axum::routing::get(handle_debug_count)) 28 + axum::Router::new() 29 + .route("/debug/count", axum::routing::get(handle_debug_count)) 30 + .route("/debug/get", axum::routing::get(handle_debug_get)) 31 + .route("/debug/iter", axum::routing::get(handle_debug_iter)) 25 32 } 26 33 27 34 pub async fn handle_debug_count( ··· 58 65 59 66 Ok(Json(DebugCountResponse { count })) 60 67 } 68 + 69 + #[derive(Deserialize)] 70 + pub struct DebugGetRequest { 71 + pub partition: String, 72 + pub key: String, 73 + } 74 + 75 + #[derive(Serialize)] 76 + pub struct DebugGetResponse { 77 + pub value: Option<Value>, 78 + } 79 + 80 + fn deserialize_value(partition: &str, value: &[u8]) -> Value { 81 + match partition { 82 + "repos" => { 83 + if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) { 84 + return serde_json::to_value(state).unwrap_or(Value::Null); 85 + } 86 + } 87 + "resync" => { 88 + if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) { 89 + return serde_json::to_value(state).unwrap_or(Value::Null); 90 + } 91 + } 92 + "events" => { 93 + if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 94 + return serde_json::to_value(event).unwrap_or(Value::Null); 95 + } 96 + } 97 + "records" => { 98 + if let Ok(s) = String::from_utf8(value.to_vec()) { 99 + match Cid::from_str(&s) { 100 + Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)), 101 + Err(_) => return Value::String(s), 102 + } 103 + } 104 + } 105 + "counts" | "cursors" => { 106 + if let Ok(arr) = value.try_into() { 107 + return Value::Number(u64::from_be_bytes(arr).into()); 108 + } 109 + } 110 + "blocks" => { 111 + if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) { 112 + return val; 113 + } 114 + } 115 + "pending" => return Value::Null, 116 + _ => {} 117 + } 118 + Value::String(hex::encode(value)) 119 + } 120 + 121 + pub async fn handle_debug_get( 122 + State(state): State<Arc<AppState>>, 123 + Query(req): Query<DebugGetRequest>, 124 + ) -> Result<Json<DebugGetResponse>, StatusCode> { 125 + let ks = get_keyspace_by_name(&state.db, &req.partition)?; 126 + 127 + let key = if req.partition == "events" { 128 + let id = req 129 + .key 130 + .parse::<u64>() 131 + .map_err(|_| StatusCode::BAD_REQUEST)?; 132 + id.to_be_bytes().to_vec() 133 + } else { 134 + req.key.into_bytes() 135 + }; 136 + 137 + let partition = req.partition.clone(); 138 + let value = crate::db::Db::get(ks, key) 139 + .await 140 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 141 + .map(|v| deserialize_value(&partition, &v)); 142 + 143 + Ok(Json(DebugGetResponse { value })) 144 + } 145 + 146 + #[derive(Deserialize)] 147 + pub struct DebugIterRequest { 148 + pub partition: String, 149 + pub start: Option<String>, 150 + pub end: Option<String>, 151 + pub limit: Option<usize>, 152 + pub reverse: Option<bool>, 153 + } 154 + 155 + #[derive(Serialize)] 156 + pub struct DebugIterResponse { 157 + pub items: Vec<(String, Value)>, 158 + } 159 + 160 + pub async fn handle_debug_iter( 161 + State(state): State<Arc<AppState>>, 162 + Query(req): Query<DebugIterRequest>, 163 + ) -> Result<Json<DebugIterResponse>, StatusCode> { 164 + let ks = get_keyspace_by_name(&state.db, &req.partition)?; 165 + let is_events = req.partition == "events"; 166 + let partition = req.partition.clone(); 167 + 168 + let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { 169 + match s { 170 + Some(s) => { 171 + if is_events { 172 + let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?; 173 + Ok(Some(id.to_be_bytes().to_vec())) 174 + } else { 175 + Ok(Some(s.into_bytes())) 176 + } 177 + } 178 + None => Ok(None), 179 + } 180 + }; 181 + 182 + let start = parse_bound(req.start)?; 183 + let end = parse_bound(req.end)?; 184 + 185 + let items = tokio::task::spawn_blocking(move || { 186 + let limit = req.limit.unwrap_or(50).min(1000); 187 + 188 + // Helper closure to avoid generic type complexity 189 + let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 190 + let mut items = Vec::new(); 191 + for guard in iter.take(limit) { 192 + let (k, v) = guard 193 + .into_inner() 194 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 195 + 196 + let key_str = if is_events { 197 + if let Ok(arr) = k.as_ref().try_into() { 198 + u64::from_be_bytes(arr).to_string() 199 + } else { 200 + "invalid_u64".to_string() 201 + } 202 + } else { 203 + String::from_utf8_lossy(&k).into_owned() 204 + }; 205 + 206 + items.push((key_str, deserialize_value(&partition, &v))); 207 + } 208 + Ok::<_, StatusCode>(items) 209 + }; 210 + 211 + let start_bound = if let Some(ref s) = start { 212 + std::ops::Bound::Included(s.as_slice()) 213 + } else { 214 + std::ops::Bound::Unbounded 215 + }; 216 + 217 + let end_bound = if let Some(ref e) = end { 218 + std::ops::Bound::Included(e.as_slice()) 219 + } else { 220 + std::ops::Bound::Unbounded 221 + }; 222 + 223 + if req.reverse == Some(true) { 224 + collect( 225 + &mut ks 226 + .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 227 + start_bound, 228 + end_bound, 229 + )) 230 + .rev(), 231 + ) 232 + } else { 233 + collect( 234 + &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 235 + start_bound, 236 + end_bound, 237 + )), 238 + ) 239 + } 240 + }) 241 + .await 242 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 243 + 244 + Ok(Json(DebugIterResponse { items })) 245 + } 246 + 247 + fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 248 + match name { 249 + "repos" => Ok(db.repos.clone()), 250 + "records" => Ok(db.records.clone()), 251 + "blocks" => Ok(db.blocks.clone()), 252 + "cursors" => Ok(db.cursors.clone()), 253 + "pending" => Ok(db.pending.clone()), 254 + "resync" => Ok(db.resync.clone()), 255 + "events" => Ok(db.events.clone()), 256 + "counts" => Ok(db.counts.clone()), 257 + _ => Err(StatusCode::BAD_REQUEST), 258 + } 259 + }
+3 -3
src/bin/mst_dump.rs
··· 3 3 use std::time::Duration; 4 4 5 5 use hydrant::resolver::Resolver; 6 - use jacquard::IntoStatic; // Corrected from jacquard_identity::IntoStatic 6 + use jacquard::IntoStatic; 7 7 use jacquard::api::com_atproto::sync::get_repo::GetRepo; 8 8 use jacquard::prelude::XrpcExt; 9 9 use jacquard::types::did::Did; ··· 12 12 use jacquard_repo::mst::Mst; 13 13 use miette::{IntoDiagnostic, Result}; 14 14 use tracing::{Level, info}; 15 - use tracing_subscriber::FmtSubscriber; // Restored 16 - use url::Url; // Restored 15 + use tracing_subscriber::FmtSubscriber; 16 + use url::Url; 17 17 18 18 #[tokio::main] 19 19 async fn main() -> Result<()> {
+1 -1
src/db/keys.rs
··· 3 3 use crate::db::types::TrimmedDid; 4 4 5 5 /// separator used for composite keys 6 - pub const SEP: u8 = 0x00; 6 + pub const SEP: u8 = b'|'; 7 7 8 8 pub const CURSOR_KEY: &[u8] = b"firehose_cursor"; 9 9
+88
tests/debug_endpoints.nu
··· 1 + #!/usr/bin/env nu 2 + use common.nu * 3 + 4 + def main [] { 5 + let did = "did:web:guestbook.gaze.systems" 6 + let port = 3003 7 + let debug_port = $port + 1 8 + let url = $"http://localhost:($port)" 9 + let debug_url = $"http://localhost:($debug_port)" 10 + let db_path = (mktemp -d -t hydrant_debug_test.XXXXXX) 11 + 12 + print $"testing debug endpoints..." 13 + print $"database path: ($db_path)" 14 + 15 + let binary = build-hydrant 16 + let instance = start-hydrant $binary $db_path $port 17 + 18 + if (wait-for-api $url) { 19 + # Trigger backfill to populate some data 20 + print $"adding repo ($did) to tracking..." 21 + http post -t application/json $"($url)/repo/add" { dids: [($did)] } 22 + 23 + if (wait-for-backfill $url) { 24 + print "backfill complete, testing debug endpoints" 25 + 26 + # 1. Test /debug/iter to find a key 27 + print "testing /debug/iter on records partition" 28 + let records = http get $"($debug_url)/debug/iter?partition=records&limit=1" 29 + 30 + if ($records.items | is-empty) { 31 + print "FAILED: /debug/iter returned empty items" 32 + exit 1 33 + } 34 + 35 + let first_item = ($records.items | first) 36 + let key_str = $first_item.0 37 + let value_cid = $first_item.1 38 + 39 + print $"found key: ($key_str)" 40 + print $"found value [cid]: ($value_cid)" 41 + 42 + if not ($key_str | str contains "|") { 43 + print "FAILED: key does not contain pipe separator" 44 + exit 1 45 + } 46 + 47 + # 2. Test /debug/get with that key (sent as string) 48 + print "testing /debug/get" 49 + let get_res = http get $"($debug_url)/debug/get?partition=records&key=($key_str)" 50 + 51 + if $get_res.value != $value_cid { 52 + print $"FAILED: /debug/get returned different value. expected: ($value_cid), got: ($get_res.value)" 53 + exit 1 54 + } 55 + 56 + print "PASSED: /debug/iter and /debug/get works with string keys and JSON values" 57 + 58 + # 3. Test /debug/iter on events partition (should be JSON objects) 59 + print "testing /debug/iter on events partition" 60 + let events = http get $"($debug_url)/debug/iter?partition=events&limit=1" 61 + 62 + if ($events.items | is-empty) { 63 + # might be empty if no events yet (backfill only fills records?) 64 + # Backfill should generate events? ops.rs makes events. 65 + print "WARNING: /debug/iter returned empty items for events (expected if async?)" 66 + } else { 67 + let first_evt = ($events.items | first) 68 + let val = $first_evt.1 69 + let type = ($val | describe) 70 + print $"found event value type: ($type)" 71 + if not ($type | str starts-with "record") { 72 + print $"FAILED: events value is not a record/object. got: ($type)" 73 + exit 1 74 + } 75 + print "PASSED: /debug/iter on events returns JSON objects" 76 + } 77 + 78 + } else { 79 + print "backfill failed" 80 + exit 1 81 + } 82 + } else { 83 + print "api failed to start" 84 + exit 1 85 + } 86 + 87 + try { kill $instance.pid } 88 + }