this repo has no description
1mod common; 2use common::*; 3 4use axum::{extract::ws::Message, routing::get, Router}; 5use bspds::{ 6 state::AppState, 7 sync::{firehose::SequencedEvent, relay_client::start_relay_clients}, 8}; 9use chrono::Utc; 10use tokio::net::TcpListener; 11use tokio::sync::mpsc; 12 13async fn mock_relay_server( 14 listener: TcpListener, 15 event_tx: mpsc::Sender<Vec<u8>>, 16 connected_tx: mpsc::Sender<()>, 17) { 18 let handler = |ws: axum::extract::ws::WebSocketUpgrade| async { 19 ws.on_upgrade(move |mut socket| async move { 20 let _ = connected_tx.send(()).await; 21 while let Some(Ok(msg)) = socket.recv().await { 22 if let Message::Binary(bytes) = msg { 23 let _ = event_tx.send(bytes.to_vec()).await; 24 break; 25 } 26 } 27 }) 28 }; 29 let app = Router::new().route("/", get(handler)); 30 31 axum::serve(listener, app.into_make_service()) 32 .await 33 .unwrap(); 34} 35 36#[tokio::test] 37async fn test_outbound_relay_client() { 38 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 39 let addr = listener.local_addr().unwrap(); 40 let (event_tx, mut event_rx) = mpsc::channel(1); 41 let (connected_tx, _connected_rx) = mpsc::channel::<()>(1); 42 tokio::spawn(mock_relay_server(listener, event_tx, connected_tx)); 43 let relay_url = format!("ws://{}", addr); 44 45 let db_url = get_db_connection_string().await; 46 let pool = sqlx::postgres::PgPoolOptions::new() 47 .connect(&db_url) 48 .await 49 .unwrap(); 50 let state = AppState::new(pool).await; 51 52 let (ready_tx, ready_rx) = mpsc::channel(1); 53 start_relay_clients(state.clone(), vec![relay_url], Some(ready_rx)).await; 54 55 tokio::time::timeout( 56 tokio::time::Duration::from_secs(5), 57 async { 58 ready_tx.closed().await; 59 } 60 ) 61 .await 62 .expect("Timeout waiting for relay client to be ready"); 63 64 let dummy_event = SequencedEvent { 65 seq: 1, 66 did: "did:plc:test".to_string(), 67 created_at: Utc::now(), 68 event_type: "commit".to_string(), 69 commit_cid: Some("bafyreihffx5a4o3qbv7vp6qmxpxok5mx5xvlsq6z4x3xv3zqv7vqvc7mzy".to_string()), 70 prev_cid: None, 71 ops: Some(serde_json::json!([])), 72 blobs: Some(vec![]), 73 blocks_cids: Some(vec![]), 74 }; 75 state.firehose_tx.send(dummy_event).unwrap(); 76 77 let received_bytes = tokio::time::timeout( 78 tokio::time::Duration::from_secs(5), 79 event_rx.recv() 80 ) 81 .await 82 .expect("Timeout waiting for event") 83 .expect("Event channel closed"); 84 85 assert!(!received_bytes.is_empty()); 86}