at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::AppState;
2use crate::db::keys;
3use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent};
4use axum::{
5 extract::{
6 ws::{Message, WebSocket, WebSocketUpgrade},
7 Query, State,
8 },
9 response::IntoResponse,
10};
11use jacquard_common::types::value::RawData;
12use serde::Deserialize;
13use std::sync::Arc;
14use tokio::sync::{broadcast, mpsc};
15
16#[derive(Deserialize)]
17pub struct StreamQuery {
18 pub cursor: Option<u64>,
19}
20
21pub async fn handle_stream(
22 State(state): State<Arc<AppState>>,
23 Query(query): Query<StreamQuery>,
24 ws: WebSocketUpgrade,
25) -> impl IntoResponse {
26 ws.on_upgrade(move |socket| handle_socket(socket, state, query))
27}
28
29async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
30 let (tx, mut rx) = mpsc::channel(500);
31
32 std::thread::Builder::new()
33 .name(format!(
34 "stream-handler-{}",
35 std::time::SystemTime::UNIX_EPOCH
36 .elapsed()
37 .unwrap()
38 .as_secs()
39 ))
40 .spawn(move || {
41 let db = &state.db;
42 let mut event_rx = db.event_tx.subscribe();
43 let ks = db.events.clone();
44 let mut current_id = match query.cursor {
45 Some(cursor) => cursor.saturating_sub(1),
46 None => {
47 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst);
48 max_id.saturating_sub(1)
49 }
50 };
51
52 loop {
53 // 1. catch up from DB
54 loop {
55 let mut found = false;
56 for item in ks.range(keys::event_key((current_id + 1) as i64)..) {
57 if let Ok((k, v)) = item.into_inner() {
58 let mut buf = [0u8; 8];
59 buf.copy_from_slice(&k);
60 let id = u64::from_be_bytes(buf);
61 current_id = id;
62
63 let stored_evt: StoredEvent = match rmp_serde::from_slice(&v) {
64 Ok(e) => e,
65 Err(_) => continue,
66 };
67
68 let marshallable = match stored_evt {
69 StoredEvent::Record {
70 live,
71 did,
72 rev,
73 collection,
74 rkey,
75 action,
76 cid,
77 } => {
78 let mut record_val = None;
79 if let Some(cid_str) = &cid {
80 if let Ok(Some(block_bytes)) =
81 db.blocks.get(keys::block_key(cid_str))
82 {
83 if let Ok(raw_data) =
84 serde_ipld_dagcbor::from_slice::<RawData>(
85 &block_bytes,
86 )
87 {
88 record_val = serde_json::to_value(raw_data).ok();
89 }
90 }
91 }
92
93 MarshallableEvt {
94 id,
95 event_type: "record".into(),
96 record: Some(RecordEvt {
97 live,
98 did,
99 rev,
100 collection,
101 rkey,
102 action,
103 record: record_val,
104 cid,
105 }),
106 identity: None,
107 }
108 }
109 StoredEvent::Identity(identity) => MarshallableEvt {
110 id,
111 event_type: "identity".into(),
112 record: None,
113 identity: Some(identity),
114 },
115 };
116
117 let json_str = match serde_json::to_string(&marshallable) {
118 Ok(s) => s,
119 Err(_) => continue,
120 };
121
122 if tx.blocking_send(Message::Text(json_str.into())).is_err() {
123 return;
124 }
125 found = true;
126 } else {
127 break;
128 }
129 }
130 if !found {
131 break;
132 }
133 }
134
135 // 2. wait for live events
136 match event_rx.blocking_recv() {
137 Ok(BroadcastEvent::Persisted(_)) => {
138 // just wake up and run catch-up loop again
139 }
140 Ok(BroadcastEvent::Ephemeral(evt)) => {
141 // send ephemeral event directly
142 let json_str = match serde_json::to_string(&evt) {
143 Ok(s) => s,
144 Err(_) => continue,
145 };
146 if tx.blocking_send(Message::Text(json_str.into())).is_err() {
147 return;
148 }
149 }
150 Err(broadcast::error::RecvError::Lagged(_)) => {
151 // continue to catch up
152 }
153 Err(broadcast::error::RecvError::Closed) => {
154 break;
155 }
156 }
157 }
158 })
159 .expect("failed to spawn stream handler thread");
160
161 while let Some(msg) = rx.recv().await {
162 if socket.send(msg).await.is_err() {
163 break;
164 }
165 }
166}