this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::{format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events}; 4use axum::{ 5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, 6 response::Response, 7}; 8use futures::{sink::SinkExt, stream::StreamExt}; 9use serde::Deserialize; 10use std::sync::atomic::{AtomicUsize, Ordering}; 11use tokio::sync::broadcast::error::RecvError; 12use tracing::{error, info, warn}; 13const BACKFILL_BATCH_SIZE: i64 = 1000; 14static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0); 15#[derive(Deserialize)] 16pub struct SubscribeReposParams { 17 pub cursor: Option<i64>, 18} 19#[axum::debug_handler] 20pub async fn subscribe_repos( 21 ws: WebSocketUpgrade, 22 State(state): State<AppState>, 23 Query(params): Query<SubscribeReposParams>, 24) -> Response { 25 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 26} 27async fn send_event( 28 socket: &mut WebSocket, 29 state: &AppState, 30 event: SequencedEvent, 31) -> Result<(), anyhow::Error> { 32 let bytes = format_event_for_sending(state, event).await?; 33 socket.send(Message::Binary(bytes.into())).await?; 34 Ok(()) 35} 36pub fn get_subscriber_count() -> usize { 37 SUBSCRIBER_COUNT.load(Ordering::SeqCst) 38} 39async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 40 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1; 41 crate::metrics::set_firehose_subscribers(count); 42 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber"); 43 let _ = handle_socket_inner(&mut socket, &state, params).await; 44 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1; 45 crate::metrics::set_firehose_subscribers(count); 46 info!(subscribers = count, "Firehose subscriber disconnected"); 47} 48async fn handle_socket_inner(socket: &mut WebSocket, state: &AppState, params: SubscribeReposParams) -> Result<(), ()> { 49 if let Some(cursor) = params.cursor { 50 let mut current_cursor = cursor; 51 loop { 52 let events = sqlx::query_as!( 53 SequencedEvent, 54 r#" 55 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status 56 FROM repo_seq 57 WHERE seq > $1 58 ORDER BY seq ASC 59 LIMIT $2 60 "#, 61 current_cursor, 62 BACKFILL_BATCH_SIZE 63 ) 64 .fetch_all(&state.db) 65 .await; 66 match events { 67 Ok(events) => { 68 if events.is_empty() { 69 break; 70 } 71 let events_count = events.len(); 72 let prefetched = match prefetch_blocks_for_events(state, &events).await { 73 Ok(blocks) => blocks, 74 Err(e) => { 75 error!("Failed to prefetch blocks for backfill: {}", e); 76 socket.close().await.ok(); 77 return Err(()); 78 } 79 }; 80 for event in events { 81 current_cursor = event.seq; 82 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await { 83 Ok(b) => b, 84 Err(e) => { 85 warn!("Failed to format backfill event: {}", e); 86 return Err(()); 87 } 88 }; 89 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 90 warn!("Failed to send backfill event: {}", e); 91 return Err(()); 92 } 93 crate::metrics::record_firehose_event(); 94 } 95 if (events_count as i64) < BACKFILL_BATCH_SIZE { 96 break; 97 } 98 } 99 Err(e) => { 100 error!("Failed to fetch backfill events: {}", e); 101 socket.close().await.ok(); 102 return Err(()); 103 } 104 } 105 } 106 } 107 let mut rx = state.firehose_tx.subscribe(); 108 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 109 .ok() 110 .and_then(|v| v.parse().ok()) 111 .unwrap_or(5000); 112 loop { 113 tokio::select! { 114 result = rx.recv() => { 115 match result { 116 Ok(event) => { 117 if let Err(e) = send_event(socket, state, event).await { 118 warn!("Failed to send event: {}", e); 119 break; 120 } 121 crate::metrics::record_firehose_event(); 122 } 123 Err(RecvError::Lagged(skipped)) => { 124 warn!(skipped = skipped, "Firehose subscriber lagged behind"); 125 if skipped > max_lag_before_disconnect { 126 warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 127 "Disconnecting slow firehose consumer"); 128 break; 129 } 130 } 131 Err(RecvError::Closed) => { 132 info!("Firehose channel closed"); 133 break; 134 } 135 } 136 } 137 Some(Ok(msg)) = socket.next() => { 138 if let Message::Close(_) = msg { 139 info!("Client closed connection"); 140 break; 141 } 142 } 143 else => { 144 break; 145 } 146 } 147 } 148 Ok(()) 149}