use crate::state::AppState; use crate::sync::firehose::SequencedEvent; use crate::sync::util::format_event_for_sending; use axum::{ extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, response::Response, }; use futures::{sink::SinkExt, stream::StreamExt}; use serde::Deserialize; use tracing::{error, info, warn}; #[derive(Deserialize)] pub struct SubscribeReposParams { pub cursor: Option, } #[axum::debug_handler] pub async fn subscribe_repos( ws: WebSocketUpgrade, State(state): State, Query(params): Query, ) -> Response { ws.on_upgrade(move |socket| handle_socket(socket, state, params)) } async fn send_event( socket: &mut WebSocket, state: &AppState, event: SequencedEvent, ) -> Result<(), anyhow::Error> { let bytes = format_event_for_sending(state, event).await?; socket.send(Message::Binary(bytes.into())).await?; Ok(()) } async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { info!(cursor = ?params.cursor, "New firehose subscriber"); if let Some(cursor) = params.cursor { let events = sqlx::query_as!( SequencedEvent, r#" SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids FROM repo_seq WHERE seq > $1 ORDER BY seq ASC "#, cursor ) .fetch_all(&state.db) .await; match events { Ok(events) => { for event in events { if let Err(e) = send_event(&mut socket, &state, event).await { warn!("Failed to send backfill event: {}", e); return; } } } Err(e) => { error!("Failed to fetch backfill events: {}", e); socket.close().await.ok(); return; } } } let mut rx = state.firehose_tx.subscribe(); loop { tokio::select! { Ok(event) = rx.recv() => { if let Err(e) = send_event(&mut socket, &state, event).await { warn!("Failed to send event: {}", e); break; } } Some(Ok(msg)) = socket.next() => { if let Message::Close(_) = msg { info!("Client closed connection"); break; } } else => { break; } } } }