this repo has no description
1mod common;
2use common::*;
3
4use axum::{extract::ws::Message, routing::get, Router};
5use bspds::{
6 state::AppState,
7 sync::{firehose::SequencedEvent, relay_client::start_relay_clients},
8};
9use chrono::Utc;
10use tokio::net::TcpListener;
11use tokio::sync::mpsc;
12
13async fn mock_relay_server(
14 listener: TcpListener,
15 event_tx: mpsc::Sender<Vec<u8>>,
16 ready_tx: mpsc::Sender<()>,
17) {
18 let handler = |ws: axum::extract::ws::WebSocketUpgrade| async {
19 ws.on_upgrade(move |mut socket| async move {
20 ready_tx.send(()).await.unwrap();
21 if let Some(Ok(Message::Binary(bytes))) = socket.recv().await {
22 event_tx.send(bytes.to_vec()).await.unwrap();
23 }
24 })
25 };
26 let app = Router::new().route("/", get(handler));
27
28 axum::serve(listener, app.into_make_service())
29 .await
30 .unwrap();
31}
32
33#[tokio::test]
34async fn test_outbound_relay_client() {
35 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
36 let addr = listener.local_addr().unwrap();
37 let (event_tx, mut event_rx) = mpsc::channel(1);
38 let (ready_tx, ready_rx) = mpsc::channel(1);
39 tokio::spawn(mock_relay_server(listener, event_tx, ready_tx));
40 let relay_url = format!("ws://{}", addr);
41
42 let db_url = get_db_connection_string().await;
43 let pool = sqlx::postgres::PgPoolOptions::new()
44 .connect(&db_url)
45 .await
46 .unwrap();
47 let state = AppState::new(pool).await;
48
49 start_relay_clients(state.clone(), vec![relay_url], Some(ready_rx)).await;
50
51 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
52
53 let dummy_event = SequencedEvent {
54 seq: 1,
55 did: "did:plc:test".to_string(),
56 created_at: Utc::now(),
57 event_type: "commit".to_string(),
58 commit_cid: None,
59 prev_cid: None,
60 ops: None,
61 blobs: None,
62 blocks_cids: None,
63 };
64 state.firehose_tx.send(dummy_event).unwrap();
65
66 let received_bytes = event_rx.recv().await.expect("Did not receive event");
67 assert!(!received_bytes.is_empty());
68}