Lightweight decentralized “knot” server prototype using Iroh + ATProto.
1use crate::manifest::Manifest;
2use crate::{Error, Result};
3use std::sync::Mutex;
4use iroh::protocol::Router;
5use iroh::{Endpoint, NodeAddr, PublicKey, Watcher};
6use iroh_base::ticket::NodeTicket;
7use iroh_blobs::store::mem::MemStore;
8use iroh_blobs::api::TempTag;
9use iroh_blobs::{BlobsProtocol, Hash};
10use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
11use std::str::FromStr;
12use tokio::time::{timeout, Duration};
13
14fn wrap_err(context: &str, err: impl std::fmt::Display) -> Error {
15 Error::Invalid(format!("{context}: {err}"))
16}
17
18pub struct IrohNode {
19 endpoint: Endpoint,
20 store: MemStore,
21 router: Router,
22 pinned: Mutex<Vec<TempTag>>,
23}
24
25impl IrohNode {
26 pub async fn new() -> Result<Self> {
27 let endpoint = Endpoint::builder()
28 .discovery_n0()
29 .bind()
30 .await
31 .map_err(|err| wrap_err("endpoint bind", err))?;
32 let store = MemStore::default();
33 let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
34 let router = Router::builder(endpoint.clone())
35 .accept(iroh_blobs::ALPN, blobs)
36 .spawn();
37
38 Ok(Self {
39 endpoint,
40 store,
41 router,
42 pinned: Mutex::new(Vec::new()),
43 })
44 }
45
46 pub async fn node_addr(&self) -> Result<NodeAddr> {
47 let bound = self.endpoint.bound_sockets();
48 if bound.is_empty() {
49 return Err(Error::Invalid("no bound sockets available".into()));
50 }
51 let advertise = load_advertise_config()?;
52 let direct_addrs = compute_direct_addresses(&bound, &advertise);
53 let node_id = self.endpoint.node_id();
54 Ok(NodeAddr::new(node_id).with_direct_addresses(direct_addrs))
55 }
56
57 pub fn node_id(&self) -> PublicKey {
58 self.endpoint.node_id()
59 }
60
61 pub async fn node_ticket(&self) -> Result<String> {
62 let relay_url = timeout(Duration::from_secs(5), async {
63 let mut watcher = self.endpoint.home_relay();
64 watcher.initialized().await
65 })
66 .await
67 .ok();
68
69 // Prefer a relay-only ticket to avoid embedding stale IP addresses.
70 let node_addr = if let Some(relay_url) = relay_url {
71 NodeAddr::new(self.node_id()).with_relay_url(relay_url)
72 } else {
73 // Fall back to direct addresses for local/offline environments.
74 self.node_addr().await?
75 };
76
77 Ok(NodeTicket::from(node_addr).to_string())
78 }
79
80 pub fn add_peer(&self, addr: NodeAddr) -> Result<()> {
81 self.endpoint
82 .add_node_addr(addr)
83 .map_err(|err| wrap_err("add_node_addr", err))?;
84 Ok(())
85 }
86
87 pub async fn add_bytes(&self, data: &[u8]) -> Result<Hash> {
88 let tag = self
89 .store
90 .add_slice(data)
91 .temp_tag()
92 .await
93 .map_err(|err| wrap_err("store add_slice", err))?;
94 let hash = *tag.hash();
95 let mut guard = self
96 .pinned
97 .lock()
98 .map_err(|_| Error::Invalid("pinned lock poisoned".into()))?;
99 guard.push(tag);
100 Ok(hash)
101 }
102
103 pub async fn add_manifest(&self, manifest: &Manifest) -> Result<Hash> {
104 let bytes = manifest.to_bytes_pretty()?;
105 self.add_bytes(&bytes).await
106 }
107
108 pub async fn get_manifest(&self, hash: Hash) -> Result<Manifest> {
109 let bytes = self
110 .store
111 .get_bytes(hash)
112 .await
113 .map_err(|err| wrap_err("store get_bytes", err))?;
114 Manifest::from_bytes(&bytes)
115 }
116
117 pub async fn get_bytes(&self, hash: Hash) -> Result<Vec<u8>> {
118 let bytes = self
119 .store
120 .get_bytes(hash)
121 .await
122 .map_err(|err| wrap_err("store get_bytes", err))?;
123 Ok(bytes.to_vec())
124 }
125
126 pub async fn fetch_manifest_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result<Manifest> {
127 let bytes = self.fetch_from_peer(hash, peer).await?;
128 Manifest::from_bytes(&bytes)
129 }
130
131 pub async fn fetch_from_peer(&self, hash: Hash, peer: NodeAddr) -> Result<Vec<u8>> {
132 // If the peer is this node, fetch directly from the local store.
133 if peer.node_id == self.node_id() {
134 return self.get_bytes(hash).await;
135 }
136 self.add_peer(peer.clone())?;
137 let conn = self
138 .endpoint
139 .connect(peer, iroh_blobs::ALPN)
140 .await
141 .map_err(|err| wrap_err("endpoint connect", err))?;
142 let remote = self.store.remote();
143 remote
144 .fetch(conn, hash)
145 .await
146 .map_err(|err| wrap_err("remote fetch", err))?;
147 let bytes = self
148 .store
149 .get_bytes(hash)
150 .await
151 .map_err(|err| wrap_err("store get_bytes", err))?;
152 Ok(bytes.to_vec())
153 }
154
155
156 pub async fn shutdown(self) -> Result<()> {
157 self.router
158 .shutdown()
159 .await
160 .map_err(|err| wrap_err("router shutdown", err))?;
161 let _ = self.store.shutdown().await;
162 Ok(())
163 }
164}
165
166#[derive(Debug, Clone, Default)]
167struct AdvertiseConfig {
168 addrs: Option<Vec<SocketAddr>>,
169 ip: Option<IpAddr>,
170}
171
172fn load_advertise_config() -> Result<AdvertiseConfig> {
173 let addrs = std::env::var("KNOT_ADVERTISE_ADDRS").ok();
174 if let Some(raw) = addrs {
175 let mut parsed = Vec::new();
176 for part in raw.split(',') {
177 let trimmed = part.trim();
178 if trimmed.is_empty() {
179 continue;
180 }
181 let addr = SocketAddr::from_str(trimmed).map_err(|err| {
182 Error::Invalid(format!("invalid KNOT_ADVERTISE_ADDRS entry {trimmed}: {err}"))
183 })?;
184 parsed.push(addr);
185 }
186 if !parsed.is_empty() {
187 return Ok(AdvertiseConfig {
188 addrs: Some(parsed),
189 ip: None,
190 });
191 }
192 }
193 let ip = std::env::var("KNOT_ADVERTISE_IP")
194 .ok()
195 .map(|raw| {
196 IpAddr::from_str(raw.trim()).map_err(|err| {
197 Error::Invalid(format!("invalid KNOT_ADVERTISE_IP {}: {err}", raw.trim()))
198 })
199 })
200 .transpose()?;
201 Ok(AdvertiseConfig { addrs: None, ip })
202}
203
204fn compute_direct_addresses(bound: &[SocketAddr], advertise: &AdvertiseConfig) -> Vec<SocketAddr> {
205 if let Some(addrs) = &advertise.addrs {
206 return addrs.clone();
207 }
208 if let Some(ip) = advertise.ip {
209 return bound
210 .iter()
211 .map(|addr| SocketAddr::new(ip, addr.port()))
212 .collect();
213 }
214 bound
215 .iter()
216 .map(|addr| match addr {
217 SocketAddr::V4(v4) => SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), v4.port()),
218 SocketAddr::V6(v6) => SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), v6.port()),
219 })
220 .collect()
221}
222
223#[cfg(test)]
224mod tests {
225 use super::{compute_direct_addresses, AdvertiseConfig};
226 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
227
228 #[test]
229 fn direct_addresses_default_to_loopback() {
230 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))];
231 let cfg = AdvertiseConfig::default();
232 let addrs = compute_direct_addresses(&bound, &cfg);
233 assert_eq!(addrs, vec![SocketAddr::from(([127, 0, 0, 1], 1234))]);
234 }
235
236 #[test]
237 fn direct_addresses_use_advertised_ip() {
238 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))];
239 let cfg = AdvertiseConfig {
240 addrs: None,
241 ip: Some(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 5))),
242 };
243 let addrs = compute_direct_addresses(&bound, &cfg);
244 assert_eq!(addrs, vec![SocketAddr::from(([192, 168, 1, 5], 1234))]);
245 }
246
247 #[test]
248 fn direct_addresses_use_explicit_addrs() {
249 let bound = vec![SocketAddr::from(([0, 0, 0, 0], 1234))];
250 let cfg = AdvertiseConfig {
251 addrs: Some(vec![SocketAddr::from(([10, 0, 0, 9], 9999))]),
252 ip: None,
253 };
254 let addrs = compute_direct_addresses(&bound, &cfg);
255 assert_eq!(addrs, vec![SocketAddr::from(([10, 0, 0, 9], 9999))]);
256 }
257}