at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::AppState;
2use crate::db::keys;
3use crate::types::{RepoState, ResyncState, StoredEvent};
4use axum::routing::{get, post};
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9};
10use jacquard_common::types::cid::Cid;
11use jacquard_common::types::ident::AtIdentifier;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::str::FromStr;
15use std::sync::Arc;
16
17#[derive(Deserialize)]
18pub struct DebugCountRequest {
19 pub did: String,
20 pub collection: String,
21}
22
23#[derive(Serialize)]
24pub struct DebugCountResponse {
25 pub count: usize,
26}
27
28pub fn router() -> axum::Router<Arc<AppState>> {
29 axum::Router::new()
30 .route("/debug/count", get(handle_debug_count))
31 .route("/debug/get", get(handle_debug_get))
32 .route("/debug/iter", get(handle_debug_iter))
33 .route("/debug/refcount", get(handle_debug_refcount))
34 .route("/debug/refcount", post(handle_set_debug_refcount))
35 .route("/debug/repo_refcounts", get(handle_debug_repo_refcounts))
36 .route("/debug/compact", post(handle_debug_compact))
37}
38
39pub async fn handle_debug_count(
40 State(state): State<Arc<AppState>>,
41 Query(req): Query<DebugCountRequest>,
42) -> Result<Json<DebugCountResponse>, StatusCode> {
43 let did = state
44 .resolver
45 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?)
46 .await
47 .map_err(|_| StatusCode::BAD_REQUEST)?;
48
49 let db = &state.db;
50 let ks = db.records.clone();
51
52 // {TrimmedDid}|{collection}|
53 let prefix = keys::record_prefix_collection(&did, &req.collection);
54
55 let count = tokio::task::spawn_blocking(move || {
56 let start_key = prefix.clone();
57 let mut end_key = prefix.clone();
58 if let Some(msg) = end_key.last_mut() {
59 *msg += 1;
60 }
61
62 ks.range(start_key..end_key).count()
63 })
64 .await
65 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
66
67 Ok(Json(DebugCountResponse { count }))
68}
69
70#[derive(Deserialize)]
71pub struct DebugGetRequest {
72 pub partition: String,
73 pub key: String,
74}
75
76#[derive(Serialize)]
77pub struct DebugGetResponse {
78 pub value: Option<Value>,
79}
80
81fn deserialize_value(partition: &str, value: &[u8]) -> Value {
82 match partition {
83 "repos" => {
84 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) {
85 return serde_json::to_value(state).unwrap_or(Value::Null);
86 }
87 }
88 "resync" => {
89 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) {
90 return serde_json::to_value(state).unwrap_or(Value::Null);
91 }
92 }
93 "events" => {
94 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) {
95 return serde_json::to_value(event).unwrap_or(Value::Null);
96 }
97 }
98 "records" => {
99 if let Ok(s) = String::from_utf8(value.to_vec()) {
100 match Cid::from_str(&s) {
101 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)),
102 Err(_) => return Value::String(s),
103 }
104 }
105 }
106 "counts" | "cursors" => {
107 if let Ok(arr) = value.try_into() {
108 return Value::Number(u64::from_be_bytes(arr).into());
109 }
110 if let Ok(s) = String::from_utf8(value.to_vec()) {
111 return Value::String(s);
112 }
113 }
114 "blocks" => {
115 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) {
116 return val;
117 }
118 }
119 "pending" => return Value::Null,
120 _ => {}
121 }
122 Value::String(hex::encode(value))
123}
124
125pub async fn handle_debug_get(
126 State(state): State<Arc<AppState>>,
127 Query(req): Query<DebugGetRequest>,
128) -> Result<Json<DebugGetResponse>, StatusCode> {
129 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
130
131 let key = if req.partition == "events" {
132 let id = req
133 .key
134 .parse::<u64>()
135 .map_err(|_| StatusCode::BAD_REQUEST)?;
136 id.to_be_bytes().to_vec()
137 } else {
138 req.key.into_bytes()
139 };
140
141 let partition = req.partition.clone();
142 let value = crate::db::Db::get(ks, key)
143 .await
144 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
145 .map(|v| deserialize_value(&partition, &v));
146
147 Ok(Json(DebugGetResponse { value }))
148}
149
150#[derive(Deserialize)]
151pub struct DebugIterRequest {
152 pub partition: String,
153 pub start: Option<String>,
154 pub end: Option<String>,
155 pub limit: Option<usize>,
156 pub reverse: Option<bool>,
157}
158
159#[derive(Serialize)]
160pub struct DebugIterResponse {
161 pub items: Vec<(String, Value)>,
162}
163
164pub async fn handle_debug_iter(
165 State(state): State<Arc<AppState>>,
166 Query(req): Query<DebugIterRequest>,
167) -> Result<Json<DebugIterResponse>, StatusCode> {
168 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
169 let is_events = req.partition == "events";
170 let partition = req.partition.clone();
171
172 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> {
173 match s {
174 Some(s) => {
175 if is_events {
176 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?;
177 Ok(Some(id.to_be_bytes().to_vec()))
178 } else {
179 Ok(Some(s.into_bytes()))
180 }
181 }
182 None => Ok(None),
183 }
184 };
185
186 let start = parse_bound(req.start)?;
187 let end = parse_bound(req.end)?;
188
189 let items = tokio::task::spawn_blocking(move || {
190 let limit = req.limit.unwrap_or(50);
191
192 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| {
193 let mut items = Vec::new();
194 for guard in iter.take(limit) {
195 let (k, v) = guard
196 .into_inner()
197 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
198
199 let key_str = if is_events {
200 if let Ok(arr) = k.as_ref().try_into() {
201 u64::from_be_bytes(arr).to_string()
202 } else {
203 "invalid_u64".to_string()
204 }
205 } else if partition == "blocks" {
206 match cid::Cid::read_bytes(k.as_ref()) {
207 Ok(cid) => cid.to_string(),
208 Err(_) => String::from_utf8_lossy(&k).into_owned(),
209 }
210 } else {
211 String::from_utf8_lossy(&k).into_owned()
212 };
213
214 items.push((key_str, deserialize_value(&partition, &v)));
215 }
216 Ok::<_, StatusCode>(items)
217 };
218
219 let start_bound = if let Some(ref s) = start {
220 std::ops::Bound::Included(s.as_slice())
221 } else {
222 std::ops::Bound::Unbounded
223 };
224
225 let end_bound = if let Some(ref e) = end {
226 std::ops::Bound::Included(e.as_slice())
227 } else {
228 std::ops::Bound::Unbounded
229 };
230
231 if req.reverse == Some(true) {
232 collect(
233 &mut ks
234 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
235 start_bound,
236 end_bound,
237 ))
238 .rev(),
239 )
240 } else {
241 collect(
242 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
243 start_bound,
244 end_bound,
245 )),
246 )
247 }
248 })
249 .await
250 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
251
252 Ok(Json(DebugIterResponse { items }))
253}
254
255fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> {
256 match name {
257 "repos" => Ok(db.repos.clone()),
258 "blocks" => Ok(db.blocks.clone()),
259 "cursors" => Ok(db.cursors.clone()),
260 "pending" => Ok(db.pending.clone()),
261 "resync" => Ok(db.resync.clone()),
262 "events" => Ok(db.events.clone()),
263 "counts" => Ok(db.counts.clone()),
264 "records" => Ok(db.records.clone()),
265 _ => Err(StatusCode::BAD_REQUEST),
266 }
267}
268
269#[derive(Deserialize)]
270pub struct DebugCompactRequest {
271 pub partition: String,
272}
273
274pub async fn handle_debug_compact(
275 State(state): State<Arc<AppState>>,
276 Query(req): Query<DebugCompactRequest>,
277) -> Result<StatusCode, StatusCode> {
278 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
279 let state_clone = state.clone();
280
281 tokio::task::spawn_blocking(move || {
282 let _ = ks.remove(b"dummy_tombstone123");
283 let _ = state_clone.db.persist();
284 let _ = ks.rotate_memtable_and_wait();
285 ks.major_compact()
286 })
287 .await
288 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
289 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
290
291 Ok(StatusCode::OK)
292}
293
294#[derive(Deserialize)]
295pub struct DebugRefcountRequest {
296 pub cid: String,
297}
298
299#[derive(Serialize)]
300pub struct DebugRefcountResponse {
301 pub count: Option<i64>,
302}
303
304pub async fn handle_debug_refcount(
305 State(state): State<Arc<AppState>>,
306 Query(req): Query<DebugRefcountRequest>,
307) -> Result<Json<DebugRefcountResponse>, StatusCode> {
308 let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?;
309 let cid_bytes = fjall::Slice::from(cid.to_bytes());
310
311 let count = state
312 .db
313 .block_refcounts
314 .read_sync(cid_bytes.as_ref(), |_, v| *v);
315
316 Ok(Json(DebugRefcountResponse { count }))
317}
318
319#[derive(Deserialize)]
320pub struct DebugSetRefcountRequest {
321 pub cid: String,
322 pub count: i64,
323}
324
325pub async fn handle_set_debug_refcount(
326 State(state): State<Arc<AppState>>,
327 axum::extract::Json(req): axum::extract::Json<DebugSetRefcountRequest>,
328) -> Result<StatusCode, StatusCode> {
329 let cid = cid::Cid::from_str(&req.cid).map_err(|_| StatusCode::BAD_REQUEST)?;
330 let cid_bytes = fjall::Slice::from(cid.to_bytes());
331
332 let _ = state.db.block_refcounts.insert_sync(cid_bytes, req.count);
333
334 Ok(StatusCode::OK)
335}
336
337#[derive(Deserialize)]
338pub struct DebugRepoRefcountsRequest {
339 pub did: String,
340}
341
342#[derive(Serialize)]
343pub struct DebugRepoRefcountsResponse {
344 pub cids: std::collections::HashMap<String, i64>,
345}
346
347pub async fn handle_debug_repo_refcounts(
348 State(state): State<Arc<AppState>>,
349 Query(req): Query<DebugRepoRefcountsRequest>,
350) -> Result<Json<DebugRepoRefcountsResponse>, StatusCode> {
351 let raw_did = jacquard_common::types::ident::AtIdentifier::new(req.did.as_str())
352 .map_err(|_| StatusCode::BAD_REQUEST)?;
353 let did = state
354 .resolver
355 .resolve_did(&raw_did)
356 .await
357 .map_err(|_| StatusCode::BAD_REQUEST)?;
358
359 let state_clone = state.clone();
360
361 let cids = tokio::task::spawn_blocking(move || {
362 let mut unique_cids: std::collections::HashSet<String> = std::collections::HashSet::new();
363 let db = &state_clone.db;
364
365 // 1. Scan records
366 let records_prefix = crate::db::keys::record_prefix_did(&did);
367 for guard in db.records.prefix(&records_prefix) {
368 if let Ok((_k, v)) = guard.into_inner() {
369 if let Ok(cid) = cid::Cid::read_bytes(v.as_ref()) {
370 unique_cids.insert(cid.to_string());
371 }
372 }
373 }
374
375 // 2. Scan events
376 let trimmed_did = crate::db::types::TrimmedDid::from(&did);
377 for guard in db.events.iter() {
378 if let Ok((_k, v)) = guard.into_inner() {
379 if let Ok(evt) = rmp_serde::from_slice::<crate::types::StoredEvent>(v.as_ref()) {
380 if evt.did == trimmed_did {
381 if let Some(cid) = evt.cid {
382 unique_cids.insert(cid.to_string());
383 }
384 }
385 }
386 }
387 }
388
389 let mut counts: std::collections::HashMap<String, i64> = std::collections::HashMap::new();
390 for cid_str in unique_cids {
391 if let Ok(cid) = cid::Cid::from_str(&cid_str) {
392 let cid_bytes = fjall::Slice::from(cid.to_bytes());
393 let count = db.block_refcounts.read_sync(cid_bytes.as_ref(), |_, v| *v).unwrap_or(0);
394 counts.insert(cid_str, count);
395 }
396 }
397 counts
398 })
399 .await
400 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
401
402 Ok(Json(DebugRepoRefcountsResponse { cids }))
403}