at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at e2867db221dd41c63dd8ebd8816b6481e4223737 183 lines 7.1 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 4use axum::{ 5 extract::{ 6 Query, State, 7 ws::{Message, WebSocket, WebSocketUpgrade}, 8 }, 9 response::IntoResponse, 10}; 11use jacquard::CowStr; 12use jacquard_common::types::value::RawData; 13use miette::{Context, IntoDiagnostic}; 14use serde::Deserialize; 15use std::sync::Arc; 16use tokio::sync::{broadcast, mpsc}; 17use tracing::error; 18 19#[derive(Deserialize)] 20pub struct StreamQuery { 21 pub cursor: Option<u64>, 22} 23 24pub async fn handle_stream( 25 State(state): State<Arc<AppState>>, 26 Query(query): Query<StreamQuery>, 27 ws: WebSocketUpgrade, 28) -> impl IntoResponse { 29 ws.on_upgrade(move |socket| handle_socket(socket, state, query)) 30} 31 32async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) { 33 let (tx, mut rx) = mpsc::channel(500); 34 35 std::thread::Builder::new() 36 .name(format!( 37 "stream-handler-{}", 38 std::time::SystemTime::UNIX_EPOCH 39 .elapsed() 40 .unwrap() 41 .as_secs() 42 )) 43 .spawn(move || { 44 let db = &state.db; 45 let mut event_rx = db.event_tx.subscribe(); 46 let ks = db.events.clone(); 47 let mut current_id = match query.cursor { 48 Some(cursor) => cursor.saturating_sub(1), 49 None => { 50 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst); 51 max_id.saturating_sub(1) 52 } 53 }; 54 55 loop { 56 // 1. catch up from DB 57 loop { 58 let mut found = false; 59 for item in ks.range(keys::event_key(current_id + 1)..) { 60 let (k, v) = match item.into_inner() { 61 Ok((k, v)) => (k, v), 62 Err(e) => { 63 error!("failed to read event from db: {e}"); 64 break; 65 } 66 }; 67 let id = match k 68 .as_ref() 69 .try_into() 70 .into_diagnostic() 71 .wrap_err("expected event id to be 8 bytes") 72 .map(u64::from_be_bytes) 73 { 74 Ok(id) => id, 75 Err(e) => { 76 error!("failed to parse event id: {e}"); 77 continue; 78 } 79 }; 80 current_id = id; 81 82 let StoredEvent { 83 live, 84 did, 85 rev, 86 collection, 87 rkey, 88 action, 89 cid, 90 } = match rmp_serde::from_slice(&v) { 91 Ok(e) => e, 92 Err(e) => { 93 error!("failed to deserialize stored event: {e}"); 94 continue; 95 } 96 }; 97 98 let marshallable = { 99 let mut record_val = None; 100 if let Some(cid) = &cid { 101 if let Ok(Some(block_bytes)) = db.blocks.get(&cid.to_bytes()) { 102 if let Ok(raw_data) = 103 serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) 104 { 105 record_val = serde_json::to_value(raw_data).ok(); 106 } 107 } 108 } 109 110 MarshallableEvt { 111 id, 112 event_type: "record".into(), 113 record: Some(RecordEvt { 114 live, 115 did: did.to_did(), 116 rev: CowStr::Owned(rev.to_tid().into()), 117 collection, 118 rkey: CowStr::Owned(rkey.to_smolstr().into()), 119 action: CowStr::Borrowed(action.as_str()), 120 record: record_val, 121 cid: cid.map(|c| jacquard::types::cid::Cid::ipld(c).into()), 122 }), 123 identity: None, 124 account: None, 125 } 126 }; 127 128 let json_str = match serde_json::to_string(&marshallable) { 129 Ok(s) => s, 130 Err(e) => { 131 error!("failed to serialize ws event: {e}"); 132 continue; 133 } 134 }; 135 136 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 137 error!("failed to send ws message: {e}"); 138 return; 139 } 140 141 found = true; 142 } 143 if !found { 144 break; 145 } 146 } 147 148 // 2. wait for live events 149 match event_rx.blocking_recv() { 150 Ok(BroadcastEvent::Persisted(_)) => { 151 // just wake up and run catch-up loop again 152 } 153 Ok(BroadcastEvent::Ephemeral(evt)) => { 154 // send ephemeral event directly 155 let json_str = match serde_json::to_string(&evt) { 156 Ok(s) => s, 157 Err(e) => { 158 error!("failed to serialize ws event: {e}"); 159 continue; 160 } 161 }; 162 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 163 error!("failed to send ws message: {e}"); 164 return; 165 } 166 } 167 Err(broadcast::error::RecvError::Lagged(_)) => { 168 // continue to catch up 169 } 170 Err(broadcast::error::RecvError::Closed) => { 171 break; 172 } 173 } 174 } 175 }) 176 .expect("failed to spawn stream handler thread"); 177 178 while let Some(msg) = rx.recv().await { 179 if socket.send(msg).await.is_err() { 180 break; 181 } 182 } 183}