at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::AppState;
2use crate::db::keys;
3use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent};
4use axum::{
5 extract::{
6 Query, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 response::IntoResponse,
10};
11use jacquard::CowStr;
12use jacquard_common::types::value::RawData;
13use miette::{Context, IntoDiagnostic};
14use serde::Deserialize;
15use std::sync::Arc;
16use tokio::sync::{broadcast, mpsc};
17use tracing::error;
18
19#[derive(Deserialize)]
20pub struct StreamQuery {
21 pub cursor: Option<u64>,
22}
23
24pub async fn handle_stream(
25 State(state): State<Arc<AppState>>,
26 Query(query): Query<StreamQuery>,
27 ws: WebSocketUpgrade,
28) -> impl IntoResponse {
29 ws.on_upgrade(move |socket| handle_socket(socket, state, query))
30}
31
32async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
33 let (tx, mut rx) = mpsc::channel(500);
34
35 std::thread::Builder::new()
36 .name(format!(
37 "stream-handler-{}",
38 std::time::SystemTime::UNIX_EPOCH
39 .elapsed()
40 .unwrap()
41 .as_secs()
42 ))
43 .spawn(move || {
44 let db = &state.db;
45 let mut event_rx = db.event_tx.subscribe();
46 let ks = db.events.clone();
47 let mut current_id = match query.cursor {
48 Some(cursor) => cursor.saturating_sub(1),
49 None => {
50 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst);
51 max_id.saturating_sub(1)
52 }
53 };
54
55 loop {
56 // 1. catch up from DB
57 loop {
58 let mut found = false;
59 for item in ks.range(keys::event_key(current_id + 1)..) {
60 let (k, v) = match item.into_inner() {
61 Ok((k, v)) => (k, v),
62 Err(e) => {
63 error!("failed to read event from db: {e}");
64 break;
65 }
66 };
67 let id = match k
68 .as_ref()
69 .try_into()
70 .into_diagnostic()
71 .wrap_err("expected event id to be 8 bytes")
72 .map(u64::from_be_bytes)
73 {
74 Ok(id) => id,
75 Err(e) => {
76 error!("failed to parse event id: {e}");
77 continue;
78 }
79 };
80 current_id = id;
81
82 let StoredEvent {
83 live,
84 did,
85 rev,
86 collection,
87 rkey,
88 action,
89 cid,
90 } = match rmp_serde::from_slice(&v) {
91 Ok(e) => e,
92 Err(e) => {
93 error!("failed to deserialize stored event: {e}");
94 continue;
95 }
96 };
97
98 let marshallable = {
99 let mut record_val = None;
100 if let Some(cid) = &cid {
101 if let Ok(Some(block_bytes)) = db.blocks.get(&cid.to_bytes()) {
102 if let Ok(raw_data) =
103 serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes)
104 {
105 record_val = serde_json::to_value(raw_data).ok();
106 }
107 }
108 }
109
110 MarshallableEvt {
111 id,
112 event_type: "record".into(),
113 record: Some(RecordEvt {
114 live,
115 did: did.to_did(),
116 rev: CowStr::Owned(rev.to_tid().into()),
117 collection,
118 rkey: CowStr::Owned(rkey.to_smolstr().into()),
119 action: CowStr::Borrowed(action.as_str()),
120 record: record_val,
121 cid: cid.map(|c| jacquard::types::cid::Cid::ipld(c).into()),
122 }),
123 identity: None,
124 account: None,
125 }
126 };
127
128 let json_str = match serde_json::to_string(&marshallable) {
129 Ok(s) => s,
130 Err(e) => {
131 error!("failed to serialize ws event: {e}");
132 continue;
133 }
134 };
135
136 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
137 error!("failed to send ws message: {e}");
138 return;
139 }
140
141 found = true;
142 }
143 if !found {
144 break;
145 }
146 }
147
148 // 2. wait for live events
149 match event_rx.blocking_recv() {
150 Ok(BroadcastEvent::Persisted(_)) => {
151 // just wake up and run catch-up loop again
152 }
153 Ok(BroadcastEvent::Ephemeral(evt)) => {
154 // send ephemeral event directly
155 let json_str = match serde_json::to_string(&evt) {
156 Ok(s) => s,
157 Err(e) => {
158 error!("failed to serialize ws event: {e}");
159 continue;
160 }
161 };
162 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
163 error!("failed to send ws message: {e}");
164 return;
165 }
166 }
167 Err(broadcast::error::RecvError::Lagged(_)) => {
168 // continue to catch up
169 }
170 Err(broadcast::error::RecvError::Closed) => {
171 break;
172 }
173 }
174 }
175 })
176 .expect("failed to spawn stream handler thread");
177
178 while let Some(msg) = rx.recv().await {
179 if socket.send(msg).await.is_err() {
180 break;
181 }
182 }
183}