this repo has no description
1mod common; 2use common::*; 3 4use axum::{extract::ws::Message, routing::get, Router}; 5use bspds::{ 6 state::AppState, 7 sync::{firehose::SequencedEvent, relay_client::start_relay_clients}, 8}; 9use chrono::Utc; 10use tokio::net::TcpListener; 11use tokio::sync::mpsc; 12 13async fn mock_relay_server( 14 listener: TcpListener, 15 event_tx: mpsc::Sender<Vec<u8>>, 16 ready_tx: mpsc::Sender<()>, 17) { 18 let handler = |ws: axum::extract::ws::WebSocketUpgrade| async { 19 ws.on_upgrade(move |mut socket| async move { 20 ready_tx.send(()).await.unwrap(); 21 if let Some(Ok(Message::Binary(bytes))) = socket.recv().await { 22 event_tx.send(bytes.to_vec()).await.unwrap(); 23 } 24 }) 25 }; 26 let app = Router::new().route("/", get(handler)); 27 28 axum::serve(listener, app.into_make_service()) 29 .await 30 .unwrap(); 31} 32 33#[tokio::test] 34async fn test_outbound_relay_client() { 35 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); 36 let addr = listener.local_addr().unwrap(); 37 let (event_tx, mut event_rx) = mpsc::channel(1); 38 let (ready_tx, ready_rx) = mpsc::channel(1); 39 tokio::spawn(mock_relay_server(listener, event_tx, ready_tx)); 40 let relay_url = format!("ws://{}", addr); 41 42 let db_url = get_db_connection_string().await; 43 let pool = sqlx::postgres::PgPoolOptions::new() 44 .connect(&db_url) 45 .await 46 .unwrap(); 47 let state = AppState::new(pool).await; 48 49 start_relay_clients(state.clone(), vec![relay_url], Some(ready_rx)).await; 50 51 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; 52 53 let dummy_event = SequencedEvent { 54 seq: 1, 55 did: "did:plc:test".to_string(), 56 created_at: Utc::now(), 57 event_type: "commit".to_string(), 58 commit_cid: None, 59 prev_cid: None, 60 ops: None, 61 blobs: None, 62 blocks_cids: None, 63 }; 64 state.firehose_tx.send(dummy_event).unwrap(); 65 66 let received_bytes = event_rx.recv().await.expect("Did not receive event"); 67 assert!(!received_bytes.is_empty()); 68}