···23## Ben
45-- [ ] Transport : Packet splitting
6-- [ ] Transport : Handle Errors
7-- [ ] Transport : Mark game started on client
0008- [ ] State : Event history tracking
9- [ ] State : Post game sync
10- [ ] API : Handling Profile Syncing
···23## Ben
45+- [x] Transport : Packet splitting
6+- [x] Transport : Handle Errors
7+- [x] Transport : Mark game started on client
8+- [x] API : Command to check if a game exists and is open for fast error checking
9+- [x] Transport : Switch to burst message processing for less time in the
10+ critical path
11- [ ] State : Event history tracking
12- [ ] State : Post game sync
13- [ ] API : Handling Profile Syncing
···17 /// Contains location history of the given player, used after the game to sync location
18 /// histories
19 PostGameSync(Id, Vec<Location>),
0000020}
···17 /// Contains location history of the given player, used after the game to sync location
18 /// histories
19 PostGameSync(Id, Vec<Location>),
20+ /// A player has been disconnected and removed from the game (because of error or otherwise).
21+ /// The player should be removed from all state
22+ DroppedPlayer(Id),
23+ /// The underlying transport has disconnected
24+ TransportDisconnect,
25}
+50-21
backend/src/game/mod.rs
···3use powerups::PowerUpType;
4pub use settings::GameSettings;
5use std::{collections::HashMap, sync::Arc, time::Duration};
06use uuid::Uuid;
78use tokio::{sync::RwLock, time::MissedTickBehavior};
···13mod settings;
14mod state;
15mod transport;
001617pub use location::{Location, LocationService};
18pub use state::GameState;
···31 transport: Arc<T>,
32 location: L,
33 interval: Duration,
034}
3536impl<L: LocationService, T: Transport> Game<L, T> {
···41 settings: GameSettings,
42 transport: Arc<T>,
43 location: L,
044 ) -> Self {
45 let state = GameState::new(settings, my_id, initial_caught_state);
4647 Self {
48 transport,
049 location,
50 interval,
51 state: RwLock::new(state),
···104 }
105 }
106107- async fn consume_event(&self, event: GameEvent) {
108- let mut state = self.state.write().await;
109-110 match event {
111 GameEvent::Ping(player_ping) => state.add_ping(player_ping),
112 GameEvent::ForcePing(target, display) => {
113 if target != state.id {
114- return;
115 }
116117 let ping = if let Some(display) = display {
···129 GameEvent::PlayerCaught(player) => {
130 state.mark_caught(player);
131 state.remove_ping(player);
000000132 }
133 GameEvent::PostGameSync(_, _locations) => {}
134 }
00135 }
136137 /// Perform a tick for a specific moment in time
138- async fn tick(&self, now: UtcDT) {
139- let mut state = self.state.write().await;
140-141 // Push to location history
142 if let Some(location) = self.location.get_loc() {
143 state.push_loc(location);
···186187 #[cfg(test)]
188 pub async fn force_tick(&self, now: UtcDT) {
189- self.tick(now).await;
00000190 }
191192 /// Main loop of the game, handles ticking and receiving messages from [Transport].
193- pub async fn main_loop(&self) {
194 let mut interval = tokio::time::interval(self.interval);
195196 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
197198- loop {
199 tokio::select! {
0200201- biased;
000000202203- Some(msg) = self.transport.receive_message() => {
204- self.consume_event(msg).await;
205- // TODO: Check all caught, end game
206 }
207208 _ = interval.tick() => {
209- let now = Utc::now();
210- self.tick(now).await;
211 }
212- };
213- }
0000214 }
215}
216···232 }
233234 impl Transport for MockTransport {
235- async fn receive_message(&self) -> Option<GameEvent> {
236 let mut rx = self.rx.lock().await;
237- rx.recv().await
00238 }
239240 async fn send_message(&self, msg: GameEvent) {
···301 settings.clone(),
302 Arc::new(transport),
303 location,
0304 );
305306 (id as u32, Arc::new(game))
···319 for game in self.games.values() {
320 let game = game.clone();
321 tokio::spawn(async move {
322- game.main_loop().await;
323 });
324 yield_now().await;
325 }
···3use powerups::PowerUpType;
4pub use settings::GameSettings;
5use std::{collections::HashMap, sync::Arc, time::Duration};
6+use tokio_util::sync::CancellationToken;
7use uuid::Uuid;
89use tokio::{sync::RwLock, time::MissedTickBehavior};
···14mod settings;
15mod state;
16mod transport;
17+18+use crate::prelude::*;
1920pub use location::{Location, LocationService};
21pub use state::GameState;
···34 transport: Arc<T>,
35 location: L,
36 interval: Duration,
37+ transport_cancel_token: CancellationToken,
38}
3940impl<L: LocationService, T: Transport> Game<L, T> {
···45 settings: GameSettings,
46 transport: Arc<T>,
47 location: L,
48+ transport_cancel_token: CancellationToken,
49 ) -> Self {
50 let state = GameState::new(settings, my_id, initial_caught_state);
5152 Self {
53 transport,
54+ transport_cancel_token,
55 location,
56 interval,
57 state: RwLock::new(state),
···110 }
111 }
112113+ async fn consume_event(&self, state: &mut GameState, event: GameEvent) -> Result {
00114 match event {
115 GameEvent::Ping(player_ping) => state.add_ping(player_ping),
116 GameEvent::ForcePing(target, display) => {
117 if target != state.id {
118+ return Ok(());
119 }
120121 let ping = if let Some(display) = display {
···133 GameEvent::PlayerCaught(player) => {
134 state.mark_caught(player);
135 state.remove_ping(player);
136+ }
137+ GameEvent::DroppedPlayer(id) => {
138+ state.remove_player(id);
139+ }
140+ GameEvent::TransportDisconnect => {
141+ bail!("Transport disconnected");
142 }
143 GameEvent::PostGameSync(_, _locations) => {}
144 }
145+146+ Ok(())
147 }
148149 /// Perform a tick for a specific moment in time
150+ async fn tick(&self, state: &mut GameState, now: UtcDT) {
00151 // Push to location history
152 if let Some(location) = self.location.get_loc() {
153 state.push_loc(location);
···196197 #[cfg(test)]
198 pub async fn force_tick(&self, now: UtcDT) {
199+ let mut state = self.state.write().await;
200+ self.tick(&mut state, now).await;
201+ }
202+203+ pub fn quit_game(&self) {
204+ self.transport_cancel_token.cancel();
205 }
206207 /// Main loop of the game, handles ticking and receiving messages from [Transport].
208+ pub async fn main_loop(&self) -> Result {
209 let mut interval = tokio::time::interval(self.interval);
210211 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
212213+ let res = 'game: loop {
214 tokio::select! {
215+ biased;
216217+ events = self.transport.receive_messages() => {
218+ let mut state = self.state.write().await;
219+ for event in events {
220+ if let Err(why) = self.consume_event(&mut state, event).await {
221+ break 'game Err(why);
222+ }
223+ }
224225+ if state.should_end() {
226+ break Ok(());
227+ }
228 }
229230 _ = interval.tick() => {
231+ let mut state = self.state.write().await;
232+ self.tick(&mut state, Utc::now()).await;
233 }
234+ }
235+ };
236+237+ self.transport_cancel_token.cancel();
238+239+ res
240 }
241}
242···258 }
259260 impl Transport for MockTransport {
261+ async fn receive_messages(&self) -> impl Iterator<Item = GameEvent> {
262 let mut rx = self.rx.lock().await;
263+ let mut buf = Vec::with_capacity(20);
264+ rx.recv_many(&mut buf, 20).await;
265+ buf.into_iter()
266 }
267268 async fn send_message(&self, msg: GameEvent) {
···329 settings.clone(),
330 Arc::new(transport),
331 location,
332+ CancellationToken::new(),
333 );
334335 (id as u32, Arc::new(game))
···348 for game in self.games.values() {
349 let game = game.clone();
350 tokio::spawn(async move {
351+ game.main_loop().await.expect("Game Start Fail");
352 });
353 yield_now().await;
354 }
+11
backend/src/game/state.rs
···240 self.pings.get(&player)
241 }
24200000243 /// Remove a ping from the map
244 pub fn remove_ping(&mut self, player: Id) -> Option<PlayerPing> {
245 self.pings.remove(&player)
···281 /// Create a [PlayerPing] with the latest location as another player
282 pub fn create_ping(&self, id: Id) -> Option<PlayerPing> {
283 self.get_loc().map(|loc| PlayerPing::new(*loc, id, self.id))
000000284 }
285286 /// Player has gotten a powerup, rolls to see which powerup and stores it
···240 self.pings.get(&player)
241 }
242243+ /// Check if the game should be ended (due to all players being caught)
244+ pub fn should_end(&self) -> bool {
245+ self.caught_state.values().all(|v| *v)
246+ }
247+248 /// Remove a ping from the map
249 pub fn remove_ping(&mut self, player: Id) -> Option<PlayerPing> {
250 self.pings.remove(&player)
···286 /// Create a [PlayerPing] with the latest location as another player
287 pub fn create_ping(&self, id: Id) -> Option<PlayerPing> {
288 self.get_loc().map(|loc| PlayerPing::new(*loc, id, self.id))
289+ }
290+291+ /// Remove a player from the game by their ID number
292+ pub fn remove_player(&mut self, id: Id) {
293+ self.pings.remove(&id);
294+ self.caught_state.remove(&id);
295 }
296297 /// Player has gotten a powerup, rolls to see which powerup and stores it
+1-1
backend/src/game/transport.rs
···23pub trait Transport {
4 /// Receive an event
5- async fn receive_message(&self) -> Option<GameEvent>;
6 /// Send an event
7 async fn send_message(&self, msg: GameEvent);
8}
···23pub trait Transport {
4 /// Receive an event
5+ async fn receive_messages(&self) -> impl Iterator<Item = GameEvent>;
6 /// Send an event
7 async fn send_message(&self, msg: GameEvent);
8}
···1-use std::{collections::HashSet, time::Duration};
000203use futures::FutureExt;
04use matchbox_socket::{PeerId, PeerState, WebRtcSocket};
5use serde::{Deserialize, Serialize};
6use tokio::sync::{Mutex, RwLock};
07use uuid::Uuid;
89use crate::{
10 game::{GameEvent, Transport},
11 lobby::LobbyMessage,
012};
130000000014#[derive(Debug, Serialize, Deserialize, Clone)]
15pub enum TransportMessage {
16 /// Message related to the actual game
17- Game(GameEvent),
018 /// Message related to the pre-game lobby
19- Lobby(LobbyMessage),
20 /// Internal message when peer connects
21 PeerConnect,
22 /// Internal message when peer disconnects
23 PeerDisconnect,
0000000000000000000000000000000000000000000000000000000000000000000024}
2526type OutgoingMsgPair = (Option<Uuid>, TransportMessage);
···59 .expect("Failed to add to outgoing queue");
60 }
6162- pub async fn recv_transport_message(&self) -> Option<IncomingMsgPair> {
63 let mut incoming_rx = self.incoming.1.lock().await;
64- incoming_rx.recv().await
0065 }
6667 pub async fn get_my_id(&self) -> Option<Uuid> {
68 *self.my_id.read().await
69 }
7071- pub async fn transport_loop(&self) {
00000000000000000000000000000000000072 let (mut socket, loop_fut) = WebRtcSocket::new_reliable(&self.ws_url);
7374 let loop_fut = loop_fut.fuse();
···7980 let mut timer = tokio::time::interval(Duration::from_millis(100));
8100082 loop {
83 for (peer, state) in socket.update_peers() {
84 let msg = match state {
···91 TransportMessage::PeerDisconnect
92 }
93 };
94- self.incoming
95- .0
96- .send((peer.0, msg))
97- .await
98- .expect("Failed to push to incoming queue");
99 }
100101- for (peer, data) in socket.channel_mut(0).receive() {
102- if let Ok(msg) = rmp_serde::from_slice(&data) {
103- self.incoming
104- .0
105- .send((peer.0, msg))
106- .await
107- .expect("Failed to push to incoming queue");
0000000000000000000000000000000000000000000108 }
109 }
11000000000000111 if my_id.is_none() {
112 if let Some(new_id) = socket.id() {
113 my_id = Some(new_id.0);
···117118 let mut outgoing_rx = self.outgoing.1.lock().await;
11900120 tokio::select! {
000000121 _ = timer.tick() => {
122 // Transport Tick
123 continue;
124 }
125126- Some((peer, msg)) = outgoing_rx.recv() => {
127- let encoded = rmp_serde::to_vec(&msg).unwrap();
128-129- if let Some(peer) = peer {
130- let channel = socket.channel_mut(0);
131- let data = encoded.into_boxed_slice();
132- channel.send(data, PeerId(peer));
133- } else {
134- // Send to self as well
135- if let Some(myself) = my_id {
136- self.incoming.0.send((myself, msg)).await.expect("Failed to push to incoming queue");
137- }
138- let channel = socket.channel_mut(0);
139-140- for peer in all_peers.iter() {
141- // TODO: Any way around having to clone here?
142- let data = encoded.clone().into_boxed_slice();
143- channel.send(data, *peer);
144- }
145- }
146 }
147148 _ = &mut loop_fut => {
···155}
156157impl Transport for MatchboxTransport {
158- async fn receive_message(&self) -> Option<GameEvent> {
159- self.recv_transport_message()
160 .await
161- .and_then(|(_, msg)| match msg {
162- TransportMessage::Game(game_event) => Some(game_event),
00163 _ => None,
164 })
165 }
166167 async fn send_message(&self, msg: GameEvent) {
168- let msg = TransportMessage::Game(msg);
169- self.send_transport_message(None, msg).await;
170 }
171}
···1+use std::{
2+ collections::{HashMap, HashSet},
3+ time::Duration,
4+};
56+use anyhow::Context;
7use futures::FutureExt;
8+use log::error;
9use matchbox_socket::{PeerId, PeerState, WebRtcSocket};
10use serde::{Deserialize, Serialize};
11use tokio::sync::{Mutex, RwLock};
12+use tokio_util::sync::CancellationToken;
13use uuid::Uuid;
1415use crate::{
16 game::{GameEvent, Transport},
17 lobby::LobbyMessage,
18+ prelude::*,
19};
2021+#[derive(Serialize, Deserialize, Debug, Clone)]
22+pub struct TransportChunk {
23+ id: u64,
24+ current: usize,
25+ total: usize,
26+ data: Vec<u8>,
27+}
28+29#[derive(Debug, Serialize, Deserialize, Clone)]
30pub enum TransportMessage {
31 /// Message related to the actual game
32+ /// Boxed for space reasons
33+ Game(Box<GameEvent>),
34 /// Message related to the pre-game lobby
35+ Lobby(Box<LobbyMessage>),
36 /// Internal message when peer connects
37 PeerConnect,
38 /// Internal message when peer disconnects
39 PeerDisconnect,
40+ /// Event sent when the transport gets disconnected, used to help consumers know when to stop
41+ /// consuming messages
42+ Disconnected,
43+ /// Internal message for packet chunking
44+ Seq(TransportChunk),
45+}
46+47+// Max packet size according to: https://github.com/johanhelsing/matchbox/issues/272
48+const MAX_PACKET_SIZE: usize = 65535;
49+50+// Align packets with a bit of extra space for [TransportMessage::Seq] header
51+const PACKET_ALIGNMENT: usize = MAX_PACKET_SIZE - 128;
52+53+impl TransportMessage {
54+ pub fn serialize(&self) -> Vec<u8> {
55+ rmp_serde::to_vec(self).expect("Failed to encode")
56+ }
57+58+ pub fn deserialize(data: &[u8]) -> Result<Self> {
59+ rmp_serde::from_slice(data).context("While deserializing message")
60+ }
61+62+ pub fn from_packets(packets: impl Iterator<Item = Box<[u8]>>) -> Result<Self> {
63+ let full_data = packets.flatten().collect::<Box<[u8]>>();
64+ Self::deserialize(&full_data).context("While decoding a multi-part message")
65+ }
66+67+ pub fn to_packets(&self) -> Vec<Vec<u8>> {
68+ let bytes = self.serialize();
69+ if bytes.len() > MAX_PACKET_SIZE {
70+ let id = rand::random_range(0..u64::MAX);
71+ let packets_needed = bytes.len().div_ceil(PACKET_ALIGNMENT);
72+ let rem = bytes.len() % PACKET_ALIGNMENT;
73+ let bytes = bytes.into_boxed_slice();
74+ (0..packets_needed)
75+ .map(|idx| {
76+ let start = PACKET_ALIGNMENT * idx;
77+ let end = if idx == packets_needed - 1 {
78+ start + rem
79+ } else {
80+ PACKET_ALIGNMENT * (idx + 1)
81+ };
82+ let data = bytes[start..end].to_vec();
83+ let chunk = TransportChunk {
84+ id,
85+ current: idx,
86+ total: packets_needed,
87+ data,
88+ };
89+ TransportMessage::Seq(chunk).serialize()
90+ })
91+ .collect()
92+ } else {
93+ vec![bytes]
94+ }
95+ }
96+}
97+98+impl From<GameEvent> for TransportMessage {
99+ fn from(v: GameEvent) -> Self {
100+ Self::Game(Box::new(v))
101+ }
102+}
103+104+impl From<LobbyMessage> for TransportMessage {
105+ fn from(v: LobbyMessage) -> Self {
106+ Self::Lobby(Box::new(v))
107+ }
108}
109110type OutgoingMsgPair = (Option<Uuid>, TransportMessage);
···143 .expect("Failed to add to outgoing queue");
144 }
145146+ pub async fn recv_transport_messages(&self) -> Vec<IncomingMsgPair> {
147 let mut incoming_rx = self.incoming.1.lock().await;
148+ let mut buffer = Vec::with_capacity(60);
149+ incoming_rx.recv_many(&mut buffer, 60).await;
150+ buffer
151 }
152153 pub async fn get_my_id(&self) -> Option<Uuid> {
154 *self.my_id.read().await
155 }
156157+ async fn push_incoming(&self, id: Uuid, msg: TransportMessage) {
158+ self.incoming
159+ .0
160+ .send((id, msg))
161+ .await
162+ .expect("Failed to push to incoming queue");
163+ }
164+165+ async fn handle_send(
166+ &self,
167+ socket: &mut WebRtcSocket,
168+ all_peers: &HashSet<PeerId>,
169+ messages: impl Iterator<Item = OutgoingMsgPair>,
170+ ) {
171+ let packets = messages.flat_map(|(id, msg)| {
172+ msg.to_packets()
173+ .into_iter()
174+ .map(move |packet| (id, packet.into_boxed_slice()))
175+ });
176+177+ for (peer, packet) in packets {
178+ if let Some(peer) = peer {
179+ let channel = socket.channel_mut(0);
180+ channel.send(packet, PeerId(peer));
181+ } else {
182+ let channel = socket.channel_mut(0);
183+184+ for peer in all_peers.iter() {
185+ // TODO: Any way around having to clone here?
186+ let data = packet.clone();
187+ channel.send(data, *peer);
188+ }
189+ }
190+ }
191+ }
192+193+ pub async fn transport_loop(&self, cancel: CancellationToken) {
194 let (mut socket, loop_fut) = WebRtcSocket::new_reliable(&self.ws_url);
195196 let loop_fut = loop_fut.fuse();
···201202 let mut timer = tokio::time::interval(Duration::from_millis(100));
203204+ let mut partial_packets =
205+ HashMap::<u64, (Uuid, HashMap<usize, Option<Vec<u8>>>)>::with_capacity(3);
206+207 loop {
208 for (peer, state) in socket.update_peers() {
209 let msg = match state {
···216 TransportMessage::PeerDisconnect
217 }
218 };
219+ self.push_incoming(peer.0, msg).await;
0000220 }
221222+ let messages = socket.channel_mut(0).receive();
223+224+ let mut messages = messages
225+ .into_iter()
226+ .filter_map(|(id, data)| {
227+ let msg = TransportMessage::deserialize(&data).ok();
228+229+ if let Some(TransportMessage::Seq(TransportChunk {
230+ id: multipart_id,
231+ current,
232+ total,
233+ data,
234+ })) = msg
235+ {
236+ if let Some((_, map)) = partial_packets.get_mut(&multipart_id) {
237+ map.insert(current, Some(data));
238+ } else {
239+ let mut map = HashMap::from_iter((0..total).map(|idx| (idx, None)));
240+ map.insert(current, Some(data));
241+ partial_packets.insert(multipart_id, (id.0, map));
242+ }
243+ None
244+ } else {
245+ msg.map(|msg| (id.0, msg))
246+ }
247+ })
248+ .collect::<Vec<_>>();
249+250+ let complete_messages = partial_packets
251+ .keys()
252+ .copied()
253+ .filter(|id| {
254+ partial_packets
255+ .get(id)
256+ .is_some_and(|(_, v)| v.values().all(Option::is_some))
257+ })
258+ .collect::<Vec<_>>();
259+260+ for id in complete_messages {
261+ let (peer, packet_map) = partial_packets.remove(&id).unwrap();
262+263+ let res = TransportMessage::from_packets(
264+ packet_map
265+ .into_values()
266+ .map(|v| v.unwrap().into_boxed_slice()),
267+ );
268+269+ match res {
270+ Ok(msg) => messages.push((peer, msg)),
271+ Err(why) => error!("Error receiving message: {why:?}"),
272 }
273 }
274275+ let push_iter = self
276+ .incoming
277+ .0
278+ .reserve_many(messages.len())
279+ .await
280+ .expect("Couldn't reserve space");
281+282+ for (sender, msg) in push_iter.zip(messages.into_iter()) {
283+ sender.send(msg);
284+ }
285+286 if my_id.is_none() {
287 if let Some(new_id) = socket.id() {
288 my_id = Some(new_id.0);
···292293 let mut outgoing_rx = self.outgoing.1.lock().await;
294295+ let mut buffer = Vec::with_capacity(30);
296+297 tokio::select! {
298+299+ _ = cancel.cancelled() => {
300+ socket.close();
301+302+ }
303+304 _ = timer.tick() => {
305 // Transport Tick
306 continue;
307 }
308309+ _ = outgoing_rx.recv_many(&mut buffer, 30) => {
310+ self.handle_send(&mut socket, &all_peers, buffer.drain(..)).await;
000000000000000000311 }
312313 _ = &mut loop_fut => {
···320}
321322impl Transport for MatchboxTransport {
323+ async fn receive_messages(&self) -> impl Iterator<Item = GameEvent> {
324+ self.recv_transport_messages()
325 .await
326+ .into_iter()
327+ .filter_map(|(id, msg)| match msg {
328+ TransportMessage::Game(game_event) => Some(*game_event),
329+ TransportMessage::PeerDisconnect => Some(GameEvent::DroppedPlayer(id)),
330 _ => None,
331 })
332 }
333334 async fn send_message(&self, msg: GameEvent) {
335+ self.send_transport_message(None, msg.into()).await;
0336 }
337}
+19
frontend/src/bindings.ts
···147 if (e instanceof Error) throw e;
148 else return { status: "error", error: e as any };
149 }
000000000000150 }
151};
152153/** user-defined events **/
154000000155/** user-defined constants **/
156157/** user-defined types **/
158159export type AppScreen = "Setup" | "Menu" | "Lobby" | "Game";
0160/**
161 * Settings for the game, host is the only person able to change these
162 */
···147 if (e instanceof Error) throw e;
148 else return { status: "error", error: e as any };
149 }
150+ },
151+ /**
152+ * (Screen: Menu) Check if a room code is valid to join, use this before starting a game
153+ * for faster error checking.
154+ */
155+ async checkRoomCode(code: string): Promise<Result<boolean, string>> {
156+ try {
157+ return { status: "ok", data: await TAURI_INVOKE("check_room_code", { code }) };
158+ } catch (e) {
159+ if (e instanceof Error) throw e;
160+ else return { status: "error", error: e as any };
161+ }
162 }
163};
164165/** user-defined events **/
166167+export const events = __makeEvents__<{
168+ changeScreen: ChangeScreen;
169+}>({
170+ changeScreen: "change-screen"
171+});
172+173/** user-defined constants **/
174175/** user-defined types **/
176177export type AppScreen = "Setup" | "Menu" | "Lobby" | "Game";
178+export type ChangeScreen = AppScreen;
179/**
180 * Settings for the game, host is the only person able to change these
181 */