this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::{format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events}; 4use axum::{ 5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, 6 response::Response, 7}; 8use futures::{sink::SinkExt, stream::StreamExt}; 9use serde::Deserialize; 10use std::sync::atomic::{AtomicUsize, Ordering}; 11use tokio::sync::broadcast::error::RecvError; 12use tracing::{error, info, warn}; 13 14const BACKFILL_BATCH_SIZE: i64 = 1000; 15 16static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0); 17 18#[derive(Deserialize)] 19pub struct SubscribeReposParams { 20 pub cursor: Option<i64>, 21} 22 23#[axum::debug_handler] 24pub async fn subscribe_repos( 25 ws: WebSocketUpgrade, 26 State(state): State<AppState>, 27 Query(params): Query<SubscribeReposParams>, 28) -> Response { 29 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 30} 31 32async fn send_event( 33 socket: &mut WebSocket, 34 state: &AppState, 35 event: SequencedEvent, 36) -> Result<(), anyhow::Error> { 37 let bytes = format_event_for_sending(state, event).await?; 38 socket.send(Message::Binary(bytes.into())).await?; 39 Ok(()) 40} 41 42pub fn get_subscriber_count() -> usize { 43 SUBSCRIBER_COUNT.load(Ordering::SeqCst) 44} 45 46async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 47 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1; 48 crate::metrics::set_firehose_subscribers(count); 49 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber"); 50 let _ = handle_socket_inner(&mut socket, &state, params).await; 51 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1; 52 crate::metrics::set_firehose_subscribers(count); 53 info!(subscribers = count, "Firehose subscriber disconnected"); 54} 55 56async fn handle_socket_inner(socket: &mut WebSocket, state: &AppState, params: SubscribeReposParams) -> Result<(), ()> { 57 if let Some(cursor) = params.cursor { 58 let mut current_cursor = cursor; 59 loop { 60 let events = sqlx::query_as!( 61 SequencedEvent, 62 r#" 63 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status 64 FROM repo_seq 65 WHERE seq > $1 66 ORDER BY seq ASC 67 LIMIT $2 68 "#, 69 current_cursor, 70 BACKFILL_BATCH_SIZE 71 ) 72 .fetch_all(&state.db) 73 .await; 74 match events { 75 Ok(events) => { 76 if events.is_empty() { 77 break; 78 } 79 let events_count = events.len(); 80 let prefetched = match prefetch_blocks_for_events(state, &events).await { 81 Ok(blocks) => blocks, 82 Err(e) => { 83 error!("Failed to prefetch blocks for backfill: {}", e); 84 socket.close().await.ok(); 85 return Err(()); 86 } 87 }; 88 for event in events { 89 current_cursor = event.seq; 90 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await { 91 Ok(b) => b, 92 Err(e) => { 93 warn!("Failed to format backfill event: {}", e); 94 return Err(()); 95 } 96 }; 97 if let Err(e) = socket.send(Message::Binary(bytes.into())).await { 98 warn!("Failed to send backfill event: {}", e); 99 return Err(()); 100 } 101 crate::metrics::record_firehose_event(); 102 } 103 if (events_count as i64) < BACKFILL_BATCH_SIZE { 104 break; 105 } 106 } 107 Err(e) => { 108 error!("Failed to fetch backfill events: {}", e); 109 socket.close().await.ok(); 110 return Err(()); 111 } 112 } 113 } 114 } 115 let mut rx = state.firehose_tx.subscribe(); 116 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG") 117 .ok() 118 .and_then(|v| v.parse().ok()) 119 .unwrap_or(5000); 120 loop { 121 tokio::select! { 122 result = rx.recv() => { 123 match result { 124 Ok(event) => { 125 if let Err(e) = send_event(socket, state, event).await { 126 warn!("Failed to send event: {}", e); 127 break; 128 } 129 crate::metrics::record_firehose_event(); 130 } 131 Err(RecvError::Lagged(skipped)) => { 132 warn!(skipped = skipped, "Firehose subscriber lagged behind"); 133 if skipped > max_lag_before_disconnect { 134 warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 135 "Disconnecting slow firehose consumer"); 136 break; 137 } 138 } 139 Err(RecvError::Closed) => { 140 info!("Firehose channel closed"); 141 break; 142 } 143 } 144 } 145 Some(Ok(msg)) = socket.next() => { 146 if let Message::Close(_) = msg { 147 info!("Client closed connection"); 148 break; 149 } 150 } 151 else => { 152 break; 153 } 154 } 155 } 156 Ok(()) 157}