P2P support library for the beaver compute environment
at main 327 lines 11 kB view raw
1use iroh::{EndpointId, address_lookup::DiscoveryEvent}; 2use p2p_beaver::{PairingError, PairingManager, PeerEvent}; 3use parking_lot::Mutex; 4use std::sync::Arc; 5use std::sync::mpsc::{Receiver, channel}; 6 7type PairingReceiver = Receiver<PeerEvent>; 8 9// Helper: create a named pairing manager. 10async fn create_manager(name: &str) -> (PairingManager, PairingReceiver) { 11 let (sender, receiver) = channel(); 12 let manager = PairingManager::create(sender, name).await; 13 (manager, receiver) 14} 15 16// Helper: wait for 2 managers to have discovered each other. 17// Returns the endpoints discovered by each receiver. 18fn wait_for_discovery( 19 receiver1: Arc<Mutex<PairingReceiver>>, 20 receiver2: Arc<Mutex<PairingReceiver>>, 21) -> (EndpointId, EndpointId) { 22 let handle1 = std::thread::spawn(move || { 23 loop { 24 match receiver1.lock().recv() { 25 Ok(event) => match event { 26 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { 27 return endpoint_info.endpoint_id; 28 } 29 _ => {} 30 }, 31 Err(err) => { 32 panic!("Should not error! {err:?}"); 33 } 34 } 35 } 36 }); 37 38 let handle2 = std::thread::spawn(move || { 39 loop { 40 match receiver2.lock().recv() { 41 Ok(event) => match event { 42 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { 43 return endpoint_info.endpoint_id; 44 } 45 _ => {} 46 }, 47 Err(err) => { 48 panic!("Should not error! {err:?}"); 49 } 50 } 51 } 52 }); 53 54 let endpoint1 = handle1.join().unwrap(); 55 let endpoint2 = handle2.join().unwrap(); 56 57 (endpoint1, endpoint2) 58} 59 60// Easier assert for PairingError 61macro_rules! assert_pairing { 62 ($observed:expr, $expected:pat) => { 63 match $observed.err().unwrap() { 64 $expected => {} 65 _ => panic!("expected: {{$expected}} but got {{$observed}}"), 66 } 67 }; 68} 69 70// Discover and stop. 71#[tokio::test(flavor = "multi_thread")] 72async fn discover_and_shutdown() { 73 let (mut manager1, receiver1) = create_manager("test-1").await; 74 let (mut manager2, receiver2) = create_manager("test-2").await; 75 76 let handle1 = std::thread::spawn(move || { 77 loop { 78 match receiver1.recv() { 79 Ok(event) => match event { 80 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { 81 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); 82 assert_eq!(endpoint_name, "test-2"); 83 break; 84 } 85 _ => {} 86 }, 87 Err(_err) => { 88 break; 89 } 90 } 91 } 92 }); 93 94 let handle2 = std::thread::spawn(move || { 95 loop { 96 match receiver2.recv() { 97 Ok(event) => match event { 98 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => { 99 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); 100 assert_eq!(endpoint_name, "test-1"); 101 break; 102 } 103 _ => {} 104 }, 105 Err(_err) => { 106 break; 107 } 108 } 109 } 110 }); 111 112 let _ = handle1.join(); 113 let _ = handle2.join(); 114 115 manager1.stop().await; 116 manager2.stop().await; 117} 118 119// Discover, and wait for second peer to disappear 120#[tokio::test(flavor = "multi_thread")] 121async fn discover_and_expire() { 122 env_logger::init(); 123 124 let (mut manager1, receiver1) = create_manager("test-1").await; 125 let (mut manager2, receiver2) = create_manager("test-2").await; 126 127 let receiver1 = Arc::new(Mutex::new(receiver1)); 128 let receiver2 = Arc::new(Mutex::new(receiver2)); 129 130 let (_endpoint1, endpoint2) = 131 wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); 132 133 let handle1 = std::thread::spawn(move || { 134 loop { 135 match receiver1.lock().recv() { 136 Ok(event) => { 137 println!("mgr1 event: {event:?}"); 138 match event { 139 PeerEvent::Discovery(DiscoveryEvent::Discovered { 140 endpoint_info, .. 141 }) => { 142 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap()); 143 assert_eq!(endpoint_name, "test-2"); 144 } 145 PeerEvent::Discovery(DiscoveryEvent::Expired { endpoint_id }) => { 146 assert_eq!(endpoint_id, endpoint2); 147 break; 148 } 149 _ => {} 150 } 151 } 152 Err(_err) => { 153 break; 154 } 155 } 156 } 157 }); 158 159 assert_eq!(manager2.peers().await.len(), 1); 160 assert_eq!(manager1.peers().await.len(), 1); 161 manager2.stop().await; 162 163 let _ = handle1.join(); 164 assert_eq!(manager1.peers().await.len(), 0); 165 manager1.stop().await; 166} 167 168// Peer1 requests pairing, peer2 rejects it. 169#[tokio::test(flavor = "multi_thread")] 170async fn reject_pairing() { 171 env_logger::init(); 172 173 let (mut manager1, receiver1) = create_manager("test-1").await; 174 let (mut manager2, receiver2) = create_manager("test-2").await; 175 176 let receiver1 = Arc::new(Mutex::new(receiver1)); 177 let receiver2 = Arc::new(Mutex::new(receiver2)); 178 179 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); 180 181 let handle1 = std::thread::spawn(move || { 182 loop { 183 match receiver1.lock().recv() { 184 Ok(event) => { 185 println!("mgr1 event: {event:?}"); 186 match event { 187 PeerEvent::PairingRejected(endpoint) => { 188 assert_eq!(endpoint, endpoint1); 189 break; 190 } 191 _ => {} 192 } 193 } 194 Err(_err) => { 195 break; 196 } 197 } 198 } 199 }); 200 201 let mgr = manager2.clone(); 202 let handle2 = std::thread::spawn(move || { 203 loop { 204 let rt = tokio::runtime::Builder::new_current_thread() 205 .enable_all() 206 .build() 207 .unwrap(); 208 209 match receiver2.lock().recv() { 210 Ok(event) => { 211 println!("mgr2 event: {event:?}"); 212 let mgr = mgr.clone(); 213 match event { 214 PeerEvent::PairingRequest(endpoint) => { 215 assert_eq!(endpoint, endpoint2); 216 rt.block_on(async { 217 mgr.reject_pairing(&endpoint) 218 .await 219 .expect("failed to reject pairing"); 220 }); 221 } 222 PeerEvent::PairingRejected(endpoint) => { 223 assert_eq!(endpoint, endpoint2); 224 break; 225 } 226 _ => {} 227 } 228 } 229 Err(_err) => { 230 break; 231 } 232 } 233 } 234 }); 235 236 let response = manager1.request_pairing(&endpoint1).await; 237 assert_pairing!(response, PairingError::Rejected); 238 239 let _ = handle1.join(); 240 let _ = handle2.join(); 241 242 manager1.stop().await; 243 manager2.stop().await; 244} 245 246// Peer1 requests pairing, peer2 accepts it. 247#[tokio::test(flavor = "multi_thread")] 248async fn accept_pairing() { 249 env_logger::init(); 250 251 let (mut manager1, receiver1) = create_manager("test-1").await; 252 let (mut manager2, receiver2) = create_manager("test-2").await; 253 254 let receiver1 = Arc::new(Mutex::new(receiver1)); 255 let receiver2 = Arc::new(Mutex::new(receiver2)); 256 257 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2)); 258 259 println!("Discovery done"); 260 261 let handle1 = std::thread::spawn(move || { 262 loop { 263 match receiver1.lock().recv() { 264 Ok(event) => { 265 println!("mgr1 event: {event:?}"); 266 match event { 267 PeerEvent::PairingAccepted(endpoint) => { 268 assert_eq!(endpoint, endpoint1); 269 break; 270 } 271 _ => {} 272 } 273 } 274 Err(_err) => { 275 break; 276 } 277 } 278 } 279 }); 280 281 let mgr = manager2.clone(); 282 let handle2 = std::thread::spawn(move || { 283 loop { 284 let rt = tokio::runtime::Builder::new_current_thread() 285 .enable_all() 286 .build() 287 .unwrap(); 288 289 match receiver2.lock().recv() { 290 Ok(event) => { 291 println!("mgr2 event: {event:?}"); 292 let mgr = mgr.clone(); 293 match event { 294 PeerEvent::PairingRequest(endpoint) => { 295 assert_eq!(endpoint, endpoint2); 296 rt.block_on(async { 297 mgr.accept_pairing(&endpoint) 298 .await 299 .expect("failed to accept pairing"); 300 }); 301 } 302 PeerEvent::PairingAccepted(endpoint) => { 303 assert_eq!(endpoint, endpoint2); 304 break; 305 } 306 _ => {} 307 } 308 } 309 Err(_err) => { 310 break; 311 } 312 } 313 } 314 }); 315 316 let response = manager1.request_pairing(&endpoint1).await; 317 assert!(response.is_ok()); 318 319 let _ = handle1.join(); 320 let _ = handle2.join(); 321 322 assert_eq!(manager1.peers().await.len(), 1); 323 assert_eq!(manager2.peers().await.len(), 1); 324 325 manager1.stop().await; 326 manager2.stop().await; 327}