this repo has no description
1use crate::state::AppState;
2use crate::sync::util::format_event_for_sending;
3use futures::{sink::SinkExt, stream::StreamExt};
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7use tracing::{error, info, warn};
8
9async fn run_relay_client(state: AppState, url: String, ready_tx: Option<mpsc::Sender<()>>) {
10 info!("Starting firehose client for relay: {}", url);
11 loop {
12 match connect_async(&url).await {
13 Ok((mut ws_stream, _)) => {
14 info!("Connected to firehose relay: {}", url);
15 if let Some(tx) = ready_tx.as_ref() {
16 tx.send(()).await.ok();
17 }
18
19 let mut rx = state.firehose_tx.subscribe();
20
21 loop {
22 tokio::select! {
23 Ok(event) = rx.recv() => {
24 match format_event_for_sending(&state, event).await {
25 Ok(bytes) => {
26 if let Err(e) = ws_stream.send(Message::Binary(bytes.into())).await {
27 warn!("Failed to send event to {}: {}. Disconnecting.", url, e);
28 break;
29 }
30 }
31 Err(e) => {
32 error!("Failed to format event for relay {}: {}", url, e);
33 }
34 }
35 }
36 Some(msg) = ws_stream.next() => {
37 if let Ok(Message::Close(_)) = msg {
38 warn!("Relay {} closed connection.", url);
39 break;
40 }
41 }
42 else => break,
43 }
44 }
45 }
46 Err(e) => {
47 error!("Failed to connect to firehose relay {}: {}", url, e);
48 }
49 }
50 warn!(
51 "Disconnected from {}. Reconnecting in 5 seconds...",
52 url
53 );
54 tokio::time::sleep(Duration::from_secs(5)).await;
55 }
56}
57
58pub async fn start_relay_clients(
59 state: AppState,
60 relays: Vec<String>,
61 mut ready_rx: Option<mpsc::Receiver<()>>,
62) {
63 if relays.is_empty() {
64 return;
65 }
66
67 let (ready_tx, mut internal_ready_rx) = mpsc::channel(1);
68
69 for url in relays {
70 let ready_tx = if ready_rx.is_some() {
71 Some(ready_tx.clone())
72 } else {
73 None
74 };
75 tokio::spawn(run_relay_client(state.clone(), url, ready_tx));
76 }
77
78 if let Some(mut rx) = ready_rx.take() {
79 tokio::spawn(async move {
80 internal_ready_rx.recv().await;
81 rx.close();
82 });
83 }
84}