Lightweight decentralized “knot” server prototype using Iroh + ATProto.
at master 257 lines 8.2 kB view raw
1use crate::manifest::Manifest; 2use crate::{Error, Result}; 3use std::sync::Mutex; 4use iroh::protocol::Router; 5use iroh::{Endpoint, NodeAddr, PublicKey, Watcher}; 6use iroh_base::ticket::NodeTicket; 7use iroh_blobs::store::mem::MemStore; 8use iroh_blobs::api::TempTag; 9use iroh_blobs::{BlobsProtocol, Hash}; 10use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; 11use std::str::FromStr; 12use tokio::time::{timeout, Duration}; 13 14fn wrap_err(context: &str, err: impl std::fmt::Display) -> Error { 15 Error::Invalid(format!("{context}: {err}")) 16} 17 18pub struct IrohNode { 19 endpoint: Endpoint, 20 store: MemStore, 21 router: Router, 22 pinned: Mutex<Vec<TempTag>>, 23} 24 25impl IrohNode { 26 pub async fn new() -> Result<Self> { 27 let endpoint = Endpoint::builder() 28 .discovery_n0() 29 .bind() 30 .await 31 .map_err(|err| wrap_err("endpoint bind", err))?; 32 let store = MemStore::default(); 33 let blobs = BlobsProtocol::new(&store, endpoint.clone(), None); 34 let router = Router::builder(endpoint.clone()) 35 .accept(iroh_blobs::ALPN, blobs) 36 .spawn(); 37 38 Ok(Self { 39 endpoint, 40 store, 41 router, 42 pinned: Mutex::new(Vec::new()), 43 }) 44 } 45 46 pub async fn node_addr(&self) -> Result<NodeAddr> { 47 let bound = self.endpoint.bound_sockets(); 48 if bound.is_empty() { 49 return Err(Error::Invalid("no bound sockets available".into())); 50 } 51 let advertise = load_advertise_config()?; 52 let direct_addrs = compute_direct_addresses(&bound, &advertise); 53 let node_id = self.endpoint.node_id(); 54 Ok(NodeAddr::new(node_id).with_direct_addresses(direct_addrs)) 55 } 56 57 pub fn node_id(&self) -> PublicKey { 58 self.endpoint.node_id() 59 } 60 61 pub async fn node_ticket(&self) -> Result<String> { 62 let relay_url = timeout(Duration::from_secs(5), async { 63 let mut watcher = self.endpoint.home_relay(); 64 watcher.initialized().await 65 }) 66 .await 67 .ok(); 68 69 // Prefer a relay-only ticket to avoid embedding stale IP addresses. 70 let node_addr = if let Some(relay_url) = relay_url { 71 NodeAddr::new(self.node_id()).with_relay_url(relay_url) 72 } else { 73 // Fall back to direct addresses for local/offline environments. 74 self.node_addr().await? 75 }; 76 77 Ok(NodeTicket::from(node_addr).to_string()) 78 } 79 80 pub fn add_peer(&self, addr: NodeAddr) -> Result<()> { 81 self.endpoint 82 .add_node_addr(addr) 83 .map_err(|err| wrap_err("add_node_addr", err))?; 84 Ok(()) 85 } 86 87 pub async fn add_bytes(&self, data: &[u8]) -> Result<Hash> { 88 let tag = self 89 .store 90 .add_slice(data) 91 .temp_tag() 92 .await 93 .map_err(|err| wrap_err("store add_slice", err))?; 94 let hash = *tag.hash(); 95 let mut guard = self 96 .pinned 97 .lock() 98 .map_err(|_| Error::Invalid("pinned lock poisoned".into()))?; 99 guard.push(tag); 100 Ok(hash) 101 } 102 103 pub async fn add_manifest(&self, manifest: &Manifest) -> Result<Hash> { 104 let bytes = manifest.to_bytes_pretty()?; 105 self.add_bytes(&bytes).await 106 } 107 108 pub async fn get_manifest(&self, hash: Hash) -> Result<Manifest> { 109 let bytes = self 110 .store 111 .get_bytes(hash) 112 .await 113 .map_err(|err| wrap_err("store get_bytes", err))?; 114 Manifest::from_bytes(&bytes) 115 } 116 117 pub async fn get_bytes(&self, hash: Hash) -> Result<Vec<u8>> { 118 let bytes = self 119 .store 120 .get_bytes(hash) 121 .await 122 .map_err(|err| wrap_err("store get_bytes", err))?; 123 Ok(bytes.to_vec()) 124 } 125 126 pub async fn fetch_manifest_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result<Manifest> { 127 let bytes = self.fetch_from_peer(hash, peer).await?; 128 Manifest::from_bytes(&bytes) 129 } 130 131 pub async fn fetch_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result<Vec<u8>> { 132 // If the peer is this node, fetch directly from the local store. 133 if peer.node_id == self.node_id() { 134 return self.get_bytes(hash).await; 135 } 136 self.add_peer(peer.clone())?; 137 let conn = self 138 .endpoint 139 .connect(peer, iroh_blobs::ALPN) 140 .await 141 .map_err(|err| wrap_err("endpoint connect", err))?; 142 let remote = self.store.remote(); 143 remote 144 .fetch(conn, hash) 145 .await 146 .map_err(|err| wrap_err("remote fetch", err))?; 147 let bytes = self 148 .store 149 .get_bytes(hash) 150 .await 151 .map_err(|err| wrap_err("store get_bytes", err))?; 152 Ok(bytes.to_vec()) 153 } 154 155 156 pub async fn shutdown(self) -> Result<()> { 157 self.router 158 .shutdown() 159 .await 160 .map_err(|err| wrap_err("router shutdown", err))?; 161 let _ = self.store.shutdown().await; 162 Ok(()) 163 } 164} 165 166#[derive(Debug, Clone, Default)] 167struct AdvertiseConfig { 168 addrs: Option<Vec<SocketAddr>>, 169 ip: Option<IpAddr>, 170} 171 172fn load_advertise_config() -> Result<AdvertiseConfig> { 173 let addrs = std::env::var("KNOT_ADVERTISE_ADDRS").ok(); 174 if let Some(raw) = addrs { 175 let mut parsed = Vec::new(); 176 for part in raw.split(',') { 177 let trimmed = part.trim(); 178 if trimmed.is_empty() { 179 continue; 180 } 181 let addr = SocketAddr::from_str(trimmed).map_err(|err| { 182 Error::Invalid(format!("invalid KNOT_ADVERTISE_ADDRS entry {trimmed}: {err}")) 183 })?; 184 parsed.push(addr); 185 } 186 if !parsed.is_empty() { 187 return Ok(AdvertiseConfig { 188 addrs: Some(parsed), 189 ip: None, 190 }); 191 } 192 } 193 let ip = std::env::var("KNOT_ADVERTISE_IP") 194 .ok() 195 .map(|raw| { 196 IpAddr::from_str(raw.trim()).map_err(|err| { 197 Error::Invalid(format!("invalid KNOT_ADVERTISE_IP {}: {err}", raw.trim())) 198 }) 199 }) 200 .transpose()?; 201 Ok(AdvertiseConfig { addrs: None, ip }) 202} 203 204fn compute_direct_addresses(bound: &[SocketAddr], advertise: &AdvertiseConfig) -> Vec<SocketAddr> { 205 if let Some(addrs) = &advertise.addrs { 206 return addrs.clone(); 207 } 208 if let Some(ip) = advertise.ip { 209 return bound 210 .iter() 211 .map(|addr| SocketAddr::new(ip, addr.port())) 212 .collect(); 213 } 214 bound 215 .iter() 216 .map(|addr| match addr { 217 SocketAddr::V4(v4) => SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), v4.port()), 218 SocketAddr::V6(v6) => SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), v6.port()), 219 }) 220 .collect() 221} 222 223#[cfg(test)] 224mod tests { 225 use super::{compute_direct_addresses, AdvertiseConfig}; 226 use std::net::{IpAddr, Ipv4Addr, SocketAddr}; 227 228 #[test] 229 fn direct_addresses_default_to_loopback() { 230 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; 231 let cfg = AdvertiseConfig::default(); 232 let addrs = compute_direct_addresses(&bound, &cfg); 233 assert_eq!(addrs, vec![SocketAddr::from(([127, 0, 0, 1], 1234))]); 234 } 235 236 #[test] 237 fn direct_addresses_use_advertised_ip() { 238 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; 239 let cfg = AdvertiseConfig { 240 addrs: None, 241 ip: Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 5))), 242 }; 243 let addrs = compute_direct_addresses(&bound, &cfg); 244 assert_eq!(addrs, vec![SocketAddr::from(([192, 168, 1, 5], 1234))]); 245 } 246 247 #[test] 248 fn direct_addresses_use_explicit_addrs() { 249 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))]; 250 let cfg = AdvertiseConfig { 251 addrs: Some(vec![SocketAddr::from(([10, 0, 0, 9], 9999))]), 252 ip: None, 253 }; 254 let addrs = compute_direct_addresses(&bound, &cfg); 255 assert_eq!(addrs, vec![SocketAddr::from(([10, 0, 0, 9], 9999))]); 256 } 257}