this repo has no description
1use crate::state::AppState; 2use crate::sync::util::format_event_for_sending; 3use futures::{sink::SinkExt, stream::StreamExt}; 4use std::time::Duration; 5use tokio::sync::mpsc; 6use tokio_tungstenite::{connect_async, tungstenite::Message}; 7use tracing::{error, info, warn}; 8 9async fn run_relay_client(state: AppState, url: String, ready_tx: Option<mpsc::Sender<()>>) { 10 info!("Starting firehose client for relay: {}", url); 11 loop { 12 match connect_async(&url).await { 13 Ok((mut ws_stream, _)) => { 14 info!("Connected to firehose relay: {}", url); 15 let mut rx = state.firehose_tx.subscribe(); 16 if let Some(tx) = ready_tx.as_ref() { 17 tx.send(()).await.ok(); 18 } 19 20 loop { 21 tokio::select! { 22 Ok(event) = rx.recv() => { 23 match format_event_for_sending(&state, event).await { 24 Ok(bytes) => { 25 if let Err(e) = ws_stream.send(Message::Binary(bytes.into())).await { 26 warn!("Failed to send event to {}: {}. Disconnecting.", url, e); 27 break; 28 } 29 } 30 Err(e) => { 31 error!("Failed to format event for relay {}: {}", url, e); 32 } 33 } 34 } 35 Some(msg) = ws_stream.next() => { 36 if let Ok(Message::Close(_)) = msg { 37 warn!("Relay {} closed connection.", url); 38 break; 39 } 40 } 41 else => break, 42 } 43 } 44 } 45 Err(e) => { 46 error!("Failed to connect to firehose relay {}: {}", url, e); 47 } 48 } 49 warn!( 50 "Disconnected from {}. Reconnecting in 5 seconds...", 51 url 52 ); 53 tokio::time::sleep(Duration::from_secs(5)).await; 54 } 55} 56 57pub async fn start_relay_clients( 58 state: AppState, 59 relays: Vec<String>, 60 mut ready_rx: Option<mpsc::Receiver<()>>, 61) { 62 if relays.is_empty() { 63 return; 64 } 65 66 let (ready_tx, mut internal_ready_rx) = mpsc::channel(1); 67 68 for url in relays { 69 let ready_tx = if ready_rx.is_some() { 70 Some(ready_tx.clone()) 71 } else { 72 None 73 }; 74 tokio::spawn(run_relay_client(state.clone(), url, ready_tx)); 75 } 76 77 if let Some(mut rx) = ready_rx.take() { 78 tokio::spawn(async move { 79 internal_ready_rx.recv().await; 80 rx.close(); 81 }); 82 } 83}