at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 1e32f8a655364cbf5bd0bac76eb2283713f13f3d 257 lines 7.8 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::types::{RepoState, ResyncState, StoredEvent}; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8}; 9use jacquard_common::types::cid::Cid; 10use jacquard_common::types::ident::AtIdentifier; 11use serde::{Deserialize, Serialize}; 12use serde_json::Value; 13use std::str::FromStr; 14use std::sync::Arc; 15 16#[derive(Deserialize)] 17pub struct DebugCountRequest { 18 pub did: String, 19 pub collection: String, 20} 21 22#[derive(Serialize)] 23pub struct DebugCountResponse { 24 pub count: usize, 25} 26 27pub fn router() -> axum::Router<Arc<AppState>> { 28 axum::Router::new() 29 .route("/debug/count", axum::routing::get(handle_debug_count)) 30 .route("/debug/get", axum::routing::get(handle_debug_get)) 31 .route("/debug/iter", axum::routing::get(handle_debug_iter)) 32} 33 34pub async fn handle_debug_count( 35 State(state): State<Arc<AppState>>, 36 Query(req): Query<DebugCountRequest>, 37) -> Result<Json<DebugCountResponse>, StatusCode> { 38 let did = state 39 .resolver 40 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?) 41 .await 42 .map_err(|_| StatusCode::BAD_REQUEST)?; 43 44 let db = &state.db; 45 let ks = db.records.clone(); 46 47 // {TrimmedDid}|{collection}| 48 let prefix = keys::record_prefix_collection(&did, &req.collection); 49 50 let count = tokio::task::spawn_blocking(move || { 51 let start_key = prefix.clone(); 52 let mut end_key = prefix.clone(); 53 if let Some(msg) = end_key.last_mut() { 54 *msg += 1; 55 } 56 57 ks.range(start_key..end_key).count() 58 }) 59 .await 60 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 61 62 Ok(Json(DebugCountResponse { count })) 63} 64 65#[derive(Deserialize)] 66pub struct DebugGetRequest { 67 pub partition: String, 68 pub key: String, 69} 70 71#[derive(Serialize)] 72pub struct DebugGetResponse { 73 pub value: Option<Value>, 74} 75 76fn deserialize_value(partition: &str, value: &[u8]) -> Value { 77 match partition { 78 "repos" => { 79 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) { 80 return serde_json::to_value(state).unwrap_or(Value::Null); 81 } 82 } 83 "resync" => { 84 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) { 85 return serde_json::to_value(state).unwrap_or(Value::Null); 86 } 87 } 88 "events" => { 89 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 90 return serde_json::to_value(event).unwrap_or(Value::Null); 91 } 92 } 93 "records" => { 94 if let Ok(s) = String::from_utf8(value.to_vec()) { 95 match Cid::from_str(&s) { 96 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)), 97 Err(_) => return Value::String(s), 98 } 99 } 100 } 101 "counts" | "cursors" => { 102 if let Ok(arr) = value.try_into() { 103 return Value::Number(u64::from_be_bytes(arr).into()); 104 } 105 if let Ok(s) = String::from_utf8(value.to_vec()) { 106 return Value::String(s); 107 } 108 } 109 "blocks" => { 110 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) { 111 return val; 112 } 113 } 114 "pending" => return Value::Null, 115 _ => {} 116 } 117 Value::String(hex::encode(value)) 118} 119 120pub async fn handle_debug_get( 121 State(state): State<Arc<AppState>>, 122 Query(req): Query<DebugGetRequest>, 123) -> Result<Json<DebugGetResponse>, StatusCode> { 124 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 125 126 let key = if req.partition == "events" { 127 let id = req 128 .key 129 .parse::<u64>() 130 .map_err(|_| StatusCode::BAD_REQUEST)?; 131 id.to_be_bytes().to_vec() 132 } else { 133 req.key.into_bytes() 134 }; 135 136 let partition = req.partition.clone(); 137 let value = crate::db::Db::get(ks, key) 138 .await 139 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 140 .map(|v| deserialize_value(&partition, &v)); 141 142 Ok(Json(DebugGetResponse { value })) 143} 144 145#[derive(Deserialize)] 146pub struct DebugIterRequest { 147 pub partition: String, 148 pub start: Option<String>, 149 pub end: Option<String>, 150 pub limit: Option<usize>, 151 pub reverse: Option<bool>, 152} 153 154#[derive(Serialize)] 155pub struct DebugIterResponse { 156 pub items: Vec<(String, Value)>, 157} 158 159pub async fn handle_debug_iter( 160 State(state): State<Arc<AppState>>, 161 Query(req): Query<DebugIterRequest>, 162) -> Result<Json<DebugIterResponse>, StatusCode> { 163 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 164 let is_events = req.partition == "events"; 165 let partition = req.partition.clone(); 166 167 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { 168 match s { 169 Some(s) => { 170 if is_events { 171 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?; 172 Ok(Some(id.to_be_bytes().to_vec())) 173 } else { 174 Ok(Some(s.into_bytes())) 175 } 176 } 177 None => Ok(None), 178 } 179 }; 180 181 let start = parse_bound(req.start)?; 182 let end = parse_bound(req.end)?; 183 184 let items = tokio::task::spawn_blocking(move || { 185 let limit = req.limit.unwrap_or(50).min(1000); 186 187 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 188 let mut items = Vec::new(); 189 for guard in iter.take(limit) { 190 let (k, v) = guard 191 .into_inner() 192 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 193 194 let key_str = if is_events { 195 if let Ok(arr) = k.as_ref().try_into() { 196 u64::from_be_bytes(arr).to_string() 197 } else { 198 "invalid_u64".to_string() 199 } 200 } else { 201 String::from_utf8_lossy(&k).into_owned() 202 }; 203 204 items.push((key_str, deserialize_value(&partition, &v))); 205 } 206 Ok::<_, StatusCode>(items) 207 }; 208 209 let start_bound = if let Some(ref s) = start { 210 std::ops::Bound::Included(s.as_slice()) 211 } else { 212 std::ops::Bound::Unbounded 213 }; 214 215 let end_bound = if let Some(ref e) = end { 216 std::ops::Bound::Included(e.as_slice()) 217 } else { 218 std::ops::Bound::Unbounded 219 }; 220 221 if req.reverse == Some(true) { 222 collect( 223 &mut ks 224 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 225 start_bound, 226 end_bound, 227 )) 228 .rev(), 229 ) 230 } else { 231 collect( 232 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 233 start_bound, 234 end_bound, 235 )), 236 ) 237 } 238 }) 239 .await 240 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 241 242 Ok(Json(DebugIterResponse { items })) 243} 244 245fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 246 match name { 247 "repos" => Ok(db.repos.clone()), 248 "blocks" => Ok(db.blocks.clone()), 249 "cursors" => Ok(db.cursors.clone()), 250 "pending" => Ok(db.pending.clone()), 251 "resync" => Ok(db.resync.clone()), 252 "events" => Ok(db.events.clone()), 253 "counts" => Ok(db.counts.clone()), 254 "records" => Ok(db.records.clone()), 255 _ => Err(StatusCode::BAD_REQUEST), 256 } 257}