this repo has no description
1mod common;
2use common::*;
3
4use axum::{extract::ws::Message, routing::get, Router};
5use bspds::{
6 state::AppState,
7 sync::{firehose::SequencedEvent, relay_client::start_relay_clients},
8};
9use chrono::Utc;
10use tokio::net::TcpListener;
11use tokio::sync::mpsc;
12
13async fn mock_relay_server(
14 listener: TcpListener,
15 event_tx: mpsc::Sender<Vec<u8>>,
16 connected_tx: mpsc::Sender<()>,
17) {
18 let handler = |ws: axum::extract::ws::WebSocketUpgrade| async {
19 ws.on_upgrade(move |mut socket| async move {
20 let _ = connected_tx.send(()).await;
21 while let Some(Ok(msg)) = socket.recv().await {
22 if let Message::Binary(bytes) = msg {
23 let _ = event_tx.send(bytes.to_vec()).await;
24 break;
25 }
26 }
27 })
28 };
29 let app = Router::new().route("/", get(handler));
30
31 axum::serve(listener, app.into_make_service())
32 .await
33 .unwrap();
34}
35
36#[tokio::test]
37async fn test_outbound_relay_client() {
38 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
39 let addr = listener.local_addr().unwrap();
40 let (event_tx, mut event_rx) = mpsc::channel(1);
41 let (connected_tx, _connected_rx) = mpsc::channel::<()>(1);
42 tokio::spawn(mock_relay_server(listener, event_tx, connected_tx));
43 let relay_url = format!("ws://{}", addr);
44
45 let db_url = get_db_connection_string().await;
46 let pool = sqlx::postgres::PgPoolOptions::new()
47 .connect(&db_url)
48 .await
49 .unwrap();
50 let state = AppState::new(pool).await;
51
52 let (ready_tx, ready_rx) = mpsc::channel(1);
53 start_relay_clients(state.clone(), vec![relay_url], Some(ready_rx)).await;
54
55 tokio::time::timeout(
56 tokio::time::Duration::from_secs(5),
57 async {
58 ready_tx.closed().await;
59 }
60 )
61 .await
62 .expect("Timeout waiting for relay client to be ready");
63
64 let dummy_event = SequencedEvent {
65 seq: 1,
66 did: "did:plc:test".to_string(),
67 created_at: Utc::now(),
68 event_type: "commit".to_string(),
69 commit_cid: Some("bafyreihffx5a4o3qbv7vp6qmxpxok5mx5xvlsq6z4x3xv3zqv7vqvc7mzy".to_string()),
70 prev_cid: None,
71 ops: Some(serde_json::json!([])),
72 blobs: Some(vec![]),
73 blocks_cids: Some(vec![]),
74 };
75 state.firehose_tx.send(dummy_event).unwrap();
76
77 let received_bytes = tokio::time::timeout(
78 tokio::time::Duration::from_secs(5),
79 event_rx.recv()
80 )
81 .await
82 .expect("Timeout waiting for event")
83 .expect("Event channel closed");
84
85 assert!(!received_bytes.is_empty());
86}