use crate::manifest::Manifest; use crate::{Error, Result}; use std::sync::Mutex; use iroh::protocol::Router; use iroh::{Endpoint, NodeAddr, PublicKey, Watcher}; use iroh_base::ticket::NodeTicket; use iroh_blobs::store::mem::MemStore; use iroh_blobs::api::TempTag; use iroh_blobs::{BlobsProtocol, Hash}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::str::FromStr; use tokio::time::{timeout, Duration}; fn wrap_err(context: &str, err: impl std::fmt::Display) -> Error { Error::Invalid(format!("{context}: {err}")) } pub struct IrohNode { endpoint: Endpoint, store: MemStore, router: Router, pinned: Mutex>, } impl IrohNode { pub async fn new() -> Result { let endpoint = Endpoint::builder() .discovery_n0() .bind() .await .map_err(|err| wrap_err("endpoint bind", err))?; let store = MemStore::default(); let blobs = BlobsProtocol::new(&store, endpoint.clone(), None); let router = Router::builder(endpoint.clone()) .accept(iroh_blobs::ALPN, blobs) .spawn(); Ok(Self { endpoint, store, router, pinned: Mutex::new(Vec::new()), }) } pub async fn node_addr(&self) -> Result { let bound = self.endpoint.bound_sockets(); if bound.is_empty() { return Err(Error::Invalid("no bound sockets available".into())); } let advertise = load_advertise_config()?; let direct_addrs = compute_direct_addresses(&bound, &advertise); let node_id = self.endpoint.node_id(); Ok(NodeAddr::new(node_id).with_direct_addresses(direct_addrs)) } pub fn node_id(&self) -> PublicKey { self.endpoint.node_id() } pub async fn node_ticket(&self) -> Result { let relay_url = timeout(Duration::from_secs(5), async { let mut watcher = self.endpoint.home_relay(); watcher.initialized().await }) .await .ok(); // Prefer a relay-only ticket to avoid embedding stale IP addresses. let node_addr = if let Some(relay_url) = relay_url { NodeAddr::new(self.node_id()).with_relay_url(relay_url) } else { // Fall back to direct addresses for local/offline environments. self.node_addr().await? }; Ok(NodeTicket::from(node_addr).to_string()) } pub fn add_peer(&self, addr: NodeAddr) -> Result<()> { self.endpoint .add_node_addr(addr) .map_err(|err| wrap_err("add_node_addr", err))?; Ok(()) } pub async fn add_bytes(&self, data: &[u8]) -> Result { let tag = self .store .add_slice(data) .temp_tag() .await .map_err(|err| wrap_err("store add_slice", err))?; let hash = *tag.hash(); let mut guard = self .pinned .lock() .map_err(|_| Error::Invalid("pinned lock poisoned".into()))?; guard.push(tag); Ok(hash) } pub async fn add_manifest(&self, manifest: &Manifest) -> Result { let bytes = manifest.to_bytes_pretty()?; self.add_bytes(&bytes).await } pub async fn get_manifest(&self, hash: Hash) -> Result { let bytes = self .store .get_bytes(hash) .await .map_err(|err| wrap_err("store get_bytes", err))?; Manifest::from_bytes(&bytes) } pub async fn get_bytes(&self, hash: Hash) -> Result> { let bytes = self .store .get_bytes(hash) .await .map_err(|err| wrap_err("store get_bytes", err))?; Ok(bytes.to_vec()) } pub async fn fetch_manifest_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result { let bytes = self.fetch_from_peer(hash, peer).await?; Manifest::from_bytes(&bytes) } pub async fn fetch_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result> { // If the peer is this node, fetch directly from the local store. if peer.node_id == self.node_id() { return self.get_bytes(hash).await; } self.add_peer(peer.clone())?; let conn = self .endpoint .connect(peer, iroh_blobs::ALPN) .await .map_err(|err| wrap_err("endpoint connect", err))?; let remote = self.store.remote(); remote .fetch(conn, hash) .await .map_err(|err| wrap_err("remote fetch", err))?; let bytes = self .store .get_bytes(hash) .await .map_err(|err| wrap_err("store get_bytes", err))?; Ok(bytes.to_vec()) } pub async fn shutdown(self) -> Result<()> { self.router .shutdown() .await .map_err(|err| wrap_err("router shutdown", err))?; let _ = self.store.shutdown().await; Ok(()) } } #[derive(Debug, Clone, Default)] struct AdvertiseConfig { addrs: Option>, ip: Option, } fn load_advertise_config() -> Result { let addrs = std::env::var("KNOT_ADVERTISE_ADDRS").ok(); if let Some(raw) = addrs { let mut parsed = Vec::new(); for part in raw.split(',') { let trimmed = part.trim(); if trimmed.is_empty() { continue; } let addr = SocketAddr::from_str(trimmed).map_err(|err| { Error::Invalid(format!("invalid KNOT_ADVERTISE_ADDRS entry {trimmed}: {err}")) })?; parsed.push(addr); } if !parsed.is_empty() { return Ok(AdvertiseConfig { addrs: Some(parsed), ip: None, }); } } let ip = std::env::var("KNOT_ADVERTISE_IP") .ok() .map(|raw| { IpAddr::from_str(raw.trim()).map_err(|err| { Error::Invalid(format!("invalid KNOT_ADVERTISE_IP {}: {err}", raw.trim())) }) }) .transpose()?; Ok(AdvertiseConfig { addrs: None, ip }) } fn compute_direct_addresses(bound: &[SocketAddr], advertise: &AdvertiseConfig) -> Vec { if let Some(addrs) = &advertise.addrs { return addrs.clone(); } if let Some(ip) = advertise.ip { return bound .iter() .map(|addr| SocketAddr::new(ip, addr.port())) .collect(); } bound .iter() .map(|addr| match addr { SocketAddr::V4(v4) => SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), v4.port()), SocketAddr::V6(v6) => SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), v6.port()), }) .collect() } #[cfg(test)] mod tests { use super::{compute_direct_addresses, AdvertiseConfig}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; #[test] fn direct_addresses_default_to_loopback() { let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; let cfg = AdvertiseConfig::default(); let addrs = compute_direct_addresses(&bound, &cfg); assert_eq!(addrs, vec![SocketAddr::from(([127, 0, 0, 1], 1234))]); } #[test] fn direct_addresses_use_advertised_ip() { let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; let cfg = AdvertiseConfig { addrs: None, ip: Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 5))), }; let addrs = compute_direct_addresses(&bound, &cfg); assert_eq!(addrs, vec![SocketAddr::from(([192, 168, 1, 5], 1234))]); } #[test] fn direct_addresses_use_explicit_addrs() { let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; let cfg = AdvertiseConfig { addrs: Some(vec![SocketAddr::from(([10, 0, 0, 9], 9999))]), ip: None, }; let addrs = compute_direct_addresses(&bound, &cfg); assert_eq!(addrs, vec![SocketAddr::from(([10, 0, 0, 9], 9999))]); } }