at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::api::AppState;
2use crate::db::keys;
3use crate::db::refcount::RefcountedBatch;
4use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent};
5use axum::Router;
6use axum::http::StatusCode;
7use axum::routing::{get, post};
8use axum::{
9 extract::{
10 Query, State,
11 ws::{Message, WebSocket, WebSocketUpgrade},
12 },
13 response::IntoResponse,
14};
15use jacquard_common::{CowStr, RawData};
16use miette::{Context, IntoDiagnostic};
17use serde::Deserialize;
18use std::sync::Arc;
19use tokio::sync::{broadcast, mpsc, oneshot};
20use tracing::{error, info_span, warn};
21
22pub fn router() -> Router<Arc<AppState>> {
23 Router::new()
24 .route("/", get(handle_stream))
25 .route("/ack", post(handle_ack))
26}
27
28#[derive(Deserialize)]
29pub struct AckBody {
30 pub ids: Vec<u64>,
31}
32
33pub async fn handle_ack(
34 State(state): State<Arc<AppState>>,
35 axum::Json(body): axum::Json<AckBody>,
36) -> Result<StatusCode, (StatusCode, String)> {
37 if body.ids.is_empty() {
38 return Ok(StatusCode::OK);
39 }
40
41 let state = state.clone();
42 let ids = body.ids;
43 tokio::task::spawn_blocking(move || {
44 let mut batch = RefcountedBatch::new(&state.db);
45 for &id in &ids {
46 let _entered = tracing::info_span!("ack", id).entered();
47 let key = keys::event_key(id);
48 let Some(event_bytes) = state.db.events.get(&key).into_diagnostic()? else {
49 tracing::warn!("event bytes not found");
50 continue;
51 };
52 let evt = rmp_serde::from_slice::<StoredEvent>(&event_bytes).into_diagnostic()?;
53 if let Some(cid) = evt.cid {
54 tracing::debug!(cid = %cid, "acking event");
55 batch.update_block_refcount(fjall::Slice::from(cid.to_bytes()), -1)?;
56 } else {
57 tracing::debug!("acking event with NO cid");
58 }
59 batch.batch_mut().remove(&state.db.events, key);
60 }
61 batch
62 .commit()
63 .into_diagnostic()
64 .wrap_err("failed to delete events")?;
65 Ok(StatusCode::OK)
66 })
67 .await
68 .into_diagnostic()
69 .wrap_err("panicked while deleting events")
70 .flatten()
71 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
72}
73
74#[derive(Deserialize)]
75pub struct StreamQuery {
76 pub cursor: Option<u64>,
77}
78
79pub async fn handle_stream(
80 State(state): State<Arc<AppState>>,
81 Query(query): Query<StreamQuery>,
82 ws: WebSocketUpgrade,
83) -> impl IntoResponse {
84 ws.on_upgrade(move |socket| handle_socket(socket, state, query))
85}
86
87async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) {
88 let (tx, mut rx) = mpsc::channel(500);
89 let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
90
91 let runtime = tokio::runtime::Handle::current();
92 let id = std::time::SystemTime::UNIX_EPOCH
93 .elapsed()
94 .unwrap()
95 .as_secs();
96
97 let thread = std::thread::Builder::new()
98 .name(format!("stream-handler-{id}"))
99 .spawn(move || {
100 let _runtime_guard = runtime.enter();
101 stream(state, cancel_rx, tx, query, id);
102 })
103 .expect("failed to spawn stream handler thread");
104
105 while let Some(msg) = rx.recv().await {
106 if let Err(e) = socket.send(msg).await {
107 error!(err = %e, "failed to send ws message");
108 break;
109 }
110 }
111
112 let _ = cancel_tx.send(());
113 if let Err(e) = thread.join() {
114 error!(err = ?e, "stream handler thread panicked");
115 }
116}
117
118fn stream(
119 state: Arc<AppState>,
120 mut cancel: oneshot::Receiver<()>,
121 tx: mpsc::Sender<Message>,
122 query: StreamQuery,
123 id: u64,
124) {
125 let db = &state.db;
126 let mut event_rx = db.event_tx.subscribe();
127 let ks = db.events.clone();
128 let mut current_id = match query.cursor {
129 Some(cursor) => cursor.saturating_sub(1),
130 None => {
131 let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst);
132 max_id.saturating_sub(1)
133 }
134 };
135 let runtime = tokio::runtime::Handle::current();
136
137 let span = info_span!("stream", id);
138 let _entered_span = span.enter();
139
140 loop {
141 // 1. catch up from DB
142 loop {
143 let mut found = false;
144 for item in ks.range(keys::event_key(current_id + 1)..) {
145 let (k, v) = match item.into_inner() {
146 Ok((k, v)) => (k, v),
147 Err(e) => {
148 error!(err = %e, "failed to read event from db");
149 break;
150 }
151 };
152 let id = match k
153 .as_ref()
154 .try_into()
155 .into_diagnostic()
156 .wrap_err("expected event id to be 8 bytes")
157 .map(u64::from_be_bytes)
158 {
159 Ok(id) => id,
160 Err(e) => {
161 error!(err = %e, "failed to parse event id");
162 continue;
163 }
164 };
165 current_id = id;
166
167 let StoredEvent {
168 live,
169 did,
170 rev,
171 collection,
172 rkey,
173 action,
174 cid,
175 } = match rmp_serde::from_slice(&v) {
176 Ok(e) => e,
177 Err(e) => {
178 error!(err = %e, "failed to deserialize stored event");
179 continue;
180 }
181 };
182
183 let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered();
184
185 let marshallable = {
186 let record_val;
187 let block_bytes = cid
188 .map(|cid| db.blocks.get(&cid.to_bytes()))
189 .transpose()
190 .map(Option::flatten);
191 match block_bytes {
192 Ok(Some(block_bytes)) => {
193 match serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) {
194 Ok(val) => record_val = serde_json::to_value(val).ok(),
195 Err(e) => {
196 error!(err = %e, "cant parse block, must be corrupted?");
197 return;
198 }
199 }
200 }
201 Ok(None) => {
202 warn!(
203 "block not found, possibly repo deleted but events not evicted yet?"
204 );
205 continue;
206 }
207 Err(e) => {
208 error!(err = %e, "can't get block");
209 crate::db::check_poisoned(&e);
210 return;
211 }
212 }
213
214 MarshallableEvt {
215 id,
216 event_type: "record".into(),
217 record: Some(RecordEvt {
218 live,
219 did: did.to_did(),
220 rev: CowStr::Owned(rev.to_tid().into()),
221 collection,
222 rkey: CowStr::Owned(rkey.to_smolstr().into()),
223 action: CowStr::Borrowed(action.as_str()),
224 record: record_val,
225 cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()),
226 }),
227 identity: None,
228 account: None,
229 }
230 };
231
232 let json_str = match serde_json::to_string(&marshallable) {
233 Ok(s) => s,
234 Err(e) => {
235 error!(err = %e, "failed to serialize ws event");
236 continue;
237 }
238 };
239
240 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
241 error!(err = %e, "failed to send ws message");
242 return;
243 }
244
245 found = true;
246 }
247 if !found {
248 break;
249 }
250 }
251
252 // 2. wait for live events
253 let next_event = runtime.block_on(async {
254 tokio::select! {
255 res = event_rx.recv() => Some(res),
256 _ = &mut cancel => None,
257 }
258 });
259
260 let Some(next_event) = next_event else {
261 break;
262 };
263
264 match next_event {
265 Ok(BroadcastEvent::Persisted(_)) => {
266 // just wake up and run catch-up loop again
267 }
268 Ok(BroadcastEvent::Ephemeral(evt)) => {
269 // send ephemeral event directly
270 let json_str = match serde_json::to_string(&evt) {
271 Ok(s) => s,
272 Err(e) => {
273 error!(err = %e, "failed to serialize ws event");
274 continue;
275 }
276 };
277 if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) {
278 error!(err = %e, "failed to send ws message");
279 return;
280 }
281 }
282 Err(broadcast::error::RecvError::Lagged(_)) => {
283 // continue to catch up
284 }
285 Err(broadcast::error::RecvError::Closed) => {
286 break;
287 }
288 }
289 }
290}