this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::{ 4 format_error_frame, format_event_for_sending, format_event_with_prefetched_blocks, 5 format_info_frame, prefetch_blocks_for_events, 6}; 7use axum::{ 8 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade}, 9 response::Response, 10}; 11use futures::{sink::SinkExt, stream::StreamExt}; 12use serde::Deserialize; 13use std::sync::atomic::{AtomicUsize, Ordering}; 14use tokio::sync::broadcast::error::RecvError; 15use tracing::{error, info, warn}; 16 17const BACKFILL_BATCH_SIZE: i64 = 1000; 18 19static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0); 20 21#[derive(Deserialize)] 22pub struct SubscribeReposParams { 23 pub cursor: Option<i64>, 24} 25 26#[axum::debug_handler] 27pub async fn subscribe_repos( 28 ws: WebSocketUpgrade, 29 State(state): State<AppState>, 30 Query(params): Query<SubscribeReposParams>, 31) -> Response { 32 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 33} 34 35async fn send_event( 36 socket: &mut WebSocket, 37 state: &AppState, 38 event: SequencedEvent, 39) -> Result<(), anyhow::Error> { 40 let bytes = format_event_for_sending(state, event).await?; 41 socket.send(Message::Binary(bytes.into())).await?; 42 Ok(()) 43} 44 45pub fn get_subscriber_count() -> usize { 46 SUBSCRIBER_COUNT.load(Ordering::SeqCst) 47} 48 49async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 50 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1; 51 crate::metrics::set_firehose_subscribers(count); 52 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber"); 53 let _ = handle_socket_inner(&mut socket, &state, params).await; 54 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1; 55 crate::metrics::set_firehose_subscribers(count); 56 info!(subscribers = count, "Firehose subscriber disconnected"); 57} 58 59fn get_backfill_hours() -> i64 { 60 std::env::var("FIREHOSE_BACKFILL_HOURS") 61 .ok() 62 .and_then(|v| v.parse().ok()) 63 .unwrap_or(72) 64} 65 66async fn handle_socket_inner( 67 socket: &mut WebSocket, 68 state: &AppState, 69 params: SubscribeReposParams, 70) -> Result<(), ()> { 71 let mut rx = state.firehose_tx.subscribe(); 72 let mut last_seen: i64 = -1; 73 74 if let Some(cursor) = params.cursor { 75 let current_seq = sqlx::query_scalar!("SELECT MAX(seq) FROM repo_seq") 76 .fetch_one(&state.db) 77 .await 78 .ok() 79 .flatten() 80 .unwrap_or(0); 81 82 if cursor > current_seq { 83 if let Ok(error_bytes) = 84 format_error_frame("FutureCursor", Some("Cursor in the future.")) 85 { 86 let _ = socket.send(Message::Binary(error_bytes.into())).await; 87 } 88 socket.close().await.ok(); 89 return Err(()); 90 } 91 92 let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours()); 93 94 let first_event = sqlx::query_as!( 95 SequencedEvent, 96 r#" 97 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 98 FROM repo_seq 99 WHERE seq > $1 100 ORDER BY seq ASC 101 LIMIT 1 102 "#, 103 cursor 104 ) 105 .fetch_optional(&state.db) 106 .await 107 .ok() 108 .flatten(); 109 110 let mut current_cursor = cursor; 111 112 if let Some(ref event) = first_event 113 && event.created_at < backfill_time 114 { 115 if let Ok(info_bytes) = format_info_frame( 116 "OutdatedCursor", 117 Some("Requested cursor exceeded limit. Possibly missing events"), 118 ) { 119 let _ = socket.send(Message::Binary(info_bytes.into())).await; 120 } 121 122 let earliest = sqlx::query_scalar!( 123 "SELECT MIN(seq) FROM repo_seq WHERE created_at >= $1", 124 backfill_time 125 ) 126 .fetch_one(&state.db) 127 .await 128 .ok() 129 .flatten(); 130 131 if let Some(earliest_seq) = earliest { 132 current_cursor = earliest_seq - 1; 133 } 134 } 135 136 last_seen = current_cursor; 137 138 loop { 139 let events = sqlx::query_as!( 140 SequencedEvent, 141 r#" 142 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 143 FROM repo_seq 144 WHERE seq > $1 145 ORDER BY seq ASC 146 LIMIT $2 147 "#, 148 current_cursor, 149 BACKFILL_BATCH_SIZE 150 ) 151 .fetch_all(&state.db) 152 .await; 153 match events { 154 Ok(events) => { 155 if events.is_empty() { 156 break; 157 } 158 let events_count = events.len(); 159 let prefetched = match prefetch_blocks_for_events(state, &events).await { 160 Ok(blocks) => blocks, 161 Err(e) => { 162 error!("Failed to prefetch blocks for backfill: {}", e); 163 socket.close().await.ok(); 164 return Err(()); 165 } 166 }; 167 for event in events { 168 current_cursor = event.seq; 169 last_seen = event.seq; 170 let bytes = 171 match format_event_with_prefetched_blocks(event, &prefetched).await { 172 Ok(b) => b, 173 Err(e) => { 174 warn!("Failed to format backfill event: {}", e); 175 return Err(()); 176 } 177 }; 178 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 179 warn!("Failed to send backfill event: {}", e); 180 return Err(()); 181 } 182 crate::metrics::record_firehose_event(); 183 } 184 if (events_count as i64) < BACKFILL_BATCH_SIZE { 185 break; 186 } 187 } 188 Err(e) => { 189 error!("Failed to fetch backfill events: {}", e); 190 socket.close().await.ok(); 191 return Err(()); 192 } 193 } 194 } 195 196 let cutover_events = sqlx::query_as!( 197 SequencedEvent, 198 r#" 199 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev 200 FROM repo_seq 201 WHERE seq > $1 202 ORDER BY seq ASC 203 "#, 204 last_seen 205 ) 206 .fetch_all(&state.db) 207 .await; 208 209 if let Ok(events) = cutover_events 210 && !events.is_empty() 211 { 212 let prefetched = match prefetch_blocks_for_events(state, &events).await { 213 Ok(blocks) => blocks, 214 Err(e) => { 215 error!("Failed to prefetch blocks for cutover: {}", e); 216 socket.close().await.ok(); 217 return Err(()); 218 } 219 }; 220 for event in events { 221 last_seen = event.seq; 222 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await { 223 Ok(b) => b, 224 Err(e) => { 225 warn!("Failed to format cutover event: {}", e); 226 return Err(()); 227 } 228 }; 229 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 230 warn!("Failed to send cutover event: {}", e); 231 return Err(()); 232 } 233 crate::metrics::record_firehose_event(); 234 } 235 } 236 } 237 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 238 .ok() 239 .and_then(|v| v.parse().ok()) 240 .unwrap_or(5000); 241 loop { 242 tokio::select! { 243 result = rx.recv() => { 244 match result { 245 Ok(event) => { 246 if event.seq <= last_seen { 247 continue; 248 } 249 last_seen = event.seq; 250 if let Err(e) = send_event(socket, state, event).await { 251 warn!("Failed to send event: {}", e); 252 break; 253 } 254 crate::metrics::record_firehose_event(); 255 } 256 Err(RecvError::Lagged(skipped)) => { 257 warn!(skipped = skipped, "Firehose subscriber lagged behind"); 258 if skipped > max_lag_before_disconnect { 259 warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 260 "Disconnecting slow firehose consumer"); 261 break; 262 } 263 } 264 Err(RecvError::Closed) => { 265 info!("Firehose channel closed"); 266 break; 267 } 268 } 269 } 270 Some(Ok(msg)) = socket.next() => { 271 if let Message::Close(_) = msg { 272 info!("Client closed connection"); 273 break; 274 } 275 } 276 else => { 277 break; 278 } 279 } 280 } 281 Ok(()) 282}