this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::format_event_for_sending; 4use axum::{ 5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, 6 response::Response, 7}; 8use futures::{sink::SinkExt, stream::StreamExt}; 9use serde::Deserialize; 10use tracing::{error, info, warn}; 11 12const BACKFILL_BATCH_SIZE: i64 = 1000; 13 14#[derive(Deserialize)] 15pub struct SubscribeReposParams { 16 pub cursor: Option<i64>, 17} 18 19#[axum::debug_handler] 20pub async fn subscribe_repos( 21 ws: WebSocketUpgrade, 22 State(state): State<AppState>, 23 Query(params): Query<SubscribeReposParams>, 24) -> Response { 25 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 26} 27 28async fn send_event( 29 socket: &mut WebSocket, 30 state: &AppState, 31 event: SequencedEvent, 32) -> Result<(), anyhow::Error> { 33 let bytes = format_event_for_sending(state, event).await?; 34 socket.send(Message::Binary(bytes.into())).await?; 35 Ok(()) 36} 37 38async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 39 info!(cursor = ?params.cursor, "New firehose subscriber"); 40 41 if let Some(cursor) = params.cursor { 42 let mut current_cursor = cursor; 43 loop { 44 let events = sqlx::query_as!( 45 SequencedEvent, 46 r#" 47 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids 48 FROM repo_seq 49 WHERE seq > $1 50 ORDER BY seq ASC 51 LIMIT $2 52 "#, 53 current_cursor, 54 BACKFILL_BATCH_SIZE 55 ) 56 .fetch_all(&state.db) 57 .await; 58 59 match events { 60 Ok(events) => { 61 if events.is_empty() { 62 break; 63 } 64 for event in &events { 65 current_cursor = event.seq; 66 if let Err(e) = send_event(&mut socket, &state, event.clone()).await { 67 warn!("Failed to send backfill event: {}", e); 68 return; 69 } 70 } 71 if (events.len() as i64) < BACKFILL_BATCH_SIZE { 72 break; 73 } 74 } 75 Err(e) => { 76 error!("Failed to fetch backfill events: {}", e); 77 socket.close().await.ok(); 78 return; 79 } 80 } 81 } 82 } 83 84 let mut rx = state.firehose_tx.subscribe(); 85 86 loop { 87 tokio::select! { 88 Ok(event) = rx.recv() => { 89 if let Err(e) = send_event(&mut socket, &state, event).await { 90 warn!("Failed to send event: {}", e); 91 break; 92 } 93 } 94 Some(Ok(msg)) = socket.next() => { 95 if let Message::Close(_) = msg { 96 info!("Client closed connection"); 97 break; 98 } 99 } 100 else => { 101 break; 102 } 103 } 104 } 105}