this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::{
4 format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events,
5};
6use axum::{
7 extract::{Query, State, ws::Message, ws::WebSocket, ws::WebSocketUpgrade},
8 response::Response,
9};
10use futures::{sink::SinkExt, stream::StreamExt};
11use serde::Deserialize;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use tokio::sync::broadcast::error::RecvError;
14use tracing::{error, info, warn};
15
16const BACKFILL_BATCH_SIZE: i64 = 1000;
17
18static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0);
19
20#[derive(Deserialize)]
21pub struct SubscribeReposParams {
22 pub cursor: Option<i64>,
23}
24
25#[axum::debug_handler]
26pub async fn subscribe_repos(
27 ws: WebSocketUpgrade,
28 State(state): State<AppState>,
29 Query(params): Query<SubscribeReposParams>,
30) -> Response {
31 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
32}
33
34async fn send_event(
35 socket: &mut WebSocket,
36 state: &AppState,
37 event: SequencedEvent,
38) -> Result<(), anyhow::Error> {
39 let bytes = format_event_for_sending(state, event).await?;
40 socket.send(Message::Binary(bytes.into())).await?;
41 Ok(())
42}
43
44pub fn get_subscriber_count() -> usize {
45 SUBSCRIBER_COUNT.load(Ordering::SeqCst)
46}
47
48async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
49 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1;
50 crate::metrics::set_firehose_subscribers(count);
51 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber");
52 let _ = handle_socket_inner(&mut socket, &state, params).await;
53 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
54 crate::metrics::set_firehose_subscribers(count);
55 info!(subscribers = count, "Firehose subscriber disconnected");
56}
57
58async fn handle_socket_inner(
59 socket: &mut WebSocket,
60 state: &AppState,
61 params: SubscribeReposParams,
62) -> Result<(), ()> {
63 if let Some(cursor) = params.cursor {
64 let mut current_cursor = cursor;
65 loop {
66 let events = sqlx::query_as!(
67 SequencedEvent,
68 r#"
69 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev
70 FROM repo_seq
71 WHERE seq > $1
72 ORDER BY seq ASC
73 LIMIT $2
74 "#,
75 current_cursor,
76 BACKFILL_BATCH_SIZE
77 )
78 .fetch_all(&state.db)
79 .await;
80 match events {
81 Ok(events) => {
82 if events.is_empty() {
83 break;
84 }
85 let events_count = events.len();
86 let prefetched = match prefetch_blocks_for_events(state, &events).await {
87 Ok(blocks) => blocks,
88 Err(e) => {
89 error!("Failed to prefetch blocks for backfill: {}", e);
90 socket.close().await.ok();
91 return Err(());
92 }
93 };
94 for event in events {
95 current_cursor = event.seq;
96 let bytes =
97 match format_event_with_prefetched_blocks(event, &prefetched).await {
98 Ok(b) => b,
99 Err(e) => {
100 warn!("Failed to format backfill event: {}", e);
101 return Err(());
102 }
103 };
104 if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
105 warn!("Failed to send backfill event: {}", e);
106 return Err(());
107 }
108 crate::metrics::record_firehose_event();
109 }
110 if (events_count as i64) < BACKFILL_BATCH_SIZE {
111 break;
112 }
113 }
114 Err(e) => {
115 error!("Failed to fetch backfill events: {}", e);
116 socket.close().await.ok();
117 return Err(());
118 }
119 }
120 }
121 }
122 let mut rx = state.firehose_tx.subscribe();
123 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG")
124 .ok()
125 .and_then(|v| v.parse().ok())
126 .unwrap_or(5000);
127 loop {
128 tokio::select! {
129 result = rx.recv() => {
130 match result {
131 Ok(event) => {
132 if let Err(e) = send_event(socket, state, event).await {
133 warn!("Failed to send event: {}", e);
134 break;
135 }
136 crate::metrics::record_firehose_event();
137 }
138 Err(RecvError::Lagged(skipped)) => {
139 warn!(skipped = skipped, "Firehose subscriber lagged behind");
140 if skipped > max_lag_before_disconnect {
141 warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
142 "Disconnecting slow firehose consumer");
143 break;
144 }
145 }
146 Err(RecvError::Closed) => {
147 info!("Firehose channel closed");
148 break;
149 }
150 }
151 }
152 Some(Ok(msg)) = socket.next() => {
153 if let Message::Close(_) = msg {
154 info!("Client closed connection");
155 break;
156 }
157 }
158 else => {
159 break;
160 }
161 }
162 }
163 Ok(())
164}