at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::{AppState, XrpcResult};
2use crate::db::types::DbRkey;
3use crate::db::{self, Db, keys};
4use axum::{Json, Router, extract::State, http::StatusCode};
5use futures::TryFutureExt;
6use jacquard::cowstr::ToCowStr;
7use jacquard::types::ident::AtIdentifier;
8use jacquard::{
9 IntoStatic,
10 api::com_atproto::repo::{
11 get_record::{GetRecordError, GetRecordOutput, GetRecordRequest},
12 list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord},
13 },
14 xrpc::XrpcRequest,
15};
16use jacquard_api::com_atproto::repo::{get_record::GetRecord, list_records::ListRecords};
17use jacquard_axum::{ExtractXrpc, IntoRouter, XrpcErrorResponse};
18use jacquard_common::{
19 types::{
20 string::{AtUri, Cid},
21 value::Data,
22 },
23 xrpc::{GenericXrpcError, XrpcError},
24};
25use miette::IntoDiagnostic;
26use serde::{Deserialize, Serialize};
27use smol_str::ToSmolStr;
28use std::{fmt::Display, sync::Arc};
29use tokio::task::spawn_blocking;
30
31pub fn router() -> Router<Arc<AppState>> {
32 Router::new()
33 .merge(GetRecordRequest::into_router(handle_get_record))
34 .merge(ListRecordsRequest::into_router(handle_list_records))
35 .merge(CountRecords::into_router(handle_count_records))
36}
37
38fn internal_error<E: std::error::Error + IntoStatic>(
39 nsid: &'static str,
40 message: impl Display,
41) -> XrpcErrorResponse<E> {
42 XrpcErrorResponse {
43 status: StatusCode::INTERNAL_SERVER_ERROR,
44 error: XrpcError::Generic(GenericXrpcError {
45 error: "InternalError".into(),
46 message: Some(message.to_smolstr()),
47 nsid,
48 method: "GET",
49 http_status: StatusCode::INTERNAL_SERVER_ERROR,
50 }),
51 }
52}
53
54fn bad_request<E: std::error::Error + IntoStatic>(
55 nsid: &'static str,
56 message: impl Display,
57) -> XrpcErrorResponse<E> {
58 XrpcErrorResponse {
59 status: StatusCode::BAD_REQUEST,
60 error: XrpcError::Generic(GenericXrpcError {
61 error: "InvalidRequest".into(),
62 message: Some(message.to_smolstr()),
63 nsid,
64 method: "GET",
65 http_status: StatusCode::BAD_REQUEST,
66 }),
67 }
68}
69
70pub async fn handle_get_record(
71 State(state): State<Arc<AppState>>,
72 ExtractXrpc(req): ExtractXrpc<GetRecordRequest>,
73) -> Result<Json<GetRecordOutput<'static>>, XrpcErrorResponse<GetRecordError<'static>>> {
74 let db = &state.db;
75 let did = state
76 .resolver
77 .resolve_did(&req.repo)
78 .await
79 .map_err(|e| bad_request(GetRecord::NSID, e))?;
80
81 let db_key = keys::record_key(
82 &did,
83 req.collection.as_str(),
84 &DbRkey::new(req.rkey.0.as_str()),
85 );
86
87 let cid_bytes = Db::get(db.records.clone(), db_key)
88 .await
89 .map_err(|e| internal_error(GetRecord::NSID, e))?;
90
91 if let Some(cid_bytes) = cid_bytes {
92 // lookup block using binary cid
93 let block_bytes = Db::get(db.blocks.clone(), &cid_bytes)
94 .await
95 .map_err(|e| internal_error(GetRecord::NSID, e))?
96 .ok_or_else(|| internal_error(GetRecord::NSID, "not found"))?;
97
98 let value: Data = serde_ipld_dagcbor::from_slice(&block_bytes)
99 .map_err(|e| internal_error(GetRecord::NSID, e))?;
100
101 let cid = Cid::new(&cid_bytes)
102 .map_err(|e| internal_error(GetRecord::NSID, e))?
103 .into_static();
104
105 Ok(Json(GetRecordOutput {
106 uri: AtUri::from_parts_owned(
107 did.as_str(),
108 req.collection.as_str(),
109 req.rkey.0.as_str(),
110 )
111 .unwrap(),
112 cid: Some(Cid::Str(cid.to_cowstr()).into_static()),
113 value: value.into_static(),
114 extra_data: Default::default(),
115 }))
116 } else {
117 Err(XrpcErrorResponse {
118 status: StatusCode::NOT_FOUND,
119 error: XrpcError::Xrpc(GetRecordError::RecordNotFound(None)),
120 })
121 }
122}
123
124pub async fn handle_list_records(
125 State(state): State<Arc<AppState>>,
126 ExtractXrpc(req): ExtractXrpc<ListRecordsRequest>,
127) -> Result<Json<ListRecordsOutput<'static>>, XrpcErrorResponse<GenericXrpcError>> {
128 let db = &state.db;
129 let did = state
130 .resolver
131 .resolve_did(&req.repo)
132 .await
133 .map_err(|e| bad_request(ListRecords::NSID, e))?;
134
135 let ks = db.records.clone();
136
137 let prefix = keys::record_prefix_collection(&did, req.collection.as_str());
138
139 let limit = req.limit.unwrap_or(50).min(100) as usize;
140 let reverse = req.reverse.unwrap_or(false);
141 let blocks_ks = db.blocks.clone();
142
143 let (results, cursor) = tokio::task::spawn_blocking(move || {
144 let mut results = Vec::new();
145 let mut cursor = None;
146
147 let iter: Box<dyn Iterator<Item = _>> = if !reverse {
148 let mut end_prefix = prefix.clone();
149 if let Some(last) = end_prefix.last_mut() {
150 *last += 1;
151 }
152
153 let end_key = if let Some(cursor) = &req.cursor {
154 let mut k = prefix.clone();
155 k.extend_from_slice(cursor.as_bytes());
156 k
157 } else {
158 end_prefix
159 };
160
161 Box::new(ks.range(prefix.as_slice()..end_key.as_slice()).rev())
162 } else {
163 let start_key = if let Some(cursor) = &req.cursor {
164 let mut k = prefix.clone();
165 k.extend_from_slice(cursor.as_bytes());
166 k.push(0);
167 k
168 } else {
169 prefix.clone()
170 };
171
172 Box::new(ks.range(start_key.as_slice()..))
173 };
174
175 for item in iter {
176 let (key, cid_bytes) = item.into_inner().into_diagnostic()?;
177
178 if !key.starts_with(prefix.as_slice()) {
179 break;
180 }
181
182 let rkey = keys::parse_rkey(&key[prefix.len()..])?;
183 if results.len() >= limit {
184 cursor = Some(rkey);
185 break;
186 }
187
188 // look up using binary cid bytes from the record
189 if let Ok(Some(block_bytes)) = blocks_ks.get(&cid_bytes) {
190 let val: Data = serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null);
191 let cid =
192 Cid::Str(Cid::new(&cid_bytes).into_diagnostic()?.to_cowstr()).into_static();
193 results.push(RepoRecord {
194 uri: AtUri::from_parts_owned(
195 did.as_str(),
196 req.collection.as_str(),
197 rkey.to_smolstr(),
198 )
199 .into_diagnostic()?,
200 cid,
201 value: val.into_static(),
202 extra_data: Default::default(),
203 });
204 }
205 }
206 Result::<_, miette::Report>::Ok((results, cursor))
207 })
208 .await
209 .map_err(|e| internal_error(ListRecords::NSID, e))?
210 .map_err(|e| internal_error(ListRecords::NSID, e))?;
211
212 Ok(Json(ListRecordsOutput {
213 records: results,
214 cursor: cursor.map(|c| jacquard::CowStr::Owned(c.to_smolstr())),
215 extra_data: Default::default(),
216 }))
217}
218
219#[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)]
220pub struct CountRecordsOutput {
221 pub count: u64,
222}
223
224pub struct CountRecordsResponse;
225impl jacquard_common::xrpc::XrpcResp for CountRecordsResponse {
226 const NSID: &'static str = "systems.gaze.hydrant.countRecords";
227 const ENCODING: &'static str = "application/json";
228 type Output<'de> = CountRecordsOutput;
229 type Err<'de> = GenericXrpcError;
230}
231
232#[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)]
233pub struct CountRecordsRequest<'i> {
234 #[serde(borrow)]
235 pub identifier: AtIdentifier<'i>,
236 pub collection: String,
237}
238
239impl<'a> jacquard_common::xrpc::XrpcRequest for CountRecordsRequest<'a> {
240 const NSID: &'static str = "systems.gaze.hydrant.countRecords";
241 const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query;
242 type Response = CountRecordsResponse;
243}
244
245pub struct CountRecords;
246impl jacquard_common::xrpc::XrpcEndpoint for CountRecords {
247 const PATH: &'static str = "/xrpc/systems.gaze.hydrant.countRecords";
248 const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query;
249 type Request<'de> = CountRecordsRequest<'de>;
250 type Response = CountRecordsResponse;
251}
252
253#[axum::debug_handler]
254pub async fn handle_count_records(
255 State(state): State<Arc<AppState>>,
256 ExtractXrpc(req): ExtractXrpc<CountRecords>,
257) -> XrpcResult<Json<CountRecordsOutput>> {
258 let did = state
259 .resolver
260 .resolve_did(&req.identifier)
261 .await
262 .map_err(|e| bad_request(CountRecordsRequest::NSID, e))?;
263
264 let count = spawn_blocking(move || {
265 db::get_record_count(&state.db, &did, &req.collection)
266 .map_err(|e| internal_error(CountRecordsRequest::NSID, e))
267 })
268 .map_err(|e| internal_error(CountRecordsRequest::NSID, e))
269 .await??;
270
271 Ok(Json(CountRecordsOutput { count }))
272}