this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::format_event_for_sending;
4use axum::{
5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State},
6 response::Response,
7};
8use futures::{sink::SinkExt, stream::StreamExt};
9use serde::Deserialize;
10use tracing::{error, info, warn};
11
12#[derive(Deserialize)]
13pub struct SubscribeReposParams {
14 pub cursor: Option<i64>,
15}
16
17#[axum::debug_handler]
18pub async fn subscribe_repos(
19 ws: WebSocketUpgrade,
20 State(state): State<AppState>,
21 Query(params): Query<SubscribeReposParams>,
22) -> Response {
23 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
24}
25
26async fn send_event(
27 socket: &mut WebSocket,
28 state: &AppState,
29 event: SequencedEvent,
30) -> Result<(), anyhow::Error> {
31 let bytes = format_event_for_sending(state, event).await?;
32 socket.send(Message::Binary(bytes.into())).await?;
33 Ok(())
34}
35
36async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
37 info!(cursor = ?params.cursor, "New firehose subscriber");
38
39 if let Some(cursor) = params.cursor {
40 let events = sqlx::query_as!(
41 SequencedEvent,
42 r#"
43 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids
44 FROM repo_seq
45 WHERE seq > $1
46 ORDER BY seq ASC
47 "#,
48 cursor
49 )
50 .fetch_all(&state.db)
51 .await;
52
53 match events {
54 Ok(events) => {
55 for event in events {
56 if let Err(e) = send_event(&mut socket, &state, event).await {
57 warn!("Failed to send backfill event: {}", e);
58 return;
59 }
60 }
61 }
62 Err(e) => {
63 error!("Failed to fetch backfill events: {}", e);
64 socket.close().await.ok();
65 return;
66 }
67 }
68 }
69
70 let mut rx = state.firehose_tx.subscribe();
71
72 loop {
73 tokio::select! {
74 Ok(event) = rx.recv() => {
75 if let Err(e) = send_event(&mut socket, &state, event).await {
76 warn!("Failed to send event: {}", e);
77 break;
78 }
79 }
80 Some(Ok(msg)) = socket.next() => {
81 if let Message::Close(_) = msg {
82 info!("Client closed connection");
83 break;
84 }
85 }
86 else => {
87 break;
88 }
89 }
90 }
91}