at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 3eb43b6191a55f71f8327cf6d64290a168d60cff 266 lines 8.1 kB view raw
1use crate::db::keys; 2use crate::types::{RepoState, ResyncState, StoredEvent}; 3use crate::{api::AppState, db::types::TrimmedDid}; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::StatusCode, 8}; 9use jacquard::types::cid::Cid; 10use jacquard::types::ident::AtIdentifier; 11use serde::{Deserialize, Serialize}; 12use serde_json::Value; 13use std::str::FromStr; 14use std::sync::Arc; 15 16#[derive(Deserialize)] 17pub struct DebugCountRequest { 18 pub did: String, 19 pub collection: String, 20} 21 22#[derive(Serialize)] 23pub struct DebugCountResponse { 24 pub count: usize, 25} 26 27pub fn router() -> axum::Router<Arc<AppState>> { 28 axum::Router::new() 29 .route("/debug/count", axum::routing::get(handle_debug_count)) 30 .route("/debug/get", axum::routing::get(handle_debug_get)) 31 .route("/debug/iter", axum::routing::get(handle_debug_iter)) 32} 33 34pub async fn handle_debug_count( 35 State(state): State<Arc<AppState>>, 36 Query(req): Query<DebugCountRequest>, 37) -> Result<Json<DebugCountResponse>, StatusCode> { 38 let did = state 39 .resolver 40 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?) 41 .await 42 .map_err(|_| StatusCode::BAD_REQUEST)?; 43 44 let db = &state.db; 45 let ks = db 46 .record_partition(&req.collection) 47 .map_err(|_| StatusCode::NOT_FOUND)?; 48 49 // {TrimmedDid}\x00 50 let mut prefix = Vec::new(); 51 TrimmedDid::from(&did).write_to_vec(&mut prefix); 52 prefix.push(keys::SEP); 53 54 let count = tokio::task::spawn_blocking(move || { 55 let start_key = prefix.clone(); 56 let mut end_key = prefix.clone(); 57 if let Some(msg) = end_key.last_mut() { 58 *msg += 1; 59 } 60 61 ks.range(start_key..end_key).count() 62 }) 63 .await 64 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 65 66 Ok(Json(DebugCountResponse { count })) 67} 68 69#[derive(Deserialize)] 70pub struct DebugGetRequest { 71 pub partition: String, 72 pub key: String, 73} 74 75#[derive(Serialize)] 76pub struct DebugGetResponse { 77 pub value: Option<Value>, 78} 79 80fn deserialize_value(partition: &str, value: &[u8]) -> Value { 81 match partition { 82 "repos" => { 83 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) { 84 return serde_json::to_value(state).unwrap_or(Value::Null); 85 } 86 } 87 "resync" => { 88 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) { 89 return serde_json::to_value(state).unwrap_or(Value::Null); 90 } 91 } 92 "events" => { 93 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 94 return serde_json::to_value(event).unwrap_or(Value::Null); 95 } 96 } 97 "records" => { 98 if let Ok(s) = String::from_utf8(value.to_vec()) { 99 match Cid::from_str(&s) { 100 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)), 101 Err(_) => return Value::String(s), 102 } 103 } 104 } 105 "counts" | "cursors" => { 106 if let Ok(arr) = value.try_into() { 107 return Value::Number(u64::from_be_bytes(arr).into()); 108 } 109 if let Ok(s) = String::from_utf8(value.to_vec()) { 110 return Value::String(s); 111 } 112 } 113 "blocks" => { 114 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) { 115 return val; 116 } 117 } 118 "pending" => return Value::Null, 119 _ => {} 120 } 121 Value::String(hex::encode(value)) 122} 123 124pub async fn handle_debug_get( 125 State(state): State<Arc<AppState>>, 126 Query(req): Query<DebugGetRequest>, 127) -> Result<Json<DebugGetResponse>, StatusCode> { 128 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 129 130 let key = if req.partition == "events" { 131 let id = req 132 .key 133 .parse::<u64>() 134 .map_err(|_| StatusCode::BAD_REQUEST)?; 135 id.to_be_bytes().to_vec() 136 } else { 137 req.key.into_bytes() 138 }; 139 140 let partition = req.partition.clone(); 141 let value = crate::db::Db::get(ks, key) 142 .await 143 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 144 .map(|v| deserialize_value(&partition, &v)); 145 146 Ok(Json(DebugGetResponse { value })) 147} 148 149#[derive(Deserialize)] 150pub struct DebugIterRequest { 151 pub partition: String, 152 pub start: Option<String>, 153 pub end: Option<String>, 154 pub limit: Option<usize>, 155 pub reverse: Option<bool>, 156} 157 158#[derive(Serialize)] 159pub struct DebugIterResponse { 160 pub items: Vec<(String, Value)>, 161} 162 163pub async fn handle_debug_iter( 164 State(state): State<Arc<AppState>>, 165 Query(req): Query<DebugIterRequest>, 166) -> Result<Json<DebugIterResponse>, StatusCode> { 167 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 168 let is_events = req.partition == "events"; 169 let partition = req.partition.clone(); 170 171 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { 172 match s { 173 Some(s) => { 174 if is_events { 175 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?; 176 Ok(Some(id.to_be_bytes().to_vec())) 177 } else { 178 Ok(Some(s.into_bytes())) 179 } 180 } 181 None => Ok(None), 182 } 183 }; 184 185 let start = parse_bound(req.start)?; 186 let end = parse_bound(req.end)?; 187 188 let items = tokio::task::spawn_blocking(move || { 189 let limit = req.limit.unwrap_or(50).min(1000); 190 191 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 192 let mut items = Vec::new(); 193 for guard in iter.take(limit) { 194 let (k, v) = guard 195 .into_inner() 196 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 197 198 let key_str = if is_events { 199 if let Ok(arr) = k.as_ref().try_into() { 200 u64::from_be_bytes(arr).to_string() 201 } else { 202 "invalid_u64".to_string() 203 } 204 } else { 205 String::from_utf8_lossy(&k).into_owned() 206 }; 207 208 items.push((key_str, deserialize_value(&partition, &v))); 209 } 210 Ok::<_, StatusCode>(items) 211 }; 212 213 let start_bound = if let Some(ref s) = start { 214 std::ops::Bound::Included(s.as_slice()) 215 } else { 216 std::ops::Bound::Unbounded 217 }; 218 219 let end_bound = if let Some(ref e) = end { 220 std::ops::Bound::Included(e.as_slice()) 221 } else { 222 std::ops::Bound::Unbounded 223 }; 224 225 if req.reverse == Some(true) { 226 collect( 227 &mut ks 228 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 229 start_bound, 230 end_bound, 231 )) 232 .rev(), 233 ) 234 } else { 235 collect( 236 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 237 start_bound, 238 end_bound, 239 )), 240 ) 241 } 242 }) 243 .await 244 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 245 246 Ok(Json(DebugIterResponse { items })) 247} 248 249fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 250 match name { 251 "repos" => Ok(db.repos.clone()), 252 "blocks" => Ok(db.blocks.clone()), 253 "cursors" => Ok(db.cursors.clone()), 254 "pending" => Ok(db.pending.clone()), 255 "resync" => Ok(db.resync.clone()), 256 "events" => Ok(db.events.clone()), 257 "counts" => Ok(db.counts.clone()), 258 _ => { 259 if let Some(col) = name.strip_prefix(crate::db::RECORDS_PARTITION_PREFIX) { 260 db.record_partition(col).map_err(|_| StatusCode::NOT_FOUND) 261 } else { 262 Err(StatusCode::BAD_REQUEST) 263 } 264 } 265 } 266}