Live location tracking and playback for the game "manhunt"
1use async_trait::async_trait;
2use axum::extract::ws::Message;
3use futures::StreamExt;
4use log::{error, info, warn};
5use matchbox_protocol::{JsonPeerEvent, PeerRequest};
6use matchbox_signaling::{
7 ClientRequestError, NoCallbacks, SignalingTopology, WsStateMeta, common_logic::parse_request,
8};
9
10use crate::state::ServerState;
11
12#[derive(Default, Debug)]
13pub struct ServerTopology;
14
15#[async_trait]
16impl SignalingTopology<NoCallbacks, ServerState> for ServerTopology {
17 async fn state_machine(upgrade: WsStateMeta<NoCallbacks, ServerState>) {
18 let WsStateMeta {
19 peer_id,
20 sender,
21 mut receiver,
22 mut state,
23 ..
24 } = upgrade;
25
26 let (host, cancel, other_peers) = state.add_peer(peer_id, sender.clone());
27
28 let msg = Message::Text(JsonPeerEvent::NewPeer(peer_id).to_string().into());
29
30 for other_id in other_peers {
31 if let Err(why) = state.try_send(other_id, msg.clone()) {
32 error!("Failed to publish new peer event to {other_id}: {why:?}");
33 }
34 }
35
36 loop {
37 let next_msg = tokio::select! {
38 biased;
39
40 _ = cancel.cancelled() => {
41 info!("Disconnecting {peer_id} due to host disconnect");
42 break;
43 }
44
45 next = receiver.next() => {
46 if let Some(next) = next {
47 parse_request(next)
48 } else {
49 info!("Peer {peer_id} has disconnected");
50 break;
51 }
52 }
53 };
54
55 let req = match next_msg {
56 Ok(req) => req,
57 Err(e) => match e {
58 ClientRequestError::Axum(e) => {
59 warn!("Peer {peer_id} encountered Axum error: {e:?}. Disconnecting...");
60 break;
61 }
62 ClientRequestError::Close => {
63 info!("Peer {peer_id} closed connection");
64 break;
65 }
66 ClientRequestError::Json(_) | ClientRequestError::UnsupportedType(_) => {
67 error!("Error parsing request from {peer_id}: {e:?}");
68 continue; // Recoverable, although may mean bad state?
69 }
70 },
71 };
72
73 if let PeerRequest::Signal { receiver, data } = req {
74 let msg = Message::Text(
75 JsonPeerEvent::Signal {
76 sender: peer_id,
77 data,
78 }
79 .to_string()
80 .into(),
81 );
82 if let Err(why) = state.try_send(receiver, msg) {
83 error!("Error sending signaling message from {peer_id} to {receiver}: {why:?}");
84 }
85 } // Other variant, PeerRequest::KeepAlive is just for a heartbeat, do nothing
86 }
87
88 let msg = Message::Text(JsonPeerEvent::PeerLeft(peer_id).to_string().into());
89 if let Some(other_peers) = state.remove_peer(peer_id, host) {
90 for other_id in other_peers {
91 if let Err(why) = state.try_send(other_id, msg.clone()) {
92 warn!("Failed to alert {other_id} that {peer_id} has disconnected: {why:?}");
93 }
94 }
95 } else {
96 warn!("Trying to remove peer {peer_id}, which doesn't exist?");
97 }
98 }
99}