P2P support library for the beaver compute environment
1use iroh::{EndpointId, address_lookup::DiscoveryEvent};
2use p2p_beaver::{PairingError, PairingManager, PeerEvent};
3use parking_lot::Mutex;
4use std::sync::Arc;
5use std::sync::mpsc::{Receiver, channel};
6
7type PairingReceiver = Receiver<PeerEvent>;
8
9// Helper: create a named pairing manager.
10async fn create_manager(name: &str) -> (PairingManager, PairingReceiver) {
11 let (sender, receiver) = channel();
12 let manager = PairingManager::create(sender, name).await;
13 (manager, receiver)
14}
15
16// Helper: wait for 2 managers to have discovered each other.
17// Returns the endpoints discovered by each receiver.
18fn wait_for_discovery(
19 receiver1: Arc<Mutex<PairingReceiver>>,
20 receiver2: Arc<Mutex<PairingReceiver>>,
21) -> (EndpointId, EndpointId) {
22 let handle1 = std::thread::spawn(move || {
23 loop {
24 match receiver1.lock().recv() {
25 Ok(event) => match event {
26 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => {
27 return endpoint_info.endpoint_id;
28 }
29 _ => {}
30 },
31 Err(err) => {
32 panic!("Should not error! {err:?}");
33 }
34 }
35 }
36 });
37
38 let handle2 = std::thread::spawn(move || {
39 loop {
40 match receiver2.lock().recv() {
41 Ok(event) => match event {
42 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => {
43 return endpoint_info.endpoint_id;
44 }
45 _ => {}
46 },
47 Err(err) => {
48 panic!("Should not error! {err:?}");
49 }
50 }
51 }
52 });
53
54 let endpoint1 = handle1.join().unwrap();
55 let endpoint2 = handle2.join().unwrap();
56
57 (endpoint1, endpoint2)
58}
59
60// Easier assert for PairingError
61macro_rules! assert_pairing {
62 ($observed:expr, $expected:pat) => {
63 match $observed.err().unwrap() {
64 $expected => {}
65 _ => panic!("expected: {{$expected}} but got {{$observed}}"),
66 }
67 };
68}
69
70// Discover and stop.
71#[tokio::test(flavor = "multi_thread")]
72async fn discover_and_shutdown() {
73 let (mut manager1, receiver1) = create_manager("test-1").await;
74 let (mut manager2, receiver2) = create_manager("test-2").await;
75
76 let handle1 = std::thread::spawn(move || {
77 loop {
78 match receiver1.recv() {
79 Ok(event) => match event {
80 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => {
81 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap());
82 assert_eq!(endpoint_name, "test-2");
83 break;
84 }
85 _ => {}
86 },
87 Err(_err) => {
88 break;
89 }
90 }
91 }
92 });
93
94 let handle2 = std::thread::spawn(move || {
95 loop {
96 match receiver2.recv() {
97 Ok(event) => match event {
98 PeerEvent::Discovery(DiscoveryEvent::Discovered { endpoint_info, .. }) => {
99 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap());
100 assert_eq!(endpoint_name, "test-1");
101 break;
102 }
103 _ => {}
104 },
105 Err(_err) => {
106 break;
107 }
108 }
109 }
110 });
111
112 let _ = handle1.join();
113 let _ = handle2.join();
114
115 manager1.stop().await;
116 manager2.stop().await;
117}
118
119// Discover, and wait for second peer to disappear
120#[tokio::test(flavor = "multi_thread")]
121async fn discover_and_expire() {
122 env_logger::init();
123
124 let (mut manager1, receiver1) = create_manager("test-1").await;
125 let (mut manager2, receiver2) = create_manager("test-2").await;
126
127 let receiver1 = Arc::new(Mutex::new(receiver1));
128 let receiver2 = Arc::new(Mutex::new(receiver2));
129
130 let (_endpoint1, endpoint2) =
131 wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2));
132
133 let handle1 = std::thread::spawn(move || {
134 loop {
135 match receiver1.lock().recv() {
136 Ok(event) => {
137 println!("mgr1 event: {event:?}");
138 match event {
139 PeerEvent::Discovery(DiscoveryEvent::Discovered {
140 endpoint_info, ..
141 }) => {
142 let endpoint_name = format!("{}", endpoint_info.user_data().unwrap());
143 assert_eq!(endpoint_name, "test-2");
144 }
145 PeerEvent::Discovery(DiscoveryEvent::Expired { endpoint_id }) => {
146 assert_eq!(endpoint_id, endpoint2);
147 break;
148 }
149 _ => {}
150 }
151 }
152 Err(_err) => {
153 break;
154 }
155 }
156 }
157 });
158
159 assert_eq!(manager2.peers().await.len(), 1);
160 assert_eq!(manager1.peers().await.len(), 1);
161 manager2.stop().await;
162
163 let _ = handle1.join();
164 assert_eq!(manager1.peers().await.len(), 0);
165 manager1.stop().await;
166}
167
168// Peer1 requests pairing, peer2 rejects it.
169#[tokio::test(flavor = "multi_thread")]
170async fn reject_pairing() {
171 env_logger::init();
172
173 let (mut manager1, receiver1) = create_manager("test-1").await;
174 let (mut manager2, receiver2) = create_manager("test-2").await;
175
176 let receiver1 = Arc::new(Mutex::new(receiver1));
177 let receiver2 = Arc::new(Mutex::new(receiver2));
178
179 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2));
180
181 let handle1 = std::thread::spawn(move || {
182 loop {
183 match receiver1.lock().recv() {
184 Ok(event) => {
185 println!("mgr1 event: {event:?}");
186 match event {
187 PeerEvent::PairingRejected(endpoint) => {
188 assert_eq!(endpoint, endpoint1);
189 break;
190 }
191 _ => {}
192 }
193 }
194 Err(_err) => {
195 break;
196 }
197 }
198 }
199 });
200
201 let mgr = manager2.clone();
202 let handle2 = std::thread::spawn(move || {
203 loop {
204 let rt = tokio::runtime::Builder::new_current_thread()
205 .enable_all()
206 .build()
207 .unwrap();
208
209 match receiver2.lock().recv() {
210 Ok(event) => {
211 println!("mgr2 event: {event:?}");
212 let mgr = mgr.clone();
213 match event {
214 PeerEvent::PairingRequest(endpoint) => {
215 assert_eq!(endpoint, endpoint2);
216 rt.block_on(async {
217 mgr.reject_pairing(&endpoint)
218 .await
219 .expect("failed to reject pairing");
220 });
221 }
222 PeerEvent::PairingRejected(endpoint) => {
223 assert_eq!(endpoint, endpoint2);
224 break;
225 }
226 _ => {}
227 }
228 }
229 Err(_err) => {
230 break;
231 }
232 }
233 }
234 });
235
236 let response = manager1.request_pairing(&endpoint1).await;
237 assert_pairing!(response, PairingError::Rejected);
238
239 let _ = handle1.join();
240 let _ = handle2.join();
241
242 manager1.stop().await;
243 manager2.stop().await;
244}
245
246// Peer1 requests pairing, peer2 accepts it.
247#[tokio::test(flavor = "multi_thread")]
248async fn accept_pairing() {
249 env_logger::init();
250
251 let (mut manager1, receiver1) = create_manager("test-1").await;
252 let (mut manager2, receiver2) = create_manager("test-2").await;
253
254 let receiver1 = Arc::new(Mutex::new(receiver1));
255 let receiver2 = Arc::new(Mutex::new(receiver2));
256
257 let (endpoint1, endpoint2) = wait_for_discovery(Arc::clone(&receiver1), Arc::clone(&receiver2));
258
259 println!("Discovery done");
260
261 let handle1 = std::thread::spawn(move || {
262 loop {
263 match receiver1.lock().recv() {
264 Ok(event) => {
265 println!("mgr1 event: {event:?}");
266 match event {
267 PeerEvent::PairingAccepted(endpoint) => {
268 assert_eq!(endpoint, endpoint1);
269 break;
270 }
271 _ => {}
272 }
273 }
274 Err(_err) => {
275 break;
276 }
277 }
278 }
279 });
280
281 let mgr = manager2.clone();
282 let handle2 = std::thread::spawn(move || {
283 loop {
284 let rt = tokio::runtime::Builder::new_current_thread()
285 .enable_all()
286 .build()
287 .unwrap();
288
289 match receiver2.lock().recv() {
290 Ok(event) => {
291 println!("mgr2 event: {event:?}");
292 let mgr = mgr.clone();
293 match event {
294 PeerEvent::PairingRequest(endpoint) => {
295 assert_eq!(endpoint, endpoint2);
296 rt.block_on(async {
297 mgr.accept_pairing(&endpoint)
298 .await
299 .expect("failed to accept pairing");
300 });
301 }
302 PeerEvent::PairingAccepted(endpoint) => {
303 assert_eq!(endpoint, endpoint2);
304 break;
305 }
306 _ => {}
307 }
308 }
309 Err(_err) => {
310 break;
311 }
312 }
313 }
314 });
315
316 let response = manager1.request_pairing(&endpoint1).await;
317 assert!(response.is_ok());
318
319 let _ = handle1.join();
320 let _ = handle2.join();
321
322 assert_eq!(manager1.peers().await.len(), 1);
323 assert_eq!(manager2.peers().await.len(), 1);
324
325 manager1.stop().await;
326 manager2.stop().await;
327}