this repo has no description
1use crate::state::AppState;
2use crate::sync::firehose::SequencedEvent;
3use crate::sync::util::{format_event_for_sending, format_event_with_prefetched_blocks, prefetch_blocks_for_events};
4use axum::{
5 extract::{ws::Message, ws::WebSocket, ws::WebSocketUpgrade, Query, State},
6 response::Response,
7};
8use futures::{sink::SinkExt, stream::StreamExt};
9use serde::Deserialize;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use tokio::sync::broadcast::error::RecvError;
12use tracing::{error, info, warn};
13const BACKFILL_BATCH_SIZE: i64 = 1000;
14static SUBSCRIBER_COUNT: AtomicUsize = AtomicUsize::new(0);
15#[derive(Deserialize)]
16pub struct SubscribeReposParams {
17 pub cursor: Option<i64>,
18}
19#[axum::debug_handler]
20pub async fn subscribe_repos(
21 ws: WebSocketUpgrade,
22 State(state): State<AppState>,
23 Query(params): Query<SubscribeReposParams>,
24) -> Response {
25 ws.on_upgrade(move |socket| handle_socket(socket, state, params))
26}
27async fn send_event(
28 socket: &mut WebSocket,
29 state: &AppState,
30 event: SequencedEvent,
31) -> Result<(), anyhow::Error> {
32 let bytes = format_event_for_sending(state, event).await?;
33 socket.send(Message::Binary(bytes.into())).await?;
34 Ok(())
35}
36pub fn get_subscriber_count() -> usize {
37 SUBSCRIBER_COUNT.load(Ordering::SeqCst)
38}
39async fn handle_socket(mut socket: WebSocket, state: AppState, params: SubscribeReposParams) {
40 let count = SUBSCRIBER_COUNT.fetch_add(1, Ordering::SeqCst) + 1;
41 crate::metrics::set_firehose_subscribers(count);
42 info!(cursor = ?params.cursor, subscribers = count, "New firehose subscriber");
43 let _ = handle_socket_inner(&mut socket, &state, params).await;
44 let count = SUBSCRIBER_COUNT.fetch_sub(1, Ordering::SeqCst) - 1;
45 crate::metrics::set_firehose_subscribers(count);
46 info!(subscribers = count, "Firehose subscriber disconnected");
47}
48async fn handle_socket_inner(socket: &mut WebSocket, state: &AppState, params: SubscribeReposParams) -> Result<(), ()> {
49 if let Some(cursor) = params.cursor {
50 let mut current_cursor = cursor;
51 loop {
52 let events = sqlx::query_as!(
53 SequencedEvent,
54 r#"
55 SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status
56 FROM repo_seq
57 WHERE seq > $1
58 ORDER BY seq ASC
59 LIMIT $2
60 "#,
61 current_cursor,
62 BACKFILL_BATCH_SIZE
63 )
64 .fetch_all(&state.db)
65 .await;
66 match events {
67 Ok(events) => {
68 if events.is_empty() {
69 break;
70 }
71 let events_count = events.len();
72 let prefetched = match prefetch_blocks_for_events(state, &events).await {
73 Ok(blocks) => blocks,
74 Err(e) => {
75 error!("Failed to prefetch blocks for backfill: {}", e);
76 socket.close().await.ok();
77 return Err(());
78 }
79 };
80 for event in events {
81 current_cursor = event.seq;
82 let bytes = match format_event_with_prefetched_blocks(event, &prefetched).await {
83 Ok(b) => b,
84 Err(e) => {
85 warn!("Failed to format backfill event: {}", e);
86 return Err(());
87 }
88 };
89 if let Err(e) = socket.send(Message::Binary(bytes.into())).await {
90 warn!("Failed to send backfill event: {}", e);
91 return Err(());
92 }
93 crate::metrics::record_firehose_event();
94 }
95 if (events_count as i64) < BACKFILL_BATCH_SIZE {
96 break;
97 }
98 }
99 Err(e) => {
100 error!("Failed to fetch backfill events: {}", e);
101 socket.close().await.ok();
102 return Err(());
103 }
104 }
105 }
106 }
107 let mut rx = state.firehose_tx.subscribe();
108 let max_lag_before_disconnect: u64 = std::env::var("FIREHOSE_MAX_LAG")
109 .ok()
110 .and_then(|v| v.parse().ok())
111 .unwrap_or(5000);
112 loop {
113 tokio::select! {
114 result = rx.recv() => {
115 match result {
116 Ok(event) => {
117 if let Err(e) = send_event(socket, state, event).await {
118 warn!("Failed to send event: {}", e);
119 break;
120 }
121 crate::metrics::record_firehose_event();
122 }
123 Err(RecvError::Lagged(skipped)) => {
124 warn!(skipped = skipped, "Firehose subscriber lagged behind");
125 if skipped > max_lag_before_disconnect {
126 warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
127 "Disconnecting slow firehose consumer");
128 break;
129 }
130 }
131 Err(RecvError::Closed) => {
132 info!("Firehose channel closed");
133 break;
134 }
135 }
136 }
137 Some(Ok(msg)) = socket.next() => {
138 if let Message::Close(_) = msg {
139 info!("Client closed connection");
140 break;
141 }
142 }
143 else => {
144 break;
145 }
146 }
147 }
148 Ok(())
149}