this repo has no description
1use crate::state::AppState; 2use crate::sync::firehose::SequencedEvent; 3use crate::sync::util::format_event_for_sending; 4use axum::{ 5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State}, 6 response::Response, 7}; 8use futures::{sink::SinkExt, stream::StreamExt}; 9use serde::Deserialize; 10use tracing::{error, info, warn}; 11 12#[derive(Deserialize)] 13pub struct SubscribeReposParams { 14 pub cursor: Option<i64>, 15} 16 17#[axum::debug_handler] 18pub async fn subscribe_repos( 19 ws: WebSocketUpgrade, 20 State(state): State<AppState>, 21 Query(params): Query<SubscribeReposParams>, 22) -> Response { 23 ws.on_upgrade(move |socket| handle_socket(socket, state, params)) 24} 25 26async fn send_event( 27 socket: &mut WebSocket, 28 state: &AppState, 29 event: SequencedEvent, 30) -> Result<(), anyhow::Error> { 31 let bytes = format_event_for_sending(state, event).await?; 32 socket.send(Message::Binary(bytes.into())).await?; 33 Ok(()) 34} 35 36async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) { 37 info!(cursor = ?params.cursor, "New firehose subscriber"); 38 39 if let Some(cursor) = params.cursor { 40 let events = sqlx::query_as!( 41 SequencedEvent, 42 r#" 43 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids 44 FROM repo_seq 45 WHERE seq > $1 46 ORDER BY seq ASC 47 "#, 48 cursor 49 ) 50 .fetch_all(&state.db) 51 .await; 52 53 match events { 54 Ok(events) => { 55 for event in events { 56 if let Err(e) = send_event(&mut socket, &state, event).await { 57 warn!("Failed to send backfill event: {}", e); 58 return; 59 } 60 } 61 } 62 Err(e) => { 63 error!("Failed to fetch backfill events: {}", e); 64 socket.close().await.ok(); 65 return; 66 } 67 } 68 } 69 70 let mut rx = state.firehose_tx.subscribe(); 71 72 loop { 73 tokio::select! { 74 Ok(event) = rx.recv() => { 75 if let Err(e) = send_event(&mut socket, &state, event).await { 76 warn!("Failed to send event: {}", e); 77 break; 78 } 79 } 80 Some(Ok(msg)) = socket.next() => { 81 if let Message::Close(_) = msg { 82 info!("Client closed connection"); 83 break; 84 } 85 } 86 else => { 87 break; 88 } 89 } 90 } 91}