at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 8990a2ff5651c71cb6fa29aa3c44fa4212b6a4f6 403 lines 13 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::types::{RepoState, ResyncState, StoredEvent}; 4use axum::routing::{get, post}; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9}; 10use jacquard_common::types::cid::Cid; 11use jacquard_common::types::ident::AtIdentifier; 12use serde::{Deserialize, Serialize}; 13use serde_json::Value; 14use std::str::FromStr; 15use std::sync::Arc; 16 17#[derive(Deserialize)] 18pub struct DebugCountRequest { 19 pub did: String, 20 pub collection: String, 21} 22 23#[derive(Serialize)] 24pub struct DebugCountResponse { 25 pub count: usize, 26} 27 28pub fn router() -> axum::Router<Arc<AppState>> { 29 axum::Router::new() 30 .route("/debug/count", get(handle_debug_count)) 31 .route("/debug/get", get(handle_debug_get)) 32 .route("/debug/iter", get(handle_debug_iter)) 33 .route("/debug/refcount", get(handle_debug_refcount)) 34 .route("/debug/refcount", post(handle_set_debug_refcount)) 35 .route("/debug/repo_refcounts", get(handle_debug_repo_refcounts)) 36 .route("/debug/compact", post(handle_debug_compact)) 37} 38 39pub async fn handle_debug_count( 40 State(state): State<Arc<AppState>>, 41 Query(req): Query<DebugCountRequest>, 42) -> Result<Json<DebugCountResponse>, StatusCode> { 43 let did = state 44 .resolver 45 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?) 46 .await 47 .map_err(|_| StatusCode::BAD_REQUEST)?; 48 49 let db = &state.db; 50 let ks = db.records.clone(); 51 52 // {TrimmedDid}|{collection}| 53 let prefix = keys::record_prefix_collection(&did, &req.collection); 54 55 let count = tokio::task::spawn_blocking(move || { 56 let start_key = prefix.clone(); 57 let mut end_key = prefix.clone(); 58 if let Some(msg) = end_key.last_mut() { 59 *msg += 1; 60 } 61 62 ks.range(start_key..end_key).count() 63 }) 64 .await 65 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 66 67 Ok(Json(DebugCountResponse { count })) 68} 69 70#[derive(Deserialize)] 71pub struct DebugGetRequest { 72 pub partition: String, 73 pub key: String, 74} 75 76#[derive(Serialize)] 77pub struct DebugGetResponse { 78 pub value: Option<Value>, 79} 80 81fn deserialize_value(partition: &str, value: &[u8]) -> Value { 82 match partition { 83 "repos" => { 84 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) { 85 return serde_json::to_value(state).unwrap_or(Value::Null); 86 } 87 } 88 "resync" => { 89 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) { 90 return serde_json::to_value(state).unwrap_or(Value::Null); 91 } 92 } 93 "events" => { 94 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 95 return serde_json::to_value(event).unwrap_or(Value::Null); 96 } 97 } 98 "records" => { 99 if let Ok(s) = String::from_utf8(value.to_vec()) { 100 match Cid::from_str(&s) { 101 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)), 102 Err(_) => return Value::String(s), 103 } 104 } 105 } 106 "counts" | "cursors" => { 107 if let Ok(arr) = value.try_into() { 108 return Value::Number(u64::from_be_bytes(arr).into()); 109 } 110 if let Ok(s) = String::from_utf8(value.to_vec()) { 111 return Value::String(s); 112 } 113 } 114 "blocks" => { 115 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) { 116 return val; 117 } 118 } 119 "pending" => return Value::Null, 120 _ => {} 121 } 122 Value::String(hex::encode(value)) 123} 124 125pub async fn handle_debug_get( 126 State(state): State<Arc<AppState>>, 127 Query(req): Query<DebugGetRequest>, 128) -> Result<Json<DebugGetResponse>, StatusCode> { 129 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 130 131 let key = if req.partition == "events" { 132 let id = req 133 .key 134 .parse::<u64>() 135 .map_err(|_| StatusCode::BAD_REQUEST)?; 136 id.to_be_bytes().to_vec() 137 } else { 138 req.key.into_bytes() 139 }; 140 141 let partition = req.partition.clone(); 142 let value = crate::db::Db::get(ks, key) 143 .await 144 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 145 .map(|v| deserialize_value(&partition, &v)); 146 147 Ok(Json(DebugGetResponse { value })) 148} 149 150#[derive(Deserialize)] 151pub struct DebugIterRequest { 152 pub partition: String, 153 pub start: Option<String>, 154 pub end: Option<String>, 155 pub limit: Option<usize>, 156 pub reverse: Option<bool>, 157} 158 159#[derive(Serialize)] 160pub struct DebugIterResponse { 161 pub items: Vec<(String, Value)>, 162} 163 164pub async fn handle_debug_iter( 165 State(state): State<Arc<AppState>>, 166 Query(req): Query<DebugIterRequest>, 167) -> Result<Json<DebugIterResponse>, StatusCode> { 168 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 169 let is_events = req.partition == "events"; 170 let partition = req.partition.clone(); 171 172 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { 173 match s { 174 Some(s) => { 175 if is_events { 176 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?; 177 Ok(Some(id.to_be_bytes().to_vec())) 178 } else { 179 Ok(Some(s.into_bytes())) 180 } 181 } 182 None => Ok(None), 183 } 184 }; 185 186 let start = parse_bound(req.start)?; 187 let end = parse_bound(req.end)?; 188 189 let items = tokio::task::spawn_blocking(move || { 190 let limit = req.limit.unwrap_or(50); 191 192 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 193 let mut items = Vec::new(); 194 for guard in iter.take(limit) { 195 let (k, v) = guard 196 .into_inner() 197 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 198 199 let key_str = if is_events { 200 if let Ok(arr) = k.as_ref().try_into() { 201 u64::from_be_bytes(arr).to_string() 202 } else { 203 "invalid_u64".to_string() 204 } 205 } else if partition == "blocks" { 206 match cid::Cid::read_bytes(k.as_ref()) { 207 Ok(cid) => cid.to_string(), 208 Err(_) => String::from_utf8_lossy(&k).into_owned(), 209 } 210 } else { 211 String::from_utf8_lossy(&k).into_owned() 212 }; 213 214 items.push((key_str, deserialize_value(&partition, &v))); 215 } 216 Ok::<_, StatusCode>(items) 217 }; 218 219 let start_bound = if let Some(ref s) = start { 220 std::ops::Bound::Included(s.as_slice()) 221 } else { 222 std::ops::Bound::Unbounded 223 }; 224 225 let end_bound = if let Some(ref e) = end { 226 std::ops::Bound::Included(e.as_slice()) 227 } else { 228 std::ops::Bound::Unbounded 229 }; 230 231 if req.reverse == Some(true) { 232 collect( 233 &mut ks 234 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 235 start_bound, 236 end_bound, 237 )) 238 .rev(), 239 ) 240 } else { 241 collect( 242 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 243 start_bound, 244 end_bound, 245 )), 246 ) 247 } 248 }) 249 .await 250 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 251 252 Ok(Json(DebugIterResponse { items })) 253} 254 255fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 256 match name { 257 "repos" => Ok(db.repos.clone()), 258 "blocks" => Ok(db.blocks.clone()), 259 "cursors" => Ok(db.cursors.clone()), 260 "pending" => Ok(db.pending.clone()), 261 "resync" => Ok(db.resync.clone()), 262 "events" => Ok(db.events.clone()), 263 "counts" => Ok(db.counts.clone()), 264 "records" => Ok(db.records.clone()), 265 _ => Err(StatusCode::BAD_REQUEST), 266 } 267} 268 269#[derive(Deserialize)] 270pub struct DebugCompactRequest { 271 pub partition: String, 272} 273 274pub async fn handle_debug_compact( 275 State(state): State<Arc<AppState>>, 276 Query(req): Query<DebugCompactRequest>, 277) -> Result<StatusCode, StatusCode> { 278 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 279 let state_clone = state.clone(); 280 281 tokio::task::spawn_blocking(move || { 282 let _ = ks.remove(b"dummy_tombstone123"); 283 let _ = state_clone.db.persist(); 284 let _ = ks.rotate_memtable_and_wait(); 285 ks.major_compact() 286 }) 287 .await 288 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 289 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 290 291 Ok(StatusCode::OK) 292} 293 294#[derive(Deserialize)] 295pub struct DebugRefcountRequest { 296 pub cid: String, 297} 298 299#[derive(Serialize)] 300pub struct DebugRefcountResponse { 301 pub count: Option<i64>, 302} 303 304pub async fn handle_debug_refcount( 305 State(state): State<Arc<AppState>>, 306 Query(req): Query<DebugRefcountRequest>, 307) -> Result<Json<DebugRefcountResponse>, StatusCode> { 308 let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?; 309 let cid_bytes = fjall::Slice::from(cid.to_bytes()); 310 311 let count = state 312 .db 313 .block_refcounts 314 .read_sync(cid_bytes.as_ref(), |_, v| *v); 315 316 Ok(Json(DebugRefcountResponse { count })) 317} 318 319#[derive(Deserialize)] 320pub struct DebugSetRefcountRequest { 321 pub cid: String, 322 pub count: i64, 323} 324 325pub async fn handle_set_debug_refcount( 326 State(state): State<Arc<AppState>>, 327 axum::extract::Json(req): axum::extract::Json<DebugSetRefcountRequest>, 328) -> Result<StatusCode, StatusCode> { 329 let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?; 330 let cid_bytes = fjall::Slice::from(cid.to_bytes()); 331 332 let _ = state.db.block_refcounts.insert_sync(cid_bytes, req.count); 333 334 Ok(StatusCode::OK) 335} 336 337#[derive(Deserialize)] 338pub struct DebugRepoRefcountsRequest { 339 pub did: String, 340} 341 342#[derive(Serialize)] 343pub struct DebugRepoRefcountsResponse { 344 pub cids: std::collections::HashMap<String, i64>, 345} 346 347pub async fn handle_debug_repo_refcounts( 348 State(state): State<Arc<AppState>>, 349 Query(req): Query<DebugRepoRefcountsRequest>, 350) -> Result<Json<DebugRepoRefcountsResponse>, StatusCode> { 351 let raw_did = jacquard_common::types::ident::AtIdentifier::new(req.did.as_str()) 352 .map_err(|_| StatusCode::BAD_REQUEST)?; 353 let did = state 354 .resolver 355 .resolve_did(&raw_did) 356 .await 357 .map_err(|_| StatusCode::BAD_REQUEST)?; 358 359 let state_clone = state.clone(); 360 361 let cids = tokio::task::spawn_blocking(move || { 362 let mut unique_cids: std::collections::HashSet<String> = std::collections::HashSet::new(); 363 let db = &state_clone.db; 364 365 // 1. Scan records 366 let records_prefix = crate::db::keys::record_prefix_did(&did); 367 for guard in db.records.prefix(&records_prefix) { 368 if let Ok((_k, v)) = guard.into_inner() { 369 if let Ok(cid) = cid::Cid::read_bytes(v.as_ref()) { 370 unique_cids.insert(cid.to_string()); 371 } 372 } 373 } 374 375 // 2. Scan events 376 let trimmed_did = crate::db::types::TrimmedDid::from(&did); 377 for guard in db.events.iter() { 378 if let Ok((_k, v)) = guard.into_inner() { 379 if let Ok(evt) = rmp_serde::from_slice::<crate::types::StoredEvent>(v.as_ref()) { 380 if evt.did == trimmed_did { 381 if let Some(cid) = evt.cid { 382 unique_cids.insert(cid.to_string()); 383 } 384 } 385 } 386 } 387 } 388 389 let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new(); 390 for cid_str in unique_cids { 391 if let Ok(cid) = cid::Cid::from_str(&cid_str) { 392 let cid_bytes = fjall::Slice::from(cid.to_bytes()); 393 let count = db.block_refcounts.read_sync(cid_bytes.as_ref(), |_, v| *v).unwrap_or(0); 394 counts.insert(cid_str, count); 395 } 396 } 397 counts 398 }) 399 .await 400 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 401 402 Ok(Json(DebugRepoRefcountsResponse { cids })) 403}