this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::format_event_for_sending;
4use axum::{
5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State},
6 response::Response,
7};
8use futures::{sink::SinkExt, stream::StreamExt};
9use serde::Deserialize;
10use tracing::{error, info, warn};
11
12const BACKFILL_BATCH_SIZE: i64 = 1000;
13
14#[derive(Deserialize)]
15pub struct SubscribeReposParams {
16 pub cursor: Option<i64>,
17}
18
19#[axum::debug_handler]
20pub async fn subscribe_repos(
21 ws: WebSocketUpgrade,
22 State(state): State<AppState>,
23 Query(params): Query<SubscribeReposParams>,
24) -> Response {
25 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
26}
27
28async fn send_event(
29 socket: &mut WebSocket,
30 state: &AppState,
31 event: SequencedEvent,
32) -> Result<(), anyhow::Error> {
33 let bytes = format_event_for_sending(state, event).await?;
34 socket.send(Message::Binary(bytes.into())).await?;
35 Ok(())
36}
37
38async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
39 info!(cursor = ?params.cursor, "New firehose subscriber");
40
41 if let Some(cursor) = params.cursor {
42 let mut current_cursor = cursor;
43 loop {
44 let events = sqlx::query_as!(
45 SequencedEvent,
46 r#"
47 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids
48 FROM repo_seq
49 WHERE seq > $1
50 ORDER BY seq ASC
51 LIMIT $2
52 "#,
53 current_cursor,
54 BACKFILL_BATCH_SIZE
55 )
56 .fetch_all(&state.db)
57 .await;
58
59 match events {
60 Ok(events) => {
61 if events.is_empty() {
62 break;
63 }
64 for event in &events {
65 current_cursor = event.seq;
66 if let Err(e) = send_event(&mut socket, &state, event.clone()).await {
67 warn!("Failed to send backfill event: {}", e);
68 return;
69 }
70 }
71 if (events.len() as i64) < BACKFILL_BATCH_SIZE {
72 break;
73 }
74 }
75 Err(e) => {
76 error!("Failed to fetch backfill events: {}", e);
77 socket.close().await.ok();
78 return;
79 }
80 }
81 }
82 }
83
84 let mut rx = state.firehose_tx.subscribe();
85
86 loop {
87 tokio::select! {
88 Ok(event) = rx.recv() => {
89 if let Err(e) = send_event(&mut socket, &state, event).await {
90 warn!("Failed to send event: {}", e);
91 break;
92 }
93 }
94 Some(Ok(msg)) = socket.next() => {
95 if let Message::Close(_) = msg {
96 info!("Client closed connection");
97 break;
98 }
99 }
100 else => {
101 break;
102 }
103 }
104 }
105}