Buttplug sex toy control library
at master 134 lines 4.4 kB view raw
1pub mod process_messages; 2use crate::error::IntifaceError; 3use crate::remote_server::ButtplugRemoteServerEvent; 4use async_trait::async_trait; 5use futures::{Stream, StreamExt, pin_mut}; 6pub use process_messages::{EngineMessage, IntifaceMessage}; 7use std::sync::Arc; 8use tokio::{ 9 select, 10 sync::{Notify, broadcast}, 11}; 12use tokio_util::sync::CancellationToken; 13 14const VERSION: &str = env!("CARGO_PKG_VERSION"); 15 16#[async_trait] 17pub trait Frontend: Sync + Send { 18 async fn send(&self, msg: EngineMessage); 19 async fn connect(&self) -> Result<(), IntifaceError>; 20 fn disconnect_notifier(&self) -> Arc<Notify>; 21 fn disconnect(&self); 22 fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage>; 23} 24 25pub async fn frontend_external_event_loop( 26 frontend: Arc<dyn Frontend>, 27 connection_cancellation_token: Arc<CancellationToken>, 28) { 29 let mut external_receiver = frontend.event_stream(); 30 loop { 31 select! { 32 external_message = external_receiver.recv() => { 33 match external_message { 34 Ok(message) => match message { 35 IntifaceMessage::RequestEngineVersion{expected_version:_} => { 36 // TODO We should check the version here and shut down on mismatch. 37 info!("Engine version request received from frontend."); 38 frontend 39 .send(EngineMessage::EngineVersion{ version: VERSION.to_owned() }) 40 .await; 41 }, 42 IntifaceMessage::Stop{} => { 43 connection_cancellation_token.cancel(); 44 info!("Got external stop request"); 45 break; 46 } 47 }, 48 Err(_) => { 49 info!("Frontend sender dropped, assuming connection lost, breaking."); 50 break; 51 } 52 } 53 }, 54 _ = connection_cancellation_token.cancelled() => { 55 info!("Connection cancellation token activated, breaking from frontend external event loop."); 56 break; 57 } 58 } 59 } 60} 61 62pub async fn frontend_server_event_loop( 63 receiver: impl Stream<Item = ButtplugRemoteServerEvent>, 64 frontend: Arc<dyn Frontend>, 65 connection_cancellation_token: CancellationToken, 66) { 67 pin_mut!(receiver); 68 69 loop { 70 select! { 71 maybe_event = receiver.next() => { 72 match maybe_event { 73 Some(event) => match event { 74 ButtplugRemoteServerEvent::ClientConnected(client_name) => { 75 info!("Client connected: {}", client_name); 76 frontend.send(EngineMessage::ClientConnected{client_name}).await; 77 } 78 ButtplugRemoteServerEvent::ClientDisconnected => { 79 info!("Client disconnected."); 80 frontend 81 .send(EngineMessage::ClientDisconnected{}) 82 .await; 83 } 84 ButtplugRemoteServerEvent::DeviceAdded { index: device_id, name: device_name, identifier: device_address, display_name: device_display_name } => { 85 info!("Device Added: {} - {} - {:?}", device_id, device_name, device_address); 86 frontend 87 .send(EngineMessage::DeviceConnected { name: device_name, index: device_id, identifier: device_address, display_name: device_display_name }) 88 .await; 89 } 90 ButtplugRemoteServerEvent::DeviceRemoved { index: device_id } => { 91 info!("Device Removed: {}", device_id); 92 frontend 93 .send(EngineMessage::DeviceDisconnected{index: device_id}) 94 .await; 95 } 96 }, 97 None => { 98 info!("Lost connection with main thread, breaking."); 99 break; 100 }, 101 } 102 }, 103 _ = connection_cancellation_token.cancelled() => { 104 info!("Connection cancellation token activated, breaking from frontend server event loop"); 105 break; 106 } 107 } 108 } 109 info!("Exiting server event receiver loop"); 110} 111/* 112#[derive(Default)] 113struct NullFrontend { 114 notify: Arc<Notify>, 115} 116 117#[async_trait] 118impl Frontend for NullFrontend { 119 async fn send(&self, _: EngineMessage) {} 120 async fn connect(&self) -> Result<(), IntifaceError> { 121 Ok(()) 122 } 123 fn disconnect(&self) { 124 self.notify.notify_waiters(); 125 } 126 fn disconnect_notifier(&self) -> Arc<Notify> { 127 self.notify.clone() 128 } 129 fn event_stream(&self) -> broadcast::Receiver<IntifaceMessage> { 130 let (_, receiver) = broadcast::channel(255); 131 receiver 132 } 133} 134*/