A RPi Pico powered Lightning Detector
at main 164 lines 5.1 kB view raw
1use embassy_futures::select::select; 2use embassy_net::tcp::{TcpReader, TcpSocket, TcpWriter}; 3use embassy_sync::{ 4 blocking_mutex::raw::NoopRawMutex, lazy_lock::LazyLock, mutex::Mutex, once_lock::OnceLock, 5}; 6use sachy_fmt::unwrap; 7use snow::{Builder, Keypair, TransportState, params::NoiseParams}; 8 9use crate::{ 10 constants::NOISE_PSK, errors::PicoError, rpc::RpcServer, updates::UpdateConnection, 11 utils::try_buffer, 12}; 13 14pub static NOISE_PROTO: &str = "Noise_XXpsk3_25519_ChaChaPoly_BLAKE2s"; 15 16static PARAMS: LazyLock<NoiseParams> = 17 LazyLock::new(|| unwrap!(NOISE_PROTO.parse(), "Unable to parse Noise proto schema")); 18 19static LOCAL_PRIVATE_KEY: OnceLock<Keypair> = OnceLock::new(); 20 21pub struct NoiseSession { 22 transport: Mutex<NoopRawMutex, TransportState>, 23} 24 25impl NoiseSession { 26 pub async fn initialize<'device>(tcp: &mut TcpSocket<'device>) -> Result<Self, PicoError> { 27 let state = noise_handshake(tcp).await?; 28 29 Ok(Self { 30 transport: Mutex::new(state), 31 }) 32 } 33 34 pub async fn run<'device>(self, tcp: &mut TcpSocket<'device>) { 35 let (reader, writer) = tcp.split(); 36 37 select(self.read_loop(reader), self.write_loop(writer)).await; 38 } 39 40 async fn read_loop<'device>(&self, mut reader: TcpReader<'device>) { 41 let mut buffer = unwrap!(try_buffer(8192)); 42 43 let (packet_buf, msg_buf) = buffer.split_at_mut(4096); 44 45 loop { 46 let Ok(received) = noise_recv(&mut reader, packet_buf).await else { 47 break; 48 }; 49 50 if let Ok(msg) = self.transport.lock().await.read_message(received, msg_buf) 51 && let Ok(req) = striker_proto::receive_request(&mut msg_buf[..msg]) 52 { 53 let Some((resp, resp_tx)) = RpcServer::handle_request(req) 54 .await 55 .zip(UpdateConnection::can_update()) 56 else { 57 break; 58 }; 59 resp_tx.try_send(resp).ok(); 60 } 61 } 62 } 63 64 async fn write_loop<'device>(&self, mut writer: TcpWriter<'device>) { 65 let outgoing = UpdateConnection::get_receiver(); 66 67 let mut buffer = unwrap!(try_buffer(8192)); 68 69 let (msg_buf, enc_buf) = buffer.split_at_mut(4096); 70 71 loop { 72 let data = outgoing.receive().await; 73 74 let packet = unwrap!(striker_proto::send_response(data, msg_buf)); 75 let written = unwrap!( 76 self.transport.lock().await.write_message(packet, enc_buf), 77 "Payload too big" 78 ); 79 80 if noise_send(&mut writer, &enc_buf[..written]).await.is_err() { 81 break; 82 } 83 84 if writer.flush().await.is_err() { 85 break; 86 } 87 } 88 } 89} 90 91async fn noise_handshake<'device>( 92 tcp: &mut TcpSocket<'device>, 93) -> Result<TransportState, PicoError> { 94 let builder = Builder::new(PARAMS.get().clone()); 95 let static_key = LOCAL_PRIVATE_KEY 96 .get_or_init(|| unwrap!(builder.generate_keypair(), "Failed to generate key pair")); 97 98 let mut noise = builder 99 .local_private_key(&static_key.private)? 100 .psk(3, &NOISE_PSK)? 101 .build_responder()?; 102 103 let (mut reader, mut writer) = tcp.split(); 104 105 let mut buffer = try_buffer(4096)?; 106 107 let (payload, packet) = buffer.split_at_mut(2048); 108 109 noise.read_message(noise_recv(&mut reader, packet).await?, payload)?; 110 111 let len = noise.write_message(&[], payload)?; 112 113 noise_send(&mut writer, &payload[..len]).await?; 114 115 noise.read_message(noise_recv(&mut reader, packet).await?, payload)?; 116 117 let transport = noise.into_transport_mode()?; 118 119 Ok(transport) 120} 121 122/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload. 123async fn noise_recv<'device, 'buffer>( 124 stream: &mut TcpReader<'device>, 125 packet: &'buffer mut [u8], 126) -> Result<&'buffer [u8], PicoError> { 127 loop { 128 if let Some(written) = stream 129 .read_with(|buf| { 130 buf.split_at_checked(2).map_or((0, None), |(size, rest)| { 131 let mut msg_len_buf = [0u8; 2]; 132 msg_len_buf.copy_from_slice(size); 133 let buf_size = usize::from(u16::from_be_bytes(msg_len_buf)); 134 packet[..buf_size].copy_from_slice(&rest[..buf_size]); 135 136 (2 + buf_size, Some(buf_size)) 137 }) 138 }) 139 .await? 140 { 141 return Ok(&packet[..written]); 142 } 143 } 144} 145 146async fn noise_send<'device>( 147 stream: &mut TcpWriter<'device>, 148 payload: &[u8], 149) -> Result<(), PicoError> { 150 let len = u16::try_from(payload.len())?; 151 while !stream 152 .write_with(|buf| { 153 buf.split_at_mut_checked(2) 154 .map_or((0, false), |(msg_size, rest)| { 155 msg_size.copy_from_slice(&len.to_be_bytes()); 156 rest[..payload.len()].copy_from_slice(payload); 157 (2 + payload.len(), true) 158 }) 159 }) 160 .await? 161 {} 162 stream.flush().await?; 163 Ok(()) 164}