at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 25e6d3c7400a40dfbadcae56258603ec3353c476 272 lines 9.1 kB view raw
1use crate::api::{AppState, XrpcResult}; 2use crate::db::types::DbRkey; 3use crate::db::{self, Db, keys}; 4use axum::{Json, Router, extract::State, http::StatusCode}; 5use futures::TryFutureExt; 6use jacquard::cowstr::ToCowStr; 7use jacquard::types::ident::AtIdentifier; 8use jacquard::{ 9 IntoStatic, 10 api::com_atproto::repo::{ 11 get_record::{GetRecordError, GetRecordOutput, GetRecordRequest}, 12 list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord}, 13 }, 14 xrpc::XrpcRequest, 15}; 16use jacquard_api::com_atproto::repo::{get_record::GetRecord, list_records::ListRecords}; 17use jacquard_axum::{ExtractXrpc, IntoRouter, XrpcErrorResponse}; 18use jacquard_common::{ 19 types::{ 20 string::{AtUri, Cid}, 21 value::Data, 22 }, 23 xrpc::{GenericXrpcError, XrpcError}, 24}; 25use miette::IntoDiagnostic; 26use serde::{Deserialize, Serialize}; 27use smol_str::ToSmolStr; 28use std::{fmt::Display, sync::Arc}; 29use tokio::task::spawn_blocking; 30 31pub fn router() -> Router<Arc<AppState>> { 32 Router::new() 33 .merge(GetRecordRequest::into_router(handle_get_record)) 34 .merge(ListRecordsRequest::into_router(handle_list_records)) 35 .merge(CountRecords::into_router(handle_count_records)) 36} 37 38fn internal_error<E: std::error::Error + IntoStatic>( 39 nsid: &'static str, 40 message: impl Display, 41) -> XrpcErrorResponse<E> { 42 XrpcErrorResponse { 43 status: StatusCode::INTERNAL_SERVER_ERROR, 44 error: XrpcError::Generic(GenericXrpcError { 45 error: "InternalError".into(), 46 message: Some(message.to_smolstr()), 47 nsid, 48 method: "GET", 49 http_status: StatusCode::INTERNAL_SERVER_ERROR, 50 }), 51 } 52} 53 54fn bad_request<E: std::error::Error + IntoStatic>( 55 nsid: &'static str, 56 message: impl Display, 57) -> XrpcErrorResponse<E> { 58 XrpcErrorResponse { 59 status: StatusCode::BAD_REQUEST, 60 error: XrpcError::Generic(GenericXrpcError { 61 error: "InvalidRequest".into(), 62 message: Some(message.to_smolstr()), 63 nsid, 64 method: "GET", 65 http_status: StatusCode::BAD_REQUEST, 66 }), 67 } 68} 69 70pub async fn handle_get_record( 71 State(state): State<Arc<AppState>>, 72 ExtractXrpc(req): ExtractXrpc<GetRecordRequest>, 73) -> Result<Json<GetRecordOutput<'static>>, XrpcErrorResponse<GetRecordError<'static>>> { 74 let db = &state.db; 75 let did = state 76 .resolver 77 .resolve_did(&req.repo) 78 .await 79 .map_err(|e| bad_request(GetRecord::NSID, e))?; 80 81 let db_key = keys::record_key( 82 &did, 83 req.collection.as_str(), 84 &DbRkey::new(req.rkey.0.as_str()), 85 ); 86 87 let cid_bytes = Db::get(db.records.clone(), db_key) 88 .await 89 .map_err(|e| internal_error(GetRecord::NSID, e))?; 90 91 if let Some(cid_bytes) = cid_bytes { 92 // lookup block using binary cid 93 let block_bytes = Db::get(db.blocks.clone(), &cid_bytes) 94 .await 95 .map_err(|e| internal_error(GetRecord::NSID, e))? 96 .ok_or_else(|| internal_error(GetRecord::NSID, "not found"))?; 97 98 let value: Data = serde_ipld_dagcbor::from_slice(&block_bytes) 99 .map_err(|e| internal_error(GetRecord::NSID, e))?; 100 101 let cid = Cid::new(&cid_bytes) 102 .map_err(|e| internal_error(GetRecord::NSID, e))? 103 .into_static(); 104 105 Ok(Json(GetRecordOutput { 106 uri: AtUri::from_parts_owned( 107 did.as_str(), 108 req.collection.as_str(), 109 req.rkey.0.as_str(), 110 ) 111 .unwrap(), 112 cid: Some(Cid::Str(cid.to_cowstr()).into_static()), 113 value: value.into_static(), 114 extra_data: Default::default(), 115 })) 116 } else { 117 Err(XrpcErrorResponse { 118 status: StatusCode::NOT_FOUND, 119 error: XrpcError::Xrpc(GetRecordError::RecordNotFound(None)), 120 }) 121 } 122} 123 124pub async fn handle_list_records( 125 State(state): State<Arc<AppState>>, 126 ExtractXrpc(req): ExtractXrpc<ListRecordsRequest>, 127) -> Result<Json<ListRecordsOutput<'static>>, XrpcErrorResponse<GenericXrpcError>> { 128 let db = &state.db; 129 let did = state 130 .resolver 131 .resolve_did(&req.repo) 132 .await 133 .map_err(|e| bad_request(ListRecords::NSID, e))?; 134 135 let ks = db.records.clone(); 136 137 let prefix = keys::record_prefix_collection(&did, req.collection.as_str()); 138 139 let limit = req.limit.unwrap_or(50).min(100) as usize; 140 let reverse = req.reverse.unwrap_or(false); 141 let blocks_ks = db.blocks.clone(); 142 143 let (results, cursor) = tokio::task::spawn_blocking(move || { 144 let mut results = Vec::new(); 145 let mut cursor = None; 146 147 let iter: Box<dyn Iterator<Item = _>> = if !reverse { 148 let mut end_prefix = prefix.clone(); 149 if let Some(last) = end_prefix.last_mut() { 150 *last += 1; 151 } 152 153 let end_key = if let Some(cursor) = &req.cursor { 154 let mut k = prefix.clone(); 155 k.extend_from_slice(cursor.as_bytes()); 156 k 157 } else { 158 end_prefix 159 }; 160 161 Box::new(ks.range(prefix.as_slice()..end_key.as_slice()).rev()) 162 } else { 163 let start_key = if let Some(cursor) = &req.cursor { 164 let mut k = prefix.clone(); 165 k.extend_from_slice(cursor.as_bytes()); 166 k.push(0); 167 k 168 } else { 169 prefix.clone() 170 }; 171 172 Box::new(ks.range(start_key.as_slice()..)) 173 }; 174 175 for item in iter { 176 let (key, cid_bytes) = item.into_inner().into_diagnostic()?; 177 178 if !key.starts_with(prefix.as_slice()) { 179 break; 180 } 181 182 let rkey = keys::parse_rkey(&key[prefix.len()..])?; 183 if results.len() >= limit { 184 cursor = Some(rkey); 185 break; 186 } 187 188 // look up using binary cid bytes from the record 189 if let Ok(Some(block_bytes)) = blocks_ks.get(&cid_bytes) { 190 let val: Data = serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 191 let cid = 192 Cid::Str(Cid::new(&cid_bytes).into_diagnostic()?.to_cowstr()).into_static(); 193 results.push(RepoRecord { 194 uri: AtUri::from_parts_owned( 195 did.as_str(), 196 req.collection.as_str(), 197 rkey.to_smolstr(), 198 ) 199 .into_diagnostic()?, 200 cid, 201 value: val.into_static(), 202 extra_data: Default::default(), 203 }); 204 } 205 } 206 Result::<_, miette::Report>::Ok((results, cursor)) 207 }) 208 .await 209 .map_err(|e| internal_error(ListRecords::NSID, e))? 210 .map_err(|e| internal_error(ListRecords::NSID, e))?; 211 212 Ok(Json(ListRecordsOutput { 213 records: results, 214 cursor: cursor.map(|c| jacquard::CowStr::Owned(c.to_smolstr())), 215 extra_data: Default::default(), 216 })) 217} 218 219#[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 220pub struct CountRecordsOutput { 221 pub count: u64, 222} 223 224pub struct CountRecordsResponse; 225impl jacquard_common::xrpc::XrpcResp for CountRecordsResponse { 226 const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 227 const ENCODING: &'static str = "application/json"; 228 type Output<'de> = CountRecordsOutput; 229 type Err<'de> = GenericXrpcError; 230} 231 232#[derive(Serialize, Deserialize, jacquard_derive::IntoStatic)] 233pub struct CountRecordsRequest<'i> { 234 #[serde(borrow)] 235 pub identifier: AtIdentifier<'i>, 236 pub collection: String, 237} 238 239impl<'a> jacquard_common::xrpc::XrpcRequest for CountRecordsRequest<'a> { 240 const NSID: &'static str = "systems.gaze.hydrant.countRecords"; 241 const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 242 type Response = CountRecordsResponse; 243} 244 245pub struct CountRecords; 246impl jacquard_common::xrpc::XrpcEndpoint for CountRecords { 247 const PATH: &'static str = "/xrpc/systems.gaze.hydrant.countRecords"; 248 const METHOD: jacquard_common::xrpc::XrpcMethod = jacquard_common::xrpc::XrpcMethod::Query; 249 type Request<'de> = CountRecordsRequest<'de>; 250 type Response = CountRecordsResponse; 251} 252 253#[axum::debug_handler] 254pub async fn handle_count_records( 255 State(state): State<Arc<AppState>>, 256 ExtractXrpc(req): ExtractXrpc<CountRecords>, 257) -> XrpcResult<Json<CountRecordsOutput>> { 258 let did = state 259 .resolver 260 .resolve_did(&req.identifier) 261 .await 262 .map_err(|e| bad_request(CountRecordsRequest::NSID, e))?; 263 264 let count = spawn_blocking(move || { 265 db::get_record_count(&state.db, &did, &req.collection) 266 .map_err(|e| internal_error(CountRecordsRequest::NSID, e)) 267 }) 268 .map_err(|e| internal_error(CountRecordsRequest::NSID, e)) 269 .await??; 270 271 Ok(Json(CountRecordsOutput { count })) 272}