Live location tracking and playback for the game "manhunt"

Transport, lobby, high level APIs

bwc9876.dev 5dac1fd5 07cfb7bb

verified
+612 -31
+1 -1
backend/src/game/location.rs
··· 15 15 } 16 16 17 17 pub trait LocationService { 18 - fn get_loc(&self) -> Location; 18 + fn get_loc(&self) -> Option<Location>; 19 19 }
+22 -17
backend/src/game/mod.rs
··· 1 1 use chrono::{DateTime, Utc}; 2 - use events::GameEvent; 2 + pub use events::GameEvent; 3 + use matchbox_socket::PeerId; 3 4 use powerups::PowerUpType; 4 - use settings::GameSettings; 5 - use std::{collections::HashMap, fmt::Debug, hash::Hash, time::Duration}; 5 + pub use settings::GameSettings; 6 + use std::{collections::HashMap, fmt::Debug, hash::Hash, ops::Deref, sync::Arc, time::Duration}; 7 + use uuid::Uuid; 6 8 7 9 use tokio::{sync::RwLock, time::MissedTickBehavior}; 8 10 ··· 13 15 mod state; 14 16 mod transport; 15 17 16 - use location::LocationService; 18 + pub use location::{Location, LocationService}; 17 19 use state::GameState; 18 - use transport::Transport; 20 + pub use transport::Transport; 19 21 20 22 /// Type used to uniquely identify players in the game 21 23 pub trait PlayerId: 22 24 Debug + Hash + Ord + Eq + PartialEq + Send + Sync + Sized + Copy + Clone 23 25 { 24 26 } 27 + 28 + impl PlayerId for Uuid {} 29 + impl PlayerId for PeerId {} 25 30 26 31 /// Convenence alias for UTC DT 27 32 pub type UtcDT = DateTime<Utc>; ··· 29 34 /// Struct representing an ongoing game, handles communication with 30 35 /// other clients via [Transport], gets location with [LocationService], and provides high-level methods for 31 36 /// taking actions in the game. 32 - struct Game<Id: PlayerId, L: LocationService, T: Transport<Id>> { 37 + pub struct Game<Id: PlayerId, L: LocationService, T: Transport<Id>> { 33 38 state: RwLock<GameState<Id>>, 34 - transport: T, 39 + transport: Arc<T>, 35 40 location: L, 36 41 interval: Duration, 37 42 } ··· 40 45 pub fn new( 41 46 my_id: Id, 42 47 interval: Duration, 43 - random_seed: u64, 44 48 initial_caught_state: HashMap<Id, bool>, 45 49 settings: GameSettings, 46 - transport: T, 50 + transport: Arc<T>, 47 51 location: L, 48 52 ) -> Self { 49 - let state = GameState::<Id>::new(settings, my_id, random_seed, initial_caught_state); 53 + let state = GameState::<Id>::new(settings, my_id, initial_caught_state); 50 54 51 55 Self { 52 56 transport, ··· 139 143 let mut state = self.state.write().await; 140 144 141 145 // Push to location history 142 - let location = self.location.get_loc(); 143 - state.push_loc(location); 146 + if let Some(location) = self.location.get_loc() { 147 + state.push_loc(location); 148 + } 144 149 145 150 // Release Seekers? 146 151 if !state.seekers_released() && state.should_release_seekers(now) { ··· 248 253 struct MockLocation; 249 254 250 255 impl LocationService for MockLocation { 251 - fn get_loc(&self) -> location::Location { 252 - location::Location { 256 + fn get_loc(&self) -> Option<Location> { 257 + Some(location::Location { 253 258 lat: 0.0, 254 259 long: 0.0, 255 260 heading: None, 256 - } 261 + }) 257 262 } 258 263 } 259 264 ··· 295 300 let game = TestGame::new( 296 301 id as u32, 297 302 INTERVAL, 298 - 0, 299 303 initial_caught_state.clone(), 300 304 settings.clone(), 301 - transport, 305 + Arc::new(transport), 302 306 location, 303 307 ); 304 308 ··· 365 369 366 370 fn mk_settings() -> GameSettings { 367 371 GameSettings { 372 + random_seed: 0, 368 373 hiding_time_seconds: 1, 369 374 ping_start: PingStartCondition::Instant, 370 375 ping_minutes_interval: 1,
+17
backend/src/game/settings.rs
··· 17 17 #[derive(Debug, Clone, Serialize, Deserialize)] 18 18 /// Settings for the game, host is the only person able to change these 19 19 pub struct GameSettings { 20 + /// The random seed used for shared rng 21 + pub random_seed: u64, 20 22 /// The number of seconds to wait before seekers are allowed to go 21 23 pub hiding_time_seconds: u32, 22 24 /// Condition to wait for global pings to begin ··· 39 41 Bernoulli::from_ratio(self.powerup_chance, 100).unwrap() 40 42 } 41 43 } 44 + 45 + impl Default for GameSettings { 46 + fn default() -> Self { 47 + Self { 48 + random_seed: rand::random_range(0..=u64::MAX), 49 + hiding_time_seconds: 60, 50 + ping_start: PingStartCondition::Players(2), 51 + ping_minutes_interval: 3, 52 + powerup_start: PingStartCondition::Minutes(5), 53 + powerup_chance: 25, 54 + powerup_minutes_cooldown: 5, 55 + powerup_locations: vec![], 56 + } 57 + } 58 + }
+3 -8
backend/src/game/state.rs
··· 95 95 } 96 96 97 97 impl<Id: PlayerId> GameState<Id> { 98 - pub fn new( 99 - settings: GameSettings, 100 - my_id: Id, 101 - random_seed: u64, 102 - initial_caught_state: HashMap<Id, bool>, 103 - ) -> Self { 104 - let mut rand = ChaCha20Rng::seed_from_u64(random_seed); 98 + pub fn new(settings: GameSettings, my_id: Id, initial_caught_state: HashMap<Id, bool>) -> Self { 99 + let mut rand = ChaCha20Rng::seed_from_u64(settings.random_seed); 105 100 let increment = rand.random_range(-100..100); 106 101 107 102 Self { ··· 112 107 caught_state: initial_caught_state, 113 108 available_powerup: None, 114 109 powerup_bernoulli: settings.get_powerup_bernoulli(), 110 + shared_random_state: settings.random_seed, 115 111 settings, 116 112 last_global_ping: None, 117 113 last_powerup_spawn: None, 118 114 location_history: Vec::with_capacity(30), 119 115 held_powerup: None, 120 116 shared_random_increment: increment, 121 - shared_random_state: random_seed, 122 117 } 123 118 } 124 119
+1 -3
backend/src/game/transport.rs
··· 1 - use futures::{stream::FuturesUnordered, StreamExt}; 2 - 3 - use super::{events::GameEvent, PlayerId, UtcDT}; 1 + use super::{events::GameEvent, PlayerId}; 4 2 5 3 pub trait Transport<Id: PlayerId> { 6 4 /// Receive an event
+141 -2
backend/src/lib.rs
··· 1 - #[allow(unused)] 2 1 mod game; 2 + mod lobby; 3 + mod location; 4 + mod profile; 5 + mod transport; 6 + 7 + use std::{sync::Arc, time::Duration}; 8 + 9 + use game::{Game as BaseGame, GameSettings}; 10 + use lobby::{Lobby, StartGameInfo}; 11 + use location::TauriLocation; 12 + use matchbox_socket::PeerId; 13 + use profile::PlayerProfile; 14 + use tauri::{AppHandle, Manager, State}; 15 + use tokio::sync::RwLock; 16 + use transport::MatchboxTransport; 17 + 18 + type Game = BaseGame<PeerId, TauriLocation, MatchboxTransport>; 19 + 20 + enum AppState { 21 + Setup, 22 + Menu(PlayerProfile), 23 + Lobby(Arc<Lobby>), 24 + Game(Arc<Game>), 25 + } 26 + 27 + type AppStateHandle = RwLock<AppState>; 28 + 29 + fn generate_join_code() -> String { 30 + // 5 character sequence of A-Z 31 + (0..5) 32 + .into_iter() 33 + .map(|_| (('A' as u8) + rand::random_range(0..26)) as char) 34 + .collect::<String>() 35 + } 36 + 37 + const GAME_TICK_RATE: Duration = Duration::from_secs(1); 38 + 39 + const fn server_url() -> &'static str { 40 + if let Some(url) = option_env!("APP_SERVER_URL") { 41 + url 42 + } else { 43 + "ws://localhost:3536" 44 + } 45 + } 46 + 47 + impl AppState { 48 + pub fn start_game(&mut self, app: AppHandle, my_id: PeerId, start: StartGameInfo) { 49 + match self { 50 + AppState::Lobby(lobby) => { 51 + let transport = lobby.clone_transport(); 52 + let location = TauriLocation::new(app.clone()); 53 + let game = Arc::new(Game::new( 54 + my_id, 55 + GAME_TICK_RATE, 56 + start.initial_caught_state, 57 + start.settings, 58 + transport, 59 + location, 60 + )); 61 + *self = AppState::Game(game.clone()); 62 + tokio::spawn(async move { 63 + game.main_loop().await; 64 + }); 65 + } 66 + _ => {} 67 + } 68 + } 69 + 70 + pub fn start_lobby( 71 + &mut self, 72 + join_code: Option<String>, 73 + app: AppHandle, 74 + settings: GameSettings, 75 + ) { 76 + match self { 77 + AppState::Menu(profile) => { 78 + let host = join_code.is_none(); 79 + let room_code = join_code.unwrap_or_else(generate_join_code); 80 + let app_after = app.clone(); 81 + let lobby = Arc::new(Lobby::new( 82 + server_url(), 83 + &room_code, 84 + app, 85 + host, 86 + profile.clone(), 87 + settings, 88 + )); 89 + *self = AppState::Lobby(lobby.clone()); 90 + tokio::spawn(async move { 91 + let (my_id, start) = lobby.open().await; 92 + let app_game = app_after.clone(); 93 + let state_handle = app_after.state::<AppStateHandle>(); 94 + let mut state = state_handle.write().await; 95 + state.start_game(app_game, my_id, start); 96 + }); 97 + } 98 + _ => {} 99 + } 100 + } 101 + } 102 + 103 + #[tauri::command] 104 + async fn go_to_lobby( 105 + app: AppHandle, 106 + join_code: Option<String>, 107 + settings: GameSettings, 108 + state: State<'_, AppStateHandle>, 109 + ) -> Result<(), String> { 110 + let mut state = state.write().await; 111 + state.start_lobby(join_code, app, settings); 112 + Ok(()) 113 + } 114 + 115 + #[tauri::command] 116 + async fn host_start_game(state: State<'_, AppStateHandle>) -> Result<(), String> { 117 + let state = state.read().await; 118 + match &*state { 119 + AppState::Lobby(lobby) => { 120 + lobby.start_game().await; 121 + Ok(()) 122 + } 123 + _ => Err("Invalid AppState".to_string()), 124 + } 125 + } 3 126 4 127 #[cfg_attr(mobile, tauri::mobile_entry_point)] 5 128 pub fn run() { 129 + let state = RwLock::new(AppState::Setup); 130 + 6 131 tauri::Builder::default() 132 + .manage(state) 7 133 .plugin(tauri_plugin_opener::init()) 8 - .invoke_handler(tauri::generate_handler![]) 134 + .plugin(tauri_plugin_geolocation::init()) 135 + .plugin(tauri_plugin_store::Builder::default().build()) 136 + .setup(|app| { 137 + let handle = app.handle().clone(); 138 + tokio::spawn(async move { 139 + if let Some(profile) = PlayerProfile::load_from_store(&handle) { 140 + let state_handle = handle.state::<AppStateHandle>(); 141 + let mut state = state_handle.write().await; 142 + *state = AppState::Menu(profile); 143 + } 144 + }); 145 + Ok(()) 146 + }) 147 + .invoke_handler(tauri::generate_handler![go_to_lobby]) 9 148 .run(tauri::generate_context!()) 10 149 .expect("error while running tauri application"); 11 150 }
+183
backend/src/lobby.rs
··· 1 + use std::{collections::HashMap, path::PathBuf, sync::Arc}; 2 + 3 + use matchbox_socket::PeerId; 4 + use serde::{Deserialize, Serialize}; 5 + use tauri::{path::BaseDirectory, AppHandle, Manager}; 6 + use tokio::sync::Mutex; 7 + 8 + use crate::{ 9 + game::GameSettings, 10 + profile::PlayerProfile, 11 + transport::{MatchboxTransport, TransportMessage}, 12 + }; 13 + 14 + #[derive(Debug, Clone, Serialize, Deserialize)] 15 + pub struct StartGameInfo { 16 + pub settings: GameSettings, 17 + pub initial_caught_state: HashMap<PeerId, bool>, 18 + } 19 + 20 + #[derive(Debug, Clone, Serialize, Deserialize)] 21 + pub enum LobbyMessage { 22 + /// Message sent on a new peer, to sync profiles 23 + PlayerSync(PlayerProfile), 24 + /// Message sent on a new peer from the host, to sync game settings 25 + HostPush(GameSettings), 26 + /// Host signals starting the game 27 + StartGame(StartGameInfo), 28 + /// A player has switched teams 29 + PlayerSwitch(bool), 30 + } 31 + 32 + #[derive(Serialize, Deserialize)] 33 + struct LobbyState { 34 + profiles: HashMap<PeerId, PlayerProfile>, 35 + join_code: String, 36 + /// True represents seeker, false hider 37 + teams: HashMap<PeerId, bool>, 38 + self_seeker: bool, 39 + settings: GameSettings, 40 + } 41 + 42 + pub struct Lobby { 43 + pfp_dir: PathBuf, 44 + is_host: bool, 45 + self_profile: PlayerProfile, 46 + state: Mutex<LobbyState>, 47 + transport: Arc<MatchboxTransport>, 48 + } 49 + 50 + impl Lobby { 51 + pub fn new( 52 + ws_url_base: &str, 53 + join_code: &str, 54 + app: AppHandle, 55 + host: bool, 56 + profile: PlayerProfile, 57 + settings: GameSettings, 58 + ) -> Self { 59 + let pfp_dir = app 60 + .path() 61 + .resolve("pfp_cache", BaseDirectory::Cache) 62 + .expect("Failed to get Cache Dir"); 63 + 64 + Self { 65 + pfp_dir, 66 + transport: Arc::new(MatchboxTransport::new(&format!( 67 + "{ws_url_base}/{join_code}" 68 + ))), 69 + is_host: host, 70 + self_profile: profile, 71 + state: Mutex::new(LobbyState { 72 + teams: HashMap::with_capacity(5), 73 + join_code: join_code.to_string(), 74 + profiles: HashMap::with_capacity(5), 75 + self_seeker: false, 76 + settings, 77 + }), 78 + } 79 + } 80 + 81 + pub fn clone_transport(&self) -> Arc<MatchboxTransport> { 82 + self.transport.clone() 83 + } 84 + 85 + /// Set self as seeker or hider 86 + pub async fn switch_teams(&self, seeker: bool) { 87 + let mut state = self.state.lock().await; 88 + state.self_seeker = seeker; 89 + drop(state); 90 + self.transport 91 + .send_transport_message( 92 + None, 93 + TransportMessage::Lobby(LobbyMessage::PlayerSwitch(seeker)), 94 + ) 95 + .await; 96 + } 97 + 98 + /// (Host) Update game settings 99 + pub async fn update_settings(&self, new_settings: GameSettings) { 100 + if self.is_host { 101 + let mut state = self.state.lock().await; 102 + state.settings = new_settings.clone(); 103 + drop(state); 104 + let msg = TransportMessage::Lobby(LobbyMessage::HostPush(new_settings)); 105 + self.transport.send_transport_message(None, msg).await; 106 + } 107 + } 108 + 109 + /// (Host) Start the game 110 + pub async fn start_game(&self) { 111 + if self.is_host { 112 + if let Some(my_id) = self.transport.get_my_id().await { 113 + let mut state = self.state.lock().await; 114 + let seeker = state.self_seeker; 115 + state.teams.insert(my_id, seeker); 116 + let start_game_info = StartGameInfo { 117 + settings: state.settings.clone(), 118 + initial_caught_state: state.teams.clone(), 119 + }; 120 + let msg = TransportMessage::Lobby(LobbyMessage::StartGame(start_game_info)); 121 + self.transport.send_transport_message(None, msg).await; 122 + } 123 + } 124 + } 125 + 126 + pub async fn open(&self) -> (PeerId, StartGameInfo) { 127 + let transport_inner = self.transport.clone(); 128 + tokio::spawn(async move { transport_inner.transport_loop().await }); 129 + 130 + loop { 131 + if let Some((peer, msg)) = self.transport.recv_transport_message().await { 132 + match msg { 133 + TransportMessage::Game(game_event) => { 134 + eprintln!("Peer {peer:?} sent a GameEvent: {game_event:?}"); 135 + } 136 + TransportMessage::Lobby(lobby_message) => match lobby_message { 137 + LobbyMessage::PlayerSync(player_profile) => { 138 + let mut state = self.state.lock().await; 139 + state.profiles.insert(peer, player_profile); 140 + } 141 + LobbyMessage::HostPush(game_settings) => { 142 + let mut state = self.state.lock().await; 143 + state.settings = game_settings; 144 + } 145 + LobbyMessage::StartGame(start_game_info) => { 146 + break ( 147 + self.transport 148 + .get_my_id() 149 + .await 150 + .expect("Error getting self ID"), 151 + start_game_info, 152 + ); 153 + } 154 + LobbyMessage::PlayerSwitch(seeker) => { 155 + let mut state = self.state.lock().await; 156 + state.teams.insert(peer, seeker); 157 + } 158 + }, 159 + TransportMessage::PeerConnect => { 160 + let msg = LobbyMessage::PlayerSync(self.self_profile.clone()); 161 + let mut state = self.state.lock().await; 162 + state.teams.insert(peer, false); 163 + drop(state); 164 + let msg = TransportMessage::Lobby(msg); 165 + self.transport.send_transport_message(Some(peer), msg).await; 166 + if self.is_host { 167 + let state = self.state.lock().await; 168 + let msg = LobbyMessage::HostPush(state.settings.clone()); 169 + drop(state); 170 + let msg = TransportMessage::Lobby(msg); 171 + self.transport.send_transport_message(Some(peer), msg).await; 172 + } 173 + } 174 + TransportMessage::PeerDisconnect => { 175 + let mut state = self.state.lock().await; 176 + state.profiles.remove(&peer); 177 + state.teams.remove(&peer); 178 + } 179 + } 180 + } 181 + } 182 + } 183 + }
+38
backend/src/location.rs
··· 1 + use tauri::AppHandle; 2 + use tauri_plugin_geolocation::{GeolocationExt, PositionOptions}; 3 + 4 + use crate::game::{Location, LocationService}; 5 + 6 + pub struct TauriLocation(AppHandle); 7 + 8 + impl TauriLocation { 9 + pub fn new(app: AppHandle) -> Self { 10 + Self(app) 11 + } 12 + } 13 + 14 + const OPTIONS: PositionOptions = PositionOptions { 15 + enable_high_accuracy: true, 16 + timeout: 10000, // Unused in our case, set to default 17 + maximum_age: 2000, 18 + }; 19 + 20 + impl LocationService for TauriLocation { 21 + fn get_loc(&self) -> Option<Location> { 22 + match self.0.geolocation().get_current_position(Some(OPTIONS)) { 23 + Ok(pos) => { 24 + let coords = pos.coords; 25 + let loc = Location { 26 + lat: coords.latitude, 27 + long: coords.longitude, 28 + heading: coords.heading, 29 + }; 30 + Some(loc) 31 + } 32 + Err(why) => { 33 + eprintln!("Failed to get loc: {why:?}"); 34 + None 35 + } 36 + } 37 + } 38 + }
+36
backend/src/profile.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use tauri::AppHandle; 3 + use tauri_plugin_store::StoreExt; 4 + 5 + #[derive(Clone, Debug, Serialize, Deserialize)] 6 + pub struct PlayerProfile { 7 + display_name: String, 8 + pfp_base64: Option<String>, 9 + } 10 + 11 + const STORE_NAME: &str = "profile.json"; 12 + 13 + impl PlayerProfile { 14 + pub fn has_pfp(&self) -> bool { 15 + self.pfp_base64.is_some() 16 + } 17 + 18 + pub fn load_from_store(app: &AppHandle) -> Option<Self> { 19 + let store = app.store(STORE_NAME).expect("Couldn't Create Store"); 20 + 21 + let profile = store 22 + .get("profile") 23 + .and_then(|v| serde_json::from_value::<Self>(v).ok()); 24 + 25 + store.close_resource(); 26 + 27 + profile 28 + } 29 + 30 + pub fn write_to_store(&self, app: &AppHandle) { 31 + let store = app.store(STORE_NAME).expect("Couldn't create store"); 32 + 33 + let value = serde_json::to_value(self.clone()).expect("Failed to serialize"); 34 + store.set("profile", value); 35 + } 36 + }
+170
backend/src/transport.rs
··· 1 + use std::{collections::HashSet, time::Duration}; 2 + 3 + use futures::{FutureExt, SinkExt}; 4 + use matchbox_socket::{PeerId, PeerState, WebRtcSocket}; 5 + use serde::{Deserialize, Serialize}; 6 + use tokio::sync::{Mutex, RwLock}; 7 + 8 + use crate::{ 9 + game::{GameEvent, Transport}, 10 + lobby::LobbyMessage, 11 + }; 12 + 13 + #[derive(Debug, Serialize, Deserialize, Clone)] 14 + pub enum TransportMessage { 15 + /// Message related to the actual game 16 + Game(GameEvent<PeerId>), 17 + /// Message related to the pre-game lobby 18 + Lobby(LobbyMessage), 19 + /// Internal message when peer connects 20 + PeerConnect, 21 + /// Internal message when peer disconnects 22 + PeerDisconnect, 23 + } 24 + 25 + type OutgoingMsgPair = (Option<PeerId>, TransportMessage); 26 + type OutgoingQueueSender = tokio::sync::mpsc::Sender<OutgoingMsgPair>; 27 + type OutgoingQueueReceiver = tokio::sync::mpsc::Receiver<OutgoingMsgPair>; 28 + 29 + type IncomingMsgPair = (PeerId, TransportMessage); 30 + type IncomingQueueSender = tokio::sync::mpsc::Sender<IncomingMsgPair>; 31 + type IncomingQueueReceiver = tokio::sync::mpsc::Receiver<IncomingMsgPair>; 32 + 33 + pub struct MatchboxTransport { 34 + ws_url: String, 35 + incoming: (IncomingQueueSender, Mutex<IncomingQueueReceiver>), 36 + outgoing: (OutgoingQueueSender, Mutex<OutgoingQueueReceiver>), 37 + my_id: RwLock<Option<PeerId>>, 38 + } 39 + 40 + impl MatchboxTransport { 41 + pub fn new(ws_url: &str) -> Self { 42 + let (itx, irx) = tokio::sync::mpsc::channel(15); 43 + let (otx, orx) = tokio::sync::mpsc::channel(15); 44 + 45 + Self { 46 + ws_url: ws_url.to_string(), 47 + incoming: (itx, Mutex::new(irx)), 48 + outgoing: (otx, Mutex::new(orx)), 49 + my_id: RwLock::new(None), 50 + } 51 + } 52 + 53 + pub async fn send_transport_message(&self, peer: Option<PeerId>, msg: TransportMessage) { 54 + self.outgoing 55 + .0 56 + .send((peer, msg)) 57 + .await 58 + .expect("Failed to add to outgoing queue"); 59 + } 60 + 61 + pub async fn recv_transport_message(&self) -> Option<IncomingMsgPair> { 62 + let mut incoming_rx = self.incoming.1.lock().await; 63 + incoming_rx.recv().await 64 + } 65 + 66 + pub async fn get_my_id(&self) -> Option<PeerId> { 67 + *self.my_id.read().await 68 + } 69 + 70 + pub async fn transport_loop(&self) { 71 + let (mut socket, loop_fut) = WebRtcSocket::new_reliable(&self.ws_url); 72 + 73 + let loop_fut = loop_fut.fuse(); 74 + tokio::pin!(loop_fut); 75 + 76 + let mut all_peers = HashSet::<PeerId>::with_capacity(20); 77 + let mut my_id = None; 78 + 79 + let mut timer = tokio::time::interval(Duration::from_millis(100)); 80 + 81 + loop { 82 + for (peer, state) in socket.update_peers() { 83 + let msg = match state { 84 + PeerState::Connected => { 85 + all_peers.insert(peer); 86 + TransportMessage::PeerConnect 87 + } 88 + PeerState::Disconnected => { 89 + all_peers.remove(&peer); 90 + TransportMessage::PeerDisconnect 91 + } 92 + }; 93 + self.incoming 94 + .0 95 + .send((peer, msg)) 96 + .await 97 + .expect("Failed to push to incoming queue"); 98 + } 99 + 100 + for (peer, data) in socket.channel_mut(0).receive() { 101 + if let Ok(msg) = rmp_serde::from_slice(&data) { 102 + self.incoming 103 + .0 104 + .send((peer, msg)) 105 + .await 106 + .expect("Failed to push to incoming queue"); 107 + } 108 + } 109 + 110 + if my_id.is_none() { 111 + if let Some(new_id) = socket.id() { 112 + my_id = Some(new_id); 113 + *self.my_id.write().await = Some(new_id); 114 + } 115 + } 116 + 117 + let mut outgoing_rx = self.outgoing.1.lock().await; 118 + 119 + tokio::select! { 120 + _ = timer.tick() => { 121 + // Transport Tick 122 + continue; 123 + } 124 + 125 + Some((peer, msg)) = outgoing_rx.recv() => { 126 + let encoded = rmp_serde::to_vec(&msg).unwrap(); 127 + 128 + if let Some(peer) = peer { 129 + let channel = socket.channel_mut(0); 130 + let data = encoded.into_boxed_slice(); 131 + channel.send(data, peer); 132 + } else { 133 + // Send to self as well 134 + if let Some(myself) = my_id { 135 + self.incoming.0.send((myself, msg)).await.expect("Failed to push to incoming queue"); 136 + } 137 + let channel = socket.channel_mut(0); 138 + 139 + for peer in all_peers.iter() { 140 + // TODO: Any way around having to clone here? 141 + let data = encoded.clone().into_boxed_slice(); 142 + channel.send(data, *peer); 143 + } 144 + } 145 + } 146 + 147 + _ = &mut loop_fut => { 148 + // Break on disconnect 149 + break; 150 + } 151 + } 152 + } 153 + } 154 + } 155 + 156 + impl Transport<PeerId> for MatchboxTransport { 157 + async fn receive_message(&self) -> Option<GameEvent<PeerId>> { 158 + self.recv_transport_message() 159 + .await 160 + .and_then(|(_, msg)| match msg { 161 + TransportMessage::Game(game_event) => Some(game_event), 162 + _ => None, 163 + }) 164 + } 165 + 166 + async fn send_message(&self, msg: GameEvent<PeerId>) { 167 + let msg = TransportMessage::Game(msg); 168 + self.send_transport_message(None, msg).await; 169 + } 170 + }