at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::AppState;
2use crate::db::keys;
3use crate::types::{RepoState, ResyncState, StoredEvent};
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8};
9use jacquard_common::types::cid::Cid;
10use jacquard_common::types::ident::AtIdentifier;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::str::FromStr;
14use std::sync::Arc;
15
16#[derive(Deserialize)]
17pub struct DebugCountRequest {
18 pub did: String,
19 pub collection: String,
20}
21
22#[derive(Serialize)]
23pub struct DebugCountResponse {
24 pub count: usize,
25}
26
27pub fn router() -> axum::Router<Arc<AppState>> {
28 axum::Router::new()
29 .route("/debug/count", axum::routing::get(handle_debug_count))
30 .route("/debug/get", axum::routing::get(handle_debug_get))
31 .route("/debug/iter", axum::routing::get(handle_debug_iter))
32}
33
34pub async fn handle_debug_count(
35 State(state): State<Arc<AppState>>,
36 Query(req): Query<DebugCountRequest>,
37) -> Result<Json<DebugCountResponse>, StatusCode> {
38 let did = state
39 .resolver
40 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?)
41 .await
42 .map_err(|_| StatusCode::BAD_REQUEST)?;
43
44 let db = &state.db;
45 let ks = db.records.clone();
46
47 // {TrimmedDid}|{collection}|
48 let prefix = keys::record_prefix_collection(&did, &req.collection);
49
50 let count = tokio::task::spawn_blocking(move || {
51 let start_key = prefix.clone();
52 let mut end_key = prefix.clone();
53 if let Some(msg) = end_key.last_mut() {
54 *msg += 1;
55 }
56
57 ks.range(start_key..end_key).count()
58 })
59 .await
60 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
61
62 Ok(Json(DebugCountResponse { count }))
63}
64
65#[derive(Deserialize)]
66pub struct DebugGetRequest {
67 pub partition: String,
68 pub key: String,
69}
70
71#[derive(Serialize)]
72pub struct DebugGetResponse {
73 pub value: Option<Value>,
74}
75
76fn deserialize_value(partition: &str, value: &[u8]) -> Value {
77 match partition {
78 "repos" => {
79 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) {
80 return serde_json::to_value(state).unwrap_or(Value::Null);
81 }
82 }
83 "resync" => {
84 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) {
85 return serde_json::to_value(state).unwrap_or(Value::Null);
86 }
87 }
88 "events" => {
89 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) {
90 return serde_json::to_value(event).unwrap_or(Value::Null);
91 }
92 }
93 "records" => {
94 if let Ok(s) = String::from_utf8(value.to_vec()) {
95 match Cid::from_str(&s) {
96 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)),
97 Err(_) => return Value::String(s),
98 }
99 }
100 }
101 "counts" | "cursors" => {
102 if let Ok(arr) = value.try_into() {
103 return Value::Number(u64::from_be_bytes(arr).into());
104 }
105 if let Ok(s) = String::from_utf8(value.to_vec()) {
106 return Value::String(s);
107 }
108 }
109 "blocks" => {
110 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) {
111 return val;
112 }
113 }
114 "pending" => return Value::Null,
115 _ => {}
116 }
117 Value::String(hex::encode(value))
118}
119
120pub async fn handle_debug_get(
121 State(state): State<Arc<AppState>>,
122 Query(req): Query<DebugGetRequest>,
123) -> Result<Json<DebugGetResponse>, StatusCode> {
124 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
125
126 let key = if req.partition == "events" {
127 let id = req
128 .key
129 .parse::<u64>()
130 .map_err(|_| StatusCode::BAD_REQUEST)?;
131 id.to_be_bytes().to_vec()
132 } else {
133 req.key.into_bytes()
134 };
135
136 let partition = req.partition.clone();
137 let value = crate::db::Db::get(ks, key)
138 .await
139 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
140 .map(|v| deserialize_value(&partition, &v));
141
142 Ok(Json(DebugGetResponse { value }))
143}
144
145#[derive(Deserialize)]
146pub struct DebugIterRequest {
147 pub partition: String,
148 pub start: Option<String>,
149 pub end: Option<String>,
150 pub limit: Option<usize>,
151 pub reverse: Option<bool>,
152}
153
154#[derive(Serialize)]
155pub struct DebugIterResponse {
156 pub items: Vec<(String, Value)>,
157}
158
159pub async fn handle_debug_iter(
160 State(state): State<Arc<AppState>>,
161 Query(req): Query<DebugIterRequest>,
162) -> Result<Json<DebugIterResponse>, StatusCode> {
163 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
164 let is_events = req.partition == "events";
165 let partition = req.partition.clone();
166
167 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> {
168 match s {
169 Some(s) => {
170 if is_events {
171 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?;
172 Ok(Some(id.to_be_bytes().to_vec()))
173 } else {
174 Ok(Some(s.into_bytes()))
175 }
176 }
177 None => Ok(None),
178 }
179 };
180
181 let start = parse_bound(req.start)?;
182 let end = parse_bound(req.end)?;
183
184 let items = tokio::task::spawn_blocking(move || {
185 let limit = req.limit.unwrap_or(50).min(1000);
186
187 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| {
188 let mut items = Vec::new();
189 for guard in iter.take(limit) {
190 let (k, v) = guard
191 .into_inner()
192 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
193
194 let key_str = if is_events {
195 if let Ok(arr) = k.as_ref().try_into() {
196 u64::from_be_bytes(arr).to_string()
197 } else {
198 "invalid_u64".to_string()
199 }
200 } else {
201 String::from_utf8_lossy(&k).into_owned()
202 };
203
204 items.push((key_str, deserialize_value(&partition, &v)));
205 }
206 Ok::<_, StatusCode>(items)
207 };
208
209 let start_bound = if let Some(ref s) = start {
210 std::ops::Bound::Included(s.as_slice())
211 } else {
212 std::ops::Bound::Unbounded
213 };
214
215 let end_bound = if let Some(ref e) = end {
216 std::ops::Bound::Included(e.as_slice())
217 } else {
218 std::ops::Bound::Unbounded
219 };
220
221 if req.reverse == Some(true) {
222 collect(
223 &mut ks
224 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
225 start_bound,
226 end_bound,
227 ))
228 .rev(),
229 )
230 } else {
231 collect(
232 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
233 start_bound,
234 end_bound,
235 )),
236 )
237 }
238 })
239 .await
240 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
241
242 Ok(Json(DebugIterResponse { items }))
243}
244
245fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> {
246 match name {
247 "repos" => Ok(db.repos.clone()),
248 "blocks" => Ok(db.blocks.clone()),
249 "cursors" => Ok(db.cursors.clone()),
250 "pending" => Ok(db.pending.clone()),
251 "resync" => Ok(db.resync.clone()),
252 "events" => Ok(db.events.clone()),
253 "counts" => Ok(db.counts.clone()),
254 "records" => Ok(db.records.clone()),
255 _ => Err(StatusCode::BAD_REQUEST),
256 }
257}