this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::{
4 format_error_frame, format_event_for_sending, format_event_with_prefetched_blocks,
5 format_info_frame, prefetch_blocks_for_events,
6};
7use axum::{
8 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade},
9 response::Response,
10};
11use futures::{sink::SinkExt, stream::StreamExt};
12use serde::Deserialize;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use tokio::sync::broadcast::error::RecvError;
15use tracing::{error, info, warn};
16
17const BACKFILL_BATCH_SIZE: i64 = 1000;
18
19static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0);
20
21#[derive(Deserialize)]
22pub struct SubscribeReposParams {
23 pub cursor: Option<i64>,
24}
25
26#[axum::debug_handler]
27pub async fn subscribe_repos(
28 ws: WebSocketUpgrade,
29 State(state): State<AppState>,
30 Query(params): Query<SubscribeReposParams>,
31) -> Response {
32 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
33}
34
35async fn send_event(
36 socket: &mut WebSocket,
37 state: &AppState,
38 event: SequencedEvent,
39) -> Result<(), anyhow::Error> {
40 let bytes = format_event_for_sending(state, event).await?;
41 socket.send(Message::Binary(bytes.into())).await?;
42 Ok(())
43}
44
45pub fn get_subscriber_count() -> usize {
46 SUBSCRIBER_COUNT.load(Ordering::SeqCst)
47}
48
49async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
50 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1;
51 crate::metrics::set_firehose_subscribers(count);
52 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber");
53 let _ = handle_socket_inner(&mut socket, &state, params).await;
54 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
55 crate::metrics::set_firehose_subscribers(count);
56 info!(subscribers = count, "Firehose subscriber disconnected");
57}
58
59fn get_backfill_hours() -> i64 {
60 std::env::var("FIREHOSE_BACKFILL_HOURS")
61 .ok()
62 .and_then(|v| v.parse().ok())
63 .unwrap_or(72)
64}
65
66async fn handle_socket_inner(
67 socket: &mut WebSocket,
68 state: &AppState,
69 params: SubscribeReposParams,
70) -> Result<(), ()> {
71 let mut rx = state.firehose_tx.subscribe();
72 let mut last_seen: i64 = -1;
73
74 if let Some(cursor) = params.cursor {
75 let current_seq = sqlx::query_scalar!("SELECT MAX(seq) FROM repo_seq")
76 .fetch_one(&state.db)
77 .await
78 .ok()
79 .flatten()
80 .unwrap_or(0);
81
82 if cursor > current_seq {
83 if let Ok(error_bytes) =
84 format_error_frame("FutureCursor", Some("Cursor in the future."))
85 {
86 let _ = socket.send(Message::Binary(error_bytes.into())).await;
87 }
88 socket.close().await.ok();
89 return Err(());
90 }
91
92 let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours());
93
94 let first_event = sqlx::query_as!(
95 SequencedEvent,
96 r#"
97 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev
98 FROM repo_seq
99 WHERE seq > $1
100 ORDER BY seq ASC
101 LIMIT 1
102 "#,
103 cursor
104 )
105 .fetch_optional(&state.db)
106 .await
107 .ok()
108 .flatten();
109
110 let mut current_cursor = cursor;
111
112 if let Some(ref event) = first_event
113 && event.created_at < backfill_time
114 {
115 if let Ok(info_bytes) = format_info_frame(
116 "OutdatedCursor",
117 Some("Requested cursor exceeded limit. Possibly missing events"),
118 ) {
119 let _ = socket.send(Message::Binary(info_bytes.into())).await;
120 }
121
122 let earliest = sqlx::query_scalar!(
123 "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1",
124 backfill_time
125 )
126 .fetch_one(&state.db)
127 .await
128 .ok()
129 .flatten();
130
131 if let Some(earliest_seq) = earliest {
132 current_cursor = earliest_seq - 1;
133 }
134 }
135
136 last_seen = current_cursor;
137
138 loop {
139 let events = sqlx::query_as!(
140 SequencedEvent,
141 r#"
142 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev
143 FROM repo_seq
144 WHERE seq > $1
145 ORDER BY seq ASC
146 LIMIT $2
147 "#,
148 current_cursor,
149 BACKFILL_BATCH_SIZE
150 )
151 .fetch_all(&state.db)
152 .await;
153 match events {
154 Ok(events) => {
155 if events.is_empty() {
156 break;
157 }
158 let events_count = events.len();
159 let prefetched = match prefetch_blocks_for_events(state, &events).await {
160 Ok(blocks) => blocks,
161 Err(e) => {
162 error!("Failed to prefetch blocks for backfill: {}", e);
163 socket.close().await.ok();
164 return Err(());
165 }
166 };
167 for event in events {
168 current_cursor = event.seq;
169 last_seen = event.seq;
170 let bytes =
171 match format_event_with_prefetched_blocks(event, &prefetched).await {
172 Ok(b) => b,
173 Err(e) => {
174 warn!("Failed to format backfill event: {}", e);
175 return Err(());
176 }
177 };
178 if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
179 warn!("Failed to send backfill event: {}", e);
180 return Err(());
181 }
182 crate::metrics::record_firehose_event();
183 }
184 if (events_count as i64) < BACKFILL_BATCH_SIZE {
185 break;
186 }
187 }
188 Err(e) => {
189 error!("Failed to fetch backfill events: {}", e);
190 socket.close().await.ok();
191 return Err(());
192 }
193 }
194 }
195
196 let cutover_events = sqlx::query_as!(
197 SequencedEvent,
198 r#"
199 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev
200 FROM repo_seq
201 WHERE seq > $1
202 ORDER BY seq ASC
203 "#,
204 last_seen
205 )
206 .fetch_all(&state.db)
207 .await;
208
209 if let Ok(events) = cutover_events
210 && !events.is_empty()
211 {
212 let prefetched = match prefetch_blocks_for_events(state, &events).await {
213 Ok(blocks) => blocks,
214 Err(e) => {
215 error!("Failed to prefetch blocks for cutover: {}", e);
216 socket.close().await.ok();
217 return Err(());
218 }
219 };
220 for event in events {
221 last_seen = event.seq;
222 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await {
223 Ok(b) => b,
224 Err(e) => {
225 warn!("Failed to format cutover event: {}", e);
226 return Err(());
227 }
228 };
229 if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
230 warn!("Failed to send cutover event: {}", e);
231 return Err(());
232 }
233 crate::metrics::record_firehose_event();
234 }
235 }
236 }
237 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG")
238 .ok()
239 .and_then(|v| v.parse().ok())
240 .unwrap_or(5000);
241 loop {
242 tokio::select! {
243 result = rx.recv() => {
244 match result {
245 Ok(event) => {
246 if event.seq <= last_seen {
247 continue;
248 }
249 last_seen = event.seq;
250 if let Err(e) = send_event(socket, state, event).await {
251 warn!("Failed to send event: {}", e);
252 break;
253 }
254 crate::metrics::record_firehose_event();
255 }
256 Err(RecvError::Lagged(skipped)) => {
257 warn!(skipped = skipped, "Firehose subscriber lagged behind");
258 if skipped > max_lag_before_disconnect {
259 warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
260 "Disconnecting slow firehose consumer");
261 break;
262 }
263 }
264 Err(RecvError::Closed) => {
265 info!("Firehose channel closed");
266 break;
267 }
268 }
269 }
270 Some(Ok(msg)) = socket.next() => {
271 if let Message::Close(_) = msg {
272 info!("Client closed connection");
273 break;
274 }
275 }
276 else => {
277 break;
278 }
279 }
280 }
281 Ok(())
282}