this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::{format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events};
4use axum::{
5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State},
6 response::Response,
7};
8use futures::{sink::SinkExt, stream::StreamExt};
9use serde::Deserialize;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use tokio::sync::broadcast::error::RecvError;
12use tracing::{error, info, warn};
13
14const BACKFILL_BATCH_SIZE: i64 = 1000;
15
16static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18#[derive(Deserialize)]
19pub struct SubscribeReposParams {
20 pub cursor: Option<i64>,
21}
22
23#[axum::debug_handler]
24pub async fn subscribe_repos(
25 ws: WebSocketUpgrade,
26 State(state): State<AppState>,
27 Query(params): Query<SubscribeReposParams>,
28) -> Response {
29 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
30}
31
32async fn send_event(
33 socket: &mut WebSocket,
34 state: &AppState,
35 event: SequencedEvent,
36) -> Result<(), anyhow::Error> {
37 let bytes = format_event_for_sending(state, event).await?;
38 socket.send(Message::Binary(bytes.into())).await?;
39 Ok(())
40}
41
42pub fn get_subscriber_count() -> usize {
43 SUBSCRIBER_COUNT.load(Ordering::SeqCst)
44}
45
46async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
47 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1;
48 crate::metrics::set_firehose_subscribers(count);
49 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber");
50 let _ = handle_socket_inner(&mut socket, &state, params).await;
51 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
52 crate::metrics::set_firehose_subscribers(count);
53 info!(subscribers = count, "Firehose subscriber disconnected");
54}
55
56async fn handle_socket_inner(socket: &mut WebSocket, state: &AppState, params: SubscribeReposParams) -> Result<(), ()> {
57 if let Some(cursor) = params.cursor {
58 let mut current_cursor = cursor;
59 loop {
60 let events = sqlx::query_as!(
61 SequencedEvent,
62 r#"
63 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status
64 FROM repo_seq
65 WHERE seq > $1
66 ORDER BY seq ASC
67 LIMIT $2
68 "#,
69 current_cursor,
70 BACKFILL_BATCH_SIZE
71 )
72 .fetch_all(&state.db)
73 .await;
74 match events {
75 Ok(events) => {
76 if events.is_empty() {
77 break;
78 }
79 let events_count = events.len();
80 let prefetched = match prefetch_blocks_for_events(state, &events).await {
81 Ok(blocks) => blocks,
82 Err(e) => {
83 error!("Failed to prefetch blocks for backfill: {}", e);
84 socket.close().await.ok();
85 return Err(());
86 }
87 };
88 for event in events {
89 current_cursor = event.seq;
90 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await {
91 Ok(b) => b,
92 Err(e) => {
93 warn!("Failed to format backfill event: {}", e);
94 return Err(());
95 }
96 };
97 if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
98 warn!("Failed to send backfill event: {}", e);
99 return Err(());
100 }
101 crate::metrics::record_firehose_event();
102 }
103 if (events_count as i64) < BACKFILL_BATCH_SIZE {
104 break;
105 }
106 }
107 Err(e) => {
108 error!("Failed to fetch backfill events: {}", e);
109 socket.close().await.ok();
110 return Err(());
111 }
112 }
113 }
114 }
115 let mut rx = state.firehose_tx.subscribe();
116 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG")
117 .ok()
118 .and_then(|v| v.parse().ok())
119 .unwrap_or(5000);
120 loop {
121 tokio::select! {
122 result = rx.recv() => {
123 match result {
124 Ok(event) => {
125 if let Err(e) = send_event(socket, state, event).await {
126 warn!("Failed to send event: {}", e);
127 break;
128 }
129 crate::metrics::record_firehose_event();
130 }
131 Err(RecvError::Lagged(skipped)) => {
132 warn!(skipped = skipped, "Firehose subscriber lagged behind");
133 if skipped > max_lag_before_disconnect {
134 warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
135 "Disconnecting slow firehose consumer");
136 break;
137 }
138 }
139 Err(RecvError::Closed) => {
140 info!("Firehose channel closed");
141 break;
142 }
143 }
144 }
145 Some(Ok(msg)) = socket.next() => {
146 if let Message::Close(_) = msg {
147 info!("Client closed connection");
148 break;
149 }
150 }
151 else => {
152 break;
153 }
154 }
155 }
156 Ok(())
157}