this repo has no description
1use crate::state::AppState;
2use crate::sync::util::format_event_for_sending;
3use futures::{sink::SinkExt, stream::StreamExt};
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7use tracing::{error, info, warn};
8
9async fn run_relay_client(state: AppState, url: String, ready_tx: Option<mpsc::Sender<()>>) {
10 info!("Starting firehose client for relay: {}", url);
11 loop {
12 match connect_async(&url).await {
13 Ok((mut ws_stream, _)) => {
14 info!("Connected to firehose relay: {}", url);
15 let mut rx = state.firehose_tx.subscribe();
16 if let Some(tx) = ready_tx.as_ref() {
17 tx.send(()).await.ok();
18 }
19
20 loop {
21 tokio::select! {
22 Ok(event) = rx.recv() => {
23 match format_event_for_sending(&state, event).await {
24 Ok(bytes) => {
25 if let Err(e) = ws_stream.send(Message::Binary(bytes.into())).await {
26 warn!("Failed to send event to {}: {}. Disconnecting.", url, e);
27 break;
28 }
29 }
30 Err(e) => {
31 error!("Failed to format event for relay {}: {}", url, e);
32 }
33 }
34 }
35 Some(msg) = ws_stream.next() => {
36 if let Ok(Message::Close(_)) = msg {
37 warn!("Relay {} closed connection.", url);
38 break;
39 }
40 }
41 else => break,
42 }
43 }
44 }
45 Err(e) => {
46 error!("Failed to connect to firehose relay {}: {}", url, e);
47 }
48 }
49 warn!(
50 "Disconnected from {}. Reconnecting in 5 seconds...",
51 url
52 );
53 tokio::time::sleep(Duration::from_secs(5)).await;
54 }
55}
56
57pub async fn start_relay_clients(
58 state: AppState,
59 relays: Vec<String>,
60 mut ready_rx: Option<mpsc::Receiver<()>>,
61) {
62 if relays.is_empty() {
63 return;
64 }
65
66 let (ready_tx, mut internal_ready_rx) = mpsc::channel(1);
67
68 for url in relays {
69 let ready_tx = if ready_rx.is_some() {
70 Some(ready_tx.clone())
71 } else {
72 None
73 };
74 tokio::spawn(run_relay_client(state.clone(), url, ready_tx));
75 }
76
77 if let Some(mut rx) = ready_rx.take() {
78 tokio::spawn(async move {
79 internal_ready_rx.recv().await;
80 rx.close();
81 });
82 }
83}