Lightweight decentralized “knot” server prototype using Iroh + ATProto.
at master 330 lines 12 kB view raw
1use std::path::PathBuf; 2use std::sync::Arc; 3use std::io::ErrorKind; 4 5use crate::atproto::{AtProtoClient, AtProtoRecord, InMemoryAtProto, PeerHint}; 6use crate::atproto_remote::AtProtoRemote; 7use crate::cache::RepoCache; 8use crate::flow::sync_repo_from_head; 9use crate::iroh_node::IrohNode; 10use crate::push_flow::{publish_repo_update, PublishSummary}; 11use crate::registry::RepoEntry; 12use crate::repo_resolver::RepoResolver; 13use crate::seed_flow::seed_with_sync; 14use crate::{Error, Result}; 15use tracing::warn; 16use tokio::time::{timeout, Duration}; 17 18#[derive(Clone)] 19pub struct GitService { 20 node: Arc<IrohNode>, 21 resolver: RepoResolver, 22 cache_root: PathBuf, 23} 24 25#[derive(Debug, Clone)] 26pub struct PreparedRepo { 27 pub repo_id: String, 28 pub repo_path: PathBuf, 29 pub record: AtProtoRecord, 30 pub record_path: PathBuf, 31} 32 33impl GitService { 34 pub fn new(node: Arc<IrohNode>, resolver: RepoResolver, cache_root: impl Into<PathBuf>) -> Self { 35 Self { 36 node, 37 resolver, 38 cache_root: cache_root.into(), 39 } 40 } 41 42 pub async fn prepare_repo(&self, path: &str) -> Result<PreparedRepo> { 43 let entry = self.resolver.resolve(path)?; 44 self.prepare_repo_from_entry(&entry).await 45 } 46 47 pub async fn prepare_repo_from_entry(&self, entry: &RepoEntry) -> Result<PreparedRepo> { 48 let mut record = self.load_record(entry).await?; 49 50 let cache = RepoCache::new(self.cache_root.clone()); 51 52 // Refresh from ATProto on-demand so the first clone/pull on a follower machine 53 // doesn't depend on the background loop having already run. 54 if let Some(locator) = &entry.atproto { 55 let remote = AtProtoRemote::new(None); 56 match timeout(Duration::from_secs(5), remote.fetch_record(locator)).await { 57 Ok(Ok(remote_record)) => { 58 if remote_record.repo_id != entry.repo_id { 59 warn!( 60 repo_id = %entry.repo_id, 61 record_repo_id = %remote_record.repo_id, 62 "atproto record repo_id mismatch" 63 ); 64 } else { 65 record = merge_records(&remote_record, &record); 66 let json = serde_json::to_vec_pretty(&record)?; 67 tokio::fs::write(&entry.record_path, json).await?; 68 } 69 } 70 Ok(Err(err)) => { 71 warn!(error=%err, "atproto refresh during prepare failed"); 72 } 73 Err(_) => { 74 warn!("atproto refresh during prepare timed out"); 75 } 76 } 77 } 78 79 // If the record has no peers yet but the cache is warm, seed locally to bootstrap hints. 80 if record.peers.is_empty() { 81 match seed_with_sync(self.node.as_ref(), &cache, &entry.record_path, None).await { 82 Ok(outcome) => { 83 record = outcome.record; 84 } 85 Err(err) => { 86 warn!(error=%err, "seed bootstrap before prepare failed"); 87 } 88 } 89 } 90 91 let atproto = InMemoryAtProto::default(); 92 atproto.put_record(&record)?; 93 sync_repo_from_head(&atproto, &self.node, &cache, &entry.repo_id).await?; 94 let manifest = cache.read_manifest(&entry.repo_id)?; 95 let repo_path = cache.materialize_bare_repo(&entry.repo_id, &manifest)?; 96 97 Ok(PreparedRepo { 98 repo_id: entry.repo_id.clone(), 99 repo_path, 100 record, 101 record_path: entry.record_path.clone(), 102 }) 103 } 104 105 async fn load_record(&self, entry: &RepoEntry) -> Result<AtProtoRecord> { 106 match tokio::fs::read(&entry.record_path).await { 107 Ok(bytes) => { 108 let record: AtProtoRecord = serde_json::from_slice(&bytes)?; 109 if record.repo_id != entry.repo_id { 110 return Err(Error::Invalid(format!( 111 "record repo_id mismatch: expected {}, found {}", 112 entry.repo_id, record.repo_id 113 ))); 114 } 115 Ok(record) 116 } 117 Err(err) if err.kind() == ErrorKind::NotFound => { 118 let locator = entry.atproto.as_ref().ok_or_else(|| { 119 Error::NotFound(format!( 120 "record not found locally and no atproto locator for {}", 121 entry.repo_id 122 )) 123 })?; 124 let remote = AtProtoRemote::new(None); 125 let record = timeout(Duration::from_secs(5), remote.fetch_record(locator)) 126 .await 127 .map_err(|_| Error::Invalid("atproto fetch timed out".into()))??; 128 if record.repo_id != entry.repo_id { 129 return Err(Error::Invalid(format!( 130 "atproto record repo_id mismatch: expected {}, found {}", 131 entry.repo_id, record.repo_id 132 ))); 133 } 134 if let Some(parent) = entry.record_path.parent() { 135 tokio::fs::create_dir_all(parent).await?; 136 } 137 let json = serde_json::to_vec_pretty(&record)?; 138 tokio::fs::write(&entry.record_path, json).await?; 139 Ok(record) 140 } 141 Err(err) => Err(err.into()), 142 } 143 } 144 145 pub async fn publish_repo_update( 146 &self, 147 prepared: &PreparedRepo, 148 new_head: &str, 149 ) -> Result<PublishSummary> { 150 let cache = RepoCache::new(self.cache_root.clone()); 151 publish_repo_update( 152 self.node.as_ref(), 153 &cache, 154 &prepared.repo_id, 155 &prepared.repo_path, 156 new_head, 157 ) 158 .await 159 } 160 161 pub async fn write_back_record( 162 &self, 163 prepared: &PreparedRepo, 164 summary: &PublishSummary, 165 ) -> Result<AtProtoRecord> { 166 let mut record = prepared.record.clone(); 167 record.head = summary.head_hash.clone(); 168 record.manifest = summary.manifest_hash.clone(); 169 170 let ticket = self.node.node_ticket().await?; 171 let hint = PeerHint { 172 node_id: self.node.node_id().to_string(), 173 addrs: Vec::new(), 174 ticket: Some(ticket), 175 }; 176 upsert_peer_hint(&mut record.peers, hint); 177 178 if let Some(parent) = prepared.record_path.parent() { 179 tokio::fs::create_dir_all(parent).await?; 180 } 181 let json = serde_json::to_vec_pretty(&record)?; 182 tokio::fs::write(&prepared.record_path, json).await?; 183 Ok(record) 184 } 185} 186 187fn merge_records(remote: &AtProtoRecord, local: &AtProtoRecord) -> AtProtoRecord { 188 let mut merged = remote.clone(); 189 merged.peers = merge_peer_hints(&remote.peers, &local.peers); 190 merged 191} 192 193fn merge_peer_hints(remote: &[PeerHint], local: &[PeerHint]) -> Vec<PeerHint> { 194 let mut merged: Vec<PeerHint> = remote.to_vec(); 195 for peer in local { 196 if let Some(existing) = merged.iter_mut().find(|p| p.node_id == peer.node_id) { 197 merge_addresses(&mut existing.addrs, &peer.addrs); 198 if existing.ticket.is_none() { 199 existing.ticket = peer.ticket.clone(); 200 } 201 } else { 202 merged.push(peer.clone()); 203 } 204 } 205 for peer in &mut merged { 206 peer.addrs.sort(); 207 peer.addrs.dedup(); 208 } 209 merged 210} 211 212fn merge_addresses(existing: &mut Vec<String>, new_addrs: &[String]) { 213 let mut set: std::collections::BTreeSet<String> = existing.iter().cloned().collect(); 214 for addr in new_addrs { 215 set.insert(addr.clone()); 216 } 217 *existing = set.into_iter().collect(); 218} 219 220#[cfg(test)] 221mod tests { 222 use super::{merge_records, GitService}; 223 use crate::atproto::{AtProtoRecord, PeerHint}; 224 use crate::registry::{AtProtoLocator, RepoEntry, RepoRegistry}; 225 use crate::repo_resolver::RepoResolver; 226 use crate::iroh_node::IrohNode; 227 use axum::extract::Query; 228 use axum::routing::get; 229 use axum::{Json, Router}; 230 use serde::Deserialize; 231 use serde_json::json; 232 use std::fs; 233 use std::sync::Arc; 234 235 #[derive(Deserialize)] 236 struct RecordQuery { 237 repo: String, 238 collection: String, 239 rkey: String, 240 } 241 242 #[test] 243 fn merge_records_prefers_remote_head_and_keeps_remote_ticket() { 244 let mut remote = AtProtoRecord::new("did:plc:test/repo", "head-remote", "manifest-remote"); 245 remote.peers.push(PeerHint { 246 node_id: "node-1".to_string(), 247 addrs: Vec::new(), 248 ticket: Some("ticket-remote".to_string()), 249 }); 250 251 let mut local = AtProtoRecord::new("did:plc:test/repo", "head-local", "manifest-local"); 252 local.peers.push(PeerHint { 253 node_id: "node-1".to_string(), 254 addrs: vec!["127.0.0.1:1234".to_string()], 255 ticket: None, 256 }); 257 258 let merged = merge_records(&remote, &local); 259 assert_eq!(merged.head, "head-remote"); 260 assert_eq!(merged.manifest, "manifest-remote"); 261 assert_eq!(merged.peers.len(), 1); 262 assert_eq!(merged.peers[0].ticket.as_deref(), Some("ticket-remote")); 263 } 264 265 #[tokio::test] 266 async fn load_record_fetches_remote_when_missing_locally() { 267 let repo_id = "did:plc:test/repo"; 268 let remote_record = AtProtoRecord::new(repo_id, "head", "manifest"); 269 270 let app = Router::new().route( 271 "/xrpc/com.atproto.repo.getRecord", 272 get(move |Query(params): Query<RecordQuery>| async move { 273 assert_eq!(params.repo, "did:plc:test"); 274 assert_eq!(params.collection, "app.knot.repo"); 275 assert_eq!(params.rkey, "repo"); 276 let record = remote_record.clone(); 277 Json(json!({ "value": record })) 278 }), 279 ); 280 281 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); 282 let addr = listener.local_addr().unwrap(); 283 tokio::spawn(async move { 284 axum::serve(listener, app).await.unwrap(); 285 }); 286 287 let tmp = tempfile::tempdir().unwrap(); 288 let registry_path = tmp.path().join("registry.json"); 289 let record_path = tmp.path().join("records").join("repo.json"); 290 291 let mut registry = RepoRegistry::default(); 292 let entry = RepoEntry { 293 path: "/example.git".to_string(), 294 repo_id: repo_id.to_string(), 295 record_path: record_path.clone(), 296 atproto: Some(AtProtoLocator { 297 pds: format!("http://{}", addr), 298 repo: "did:plc:test".to_string(), 299 collection: "app.knot.repo".to_string(), 300 rkey: "repo".to_string(), 301 }), 302 }; 303 registry.upsert(entry.clone()); 304 registry.save(&registry_path).unwrap(); 305 306 let node = Arc::new(IrohNode::new().await.unwrap()); 307 let resolver = RepoResolver::new(&registry_path); 308 let service = GitService::new(node.clone(), resolver, tmp.path().join("cache")); 309 310 let loaded = service.load_record(&entry).await.unwrap(); 311 assert_eq!(loaded.repo_id, repo_id); 312 assert!(record_path.exists()); 313 314 if let Ok(node) = Arc::try_unwrap(node) { 315 node.shutdown().await.unwrap(); 316 } 317 // Ensure we wrote a valid record file. 318 let bytes = fs::read(record_path).unwrap(); 319 let parsed: AtProtoRecord = serde_json::from_slice(&bytes).unwrap(); 320 assert_eq!(parsed.repo_id, repo_id); 321 } 322} 323 324fn upsert_peer_hint(peers: &mut Vec<PeerHint>, hint: PeerHint) { 325 if let Some(existing) = peers.iter_mut().find(|peer| peer.node_id == hint.node_id) { 326 *existing = hint; 327 } else { 328 peers.push(hint); 329 } 330}