A RPi Pico powered Lightning Detector
1use embassy_futures::select::select;
2use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter};
3use embassy_sync::{
4 blocking_mutex::raw::NoopRawMutex, lazy_lock::LazyLock, mutex::Mutex, once_lock::OnceLock,
5};
6use sachy_fmt::unwrap;
7use snow::{Builder, Keypair, TransportState, params::NoiseParams};
8
9use crate::{
10 constants::NOISE_PSK, errors::PicoError, rpc::RpcServer, updates::UpdateConnection,
11 utils::try_buffer,
12};
13
14pub static NOISE_PROTO: &str = "Noise_XXpsk3_25519_ChaChaPoly_BLAKE2s";
15
16static PARAMS: LazyLock<NoiseParams> =
17 LazyLock::new(|| unwrap!(NOISE_PROTO.parse(), "Unable to parse Noise proto schema"));
18
19static LOCAL_PRIVATE_KEY: OnceLock<Keypair> = OnceLock::new();
20
21pub struct NoiseSession {
22 transport: Mutex<NoopRawMutex, TransportState>,
23}
24
25impl NoiseSession {
26 pub async fn initialize<'device>(tcp: &mut TcpSocket<'device>) -> Result<Self, PicoError> {
27 let state = noise_handshake(tcp).await?;
28
29 Ok(Self {
30 transport: Mutex::new(state),
31 })
32 }
33
34 pub async fn run<'device>(self, tcp: &mut TcpSocket<'device>) {
35 let (reader, writer) = tcp.split();
36
37 select(self.read_loop(reader), self.write_loop(writer)).await;
38 }
39
40 async fn read_loop<'device>(&self, mut reader: TcpReader<'device>) {
41 let mut buffer = unwrap!(try_buffer(8192));
42
43 let (packet_buf, msg_buf) = buffer.split_at_mut(4096);
44
45 loop {
46 let Ok(received) = noise_recv(&mut reader, packet_buf).await else {
47 break;
48 };
49
50 if let Ok(msg) = self.transport.lock().await.read_message(received, msg_buf)
51 && let Ok(req) = striker_proto::receive_request(&mut msg_buf[..msg])
52 {
53 let Some((resp, resp_tx)) = RpcServer::handle_request(req)
54 .await
55 .zip(UpdateConnection::can_update())
56 else {
57 break;
58 };
59 resp_tx.try_send(resp).ok();
60 }
61 }
62 }
63
64 async fn write_loop<'device>(&self, mut writer: TcpWriter<'device>) {
65 let outgoing = UpdateConnection::get_receiver();
66
67 let mut buffer = unwrap!(try_buffer(8192));
68
69 let (msg_buf, enc_buf) = buffer.split_at_mut(4096);
70
71 loop {
72 let data = outgoing.receive().await;
73
74 let packet = unwrap!(striker_proto::send_response(data, msg_buf));
75 let written = unwrap!(
76 self.transport.lock().await.write_message(packet, enc_buf),
77 "Payload too big"
78 );
79
80 if noise_send(&mut writer, &enc_buf[..written]).await.is_err() {
81 break;
82 }
83
84 if writer.flush().await.is_err() {
85 break;
86 }
87 }
88 }
89}
90
91async fn noise_handshake<'device>(
92 tcp: &mut TcpSocket<'device>,
93) -> Result<TransportState, PicoError> {
94 let builder = Builder::new(PARAMS.get().clone());
95 let static_key = LOCAL_PRIVATE_KEY
96 .get_or_init(|| unwrap!(builder.generate_keypair(), "Failed to generate key pair"));
97
98 let mut noise = builder
99 .local_private_key(&static_key.private)?
100 .psk(3, &NOISE_PSK)?
101 .build_responder()?;
102
103 let (mut reader, mut writer) = tcp.split();
104
105 let mut buffer = try_buffer(4096)?;
106
107 let (payload, packet) = buffer.split_at_mut(2048);
108
109 noise.read_message(noise_recv(&mut reader, packet).await?, payload)?;
110
111 let len = noise.write_message(&[], payload)?;
112
113 noise_send(&mut writer, &payload[..len]).await?;
114
115 noise.read_message(noise_recv(&mut reader, packet).await?, payload)?;
116
117 let transport = noise.into_transport_mode()?;
118
119 Ok(transport)
120}
121
122/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload.
123async fn noise_recv<'device, 'buffer>(
124 stream: &mut TcpReader<'device>,
125 packet: &'buffer mut [u8],
126) -> Result<&'buffer [u8], PicoError> {
127 loop {
128 if let Some(written) = stream
129 .read_with(|buf| {
130 buf.split_at_checked(2).map_or((0, None), |(size, rest)| {
131 let mut msg_len_buf = [0u8; 2];
132 msg_len_buf.copy_from_slice(size);
133 let buf_size = usize::from(u16::from_be_bytes(msg_len_buf));
134 packet[..buf_size].copy_from_slice(&rest[..buf_size]);
135
136 (2 + buf_size, Some(buf_size))
137 })
138 })
139 .await?
140 {
141 return Ok(&packet[..written]);
142 }
143 }
144}
145
146async fn noise_send<'device>(
147 stream: &mut TcpWriter<'device>,
148 payload: &[u8],
149) -> Result<(), PicoError> {
150 let len = u16::try_from(payload.len())?;
151 while !stream
152 .write_with(|buf| {
153 buf.split_at_mut_checked(2)
154 .map_or((0, false), |(msg_size, rest)| {
155 msg_size.copy_from_slice(&len.to_be_bytes());
156 rest[..payload.len()].copy_from_slice(payload);
157 (2 + payload.len(), true)
158 })
159 })
160 .await?
161 {}
162 stream.flush().await?;
163 Ok(())
164}