···1212use snafu::Snafu;
1313use tokio::sync::Mutex;
14141515-use super::db;
1615use crate::public_key::PublicKey;
17161817// the files here are just copied from iroh-smol-kv-uniffi/src/code
···2221 mod subscribe_mode;
2322 pub use subscribe_mode::SubscribeMode;
2423}
2525-use db::util::format_bytes;
2624pub use kv::{SubscribeMode, TimeBound};
2525+use util::format_bytes;
27262827/// Error creating a new database node.
2928#[derive(Debug, Snafu, uniffi::Error)]
···542541543542#[cfg(test)]
544543mod tests {
545545- use super::db::util;
544544+ use super::util;
546545547546 #[test]
548547 fn escape_unescape() {
···11-use std::sync::LazyLock;
22-33-/// We export the entire API at top level since this is what go-uniffi-bindgen will do anyway.
44-mod streams;
55-pub use streams::*;
66-mod db;
77-pub use db::*;
88-#[cfg(test)]
99-mod tests;
1010-1111-/// Lazily initialized Tokio runtime for use in uniffi methods that need a runtime.
1212-static RUNTIME: LazyLock<tokio::runtime::Runtime> =
1313- LazyLock::new(|| tokio::runtime::Runtime::new().unwrap());
-809
rust/iroh-streamplace/src/node/streams.rs
···11-use std::{
22- collections::{BTreeMap, BTreeSet, HashSet},
33- str::FromStr,
44- sync::Arc,
55-};
66-77-use bytes::Bytes;
88-use iroh::{NodeId, PublicKey, RelayMode, SecretKey, Watcher};
99-use iroh_base::ticket::NodeTicket;
1010-use iroh_gossip::{net::Gossip, proto::TopicId};
1111-use irpc::{WithChannels, rpc::RemoteService};
1212-use irpc_iroh::{IrohProtocol, IrohRemoteConnection};
1313-use n0_future::{TryFutureExt, future::Boxed};
1414-1515-mod rpc {
1616- //! Protocol API
1717- use bytes::Bytes;
1818- use iroh::NodeId;
1919- use irpc::{channel::oneshot, rpc_requests};
2020- use serde::{Deserialize, Serialize};
2121-2222- pub const ALPN: &[u8] = b"/iroh/streamplace/1";
2323-2424- /// Subscribe to the given `key`
2525- #[derive(Debug, Serialize, Deserialize)]
2626- pub struct Subscribe {
2727- pub key: String,
2828- // TODO: verify
2929- pub remote_id: NodeId,
3030- }
3131-3232- /// Unsubscribe from the given `key`
3333- #[derive(Debug, Serialize, Deserialize)]
3434- pub struct Unsubscribe {
3535- pub key: String,
3636- // TODO: verify
3737- pub remote_id: NodeId,
3838- }
3939-4040- // #[derive(Debug, Serialize, Deserialize)]
4141- // pub struct SendSegment {
4242- // pub key: String,
4343- // pub data: Bytes,
4444- // }
4545-4646- #[derive(Debug, Clone, Serialize, Deserialize)]
4747- pub struct RecvSegment {
4848- pub key: String,
4949- pub data: Bytes,
5050- }
5151-5252- // Use the macro to generate both the Protocol and Message enums
5353- // plus implement Channels for each type
5454- #[rpc_requests(message = Message)]
5555- #[derive(Serialize, Deserialize, Debug)]
5656- pub enum Protocol {
5757- #[rpc(tx=oneshot::Sender<()>)]
5858- Subscribe(Subscribe),
5959- #[rpc(tx=oneshot::Sender<()>)]
6060- Unsubscribe(Unsubscribe),
6161- #[rpc(tx=oneshot::Sender<()>)]
6262- RecvSegment(RecvSegment),
6363- }
6464-}
6565-6666-mod api {
6767- //! Protocol API
6868- use bytes::Bytes;
6969- use iroh::{NodeAddr, NodeId};
7070- use irpc::{channel::oneshot, rpc_requests};
7171- use serde::{Deserialize, Serialize};
7272-7373- /// Subscribe to the given `key`
7474- #[derive(Debug, Serialize, Deserialize)]
7575- pub struct Subscribe {
7676- pub key: String,
7777- // TODO: verify
7878- pub remote_id: NodeId,
7979- }
8080-8181- /// Unsubscribe from the given `key`
8282- #[derive(Debug, Serialize, Deserialize)]
8383- pub struct Unsubscribe {
8484- pub key: String,
8585- // TODO: verify
8686- pub remote_id: NodeId,
8787- }
8888-8989- #[derive(Debug, Serialize, Deserialize)]
9090- pub struct SendSegment {
9191- pub key: String,
9292- pub data: Bytes,
9393- }
9494-9595- #[derive(Debug, Serialize, Deserialize)]
9696- pub struct JoinPeers {
9797- pub peers: Vec<NodeAddr>,
9898- }
9999-100100- #[derive(Debug, Serialize, Deserialize)]
101101- pub struct AddTickets {
102102- pub peers: Vec<NodeAddr>,
103103- }
104104-105105- #[derive(Debug, Serialize, Deserialize)]
106106- pub struct GetNodeAddr;
107107-108108- #[derive(Debug, Serialize, Deserialize)]
109109- pub struct Shutdown;
110110-111111- // Use the macro to generate both the Protocol and Message enums
112112- // plus implement Channels for each type
113113- #[rpc_requests(message = Message)]
114114- #[derive(Serialize, Deserialize, Debug)]
115115- pub enum Protocol {
116116- #[rpc(tx=oneshot::Sender<()>)]
117117- Subscribe(Subscribe),
118118- #[rpc(tx=oneshot::Sender<()>)]
119119- Unsubscribe(Unsubscribe),
120120- #[rpc(tx=oneshot::Sender<()>)]
121121- SendSegment(SendSegment),
122122- #[rpc(tx=oneshot::Sender<()>)]
123123- JoinPeers(JoinPeers),
124124- #[rpc(tx=oneshot::Sender<()>)]
125125- AddTickets(AddTickets),
126126- #[rpc(tx=oneshot::Sender<NodeAddr>)]
127127- GetNodeAddr(GetNodeAddr),
128128- #[rpc(tx=oneshot::Sender<()>)]
129129- Shutdown(Shutdown),
130130- }
131131-}
132132-use api::{Message as ApiMessage, Protocol as ApiProtocol};
133133-use n0_future::{FuturesUnordered, StreamExt};
134134-use rpc::{Message as RpcMessage, Protocol as RpcProtocol};
135135-use snafu::Snafu;
136136-use tokio::sync::oneshot;
137137-use tracing::{Instrument, debug, error, trace, trace_span, warn};
138138-139139-use super::{Config, CreateError, JoinPeersError, PutError, db, streams::rpc::RecvSegment};
140140-use crate::node::ShutdownError;
141141-142142-pub(crate) enum HandlerMode {
143143- Sender,
144144- Forwarder,
145145- Receiver(Box<dyn Fn(String, Vec<u8>) -> Boxed<()> + Send + Sync + 'static>),
146146-}
147147-148148-impl HandlerMode {
149149- pub fn receiver_fn<F, Fut>(f: F) -> Self
150150- where
151151- F: Fn(String, Vec<u8>) -> Fut + Send + Sync + 'static,
152152- Fut: std::future::Future<Output = ()> + Send + 'static,
153153- {
154154- Self::Receiver(Box::new(move |name, data| Box::pin(f(name, data))))
155155- }
156156-157157- pub fn receiver(handler: Arc<dyn DataHandler>) -> Self {
158158- Self::receiver_fn(move |id, data| {
159159- let handler = handler.clone();
160160- async move {
161161- handler.handle_data(id, data).await;
162162- }
163163- })
164164- }
165165-166166- pub fn mode_str(&self) -> &'static str {
167167- match self {
168168- HandlerMode::Sender => "sender",
169169- HandlerMode::Forwarder => "forwarder",
170170- HandlerMode::Receiver(_) => "receiver",
171171- }
172172- }
173173-}
174174-175175-type Tasks = FuturesUnordered<Boxed<(NodeId, Result<(), RpcTaskError>)>>;
176176-177177-/// Actor that contains both a kv db for metadata and a handler for the rpc protocol.
178178-///
179179-/// This can be used both for sender and receiver nodes. Sender nodes will just set the
180180-/// handler to None.
181181-struct Actor {
182182- /// Receiver for rpc messages from remote nodes
183183- rpc_rx: tokio::sync::mpsc::Receiver<RpcMessage>,
184184- /// Receiver for API messages from the user
185185- api_rx: tokio::sync::mpsc::Receiver<ApiMessage>,
186186- /// nodes I need to send to for each stream
187187- subscribers: BTreeMap<String, BTreeSet<NodeId>>,
188188- /// nodes I am subscribed to
189189- subscriptions: BTreeMap<String, NodeId>,
190190- /// lightweight typed connection pool
191191- connections: ConnectionPool,
192192- /// How to handle incoming data
193193- handler: HandlerMode,
194194- /// Iroh protocol router, I need to keep it around to keep the protocol alive
195195- router: iroh::protocol::Router,
196196- /// Metadata db
197197- client: db::Db,
198198- /// Write scope for this node for the metadata db
199199- write: db::WriteScope,
200200- /// Ongoing tasks
201201- tasks: Tasks,
202202- /// Configuration, needed for timeouts etc.
203203- config: Arc<super::Config>,
204204-}
205205-206206-#[derive(Debug, Clone)]
207207-struct Connection {
208208- id: NodeId,
209209- rpc: irpc::Client<RpcProtocol>,
210210-}
211211-212212-#[derive(Debug, Snafu)]
213213-enum RpcTaskError {
214214- #[snafu(transparent)]
215215- Task { source: irpc::Error },
216216- #[snafu(transparent)]
217217- Timeout { source: tokio::time::error::Elapsed },
218218-}
219219-220220-struct ConnectionPool {
221221- endpoint: iroh::Endpoint,
222222- connections: BTreeMap<NodeId, Connection>,
223223-}
224224-225225-impl ConnectionPool {
226226- fn new(endpoint: iroh::Endpoint) -> Self {
227227- Self {
228228- endpoint,
229229- connections: BTreeMap::new(),
230230- }
231231- }
232232-233233- /// Cheap conn pool hack
234234- fn get(&mut self, remote: &NodeId) -> Connection {
235235- if !self.connections.contains_key(remote) {
236236- let conn = IrohRemoteConnection::new(
237237- self.endpoint.clone(),
238238- (*remote).into(),
239239- rpc::ALPN.to_vec(),
240240- );
241241- let conn = Connection {
242242- rpc: irpc::Client::boxed(conn),
243243- id: *remote,
244244- };
245245- self.connections.insert(*remote, conn);
246246- }
247247- self.connections
248248- .get_mut(remote)
249249- .expect("just inserted")
250250- .clone()
251251- }
252252-253253- fn remove(&mut self, remote: &NodeId) {
254254- self.connections.remove(remote);
255255- }
256256-}
257257-258258-impl Actor {
259259- pub async fn spawn(
260260- endpoint: iroh::Endpoint,
261261- topic: iroh_gossip::proto::TopicId,
262262- config: super::Config,
263263- handler: HandlerMode,
264264- ) -> Result<(Node, impl Future<Output = ()>), iroh_gossip::api::ApiError> {
265265- let (rpc_tx, rpc_rx) = tokio::sync::mpsc::channel::<RpcMessage>(32);
266266- let (api_tx, api_rx) = tokio::sync::mpsc::channel::<ApiMessage>(32);
267267- let gossip = Gossip::builder().spawn(endpoint.clone());
268268- let id = endpoint.node_id();
269269- let router = iroh::protocol::Router::builder(endpoint.clone())
270270- .accept(iroh_gossip::ALPN, gossip.clone())
271271- .accept(
272272- rpc::ALPN,
273273- IrohProtocol::new(rpc::Protocol::remote_handler(rpc_tx.into())),
274274- )
275275- .spawn();
276276- let topic = gossip.subscribe(topic, vec![]).await?;
277277- let secret = router.endpoint().secret_key().clone();
278278- let db_config = Default::default();
279279- let client = iroh_smol_kv::Client::local(topic, db_config);
280280- let write = db::WriteScope::new(client.write(secret.clone()));
281281- let client = db::Db::new(client);
282282- let actor = Self {
283283- rpc_rx,
284284- api_rx,
285285- subscribers: BTreeMap::new(),
286286- subscriptions: BTreeMap::new(),
287287- connections: ConnectionPool::new(router.endpoint().clone()),
288288- handler,
289289- router,
290290- write: write.clone(),
291291- client: client.clone(),
292292- tasks: FuturesUnordered::new(),
293293- config: Arc::new(config),
294294- };
295295- let api = Node {
296296- client: Arc::new(client),
297297- write: Arc::new(write),
298298- api: irpc::Client::local(api_tx),
299299- };
300300- Ok((
301301- api,
302302- actor
303303- .run()
304304- .instrument(trace_span!("actor", id=%id.fmt_short())),
305305- ))
306306- }
307307-308308- async fn run(mut self) {
309309- loop {
310310- tokio::select! {
311311- msg = self.rpc_rx.recv() => {
312312- let Some(msg) = msg else {
313313- error!("rpc channel closed");
314314- break;
315315- };
316316- self.handle_rpc(msg).instrument(trace_span!("rpc")).await;
317317- }
318318- msg = self.api_rx.recv() => {
319319- let Some(msg) = msg else {
320320- break;
321321- };
322322- if let Some(shutdown) = self.handle_api(msg).instrument(trace_span!("api")).await {
323323- shutdown.send(()).await.ok();
324324- break;
325325- }
326326- }
327327- res = self.tasks.next(), if !self.tasks.is_empty() => {
328328- let Some((remote_id, res)) = res else {
329329- error!("task finished but no result");
330330- break;
331331- };
332332- match res {
333333- Ok(()) => {}
334334- Err(RpcTaskError::Timeout { source }) => {
335335- warn!("call to {remote_id} timed out: {source}");
336336- }
337337- Err(RpcTaskError::Task { source }) => {
338338- warn!("call to {remote_id} failed: {source}");
339339- }
340340- }
341341- self.connections.remove(&remote_id);
342342- }
343343- }
344344- }
345345- }
346346-347347- async fn update_subscriber_meta(&mut self, key: &str) {
348348- let n = self
349349- .subscribers
350350- .get(key)
351351- .map(|s| s.len())
352352- .unwrap_or_default();
353353- let v = n.to_string().into_bytes();
354354- self.write
355355- .put_impl(Some(key.as_bytes().to_vec()), b"subscribers", v.into())
356356- .await
357357- .ok();
358358- }
359359-360360- /// Requests from remote nodes
361361- async fn handle_rpc(&mut self, msg: RpcMessage) {
362362- match msg {
363363- RpcMessage::Subscribe(msg) => {
364364- trace!("{:?}", msg.inner);
365365- let WithChannels {
366366- tx,
367367- inner: rpc::Subscribe { key, remote_id },
368368- ..
369369- } = msg;
370370- self.subscribers
371371- .entry(key.clone())
372372- .or_default()
373373- .insert(remote_id);
374374- self.update_subscriber_meta(&key).await;
375375- tx.send(()).await.ok();
376376- }
377377- RpcMessage::Unsubscribe(msg) => {
378378- debug!("{:?}", msg.inner);
379379- let WithChannels {
380380- tx,
381381- inner: rpc::Unsubscribe { key, remote_id },
382382- ..
383383- } = msg;
384384- if let Some(e) = self.subscribers.get_mut(&key)
385385- && !e.remove(&remote_id)
386386- {
387387- warn!(
388388- "unsubscribe: no subscription for {} from {}",
389389- key, remote_id
390390- );
391391- }
392392- if let Some(subscriptions) = self.subscribers.get(&key)
393393- && subscriptions.is_empty()
394394- {
395395- self.subscribers.remove(&key);
396396- }
397397- self.update_subscriber_meta(&key).await;
398398- tx.send(()).await.ok();
399399- }
400400- RpcMessage::RecvSegment(msg) => {
401401- trace!("{:?}", msg.inner);
402402- let WithChannels {
403403- tx,
404404- inner: rpc::RecvSegment { key, data },
405405- ..
406406- } = msg;
407407- match &self.handler {
408408- HandlerMode::Sender => {
409409- warn!("received segment but in sender mode");
410410- }
411411- HandlerMode::Forwarder => {
412412- if let Some(remotes) = self.subscribers.get(&key) {
413413- Self::handle_send(
414414- &mut self.tasks,
415415- &mut self.connections,
416416- &self.config,
417417- key,
418418- data,
419419- remotes,
420420- );
421421- } else {
422422- trace!("no subscribers for stream {}", key);
423423- }
424424- }
425425- HandlerMode::Receiver(handler) => {
426426- if self.subscriptions.contains_key(&key) {
427427- handler(key, data.to_vec()).await;
428428- } else {
429429- warn!("received segment for unsubscribed key: {}", key);
430430- }
431431- }
432432- };
433433- tx.send(()).await.ok();
434434- }
435435- }
436436- }
437437-438438- async fn handle_api(&mut self, msg: ApiMessage) -> Option<irpc::channel::oneshot::Sender<()>> {
439439- match msg {
440440- ApiMessage::SendSegment(msg) => {
441441- trace!("{:?}", msg.inner);
442442- let WithChannels {
443443- tx,
444444- inner: api::SendSegment { key, data },
445445- ..
446446- } = msg;
447447- if let Some(remotes) = self.subscribers.get(&key) {
448448- Self::handle_send(
449449- &mut self.tasks,
450450- &mut self.connections,
451451- &self.config,
452452- key,
453453- data,
454454- remotes,
455455- );
456456- } else {
457457- trace!("no subscribers for stream {}", key);
458458- }
459459- tx.send(()).await.ok();
460460- }
461461- ApiMessage::Subscribe(msg) => {
462462- trace!("{:?}", msg.inner);
463463- let WithChannels {
464464- tx,
465465- inner: api::Subscribe { key, remote_id },
466466- ..
467467- } = msg;
468468- let conn = self.connections.get(&remote_id);
469469- conn.rpc
470470- .rpc(rpc::Subscribe {
471471- key: key.clone(),
472472- remote_id: self.node_id(),
473473- })
474474- .await
475475- .ok();
476476- self.subscriptions.insert(key, remote_id);
477477- tx.send(()).await.ok();
478478- }
479479- ApiMessage::Unsubscribe(msg) => {
480480- trace!("{:?}", msg.inner);
481481- let WithChannels {
482482- tx,
483483- inner: api::Unsubscribe { key, remote_id },
484484- ..
485485- } = msg;
486486- let conn = self.connections.get(&remote_id);
487487- conn.rpc
488488- .rpc(rpc::Unsubscribe {
489489- key: key.clone(),
490490- remote_id: self.node_id(),
491491- })
492492- .await
493493- .ok();
494494- self.subscriptions.remove(&key);
495495- tx.send(()).await.ok();
496496- }
497497- ApiMessage::AddTickets(msg) => {
498498- trace!("{:?}", msg.inner);
499499- let WithChannels {
500500- tx,
501501- inner: api::AddTickets { peers },
502502- ..
503503- } = msg;
504504- for addr in &peers {
505505- self.router.endpoint().add_node_addr(addr.clone()).ok();
506506- }
507507- // self.client.inner().join_peers(ids).await.ok();
508508- tx.send(()).await.ok();
509509- }
510510- ApiMessage::JoinPeers(msg) => {
511511- trace!("{:?}", msg.inner);
512512- let WithChannels {
513513- tx,
514514- inner: api::JoinPeers { peers },
515515- ..
516516- } = msg;
517517- let ids = peers
518518- .iter()
519519- .map(|a| a.node_id)
520520- .filter(|id| *id != self.node_id())
521521- .collect::<HashSet<_>>();
522522- for addr in &peers {
523523- self.router.endpoint().add_node_addr(addr.clone()).ok();
524524- }
525525- self.client.inner().join_peers(ids).await.ok();
526526- tx.send(()).await.ok();
527527- }
528528- ApiMessage::GetNodeAddr(msg) => {
529529- trace!("{:?}", msg.inner);
530530- let WithChannels { tx, .. } = msg;
531531- if !self.config.disable_relay {
532532- // don't await home relay if we have disabled relays, this will hang forever
533533- self.router.endpoint().home_relay().initialized().await;
534534- }
535535- let addr = self.router.endpoint().node_addr().initialized().await;
536536- tx.send(addr).await.ok();
537537- }
538538- ApiMessage::Shutdown(msg) => {
539539- return Some(msg.tx);
540540- }
541541- }
542542- None
543543- }
544544-545545- fn handle_send(
546546- tasks: &mut Tasks,
547547- connections: &mut ConnectionPool,
548548- config: &Arc<Config>,
549549- key: String,
550550- data: Bytes,
551551- remotes: &BTreeSet<NodeId>,
552552- ) {
553553- let msg = rpc::RecvSegment { key, data };
554554- for remote in remotes {
555555- trace!("sending to stream {}: {}", msg.key, remote);
556556- let conn = connections.get(remote);
557557- tasks.push(Box::pin(Self::forward_task(
558558- config.clone(),
559559- conn,
560560- msg.clone(),
561561- )));
562562- }
563563- }
564564-565565- async fn forward_task(
566566- config: Arc<super::Config>,
567567- conn: Connection,
568568- msg: RecvSegment,
569569- ) -> (NodeId, Result<(), RpcTaskError>) {
570570- let id = conn.id;
571571- let res = async move {
572572- tokio::time::timeout(config.max_send_duration, conn.rpc.rpc(msg)).await??;
573573- Ok(())
574574- }
575575- .await;
576576- (id, res)
577577- }
578578-579579- fn node_id(&self) -> PublicKey {
580580- self.router.endpoint().node_id()
581581- }
582582-}
583583-584584-/// Iroh-streamplace node that can send, forward or receive stream segments.
585585-#[derive(Clone, uniffi::Object)]
586586-pub struct Node {
587587- client: Arc<db::Db>,
588588- write: Arc<db::WriteScope>,
589589- api: irpc::Client<ApiProtocol>,
590590-}
591591-592592-impl Node {
593593- pub(crate) async fn new_in_runtime(
594594- config: super::Config,
595595- handler: HandlerMode,
596596- ) -> Result<Arc<Self>, CreateError> {
597597- let mode_str = Bytes::from(handler.mode_str());
598598- let secret_key =
599599- SecretKey::from_bytes(&<[u8; 32]>::try_from(config.key.clone()).map_err(|e| {
600600- CreateError::PrivateKey {
601601- size: e.len() as u64,
602602- }
603603- })?);
604604- let topic =
605605- TopicId::from_bytes(<[u8; 32]>::try_from(config.topic.clone()).map_err(|e| {
606606- CreateError::Topic {
607607- size: e.len() as u64,
608608- }
609609- })?);
610610- let relay_mode = if config.disable_relay {
611611- RelayMode::Disabled
612612- } else {
613613- RelayMode::Default
614614- };
615615- let endpoint = iroh::Endpoint::builder()
616616- .secret_key(secret_key)
617617- .relay_mode(relay_mode)
618618- .bind()
619619- .await
620620- .map_err(|e| CreateError::Bind {
621621- message: e.to_string(),
622622- })?;
623623- let (api, actor) = Actor::spawn(endpoint, topic, config, handler)
624624- .await
625625- .map_err(|e| CreateError::Subscribe {
626626- message: e.to_string(),
627627- })?;
628628- api.node_scope()
629629- .put_impl(Option::<Vec<u8>>::None, b"mode", mode_str)
630630- .await
631631- .ok();
632632- tokio::spawn(actor);
633633- Ok(Arc::new(api))
634634- }
635635-}
636636-637637-/// DataHandler trait that is exported to go for receiving data callbacks.
638638-#[uniffi::export(with_foreign)]
639639-#[async_trait::async_trait]
640640-pub trait DataHandler: Send + Sync {
641641- async fn handle_data(&self, topic: String, data: Vec<u8>);
642642-}
643643-644644-#[uniffi::export]
645645-impl Node {
646646- /// Create a new streamplace client node.
647647- #[uniffi::constructor]
648648- pub async fn sender(config: super::Config) -> Result<Arc<Self>, CreateError> {
649649- super::RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Sender))
650650- }
651651-652652- #[uniffi::constructor]
653653- pub async fn forwarder(config: super::Config) -> Result<Arc<Self>, CreateError> {
654654- super::RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::Forwarder))
655655- }
656656-657657- #[uniffi::constructor]
658658- pub async fn receiver(
659659- config: super::Config,
660660- handler: Arc<dyn DataHandler>,
661661- ) -> Result<Arc<Self>, CreateError> {
662662- super::RUNTIME.block_on(Self::new_in_runtime(config, HandlerMode::receiver(handler)))
663663- }
664664-665665- /// Get a handle to the db to watch for changes locally or globally.
666666- pub fn db(&self) -> Arc<db::Db> {
667667- self.client.clone()
668668- }
669669-670670- /// Get a handle to the write scope for this node.
671671- ///
672672- /// This is equivalent to calling `db.write(...)` with the secret key used to create the node.
673673- pub fn node_scope(&self) -> Arc<db::WriteScope> {
674674- self.write.clone()
675675- }
676676-677677- /// Subscribe to updates for a given stream from a remote node.
678678- pub async fn subscribe(
679679- &self,
680680- key: String,
681681- remote_id: Arc<crate::public_key::PublicKey>,
682682- ) -> Result<(), PutError> {
683683- self.api
684684- .rpc(api::Subscribe {
685685- key,
686686- remote_id: remote_id.as_ref().into(),
687687- })
688688- .await
689689- .map_err(|e| PutError::Irpc {
690690- message: e.to_string(),
691691- })
692692- }
693693-694694- /// Unsubscribe from updates for a given stream from a remote node.
695695- pub async fn unsubscribe(
696696- &self,
697697- key: String,
698698- remote_id: Arc<crate::public_key::PublicKey>,
699699- ) -> Result<(), PutError> {
700700- self.api
701701- .rpc(api::Unsubscribe {
702702- key,
703703- remote_id: remote_id.as_ref().into(),
704704- })
705705- .await
706706- .map_err(|e| PutError::Irpc {
707707- message: e.to_string(),
708708- })
709709- }
710710-711711- /// Send a segment to all subscribers of the given stream.
712712- pub async fn send_segment(&self, key: String, data: Vec<u8>) -> Result<(), PutError> {
713713- self.api
714714- .rpc(api::SendSegment {
715715- key,
716716- data: data.into(),
717717- })
718718- .await
719719- .map_err(|e| PutError::Irpc {
720720- message: e.to_string(),
721721- })
722722- }
723723-724724- /// Join peers by their node tickets.
725725- pub async fn join_peers(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
726726- let peers = peers
727727- .iter()
728728- .map(|p| NodeTicket::from_str(p))
729729- .collect::<Result<Vec<_>, _>>()
730730- .map_err(|e| JoinPeersError::Ticket {
731731- message: e.to_string(),
732732- })?;
733733- let addrs = peers
734734- .iter()
735735- .map(|t| t.node_addr().clone())
736736- .collect::<Vec<_>>();
737737- self.api
738738- .rpc(api::JoinPeers { peers: addrs })
739739- .await
740740- .map_err(|e| JoinPeersError::Irpc {
741741- message: e.to_string(),
742742- })
743743- }
744744-745745- /// Add tickets for remote peers
746746- pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> {
747747- let peers = peers
748748- .iter()
749749- .map(|p| NodeTicket::from_str(p))
750750- .collect::<Result<Vec<_>, _>>()
751751- .map_err(|e| JoinPeersError::Ticket {
752752- message: e.to_string(),
753753- })?;
754754- let addrs = peers
755755- .iter()
756756- .map(|t| t.node_addr().clone())
757757- .collect::<Vec<_>>();
758758- self.api
759759- .rpc(api::AddTickets { peers: addrs })
760760- .await
761761- .map_err(|e| JoinPeersError::Irpc {
762762- message: e.to_string(),
763763- })
764764- }
765765-766766- /// Get this node's ticket.
767767- pub async fn ticket(&self) -> Result<String, PutError> {
768768- let addr = self
769769- .api
770770- .rpc(api::GetNodeAddr)
771771- .await
772772- .map_err(|e| PutError::Irpc {
773773- message: e.to_string(),
774774- })?;
775775- Ok(NodeTicket::from(addr).to_string())
776776- }
777777-778778- /// Get this node's node ID.
779779- pub async fn node_id(&self) -> Result<Arc<crate::public_key::PublicKey>, PutError> {
780780- let addr = self
781781- .api
782782- .rpc(api::GetNodeAddr)
783783- .await
784784- .map_err(|e| PutError::Irpc {
785785- message: e.to_string(),
786786- })?;
787787- Ok(Arc::new(addr.node_id.into()))
788788- }
789789-790790- /// Shutdown the node, including the streaming system and the metadata db.
791791- pub async fn shutdown(&self) -> Result<(), ShutdownError> {
792792- // shut down both the streams and the db concurrently, even if one fails
793793- let (res1, res2) = tokio::join!(self.shutdown_streams(), self.client.shutdown());
794794- res1?;
795795- res2?;
796796- Ok(())
797797- }
798798-}
799799-800800-impl Node {
801801- async fn shutdown_streams(&self) -> std::result::Result<(), ShutdownError> {
802802- self.api
803803- .rpc(api::Shutdown)
804804- .await
805805- .map_err(|e| ShutdownError::Irpc {
806806- message: e.to_string(),
807807- })
808808- }
809809-}
···44use n0_future::{BufferedStreamExt, StreamExt, stream};
55use testresult::TestResult;
6677-use super::{streams::HandlerMode, *};
77+use super::*;
8899struct TestNode {
1010 node: Arc<Node>,
···144144 let (tx, mut rx) = tokio::sync::mpsc::channel(32);
145145 let handler = Arc::new(TestHandler::new((), tx));
146146 let sender = TestNode::new(HandlerMode::Sender).await?;
147147- let receiver = TestNode::new(HandlerMode::receiver(handler)).await?;
147147+ let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
148148 // join the sender to the receiver. This will also configure the receiver endpoint to be able to dial the sender.
149149 receiver.join_peers(vec![sender.ticket.clone()]).await?;
150150 let stream = "teststream".to_string();
···165165 let handler = Arc::new(TestHandler::new((), tx));
166166 let sender = TestNode::new(HandlerMode::Sender).await?;
167167 let forwarder = TestNode::new(HandlerMode::Forwarder).await?;
168168- let receiver = TestNode::new(HandlerMode::receiver(handler)).await?;
168168+ let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
169169 // join everyone to everyone, so the receiver can reach the sender via the forwarder.
170170 let tickets = vec![
171171 sender.ticket.clone(),
···196196 let handler = Arc::new(TestHandler::new((), tx));
197197 let sender = TestNode::new(HandlerMode::Sender).await?;
198198 let forwarder = TestNode::new(HandlerMode::Forwarder).await?;
199199- let receiver = TestNode::new(HandlerMode::receiver(handler)).await?;
199199+ let receiver = TestNode::new(HandlerMode::Receiver(handler)).await?;
200200 // join everyone to everyone, so the receiver can reach the sender via the forwarder.
201201 let tickets = vec![
202202 sender.ticket.clone(),
···241241 } else if forwarders.contains(&i) {
242242 HandlerMode::Forwarder
243243 } else {
244244- HandlerMode::receiver(Arc::new(TestHandler::new(i, tx.clone())))
244244+ HandlerMode::Receiver(Arc::new(TestHandler::new(i, tx.clone())))
245245 }
246246 };
247247 let nodes = test_nodes(ntotal, make_handler, true).await?;