Live location tracking and playback for the game "manhunt"
at ben/frontend 329 lines 11 kB view raw
1#![allow(clippy::result_large_err)] 2 3use manhunt_logic::{ 4 Game as BaseGame, GameSettings, Lobby as BaseLobby, Location, LocationService, PlayerProfile, 5 StartGameInfo, StateUpdateSender, 6}; 7use manhunt_test_shared::*; 8use manhunt_transport::{MatchboxTransport, request_room_code}; 9use std::{sync::Arc, time::Duration}; 10use tokio::{ 11 io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, 12 sync::{Mutex, mpsc}, 13}; 14 15struct DummyLocationService; 16 17impl LocationService for DummyLocationService { 18 fn get_loc(&self) -> Option<manhunt_logic::Location> { 19 Some(Location { 20 lat: 0.0, 21 long: 0.0, 22 heading: None, 23 }) 24 } 25} 26 27struct UpdateSender(mpsc::Sender<()>); 28 29impl StateUpdateSender for UpdateSender { 30 fn send_update(&self) { 31 let tx = self.0.clone(); 32 tokio::spawn(async move { 33 tx.send(()).await.expect("Failed to send"); 34 }); 35 } 36} 37 38type Game = BaseGame<DummyLocationService, MatchboxTransport, UpdateSender>; 39type Lobby = BaseLobby<MatchboxTransport, UpdateSender>; 40 41#[derive(Default)] 42enum DaemonScreen { 43 #[default] 44 PreConnect, 45 Lobby(Arc<Lobby>), 46 Game(Arc<Game>), 47} 48 49impl DaemonScreen { 50 pub fn as_update(&self) -> ScreenUpdate { 51 match self { 52 Self::PreConnect => ScreenUpdate::PreConnect, 53 Self::Game(_) => ScreenUpdate::Game, 54 Self::Lobby(_) => ScreenUpdate::Lobby, 55 } 56 } 57} 58 59type StateHandle = Arc<Mutex<DaemonState>>; 60 61struct DaemonState { 62 screen: DaemonScreen, 63 profile: PlayerProfile, 64 responses: mpsc::Sender<TestingResponse>, 65 updates: (mpsc::Sender<()>, Mutex<mpsc::Receiver<()>>), 66} 67 68impl DaemonState { 69 pub fn new(name: impl Into<String>, responses: mpsc::Sender<TestingResponse>) -> Self { 70 tokio::time::pause(); 71 let screen = DaemonScreen::default(); 72 let (tx, rx) = mpsc::channel(2); 73 Self { 74 screen, 75 responses, 76 profile: PlayerProfile { 77 display_name: name.into(), 78 pfp_base64: None, 79 }, 80 updates: (tx, Mutex::new(rx)), 81 } 82 } 83 84 async fn change_screen(&mut self, new_screen: DaemonScreen) { 85 let update = new_screen.as_update(); 86 self.screen = new_screen; 87 self.push_resp(update).await; 88 } 89 90 async fn lobby_loop(&self, handle: StateHandle) { 91 if let DaemonScreen::Lobby(lobby) = &self.screen { 92 let lobby = lobby.clone(); 93 tokio::spawn(async move { 94 let res = lobby.main_loop().await; 95 let handle2 = handle.clone(); 96 let mut state = handle.lock().await; 97 match res { 98 Ok(Some(start)) => { 99 state.start_game(handle2, start).await; 100 } 101 Ok(None) => { 102 state.change_screen(DaemonScreen::PreConnect).await; 103 } 104 Err(why) => { 105 state.push_resp(why).await; 106 state.change_screen(DaemonScreen::PreConnect).await; 107 } 108 } 109 }); 110 } 111 } 112 113 async fn game_loop(&self, handle: StateHandle) { 114 if let DaemonScreen::Game(game) = &self.screen { 115 let game = game.clone(); 116 tokio::spawn(async move { 117 let res = game.main_loop().await; 118 let mut state = handle.lock().await; 119 match res { 120 Ok(Some(history)) => { 121 state.push_resp(history).await; 122 } 123 Ok(None) => {} 124 Err(why) => { 125 state.push_resp(why).await; 126 } 127 } 128 state.change_screen(DaemonScreen::PreConnect).await; 129 }); 130 } 131 } 132 133 async fn push_resp(&self, resp: impl Into<TestingResponse>) { 134 self.responses 135 .send(resp.into()) 136 .await 137 .expect("Failed to push response"); 138 } 139 140 fn sender(&self) -> UpdateSender { 141 UpdateSender(self.updates.0.clone()) 142 } 143 144 const INTERVAL: Duration = Duration::from_secs(1); 145 146 async fn start_game(&mut self, handle: StateHandle, start: StartGameInfo) { 147 if let DaemonScreen::Lobby(lobby) = &self.screen { 148 let transport = lobby.clone_transport(); 149 let updates = self.sender(); 150 let location = DummyLocationService; 151 152 let game = Game::new(Self::INTERVAL, start, transport, location, updates); 153 154 self.change_screen(DaemonScreen::Game(Arc::new(game))).await; 155 self.game_loop(handle).await; 156 } 157 } 158 159 pub async fn create_lobby(&mut self, handle: StateHandle, settings: GameSettings) -> Result { 160 let sender = self.sender(); 161 162 let code = request_room_code() 163 .await 164 .context("Failed to get room code")?; 165 166 let lobby = Lobby::new(&code, true, self.profile.clone(), settings, sender) 167 .await 168 .context("Failed to start lobby")?; 169 170 self.change_screen(DaemonScreen::Lobby(lobby)).await; 171 self.lobby_loop(handle).await; 172 173 Ok(()) 174 } 175 176 pub async fn join_lobby(&mut self, handle: StateHandle, code: &str) -> Result { 177 let sender = self.sender(); 178 // TODO: Lobby should not require this on join, use an [Option]? 179 let settings = GameSettings::default(); 180 181 let lobby = Lobby::new(code, false, self.profile.clone(), settings, sender) 182 .await 183 .context("Failed to join lobby")?; 184 185 self.change_screen(DaemonScreen::Lobby(lobby)).await; 186 self.lobby_loop(handle).await; 187 188 Ok(()) 189 } 190 191 fn assert_screen(&self, expected: ScreenUpdate) -> Result<(), TestingResponse> { 192 if self.screen.as_update() == expected { 193 Ok(()) 194 } else { 195 Err(TestingResponse::WrongScreen) 196 } 197 } 198 199 async fn process_lobby_req(&mut self, req: LobbyRequest) { 200 if let DaemonScreen::Lobby(lobby) = &self.screen { 201 let lobby = lobby.clone(); 202 match req { 203 LobbyRequest::SwitchTeams(seeker) => lobby.switch_teams(seeker).await, 204 LobbyRequest::HostStartGame => lobby.start_game().await, 205 LobbyRequest::HostUpdateSettings(game_settings) => { 206 lobby.update_settings(game_settings).await 207 } 208 LobbyRequest::Leave => lobby.quit_lobby().await, 209 } 210 } 211 } 212 213 async fn process_game_req(&mut self, req: GameRequest) { 214 if let DaemonScreen::Game(game) = &self.screen { 215 let game = game.clone(); 216 match req { 217 GameRequest::NextTick => tokio::time::sleep(Self::INTERVAL).await, 218 GameRequest::MarkCaught => game.mark_caught().await, 219 GameRequest::GetPowerup => game.get_powerup().await, 220 GameRequest::UsePowerup => game.use_powerup().await, 221 GameRequest::ForcePowerup(power_up_type) => { 222 let mut state = game.lock_state().await; 223 state.force_set_powerup(power_up_type); 224 } 225 GameRequest::Quit => game.quit_game().await, 226 } 227 } 228 } 229 230 pub async fn process_req( 231 &mut self, 232 handle: StateHandle, 233 req: TestingRequest, 234 ) -> Result<(), TestingResponse> { 235 match req { 236 TestingRequest::StartLobby(game_settings) => { 237 self.assert_screen(ScreenUpdate::PreConnect)?; 238 self.create_lobby(handle, game_settings).await?; 239 } 240 TestingRequest::JoinLobby(code) => { 241 self.assert_screen(ScreenUpdate::PreConnect)?; 242 self.join_lobby(handle, &code).await?; 243 } 244 TestingRequest::LobbyReq(lobby_request) => { 245 self.assert_screen(ScreenUpdate::Lobby)?; 246 self.process_lobby_req(lobby_request).await; 247 } 248 TestingRequest::GameReq(game_request) => { 249 self.assert_screen(ScreenUpdate::Game)?; 250 self.process_game_req(game_request).await; 251 } 252 } 253 Ok(()) 254 } 255} 256 257use interprocess::local_socket::{ListenerOptions, tokio::prelude::*}; 258 259const CLI_MSG: &str = "Usage: manhunt-test-daemon SOCKET_NAME PLAYER_NAME"; 260 261#[tokio::main(flavor = "current_thread")] 262pub async fn main() -> Result { 263 let args = std::env::args().collect::<Vec<_>>(); 264 let raw_socket_name = args.get(1).cloned().expect(CLI_MSG); 265 let player_name = args.get(2).cloned().expect(CLI_MSG); 266 let socket_name = get_socket_name(raw_socket_name)?; 267 let opts = ListenerOptions::new().name(socket_name); 268 let listener = opts.create_tokio().context("Failed to bind to socket")?; 269 let (resp_tx, mut resp_rx) = mpsc::channel::<TestingResponse>(40); 270 271 let handle = Arc::new(Mutex::new(DaemonState::new(player_name, resp_tx))); 272 273 eprintln!("Testing Daemon Ready"); 274 275 'server: loop { 276 let res = tokio::select! { 277 res = listener.accept() => { 278 res 279 }, 280 Ok(_) = tokio::signal::ctrl_c() => { 281 break 'server; 282 } 283 }; 284 285 match res { 286 Ok(stream) => { 287 let mut recv = BufReader::new(&stream); 288 let mut send = &stream; 289 290 let mut buffer = String::with_capacity(256); 291 292 loop { 293 tokio::select! { 294 Ok(_) = tokio::signal::ctrl_c() => { 295 break 'server; 296 } 297 res = recv.read_line(&mut buffer) => { 298 match res { 299 Ok(0) => { 300 break; 301 } 302 Ok(_amnt) => { 303 let req = serde_json::from_str(&buffer).expect("Failed to parse"); 304 buffer.clear(); 305 let handle2 = handle.clone(); 306 let mut state = handle.lock().await; 307 if let Err(resp) = state.process_req(handle2, req).await { 308 let encoded = serde_json::to_vec(&resp).expect("Failed to encode"); 309 send.write_all(&encoded).await.expect("Failed to send"); 310 } 311 } 312 Err(why) => { 313 eprintln!("Read Error: {why:?}"); 314 } 315 } 316 } 317 Some(resp) = resp_rx.recv() => { 318 let encoded = serde_json::to_vec(&resp).expect("Failed to encode"); 319 send.write_all(&encoded).await.expect("Failed to send"); 320 } 321 } 322 } 323 } 324 Err(why) => eprintln!("Error from connection: {why:?}"), 325 } 326 } 327 328 Ok(()) 329}