at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::keys;
2use crate::types::{RepoState, ResyncState, StoredEvent};
3use crate::{api::AppState, db::types::TrimmedDid};
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8};
9use jacquard::types::cid::Cid;
10use jacquard::types::ident::AtIdentifier;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::str::FromStr;
14use std::sync::Arc;
15
16#[derive(Deserialize)]
17pub struct DebugCountRequest {
18 pub did: String,
19 pub collection: String,
20}
21
22#[derive(Serialize)]
23pub struct DebugCountResponse {
24 pub count: usize,
25}
26
27pub fn router() -> axum::Router<Arc<AppState>> {
28 axum::Router::new()
29 .route("/debug/count", axum::routing::get(handle_debug_count))
30 .route("/debug/get", axum::routing::get(handle_debug_get))
31 .route("/debug/iter", axum::routing::get(handle_debug_iter))
32}
33
34pub async fn handle_debug_count(
35 State(state): State<Arc<AppState>>,
36 Query(req): Query<DebugCountRequest>,
37) -> Result<Json<DebugCountResponse>, StatusCode> {
38 let did = state
39 .resolver
40 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?)
41 .await
42 .map_err(|_| StatusCode::BAD_REQUEST)?;
43
44 let db = &state.db;
45 let ks = db
46 .record_partition(&req.collection)
47 .map_err(|_| StatusCode::NOT_FOUND)?;
48
49 // {TrimmedDid}\x00
50 let mut prefix = Vec::new();
51 TrimmedDid::from(&did).write_to_vec(&mut prefix);
52 prefix.push(keys::SEP);
53
54 let count = tokio::task::spawn_blocking(move || {
55 let start_key = prefix.clone();
56 let mut end_key = prefix.clone();
57 if let Some(msg) = end_key.last_mut() {
58 *msg += 1;
59 }
60
61 ks.range(start_key..end_key).count()
62 })
63 .await
64 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
65
66 Ok(Json(DebugCountResponse { count }))
67}
68
69#[derive(Deserialize)]
70pub struct DebugGetRequest {
71 pub partition: String,
72 pub key: String,
73}
74
75#[derive(Serialize)]
76pub struct DebugGetResponse {
77 pub value: Option<Value>,
78}
79
80fn deserialize_value(partition: &str, value: &[u8]) -> Value {
81 match partition {
82 "repos" => {
83 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) {
84 return serde_json::to_value(state).unwrap_or(Value::Null);
85 }
86 }
87 "resync" => {
88 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) {
89 return serde_json::to_value(state).unwrap_or(Value::Null);
90 }
91 }
92 "events" => {
93 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) {
94 return serde_json::to_value(event).unwrap_or(Value::Null);
95 }
96 }
97 "records" => {
98 if let Ok(s) = String::from_utf8(value.to_vec()) {
99 match Cid::from_str(&s) {
100 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)),
101 Err(_) => return Value::String(s),
102 }
103 }
104 }
105 "counts" | "cursors" => {
106 if let Ok(arr) = value.try_into() {
107 return Value::Number(u64::from_be_bytes(arr).into());
108 }
109 if let Ok(s) = String::from_utf8(value.to_vec()) {
110 return Value::String(s);
111 }
112 }
113 "blocks" => {
114 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) {
115 return val;
116 }
117 }
118 "pending" => return Value::Null,
119 _ => {}
120 }
121 Value::String(hex::encode(value))
122}
123
124pub async fn handle_debug_get(
125 State(state): State<Arc<AppState>>,
126 Query(req): Query<DebugGetRequest>,
127) -> Result<Json<DebugGetResponse>, StatusCode> {
128 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
129
130 let key = if req.partition == "events" {
131 let id = req
132 .key
133 .parse::<u64>()
134 .map_err(|_| StatusCode::BAD_REQUEST)?;
135 id.to_be_bytes().to_vec()
136 } else {
137 req.key.into_bytes()
138 };
139
140 let partition = req.partition.clone();
141 let value = crate::db::Db::get(ks, key)
142 .await
143 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
144 .map(|v| deserialize_value(&partition, &v));
145
146 Ok(Json(DebugGetResponse { value }))
147}
148
149#[derive(Deserialize)]
150pub struct DebugIterRequest {
151 pub partition: String,
152 pub start: Option<String>,
153 pub end: Option<String>,
154 pub limit: Option<usize>,
155 pub reverse: Option<bool>,
156}
157
158#[derive(Serialize)]
159pub struct DebugIterResponse {
160 pub items: Vec<(String, Value)>,
161}
162
163pub async fn handle_debug_iter(
164 State(state): State<Arc<AppState>>,
165 Query(req): Query<DebugIterRequest>,
166) -> Result<Json<DebugIterResponse>, StatusCode> {
167 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
168 let is_events = req.partition == "events";
169 let partition = req.partition.clone();
170
171 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> {
172 match s {
173 Some(s) => {
174 if is_events {
175 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?;
176 Ok(Some(id.to_be_bytes().to_vec()))
177 } else {
178 Ok(Some(s.into_bytes()))
179 }
180 }
181 None => Ok(None),
182 }
183 };
184
185 let start = parse_bound(req.start)?;
186 let end = parse_bound(req.end)?;
187
188 let items = tokio::task::spawn_blocking(move || {
189 let limit = req.limit.unwrap_or(50).min(1000);
190
191 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| {
192 let mut items = Vec::new();
193 for guard in iter.take(limit) {
194 let (k, v) = guard
195 .into_inner()
196 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
197
198 let key_str = if is_events {
199 if let Ok(arr) = k.as_ref().try_into() {
200 u64::from_be_bytes(arr).to_string()
201 } else {
202 "invalid_u64".to_string()
203 }
204 } else {
205 String::from_utf8_lossy(&k).into_owned()
206 };
207
208 items.push((key_str, deserialize_value(&partition, &v)));
209 }
210 Ok::<_, StatusCode>(items)
211 };
212
213 let start_bound = if let Some(ref s) = start {
214 std::ops::Bound::Included(s.as_slice())
215 } else {
216 std::ops::Bound::Unbounded
217 };
218
219 let end_bound = if let Some(ref e) = end {
220 std::ops::Bound::Included(e.as_slice())
221 } else {
222 std::ops::Bound::Unbounded
223 };
224
225 if req.reverse == Some(true) {
226 collect(
227 &mut ks
228 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
229 start_bound,
230 end_bound,
231 ))
232 .rev(),
233 )
234 } else {
235 collect(
236 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
237 start_bound,
238 end_bound,
239 )),
240 )
241 }
242 })
243 .await
244 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
245
246 Ok(Json(DebugIterResponse { items }))
247}
248
249fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> {
250 match name {
251 "repos" => Ok(db.repos.clone()),
252 "blocks" => Ok(db.blocks.clone()),
253 "cursors" => Ok(db.cursors.clone()),
254 "pending" => Ok(db.pending.clone()),
255 "resync" => Ok(db.resync.clone()),
256 "events" => Ok(db.events.clone()),
257 "counts" => Ok(db.counts.clone()),
258 _ => {
259 if let Some(col) = name.strip_prefix(crate::db::RECORDS_PARTITION_PREFIX) {
260 db.record_partition(col).map_err(|_| StatusCode::NOT_FOUND)
261 } else {
262 Err(StatusCode::BAD_REQUEST)
263 }
264 }
265 }
266}