this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::{ 4 format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events, 5}; 6use axum::{ 7 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade}, 8 response::Response, 9}; 10use futures::{sink::SinkExt, stream::StreamExt}; 11use serde::Deserialize; 12use std::sync::atomic::{AtomicUsize, Ordering}; 13use tokio::sync::broadcast::error::RecvError; 14use tracing::{error, info, warn}; 15 16const BACKFILL_BATCH_SIZE: i64 = 1000; 17 18static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0); 19 20#[derive(Deserialize)] 21pub struct SubscribeReposParams { 22 pub cursor: Option<i64>, 23} 24 25#[axum::debug_handler] 26pub async fn subscribe_repos( 27 ws: WebSocketUpgrade, 28 State(state): State<AppState>, 29 Query(params): Query<SubscribeReposParams>, 30) -> Response { 31 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 32} 33 34async fn send_event( 35 socket: &mut WebSocket, 36 state: &AppState, 37 event: SequencedEvent, 38) -> Result<(), anyhow::Error> { 39 let bytes = format_event_for_sending(state, event).await?; 40 socket.send(Message::Binary(bytes.into())).await?; 41 Ok(()) 42} 43 44pub fn get_subscriber_count() -> usize { 45 SUBSCRIBER_COUNT.load(Ordering::SeqCst) 46} 47 48async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 49 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1; 50 crate::metrics::set_firehose_subscribers(count); 51 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber"); 52 let _ = handle_socket_inner(&mut socket, &state, params).await; 53 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1; 54 crate::metrics::set_firehose_subscribers(count); 55 info!(subscribers = count, "Firehose subscriber disconnected"); 56} 57 58async fn handle_socket_inner( 59 socket: &mut WebSocket, 60 state: &AppState, 61 params: SubscribeReposParams, 62) -> Result<(), ()> { 63 if let Some(cursor) = params.cursor { 64 let mut current_cursor = cursor; 65 loop { 66 let events = sqlx::query_as!( 67 SequencedEvent, 68 r#" 69 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status 70 FROM repo_seq 71 WHERE seq > $1 72 ORDER BY seq ASC 73 LIMIT $2 74 "#, 75 current_cursor, 76 BACKFILL_BATCH_SIZE 77 ) 78 .fetch_all(&state.db) 79 .await; 80 match events { 81 Ok(events) => { 82 if events.is_empty() { 83 break; 84 } 85 let events_count = events.len(); 86 let prefetched = match prefetch_blocks_for_events(state, &events).await { 87 Ok(blocks) => blocks, 88 Err(e) => { 89 error!("Failed to prefetch blocks for backfill: {}", e); 90 socket.close().await.ok(); 91 return Err(()); 92 } 93 }; 94 for event in events { 95 current_cursor = event.seq; 96 let bytes = 97 match format_event_with_prefetched_blocks(event, &prefetched).await { 98 Ok(b) => b, 99 Err(e) => { 100 warn!("Failed to format backfill event: {}", e); 101 return Err(()); 102 } 103 }; 104 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 105 warn!("Failed to send backfill event: {}", e); 106 return Err(()); 107 } 108 crate::metrics::record_firehose_event(); 109 } 110 if (events_count as i64) < BACKFILL_BATCH_SIZE { 111 break; 112 } 113 } 114 Err(e) => { 115 error!("Failed to fetch backfill events: {}", e); 116 socket.close().await.ok(); 117 return Err(()); 118 } 119 } 120 } 121 } 122 let mut rx = state.firehose_tx.subscribe(); 123 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 124 .ok() 125 .and_then(|v| v.parse().ok()) 126 .unwrap_or(5000); 127 loop { 128 tokio::select! { 129 result = rx.recv() => { 130 match result { 131 Ok(event) => { 132 if let Err(e) = send_event(socket, state, event).await { 133 warn!("Failed to send event: {}", e); 134 break; 135 } 136 crate::metrics::record_firehose_event(); 137 } 138 Err(RecvError::Lagged(skipped)) => { 139 warn!(skipped = skipped, "Firehose subscriber lagged behind"); 140 if skipped > max_lag_before_disconnect { 141 warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 142 "Disconnecting slow firehose consumer"); 143 break; 144 } 145 } 146 Err(RecvError::Closed) => { 147 info!("Firehose channel closed"); 148 break; 149 } 150 } 151 } 152 Some(Ok(msg)) = socket.next() => { 153 if let Message::Close(_) = msg { 154 info!("Client closed connection"); 155 break; 156 } 157 } 158 else => { 159 break; 160 } 161 } 162 } 163 Ok(()) 164}