at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at 8990a2ff5651c71cb6fa29aa3c44fa4212b6a4f6 290 lines 9.7 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::db::refcount::RefcountedBatch; 4use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 5use axum::Router; 6use axum::http::StatusCode; 7use axum::routing::{get, post}; 8use axum::{ 9 extract::{ 10 Query, State, 11 ws::{Message, WebSocket, WebSocketUpgrade}, 12 }, 13 response::IntoResponse, 14}; 15use jacquard_common::{CowStr, RawData}; 16use miette::{Context, IntoDiagnostic}; 17use serde::Deserialize; 18use std::sync::Arc; 19use tokio::sync::{broadcast, mpsc, oneshot}; 20use tracing::{error, info_span, warn}; 21 22pub fn router() -> Router<Arc<AppState>> { 23 Router::new() 24 .route("/", get(handle_stream)) 25 .route("/ack", post(handle_ack)) 26} 27 28#[derive(Deserialize)] 29pub struct AckBody { 30 pub ids: Vec<u64>, 31} 32 33pub async fn handle_ack( 34 State(state): State<Arc<AppState>>, 35 axum::Json(body): axum::Json<AckBody>, 36) -> Result<StatusCode, (StatusCode, String)> { 37 if body.ids.is_empty() { 38 return Ok(StatusCode::OK); 39 } 40 41 let state = state.clone(); 42 let ids = body.ids; 43 tokio::task::spawn_blocking(move || { 44 let mut batch = RefcountedBatch::new(&state.db); 45 for &id in &ids { 46 let _entered = tracing::info_span!("ack", id).entered(); 47 let key = keys::event_key(id); 48 let Some(event_bytes) = state.db.events.get(&key).into_diagnostic()? else { 49 tracing::warn!("event bytes not found"); 50 continue; 51 }; 52 let evt = rmp_serde::from_slice::<StoredEvent>(&event_bytes).into_diagnostic()?; 53 if let Some(cid) = evt.cid { 54 tracing::debug!(cid = %cid, "acking event"); 55 batch.update_block_refcount(fjall::Slice::from(cid.to_bytes()), -1)?; 56 } else { 57 tracing::debug!("acking event with NO cid"); 58 } 59 batch.batch_mut().remove(&state.db.events, key); 60 } 61 batch 62 .commit() 63 .into_diagnostic() 64 .wrap_err("failed to delete events")?; 65 Ok(StatusCode::OK) 66 }) 67 .await 68 .into_diagnostic() 69 .wrap_err("panicked while deleting events") 70 .flatten() 71 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 72} 73 74#[derive(Deserialize)] 75pub struct StreamQuery { 76 pub cursor: Option<u64>, 77} 78 79pub async fn handle_stream( 80 State(state): State<Arc<AppState>>, 81 Query(query): Query<StreamQuery>, 82 ws: WebSocketUpgrade, 83) -> impl IntoResponse { 84 ws.on_upgrade(move |socket| handle_socket(socket, state, query)) 85} 86 87async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) { 88 let (tx, mut rx) = mpsc::channel(500); 89 let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); 90 91 let runtime = tokio::runtime::Handle::current(); 92 let id = std::time::SystemTime::UNIX_EPOCH 93 .elapsed() 94 .unwrap() 95 .as_secs(); 96 97 let thread = std::thread::Builder::new() 98 .name(format!("stream-handler-{id}")) 99 .spawn(move || { 100 let _runtime_guard = runtime.enter(); 101 stream(state, cancel_rx, tx, query, id); 102 }) 103 .expect("failed to spawn stream handler thread"); 104 105 while let Some(msg) = rx.recv().await { 106 if let Err(e) = socket.send(msg).await { 107 error!(err = %e, "failed to send ws message"); 108 break; 109 } 110 } 111 112 let _ = cancel_tx.send(()); 113 if let Err(e) = thread.join() { 114 error!(err = ?e, "stream handler thread panicked"); 115 } 116} 117 118fn stream( 119 state: Arc<AppState>, 120 mut cancel: oneshot::Receiver<()>, 121 tx: mpsc::Sender<Message>, 122 query: StreamQuery, 123 id: u64, 124) { 125 let db = &state.db; 126 let mut event_rx = db.event_tx.subscribe(); 127 let ks = db.events.clone(); 128 let mut current_id = match query.cursor { 129 Some(cursor) => cursor.saturating_sub(1), 130 None => { 131 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst); 132 max_id.saturating_sub(1) 133 } 134 }; 135 let runtime = tokio::runtime::Handle::current(); 136 137 let span = info_span!("stream", id); 138 let _entered_span = span.enter(); 139 140 loop { 141 // 1. catch up from DB 142 loop { 143 let mut found = false; 144 for item in ks.range(keys::event_key(current_id + 1)..) { 145 let (k, v) = match item.into_inner() { 146 Ok((k, v)) => (k, v), 147 Err(e) => { 148 error!(err = %e, "failed to read event from db"); 149 break; 150 } 151 }; 152 let id = match k 153 .as_ref() 154 .try_into() 155 .into_diagnostic() 156 .wrap_err("expected event id to be 8 bytes") 157 .map(u64::from_be_bytes) 158 { 159 Ok(id) => id, 160 Err(e) => { 161 error!(err = %e, "failed to parse event id"); 162 continue; 163 } 164 }; 165 current_id = id; 166 167 let StoredEvent { 168 live, 169 did, 170 rev, 171 collection, 172 rkey, 173 action, 174 cid, 175 } = match rmp_serde::from_slice(&v) { 176 Ok(e) => e, 177 Err(e) => { 178 error!(err = %e, "failed to deserialize stored event"); 179 continue; 180 } 181 }; 182 183 let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered(); 184 185 let marshallable = { 186 let record_val; 187 let block_bytes = cid 188 .map(|cid| db.blocks.get(&cid.to_bytes())) 189 .transpose() 190 .map(Option::flatten); 191 match block_bytes { 192 Ok(Some(block_bytes)) => { 193 match serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) { 194 Ok(val) => record_val = serde_json::to_value(val).ok(), 195 Err(e) => { 196 error!(err = %e, "cant parse block, must be corrupted?"); 197 return; 198 } 199 } 200 } 201 Ok(None) => { 202 warn!( 203 "block not found, possibly repo deleted but events not evicted yet?" 204 ); 205 continue; 206 } 207 Err(e) => { 208 error!(err = %e, "can't get block"); 209 crate::db::check_poisoned(&e); 210 return; 211 } 212 } 213 214 MarshallableEvt { 215 id, 216 event_type: "record".into(), 217 record: Some(RecordEvt { 218 live, 219 did: did.to_did(), 220 rev: CowStr::Owned(rev.to_tid().into()), 221 collection, 222 rkey: CowStr::Owned(rkey.to_smolstr().into()), 223 action: CowStr::Borrowed(action.as_str()), 224 record: record_val, 225 cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 226 }), 227 identity: None, 228 account: None, 229 } 230 }; 231 232 let json_str = match serde_json::to_string(&marshallable) { 233 Ok(s) => s, 234 Err(e) => { 235 error!(err = %e, "failed to serialize ws event"); 236 continue; 237 } 238 }; 239 240 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 241 error!(err = %e, "failed to send ws message"); 242 return; 243 } 244 245 found = true; 246 } 247 if !found { 248 break; 249 } 250 } 251 252 // 2. wait for live events 253 let next_event = runtime.block_on(async { 254 tokio::select! { 255 res = event_rx.recv() => Some(res), 256 _ = &mut cancel => None, 257 } 258 }); 259 260 let Some(next_event) = next_event else { 261 break; 262 }; 263 264 match next_event { 265 Ok(BroadcastEvent::Persisted(_)) => { 266 // just wake up and run catch-up loop again 267 } 268 Ok(BroadcastEvent::Ephemeral(evt)) => { 269 // send ephemeral event directly 270 let json_str = match serde_json::to_string(&evt) { 271 Ok(s) => s, 272 Err(e) => { 273 error!(err = %e, "failed to serialize ws event"); 274 continue; 275 } 276 }; 277 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 278 error!(err = %e, "failed to send ws message"); 279 return; 280 } 281 } 282 Err(broadcast::error::RecvError::Lagged(_)) => { 283 // continue to catch up 284 } 285 Err(broadcast::error::RecvError::Closed) => { 286 break; 287 } 288 } 289 } 290}