at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at c4d3d93cd5d53e6df1920232aee510a40372f85d 166 lines 6.6 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 4use axum::{ 5 extract::{ 6 ws::{Message, WebSocket, WebSocketUpgrade}, 7 Query, State, 8 }, 9 response::IntoResponse, 10}; 11use jacquard_common::types::value::RawData; 12use serde::Deserialize; 13use std::sync::Arc; 14use tokio::sync::{broadcast, mpsc}; 15 16#[derive(Deserialize)] 17pub struct StreamQuery { 18 pub cursor: Option<u64>, 19} 20 21pub async fn handle_stream( 22 State(state): State<Arc<AppState>>, 23 Query(query): Query<StreamQuery>, 24 ws: WebSocketUpgrade, 25) -> impl IntoResponse { 26 ws.on_upgrade(move |socket| handle_socket(socket, state, query)) 27} 28 29async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) { 30 let (tx, mut rx) = mpsc::channel(500); 31 32 std::thread::Builder::new() 33 .name(format!( 34 "stream-handler-{}", 35 std::time::SystemTime::UNIX_EPOCH 36 .elapsed() 37 .unwrap() 38 .as_secs() 39 )) 40 .spawn(move || { 41 let db = &state.db; 42 let mut event_rx = db.event_tx.subscribe(); 43 let ks = db.events.clone(); 44 let mut current_id = match query.cursor { 45 Some(cursor) => cursor.saturating_sub(1), 46 None => { 47 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst); 48 max_id.saturating_sub(1) 49 } 50 }; 51 52 loop { 53 // 1. catch up from DB 54 loop { 55 let mut found = false; 56 for item in ks.range(keys::event_key((current_id + 1) as i64)..) { 57 if let Ok((k, v)) = item.into_inner() { 58 let mut buf = [0u8; 8]; 59 buf.copy_from_slice(&k); 60 let id = u64::from_be_bytes(buf); 61 current_id = id; 62 63 let stored_evt: StoredEvent = match rmp_serde::from_slice(&v) { 64 Ok(e) => e, 65 Err(_) => continue, 66 }; 67 68 let marshallable = match stored_evt { 69 StoredEvent::Record { 70 live, 71 did, 72 rev, 73 collection, 74 rkey, 75 action, 76 cid, 77 } => { 78 let mut record_val = None; 79 if let Some(cid_str) = &cid { 80 if let Ok(Some(block_bytes)) = 81 db.blocks.get(keys::block_key(cid_str)) 82 { 83 if let Ok(raw_data) = 84 serde_ipld_dagcbor::from_slice::<RawData>( 85 &block_bytes, 86 ) 87 { 88 record_val = serde_json::to_value(raw_data).ok(); 89 } 90 } 91 } 92 93 MarshallableEvt { 94 id, 95 event_type: "record".into(), 96 record: Some(RecordEvt { 97 live, 98 did, 99 rev, 100 collection, 101 rkey, 102 action, 103 record: record_val, 104 cid, 105 }), 106 identity: None, 107 } 108 } 109 StoredEvent::Identity(identity) => MarshallableEvt { 110 id, 111 event_type: "identity".into(), 112 record: None, 113 identity: Some(identity), 114 }, 115 }; 116 117 let json_str = match serde_json::to_string(&marshallable) { 118 Ok(s) => s, 119 Err(_) => continue, 120 }; 121 122 if tx.blocking_send(Message::Text(json_str.into())).is_err() { 123 return; 124 } 125 found = true; 126 } else { 127 break; 128 } 129 } 130 if !found { 131 break; 132 } 133 } 134 135 // 2. wait for live events 136 match event_rx.blocking_recv() { 137 Ok(BroadcastEvent::Persisted(_)) => { 138 // just wake up and run catch-up loop again 139 } 140 Ok(BroadcastEvent::Ephemeral(evt)) => { 141 // send ephemeral event directly 142 let json_str = match serde_json::to_string(&evt) { 143 Ok(s) => s, 144 Err(_) => continue, 145 }; 146 if tx.blocking_send(Message::Text(json_str.into())).is_err() { 147 return; 148 } 149 } 150 Err(broadcast::error::RecvError::Lagged(_)) => { 151 // continue to catch up 152 } 153 Err(broadcast::error::RecvError::Closed) => { 154 break; 155 } 156 } 157 } 158 }) 159 .expect("failed to spawn stream handler thread"); 160 161 while let Some(msg) = rx.recv().await { 162 if socket.send(msg).await.is_err() { 163 break; 164 } 165 } 166}