Live location tracking and playback for the game "manhunt"
at main 99 lines 3.6 kB view raw
1use async_trait::async_trait; 2use axum::extract::ws::Message; 3use futures::StreamExt; 4use log::{error, info, warn}; 5use matchbox_protocol::{JsonPeerEvent, PeerRequest}; 6use matchbox_signaling::{ 7 ClientRequestError, NoCallbacks, SignalingTopology, WsStateMeta, common_logic::parse_request, 8}; 9 10use crate::state::ServerState; 11 12#[derive(Default, Debug)] 13pub struct ServerTopology; 14 15#[async_trait] 16impl SignalingTopology<NoCallbacks, ServerState> for ServerTopology { 17 async fn state_machine(upgrade: WsStateMeta<NoCallbacks, ServerState>) { 18 let WsStateMeta { 19 peer_id, 20 sender, 21 mut receiver, 22 mut state, 23 .. 24 } = upgrade; 25 26 let (host, cancel, other_peers) = state.add_peer(peer_id, sender.clone()); 27 28 let msg = Message::Text(JsonPeerEvent::NewPeer(peer_id).to_string().into()); 29 30 for other_id in other_peers { 31 if let Err(why) = state.try_send(other_id, msg.clone()) { 32 error!("Failed to publish new peer event to {other_id}: {why:?}"); 33 } 34 } 35 36 loop { 37 let next_msg = tokio::select! { 38 biased; 39 40 _ = cancel.cancelled() => { 41 info!("Disconnecting {peer_id} due to host disconnect"); 42 break; 43 } 44 45 next = receiver.next() => { 46 if let Some(next) = next { 47 parse_request(next) 48 } else { 49 info!("Peer {peer_id} has disconnected"); 50 break; 51 } 52 } 53 }; 54 55 let req = match next_msg { 56 Ok(req) => req, 57 Err(e) => match e { 58 ClientRequestError::Axum(e) => { 59 warn!("Peer {peer_id} encountered Axum error: {e:?}. Disconnecting..."); 60 break; 61 } 62 ClientRequestError::Close => { 63 info!("Peer {peer_id} closed connection"); 64 break; 65 } 66 ClientRequestError::Json(_) | ClientRequestError::UnsupportedType(_) => { 67 error!("Error parsing request from {peer_id}: {e:?}"); 68 continue; // Recoverable, although may mean bad state? 69 } 70 }, 71 }; 72 73 if let PeerRequest::Signal { receiver, data } = req { 74 let msg = Message::Text( 75 JsonPeerEvent::Signal { 76 sender: peer_id, 77 data, 78 } 79 .to_string() 80 .into(), 81 ); 82 if let Err(why) = state.try_send(receiver, msg) { 83 error!("Error sending signaling message from {peer_id} to {receiver}: {why:?}"); 84 } 85 } // Other variant, PeerRequest::KeepAlive is just for a heartbeat, do nothing 86 } 87 88 let msg = Message::Text(JsonPeerEvent::PeerLeft(peer_id).to_string().into()); 89 if let Some(other_peers) = state.remove_peer(peer_id, host) { 90 for other_id in other_peers { 91 if let Err(why) = state.try_send(other_id, msg.clone()) { 92 warn!("Failed to alert {other_id} that {peer_id} has disconnected: {why:?}"); 93 } 94 } 95 } else { 96 warn!("Trying to remove peer {peer_id}, which doesn't exist?"); 97 } 98 } 99}