use iroh::{EndpointId, address_lookup::DiscoveryEvent}; use p2p_beaver::{PairingError, PairingManager, PeerEvent}; use parking_lot::Mutex; use std::sync::Arc; use std::sync::mpsc::{Receiver, channel}; type PairingReceiver = Receiver; // Helper: create a named pairing manager. async fn create_manager(name: &str) -> (PairingManager, PairingReceiver) { let (sender, receiver) = channel(); let manager = PairingManager::create(sender, name).await; (manager, receiver) } // Helper: wait for 2 managers to have discovered each other. // Returns the endpoints discovered by each receiver. fn wait_for_discovery( receiver1: Arc>, receiver2: Arc>, ) -> (EndpointId, EndpointId) { let handle1 = std::thread::spawn(move || { loop { match receiver1.lock().recv() { Ok(event) => match event { PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { return endpoint_info.endpoint_id; } _ => {} }, Err(err) => { panic!("Should not error! {err:?}"); } } } }); let handle2 = std::thread::spawn(move || { loop { match receiver2.lock().recv() { Ok(event) => match event { PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { return endpoint_info.endpoint_id; } _ => {} }, Err(err) => { panic!("Should not error! {err:?}"); } } } }); let endpoint1 = handle1.join().unwrap(); let endpoint2 = handle2.join().unwrap(); (endpoint1, endpoint2) } // Easier assert for PairingError macro_rules! assert_pairing { ($observed:expr, $expected:pat) => { match $observed.err().unwrap() { $expected => {} _ => panic!("expected: {{$expected}} but got {{$observed}}"), } }; } // Discover and stop. #[tokio::test(flavor = "multi_thread")] async fn discover_and_shutdown() { let (mut manager1, receiver1) = create_manager("test-1").await; let (mut manager2, receiver2) = create_manager("test-2").await; let handle1 = std::thread::spawn(move || { loop { match receiver1.recv() { Ok(event) => match event { PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); assert_eq!(endpoint_name, "test-2"); break; } _ => {} }, Err(_err) => { break; } } } }); let handle2 = std::thread::spawn(move || { loop { match receiver2.recv() { Ok(event) => match event { PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); assert_eq!(endpoint_name, "test-1"); break; } _ => {} }, Err(_err) => { break; } } } }); let _ = handle1.join(); let _ = handle2.join(); manager1.stop().await; manager2.stop().await; } // Discover, and wait for second peer to disappear #[tokio::test(flavor = "multi_thread")] async fn discover_and_expire() { env_logger::init(); let (mut manager1, receiver1) = create_manager("test-1").await; let (mut manager2, receiver2) = create_manager("test-2").await; let receiver1 = Arc::new(Mutex::new(receiver1)); let receiver2 = Arc::new(Mutex::new(receiver2)); let (_endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); let handle1 = std::thread::spawn(move || { loop { match receiver1.lock().recv() { Ok(event) => { println!("mgr1 event: {event:?}"); match event { PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); assert_eq!(endpoint_name, "test-2"); } PeerEvent::Discovery(DiscoveryEvent::Expired { endpoint_id }) => { assert_eq!(endpoint_id, endpoint2); break; } _ => {} } } Err(_err) => { break; } } } }); assert_eq!(manager2.peers().await.len(), 1); assert_eq!(manager1.peers().await.len(), 1); manager2.stop().await; let _ = handle1.join(); assert_eq!(manager1.peers().await.len(), 0); manager1.stop().await; } // Peer1 requests pairing, peer2 rejects it. #[tokio::test(flavor = "multi_thread")] async fn reject_pairing() { env_logger::init(); let (mut manager1, receiver1) = create_manager("test-1").await; let (mut manager2, receiver2) = create_manager("test-2").await; let receiver1 = Arc::new(Mutex::new(receiver1)); let receiver2 = Arc::new(Mutex::new(receiver2)); let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); let handle1 = std::thread::spawn(move || { loop { match receiver1.lock().recv() { Ok(event) => { println!("mgr1 event: {event:?}"); match event { PeerEvent::PairingRejected(endpoint) => { assert_eq!(endpoint, endpoint1); break; } _ => {} } } Err(_err) => { break; } } } }); let mgr = manager2.clone(); let handle2 = std::thread::spawn(move || { loop { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); match receiver2.lock().recv() { Ok(event) => { println!("mgr2 event: {event:?}"); let mgr = mgr.clone(); match event { PeerEvent::PairingRequest(endpoint) => { assert_eq!(endpoint, endpoint2); rt.block_on(async { mgr.reject_pairing(&endpoint) .await .expect("failed to reject pairing"); }); } PeerEvent::PairingRejected(endpoint) => { assert_eq!(endpoint, endpoint2); break; } _ => {} } } Err(_err) => { break; } } } }); let response = manager1.request_pairing(&endpoint1).await; assert_pairing!(response, PairingError::Rejected); let _ = handle1.join(); let _ = handle2.join(); manager1.stop().await; manager2.stop().await; } // Peer1 requests pairing, peer2 accepts it. #[tokio::test(flavor = "multi_thread")] async fn accept_pairing() { env_logger::init(); let (mut manager1, receiver1) = create_manager("test-1").await; let (mut manager2, receiver2) = create_manager("test-2").await; let receiver1 = Arc::new(Mutex::new(receiver1)); let receiver2 = Arc::new(Mutex::new(receiver2)); let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); println!("Discovery done"); let handle1 = std::thread::spawn(move || { loop { match receiver1.lock().recv() { Ok(event) => { println!("mgr1 event: {event:?}"); match event { PeerEvent::PairingAccepted(endpoint) => { assert_eq!(endpoint, endpoint1); break; } _ => {} } } Err(_err) => { break; } } } }); let mgr = manager2.clone(); let handle2 = std::thread::spawn(move || { loop { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); match receiver2.lock().recv() { Ok(event) => { println!("mgr2 event: {event:?}"); let mgr = mgr.clone(); match event { PeerEvent::PairingRequest(endpoint) => { assert_eq!(endpoint, endpoint2); rt.block_on(async { mgr.accept_pairing(&endpoint) .await .expect("failed to accept pairing"); }); } PeerEvent::PairingAccepted(endpoint) => { assert_eq!(endpoint, endpoint2); break; } _ => {} } } Err(_err) => { break; } } } }); let response = manager1.request_pairing(&endpoint1).await; assert!(response.is_ok()); let _ = handle1.join(); let _ = handle2.join(); assert_eq!(manager1.peers().await.len(), 1); assert_eq!(manager2.peers().await.len(), 1); manager1.stop().await; manager2.stop().await; }