Live location tracking and playback for the game "manhunt"

Tests for signaling, disconnect players in lobby on host leave

bwc9876.dev 9231a51d d903af34

verified
+685 -340
+2
Cargo.lock
··· 2982 2982 "matchbox_protocol", 2983 2983 "matchbox_signaling", 2984 2984 "tokio", 2985 + "tokio-util", 2986 + "uuid", 2985 2987 ] 2986 2988 2987 2989 [[package]]
+5 -2
TODO.md
··· 25 25 - [ ] Backend : More tests 26 26 - [x] Lobby tests 27 27 - [x] Game end test for actual return from loop 28 - - [ ] Testing crate for integration testing from a DSL 29 - - [ ] NixOS VM tests wrapping the testing crate 28 + - [ ] More transport crates tests 29 + - [x] Organize signalling and seperate out more logic 30 + - [x] Signaling tests 31 + - [ ] Testing crate for integration testing? 32 + - [ ] NixOS VM tests wrapping the testing crate? 30 33 - [ ] Nix : Cheat the dependency nightmare and use crane 31 34 - [x] Nix : Fix manhunt.nix to actually build 32 35 - [ ] Frontend : Rework state management, better hooks
+2 -5
manhunt-signaling/Cargo.toml
··· 2 2 name = "manhunt-signaling" 3 3 version = "0.1.0" 4 4 edition = "2024" 5 - default-run = "manhunt-signaling" 6 - 7 - [[bin]] 8 - name = "manhunt-signaling" 9 - path = "main.rs" 10 5 11 6 [dependencies] 12 7 anyhow = "1.0.98" ··· 18 13 matchbox_protocol = "0.12.0" 19 14 matchbox_signaling = "0.12.0" 20 15 tokio = { version = "1.45.1", features = ["macros"] } 16 + tokio-util = "0.7.15" 17 + uuid = "1.17.0"
-333
manhunt-signaling/main.rs
··· 1 - use async_trait::async_trait; 2 - use axum::{ 3 - Error as AxumError, 4 - extract::{Path, ws::Message}, 5 - http::StatusCode, 6 - response::IntoResponse, 7 - routing::{get, post}, 8 - }; 9 - use futures::StreamExt; 10 - use log::{debug, error, info, warn}; 11 - use matchbox_protocol::{JsonPeerEvent, PeerId, PeerRequest}; 12 - use matchbox_signaling::{ 13 - ClientRequestError, NoCallbacks, SignalingError, SignalingServerBuilder, SignalingState, 14 - SignalingTopology, WsStateMeta, 15 - common_logic::{self, StateObj, parse_request}, 16 - }; 17 - 18 - use anyhow::Context; 19 - use std::{ 20 - collections::{HashMap, HashSet}, 21 - net::{IpAddr, Ipv4Addr, SocketAddr}, 22 - result::Result as StdResult, 23 - }; 24 - use tokio::sync::mpsc::UnboundedSender; 25 - 26 - type Result<T = (), E = anyhow::Error> = StdResult<T, E>; 27 - 28 - type RoomId = String; 29 - type Sender = UnboundedSender<Result<Message, AxumError>>; 30 - 31 - #[derive(Debug, Clone)] 32 - struct Match { 33 - pub open_lobby: bool, 34 - pub players: HashSet<PeerId>, 35 - } 36 - 37 - #[derive(Debug, Clone)] 38 - struct Peer { 39 - pub room: RoomId, 40 - sender: Sender, 41 - } 42 - 43 - impl Match { 44 - pub fn new() -> Self { 45 - Self { 46 - open_lobby: true, 47 - players: HashSet::with_capacity(10), 48 - } 49 - } 50 - } 51 - 52 - #[derive(Default, Debug, Clone)] 53 - struct ServerState { 54 - waiting_clients: StateObj<HashMap<SocketAddr, RoomId>>, 55 - queued_clients: StateObj<HashMap<PeerId, RoomId>>, 56 - matches: StateObj<HashMap<RoomId, Match>>, 57 - clients: StateObj<HashMap<PeerId, Peer>>, 58 - } 59 - 60 - impl SignalingState for ServerState {} 61 - 62 - impl ServerState { 63 - fn add_client(&mut self, origin: SocketAddr, code: RoomId) { 64 - self.waiting_clients 65 - .lock() 66 - .unwrap() 67 - .insert(origin, code.clone()); 68 - } 69 - 70 - pub fn room_is_open(&self, room_id: &RoomId) -> bool { 71 - self.matches 72 - .lock() 73 - .unwrap() 74 - .get(room_id) 75 - .is_some_and(|m| m.open_lobby) 76 - } 77 - 78 - /// Mark a match as started, disallowing others from joining 79 - pub fn mark_started(&mut self, room: &RoomId) { 80 - if let Some(mat) = self.matches.lock().unwrap().get_mut(room) { 81 - mat.open_lobby = false; 82 - } 83 - } 84 - 85 - /// Create a new room with the given code, should be called when someone wants to host a game. 86 - /// Returns false if a room with that code already exists. 87 - pub fn create_room(&mut self, origin: SocketAddr, code: RoomId) -> bool { 88 - let mut matches = self.matches.lock().unwrap(); 89 - if matches.contains_key(&code) { 90 - false 91 - } else { 92 - matches.insert(code.clone(), Match::new()); 93 - drop(matches); 94 - self.add_client(origin, code); 95 - true 96 - } 97 - } 98 - 99 - /// Try to join a room by a code, returns `true` if successful 100 - pub fn try_join_room(&mut self, origin: SocketAddr, code: RoomId) -> bool { 101 - if self.room_is_open(&code) { 102 - self.waiting_clients.lock().unwrap().insert(origin, code); 103 - true 104 - } else { 105 - false 106 - } 107 - } 108 - 109 - /// Assign a peer an id 110 - pub fn assign_peer_id(&mut self, origin: SocketAddr, peer_id: PeerId) { 111 - let target_room = self 112 - .waiting_clients 113 - .lock() 114 - .unwrap() 115 - .remove(&origin) 116 - .expect("origin not waiting?"); 117 - 118 - self.queued_clients 119 - .lock() 120 - .unwrap() 121 - .insert(peer_id, target_room); 122 - } 123 - 124 - /// Add a peer to a room, returns other peers in the match currently 125 - pub fn add_peer(&mut self, peer_id: PeerId, sender: Sender) -> Vec<PeerId> { 126 - let target_room = self 127 - .queued_clients 128 - .lock() 129 - .unwrap() 130 - .remove(&peer_id) 131 - .expect("peer not waiting?"); 132 - let mut matches = self.matches.lock().unwrap(); 133 - let mat = matches.get_mut(&target_room).expect("Room not found?"); 134 - let peers = mat.players.iter().copied().collect::<Vec<_>>(); 135 - mat.players.insert(peer_id); 136 - drop(matches); 137 - let peer = Peer { 138 - room: target_room, 139 - sender, 140 - }; 141 - self.clients.lock().unwrap().insert(peer_id, peer); 142 - peers 143 - } 144 - 145 - /// Disconnect a peer from a room. Automatically deletes the room if no peers remain. Returns 146 - /// the removed peer and the set of other peers in the room that need to be notified 147 - pub fn remove_peer(&mut self, peer_id: PeerId) -> Option<Vec<PeerId>> { 148 - let removed_peer = self.clients.lock().unwrap().remove(&peer_id)?; 149 - let other_peers = self 150 - .matches 151 - .lock() 152 - .unwrap() 153 - .get_mut(&removed_peer.room) 154 - .map(|m| { 155 - m.players.remove(&peer_id); 156 - m.players.iter().copied().collect::<Vec<_>>() 157 - }) 158 - .unwrap_or_default(); 159 - 160 - if other_peers.is_empty() { 161 - self.matches.lock().unwrap().remove(&removed_peer.room); 162 - } 163 - 164 - Some(other_peers) 165 - } 166 - 167 - pub fn try_send(&self, peer: PeerId, msg: Message) -> Result<(), SignalingError> { 168 - self.clients 169 - .lock() 170 - .unwrap() 171 - .get(&peer) 172 - .ok_or(SignalingError::UnknownPeer) 173 - .and_then(|peer| common_logic::try_send(&peer.sender, msg)) 174 - } 175 - } 176 - 177 - #[derive(Default, Debug)] 178 - struct ServerTopology; 179 - 180 - #[async_trait] 181 - impl SignalingTopology<NoCallbacks, ServerState> for ServerTopology { 182 - async fn state_machine(upgrade: WsStateMeta<NoCallbacks, ServerState>) { 183 - let WsStateMeta { 184 - peer_id, 185 - sender, 186 - mut receiver, 187 - mut state, 188 - .. 189 - } = upgrade; 190 - 191 - let other_peers = state.add_peer(peer_id, sender.clone()); 192 - 193 - let msg = Message::Text(JsonPeerEvent::NewPeer(peer_id).to_string().into()); 194 - 195 - for other_id in other_peers { 196 - if let Err(why) = state.try_send(other_id, msg.clone()) { 197 - error!("Failed to publish new peer event to {other_id}: {why:?}"); 198 - } 199 - } 200 - 201 - while let Some(req) = receiver.next().await { 202 - let req = match parse_request(req) { 203 - Ok(req) => req, 204 - Err(e) => match e { 205 - ClientRequestError::Axum(e) => { 206 - warn!("Peer {peer_id} encountered Axum error: {e:?}. Disconnecting..."); 207 - break; 208 - } 209 - ClientRequestError::Close => { 210 - info!("Peer {peer_id} closed connection"); 211 - break; 212 - } 213 - ClientRequestError::Json(_) | ClientRequestError::UnsupportedType(_) => { 214 - error!("Error parsing request from {peer_id}: {e:?}"); 215 - continue; // Recoverable, although may mean bad state? 216 - } 217 - }, 218 - }; 219 - 220 - if let PeerRequest::Signal { receiver, data } = req { 221 - let msg = Message::Text( 222 - JsonPeerEvent::Signal { 223 - sender: peer_id, 224 - data, 225 - } 226 - .to_string() 227 - .into(), 228 - ); 229 - if let Err(why) = state.try_send(receiver, msg) { 230 - error!("Error sending signaling message from {peer_id} to {receiver}: {why:?}"); 231 - } 232 - } // Other variant, PeerRequest::KeepAlive is just for a heartbeat, do nothing 233 - } 234 - 235 - info!("Peer {peer_id} disconnected"); 236 - 237 - let msg = Message::Text(JsonPeerEvent::PeerLeft(peer_id).to_string().into()); 238 - if let Some(other_peers) = state.remove_peer(peer_id) { 239 - for other_id in other_peers { 240 - if let Err(why) = state.try_send(other_id, msg.clone()) { 241 - warn!("Failed to alert {other_id} that {peer_id} has disconnected: {why:?}"); 242 - } 243 - } 244 - } else { 245 - warn!("Trying to remove peer {peer_id}, which doesn't exist?"); 246 - } 247 - } 248 - } 249 - 250 - #[tokio::main] 251 - async fn main() -> Result { 252 - colog::init(); 253 - 254 - let args = std::env::args().collect::<Vec<_>>(); 255 - let socket_addr = args 256 - .get(1) 257 - .map(|raw_binding| raw_binding.parse::<SocketAddr>()) 258 - .transpose() 259 - .context("Invalid socket addr passed")? 260 - .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3536)); 261 - 262 - let mut state = ServerState::default(); 263 - 264 - let server = SignalingServerBuilder::new(socket_addr, ServerTopology, state.clone()) 265 - .on_connection_request({ 266 - let mut state = state.clone(); 267 - move |connection| { 268 - info!("{} is requesting to connect", connection.origin); 269 - debug!("Connection meta: {connection:?}"); 270 - 271 - let err = if let Some(room_code) = connection.path.clone() { 272 - let is_host = connection.query_params.contains_key("create"); 273 - if is_host { 274 - if state.create_room(connection.origin, room_code) { 275 - None 276 - } else { 277 - Some(StatusCode::CONFLICT) 278 - } 279 - } else if state.try_join_room(connection.origin, room_code) { 280 - None 281 - } else { 282 - Some(StatusCode::NOT_FOUND) 283 - } 284 - } else { 285 - Some(StatusCode::BAD_REQUEST) 286 - }; 287 - 288 - if let Some(status) = err { 289 - Err(status.into_response()) 290 - } else { 291 - Ok(true) 292 - } 293 - } 294 - }) 295 - .mutate_router({ 296 - let state = state.clone(); 297 - move |router| { 298 - let mut state2 = state.clone(); 299 - router 300 - .route( 301 - "/room_exists/{id}", 302 - get(move |Path(room_id): Path<String>| async move { 303 - if state.room_is_open(&room_id) { 304 - StatusCode::OK 305 - } else { 306 - StatusCode::NOT_FOUND 307 - } 308 - }), 309 - ) 310 - .route( 311 - "/mark_started/{id}", 312 - post(move |Path(room_id): Path<String>| async move { 313 - state2.mark_started(&room_id); 314 - StatusCode::OK 315 - }), 316 - ) 317 - } 318 - }) 319 - .on_id_assignment({ 320 - move |(socket, id)| { 321 - info!("Assigning id {id} to {socket}"); 322 - state.assign_peer_id(socket, id); 323 - } 324 - }) 325 - .build(); 326 - 327 - info!( 328 - "Starting manhunt signaling server {}", 329 - env!("CARGO_PKG_VERSION") 330 - ); 331 - 332 - server.serve().await.context("Error while running server") 333 - }
+100
manhunt-signaling/src/main.rs
··· 1 + mod state; 2 + mod topology; 3 + 4 + use axum::{ 5 + extract::Path, 6 + http::StatusCode, 7 + response::IntoResponse, 8 + routing::{get, post}, 9 + }; 10 + use log::{debug, info}; 11 + use matchbox_signaling::SignalingServerBuilder; 12 + 13 + use anyhow::Context; 14 + use std::{ 15 + net::{IpAddr, Ipv4Addr, SocketAddr}, 16 + result::Result as StdResult, 17 + }; 18 + 19 + use state::ServerState; 20 + use topology::ServerTopology; 21 + 22 + type Result<T = (), E = anyhow::Error> = StdResult<T, E>; 23 + 24 + #[tokio::main] 25 + async fn main() -> Result { 26 + colog::init(); 27 + 28 + let args = std::env::args().collect::<Vec<_>>(); 29 + let socket_addr = args 30 + .get(1) 31 + .map(|raw_binding| raw_binding.parse::<SocketAddr>()) 32 + .transpose() 33 + .context("Invalid socket addr passed")? 34 + .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 3536)); 35 + 36 + let mut state = ServerState::default(); 37 + 38 + let server = SignalingServerBuilder::new(socket_addr, ServerTopology, state.clone()) 39 + .on_connection_request({ 40 + let mut state = state.clone(); 41 + move |connection| { 42 + info!("{} is requesting to connect", connection.origin); 43 + debug!("Connection meta: {connection:?}"); 44 + 45 + let err = if let Some(room_code) = connection.path.clone() { 46 + let create = connection.query_params.contains_key("create"); 47 + match state.handle_room(create, connection.origin, room_code) { 48 + Ok(_) => None, 49 + Err(err) => Some(err.into()), 50 + } 51 + } else { 52 + Some(StatusCode::BAD_REQUEST) 53 + }; 54 + 55 + if let Some(status) = err { 56 + Err(status.into_response()) 57 + } else { 58 + Ok(true) 59 + } 60 + } 61 + }) 62 + .mutate_router({ 63 + let state = state.clone(); 64 + move |router| { 65 + let mut state2 = state.clone(); 66 + router 67 + .route( 68 + "/room_exists/{id}", 69 + get(move |Path(room_id): Path<String>| async move { 70 + if state.room_is_open(&room_id) { 71 + StatusCode::OK 72 + } else { 73 + StatusCode::NOT_FOUND 74 + } 75 + }), 76 + ) 77 + .route( 78 + "/mark_started/{id}", 79 + post(move |Path(room_id): Path<String>| async move { 80 + state2.mark_started(&room_id); 81 + StatusCode::OK 82 + }), 83 + ) 84 + } 85 + }) 86 + .on_id_assignment({ 87 + move |(socket, id)| { 88 + info!("Assigning id {id} to {socket}"); 89 + state.assign_peer_id(socket, id); 90 + } 91 + }) 92 + .build(); 93 + 94 + info!( 95 + "Starting manhunt signaling server {}", 96 + env!("CARGO_PKG_VERSION") 97 + ); 98 + 99 + server.serve().await.context("Error while running server") 100 + }
+477
manhunt-signaling/src/state.rs
··· 1 + use std::{ 2 + collections::{HashMap, HashSet}, 3 + net::SocketAddr, 4 + }; 5 + 6 + use axum::{Error as AxumError, extract::ws::Message, http::StatusCode}; 7 + use matchbox_protocol::PeerId; 8 + use matchbox_signaling::{ 9 + SignalingError, SignalingState, 10 + common_logic::{self, StateObj}, 11 + }; 12 + use tokio::sync::mpsc::UnboundedSender; 13 + use tokio_util::sync::CancellationToken; 14 + 15 + pub type RoomId = String; 16 + pub type Sender = UnboundedSender<Result<Message, AxumError>>; 17 + 18 + #[derive(Debug, Clone)] 19 + struct Match { 20 + pub open_lobby: bool, 21 + cancel: CancellationToken, 22 + pub players: HashSet<PeerId>, 23 + } 24 + 25 + #[derive(Debug, Clone)] 26 + struct Peer { 27 + pub room: RoomId, 28 + sender: Sender, 29 + } 30 + 31 + impl Match { 32 + pub fn new() -> Self { 33 + Self { 34 + open_lobby: true, 35 + cancel: CancellationToken::new(), 36 + players: HashSet::with_capacity(10), 37 + } 38 + } 39 + } 40 + 41 + #[derive(Default, Debug, Clone)] 42 + pub struct ServerState { 43 + waiting_clients: StateObj<HashMap<SocketAddr, (RoomId, bool)>>, 44 + queued_clients: StateObj<HashMap<PeerId, (RoomId, bool)>>, 45 + matches: StateObj<HashMap<RoomId, Match>>, 46 + clients: StateObj<HashMap<PeerId, Peer>>, 47 + } 48 + 49 + impl SignalingState for ServerState {} 50 + 51 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 52 + pub enum RoomError { 53 + /// Room already exists 54 + Exists, 55 + /// Room was not found 56 + NotFound, 57 + } 58 + 59 + impl From<RoomError> for StatusCode { 60 + fn from(val: RoomError) -> Self { 61 + match val { 62 + RoomError::Exists => StatusCode::CONFLICT, 63 + RoomError::NotFound => StatusCode::NOT_FOUND, 64 + } 65 + } 66 + } 67 + 68 + impl ServerState { 69 + fn add_client(&mut self, origin: SocketAddr, code: RoomId, host: bool) { 70 + self.waiting_clients 71 + .lock() 72 + .unwrap() 73 + .insert(origin, (code.clone(), host)); 74 + } 75 + 76 + pub fn room_is_open(&self, room_id: &str) -> bool { 77 + self.matches 78 + .lock() 79 + .unwrap() 80 + .get(room_id) 81 + .is_some_and(|m| m.open_lobby) 82 + } 83 + 84 + /// Mark a match as started, disallowing others from joining 85 + pub fn mark_started(&mut self, room: &RoomId) { 86 + if let Some(mat) = self.matches.lock().unwrap().get_mut(room) { 87 + mat.open_lobby = false; 88 + } 89 + } 90 + 91 + /// Create a new room with the given code, should be called when someone wants to host a game. 92 + /// Returns false if a room with that code already exists. 93 + fn create_room(&mut self, origin: SocketAddr, code: RoomId) -> bool { 94 + let mut matches = self.matches.lock().unwrap(); 95 + if matches.contains_key(&code) { 96 + false 97 + } else { 98 + matches.insert(code.clone(), Match::new()); 99 + drop(matches); 100 + self.add_client(origin, code, true); 101 + true 102 + } 103 + } 104 + 105 + /// Try to join a room by a code, returns `true` if successful 106 + fn try_join_room(&mut self, origin: SocketAddr, code: RoomId) -> bool { 107 + if self.room_is_open(&code) { 108 + self.waiting_clients 109 + .lock() 110 + .unwrap() 111 + .insert(origin, (code, false)); 112 + true 113 + } else { 114 + false 115 + } 116 + } 117 + 118 + /// Try to create / join a room 119 + pub fn handle_room( 120 + &mut self, 121 + create: bool, 122 + origin: SocketAddr, 123 + code: RoomId, 124 + ) -> Result<(), RoomError> { 125 + match create { 126 + true => match self.create_room(origin, code) { 127 + true => Ok(()), 128 + false => Err(RoomError::Exists), 129 + }, 130 + false => match self.try_join_room(origin, code) { 131 + true => Ok(()), 132 + false => Err(RoomError::NotFound), 133 + }, 134 + } 135 + } 136 + 137 + /// Assign a peer an id 138 + pub fn assign_peer_id(&mut self, origin: SocketAddr, peer_id: PeerId) { 139 + let target_room = self 140 + .waiting_clients 141 + .lock() 142 + .unwrap() 143 + .remove(&origin) 144 + .expect("origin not waiting?"); 145 + 146 + self.queued_clients 147 + .lock() 148 + .unwrap() 149 + .insert(peer_id, target_room); 150 + } 151 + 152 + /// Add a peer to a room, returns other peers in the match currently 153 + pub fn add_peer( 154 + &mut self, 155 + peer_id: PeerId, 156 + sender: Sender, 157 + ) -> (bool, CancellationToken, Vec<PeerId>) { 158 + let (target_room, host) = self 159 + .queued_clients 160 + .lock() 161 + .unwrap() 162 + .remove(&peer_id) 163 + .expect("peer not waiting?"); 164 + let mut matches = self.matches.lock().unwrap(); 165 + let mat = matches.get_mut(&target_room).expect("Room not found?"); 166 + let peers = mat.players.iter().copied().collect::<Vec<_>>(); 167 + mat.players.insert(peer_id); 168 + let cancel = mat.cancel.clone(); 169 + drop(matches); 170 + let peer = Peer { 171 + room: target_room, 172 + sender, 173 + }; 174 + self.clients.lock().unwrap().insert(peer_id, peer); 175 + (host, cancel, peers) 176 + } 177 + 178 + /// Disconnect a peer from a room. Automatically deletes the room if no peers remain. Returns 179 + /// the removed peer and the set of other peers in the room that need to be notified 180 + pub fn remove_peer(&mut self, peer_id: PeerId, host: bool) -> Option<Vec<PeerId>> { 181 + let removed_peer = self.clients.lock().unwrap().remove(&peer_id)?; 182 + 183 + let mut matches = self.matches.lock().unwrap(); 184 + 185 + let other_peers = matches 186 + .get_mut(&removed_peer.room) 187 + .map(|m| { 188 + m.players.remove(&peer_id); 189 + m.players.iter().copied().collect::<Vec<_>>() 190 + }) 191 + .unwrap_or_default(); 192 + 193 + if host { 194 + if let Some(mat) = matches.get_mut(&removed_peer.room) { 195 + // If we're host, disconnect everyone else 196 + mat.open_lobby = false; 197 + mat.cancel.cancel(); 198 + } 199 + } 200 + 201 + if other_peers.is_empty() { 202 + matches.remove(&removed_peer.room); 203 + } 204 + 205 + Some(other_peers) 206 + } 207 + 208 + pub fn try_send(&self, peer: PeerId, msg: Message) -> Result<(), SignalingError> { 209 + self.clients 210 + .lock() 211 + .unwrap() 212 + .get(&peer) 213 + .ok_or(SignalingError::UnknownPeer) 214 + .and_then(|peer| common_logic::try_send(&peer.sender, msg)) 215 + } 216 + } 217 + 218 + #[cfg(test)] 219 + mod tests { 220 + use std::net::{IpAddr, Ipv4Addr}; 221 + 222 + use uuid::Uuid; 223 + 224 + use super::*; 225 + 226 + const fn origin(p: u16) -> SocketAddr { 227 + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p) 228 + } 229 + 230 + const fn peer(p: u16) -> PeerId { 231 + PeerId(Uuid::from_u128(p as u128)) 232 + } 233 + 234 + fn dummy_sender() -> Sender { 235 + let (s, _) = tokio::sync::mpsc::unbounded_channel(); 236 + s 237 + } 238 + 239 + fn handle_assign_add(state: &mut ServerState, create: bool, code: &str, p: u16) { 240 + state 241 + .handle_room(create, origin(p), code.to_string()) 242 + .expect("Failed to handle room"); 243 + state.assign_peer_id(origin(p), peer(p)); 244 + state.add_peer(peer(p), dummy_sender()); 245 + } 246 + 247 + fn quick_create(state: &mut ServerState, code: &str, p: u16) { 248 + handle_assign_add(state, true, code, p); 249 + } 250 + 251 + fn quick_join(state: &mut ServerState, code: &str, p: u16) { 252 + handle_assign_add(state, false, code, p); 253 + } 254 + 255 + #[test] 256 + fn test_add_waiting_host() { 257 + let mut state = ServerState::default(); 258 + 259 + let code = "aaa"; 260 + 261 + state 262 + .handle_room(true, origin(1), code.to_string()) 263 + .expect("Could not create room"); 264 + assert_eq!( 265 + *state.waiting_clients.lock().unwrap(), 266 + HashMap::from_iter([(origin(1), (code.to_string(), true))]) 267 + ); 268 + assert!(state.room_is_open(code)) 269 + } 270 + 271 + #[test] 272 + fn test_add_waiting_player() { 273 + let mut state = ServerState::default(); 274 + 275 + let code = "aaaa"; 276 + 277 + quick_create(&mut state, code, 1); 278 + 279 + state 280 + .handle_room(false, origin(2), code.to_string()) 281 + .expect("Failed to join room"); 282 + assert_eq!( 283 + *state.waiting_clients.lock().unwrap(), 284 + HashMap::from_iter([(origin(2), (code.to_string(), false))]) 285 + ); 286 + } 287 + 288 + #[test] 289 + fn test_assign_id() { 290 + let mut state = ServerState::default(); 291 + 292 + let code = "aaa"; 293 + 294 + state 295 + .handle_room(true, origin(1), code.to_string()) 296 + .expect("Could not create room"); 297 + 298 + state.assign_peer_id(origin(1), peer(1)); 299 + 300 + assert!(state.waiting_clients.lock().unwrap().is_empty()); 301 + assert_eq!( 302 + *state.queued_clients.lock().unwrap(), 303 + HashMap::from_iter([(peer(1), (code.to_string(), true))]), 304 + ) 305 + } 306 + 307 + #[test] 308 + fn test_add_peer() { 309 + let mut state = ServerState::default(); 310 + 311 + let code = "aaa"; 312 + 313 + state 314 + .handle_room(true, origin(1), code.to_string()) 315 + .expect("Could not create room"); 316 + 317 + state.assign_peer_id(origin(1), peer(1)); 318 + 319 + let (_, _, others) = state.add_peer(peer(1), dummy_sender()); 320 + 321 + assert!(state.waiting_clients.lock().unwrap().is_empty()); 322 + assert!(state.queued_clients.lock().unwrap().is_empty()); 323 + assert!(others.is_empty()); 324 + assert!( 325 + state 326 + .clients 327 + .lock() 328 + .unwrap() 329 + .get(&peer(1)) 330 + .is_some_and(|p| p.room == code) 331 + ); 332 + assert!( 333 + state 334 + .matches 335 + .lock() 336 + .unwrap() 337 + .get(&code.to_string()) 338 + .is_some_and(|m| m.players.contains(&peer(1))) 339 + ); 340 + } 341 + 342 + #[test] 343 + fn test_join_add_peer() { 344 + let mut state = ServerState::default(); 345 + 346 + let code = "abcd"; 347 + 348 + quick_create(&mut state, code, 1); 349 + 350 + state 351 + .handle_room(false, origin(2), code.to_string()) 352 + .expect("Failed to join"); 353 + state.assign_peer_id(origin(2), peer(2)); 354 + 355 + let (_, _, others) = state.add_peer(peer(2), dummy_sender()); 356 + 357 + assert_eq!(others, vec![peer(1)]); 358 + assert!( 359 + state 360 + .clients 361 + .lock() 362 + .unwrap() 363 + .get(&peer(2)) 364 + .is_some_and(|p| p.room == code) 365 + ); 366 + assert!( 367 + state 368 + .matches 369 + .lock() 370 + .unwrap() 371 + .get(&code.to_string()) 372 + .is_some_and(|m| m.players.contains(&peer(1)) && m.players.contains(&peer(2))) 373 + ); 374 + } 375 + 376 + #[test] 377 + fn test_player_leave() { 378 + let mut state = ServerState::default(); 379 + 380 + let code = "asdfasdfasdfasdf"; 381 + 382 + quick_create(&mut state, code, 1); 383 + quick_join(&mut state, code, 2); 384 + 385 + let others = state.remove_peer(peer(2), false); 386 + 387 + assert_eq!(others, Some(vec![peer(1)])); 388 + assert!( 389 + state 390 + .matches 391 + .lock() 392 + .unwrap() 393 + .get(&code.to_string()) 394 + .is_some_and(|m| m.players.contains(&peer(1)) && !m.players.contains(&peer(2))) 395 + ); 396 + assert!(!state.clients.lock().unwrap().contains_key(&peer(2))); 397 + } 398 + 399 + #[test] 400 + fn test_player_leave_only_one() { 401 + let mut state = ServerState::default(); 402 + 403 + let code = "asdfasdfasdfasdf"; 404 + 405 + quick_create(&mut state, code, 1); 406 + 407 + let others = state.remove_peer(peer(1), true); 408 + 409 + assert!(others.is_some_and(|v| v.is_empty())); 410 + assert!(state.matches.lock().unwrap().is_empty()); 411 + assert!(state.clients.lock().unwrap().is_empty()); 412 + } 413 + 414 + #[test] 415 + fn test_host_leave_with_players() { 416 + let mut state = ServerState::default(); 417 + 418 + let code = "asdfasdfasdfasdf"; 419 + 420 + quick_create(&mut state, code, 1); 421 + quick_join(&mut state, code, 2); 422 + 423 + let others = state.remove_peer(peer(1), true); 424 + 425 + assert_eq!(others, Some(vec![peer(2)])); 426 + let matches = state.matches.lock().unwrap(); 427 + let mat = &matches[&code.to_string()]; 428 + assert!(mat.cancel.is_cancelled()); 429 + assert!(!mat.open_lobby); 430 + } 431 + 432 + #[test] 433 + fn test_join_no_match() { 434 + let mut state = ServerState::default(); 435 + 436 + let code = "asdfasdf"; 437 + 438 + let res = state.handle_room(false, origin(1), code.to_string()); 439 + assert_eq!(res, Err(RoomError::NotFound)); 440 + } 441 + 442 + #[test] 443 + fn test_create_exists() { 444 + let mut state = ServerState::default(); 445 + 446 + let code = "cdf"; 447 + 448 + quick_create(&mut state, code, 1); 449 + 450 + let res = state.handle_room(true, origin(2), code.to_string()); 451 + assert_eq!(res, Err(RoomError::Exists)); 452 + } 453 + 454 + #[test] 455 + fn test_join_started() { 456 + let mut state = ServerState::default(); 457 + 458 + let code = "qwerty"; 459 + 460 + quick_create(&mut state, code, 1); 461 + quick_join(&mut state, code, 2); 462 + 463 + state.mark_started(&code.to_string()); 464 + 465 + assert!( 466 + state 467 + .matches 468 + .lock() 469 + .unwrap() 470 + .get(&code.to_string()) 471 + .is_some_and(|m| !m.open_lobby) 472 + ); 473 + 474 + let res = state.handle_room(false, origin(3), code.to_string()); 475 + assert_eq!(res, Err(RoomError::NotFound)); 476 + } 477 + }
+99
manhunt-signaling/src/topology.rs
··· 1 + use async_trait::async_trait; 2 + use axum::extract::ws::Message; 3 + use futures::StreamExt; 4 + use log::{error, info, warn}; 5 + use matchbox_protocol::{JsonPeerEvent, PeerRequest}; 6 + use matchbox_signaling::{ 7 + ClientRequestError, NoCallbacks, SignalingTopology, WsStateMeta, common_logic::parse_request, 8 + }; 9 + 10 + use crate::state::ServerState; 11 + 12 + #[derive(Default, Debug)] 13 + pub struct ServerTopology; 14 + 15 + #[async_trait] 16 + impl SignalingTopology<NoCallbacks, ServerState> for ServerTopology { 17 + async fn state_machine(upgrade: WsStateMeta<NoCallbacks, ServerState>) { 18 + let WsStateMeta { 19 + peer_id, 20 + sender, 21 + mut receiver, 22 + mut state, 23 + .. 24 + } = upgrade; 25 + 26 + let (host, cancel, other_peers) = state.add_peer(peer_id, sender.clone()); 27 + 28 + let msg = Message::Text(JsonPeerEvent::NewPeer(peer_id).to_string().into()); 29 + 30 + for other_id in other_peers { 31 + if let Err(why) = state.try_send(other_id, msg.clone()) { 32 + error!("Failed to publish new peer event to {other_id}: {why:?}"); 33 + } 34 + } 35 + 36 + loop { 37 + let next_msg = tokio::select! { 38 + biased; 39 + 40 + _ = cancel.cancelled() => { 41 + info!("Disconnecting {peer_id} due to host disconnect"); 42 + break; 43 + } 44 + 45 + next = receiver.next() => { 46 + if let Some(next) = next { 47 + parse_request(next) 48 + } else { 49 + info!("Peer {peer_id} has disconnected"); 50 + break; 51 + } 52 + } 53 + }; 54 + 55 + let req = match next_msg { 56 + Ok(req) => req, 57 + Err(e) => match e { 58 + ClientRequestError::Axum(e) => { 59 + warn!("Peer {peer_id} encountered Axum error: {e:?}. Disconnecting..."); 60 + break; 61 + } 62 + ClientRequestError::Close => { 63 + info!("Peer {peer_id} closed connection"); 64 + break; 65 + } 66 + ClientRequestError::Json(_) | ClientRequestError::UnsupportedType(_) => { 67 + error!("Error parsing request from {peer_id}: {e:?}"); 68 + continue; // Recoverable, although may mean bad state? 69 + } 70 + }, 71 + }; 72 + 73 + if let PeerRequest::Signal { receiver, data } = req { 74 + let msg = Message::Text( 75 + JsonPeerEvent::Signal { 76 + sender: peer_id, 77 + data, 78 + } 79 + .to_string() 80 + .into(), 81 + ); 82 + if let Err(why) = state.try_send(receiver, msg) { 83 + error!("Error sending signaling message from {peer_id} to {receiver}: {why:?}"); 84 + } 85 + } // Other variant, PeerRequest::KeepAlive is just for a heartbeat, do nothing 86 + } 87 + 88 + let msg = Message::Text(JsonPeerEvent::PeerLeft(peer_id).to_string().into()); 89 + if let Some(other_peers) = state.remove_peer(peer_id, host) { 90 + for other_id in other_peers { 91 + if let Err(why) = state.try_send(other_id, msg.clone()) { 92 + warn!("Failed to alert {other_id} that {peer_id} has disconnected: {why:?}"); 93 + } 94 + } 95 + } else { 96 + warn!("Trying to remove peer {peer_id}, which doesn't exist?"); 97 + } 98 + } 99 + }