this repo has no description
1use crate::state::AppState; 2use crate::sync::util::format_event_for_sending; 3use futures::{sink::SinkExt, stream::StreamExt}; 4use std::time::Duration; 5use tokio::sync::mpsc; 6use tokio_tungstenite::{connect_async, tungstenite::Message}; 7use tracing::{error, info, warn}; 8 9async fn run_relay_client(state: AppState, url: String, ready_tx: Option<mpsc::Sender<()>>) { 10 info!("Starting firehose client for relay: {}", url); 11 loop { 12 match connect_async(&url).await { 13 Ok((mut ws_stream, _)) => { 14 info!("Connected to firehose relay: {}", url); 15 if let Some(tx) = ready_tx.as_ref() { 16 tx.send(()).await.ok(); 17 } 18 19 let mut rx = state.firehose_tx.subscribe(); 20 21 loop { 22 tokio::select! { 23 Ok(event) = rx.recv() => { 24 match format_event_for_sending(&state, event).await { 25 Ok(bytes) => { 26 if let Err(e) = ws_stream.send(Message::Binary(bytes.into())).await { 27 warn!("Failed to send event to {}: {}. Disconnecting.", url, e); 28 break; 29 } 30 } 31 Err(e) => { 32 error!("Failed to format event for relay {}: {}", url, e); 33 } 34 } 35 } 36 Some(msg) = ws_stream.next() => { 37 if let Ok(Message::Close(_)) = msg { 38 warn!("Relay {} closed connection.", url); 39 break; 40 } 41 } 42 else => break, 43 } 44 } 45 } 46 Err(e) => { 47 error!("Failed to connect to firehose relay {}: {}", url, e); 48 } 49 } 50 warn!( 51 "Disconnected from {}. Reconnecting in 5 seconds...", 52 url 53 ); 54 tokio::time::sleep(Duration::from_secs(5)).await; 55 } 56} 57 58pub async fn start_relay_clients( 59 state: AppState, 60 relays: Vec<String>, 61 mut ready_rx: Option<mpsc::Receiver<()>>, 62) { 63 if relays.is_empty() { 64 return; 65 } 66 67 let (ready_tx, mut internal_ready_rx) = mpsc::channel(1); 68 69 for url in relays { 70 let ready_tx = if ready_rx.is_some() { 71 Some(ready_tx.clone()) 72 } else { 73 None 74 }; 75 tokio::spawn(run_relay_client(state.clone(), url, ready_tx)); 76 } 77 78 if let Some(mut rx) = ready_rx.take() { 79 tokio::spawn(async move { 80 internal_ready_rx.recv().await; 81 rx.close(); 82 }); 83 } 84}