Lightweight decentralized “knot” server prototype using Iroh + ATProto.
1use std::path::PathBuf;
2use std::sync::Arc;
3use std::io::ErrorKind;
4
5use crate::atproto::{AtProtoClient, AtProtoRecord, InMemoryAtProto, PeerHint};
6use crate::atproto_remote::AtProtoRemote;
7use crate::cache::RepoCache;
8use crate::flow::sync_repo_from_head;
9use crate::iroh_node::IrohNode;
10use crate::push_flow::{publish_repo_update, PublishSummary};
11use crate::registry::RepoEntry;
12use crate::repo_resolver::RepoResolver;
13use crate::seed_flow::seed_with_sync;
14use crate::{Error, Result};
15use tracing::warn;
16use tokio::time::{timeout, Duration};
17
18#[derive(Clone)]
19pub struct GitService {
20 node: Arc<IrohNode>,
21 resolver: RepoResolver,
22 cache_root: PathBuf,
23}
24
25#[derive(Debug, Clone)]
26pub struct PreparedRepo {
27 pub repo_id: String,
28 pub repo_path: PathBuf,
29 pub record: AtProtoRecord,
30 pub record_path: PathBuf,
31}
32
33impl GitService {
34 pub fn new(node: Arc<IrohNode>, resolver: RepoResolver, cache_root: impl Into<PathBuf>) -> Self {
35 Self {
36 node,
37 resolver,
38 cache_root: cache_root.into(),
39 }
40 }
41
42 pub async fn prepare_repo(&self, path: &str) -> Result<PreparedRepo> {
43 let entry = self.resolver.resolve(path)?;
44 self.prepare_repo_from_entry(&entry).await
45 }
46
47 pub async fn prepare_repo_from_entry(&self, entry: &RepoEntry) -> Result<PreparedRepo> {
48 let mut record = self.load_record(entry).await?;
49
50 let cache = RepoCache::new(self.cache_root.clone());
51
52 // Refresh from ATProto on-demand so the first clone/pull on a follower machine
53 // doesn't depend on the background loop having already run.
54 if let Some(locator) = &entry.atproto {
55 let remote = AtProtoRemote::new(None);
56 match timeout(Duration::from_secs(5), remote.fetch_record(locator)).await {
57 Ok(Ok(remote_record)) => {
58 if remote_record.repo_id != entry.repo_id {
59 warn!(
60 repo_id = %entry.repo_id,
61 record_repo_id = %remote_record.repo_id,
62 "atproto record repo_id mismatch"
63 );
64 } else {
65 record = merge_records(&remote_record, &record);
66 let json = serde_json::to_vec_pretty(&record)?;
67 tokio::fs::write(&entry.record_path, json).await?;
68 }
69 }
70 Ok(Err(err)) => {
71 warn!(error=%err, "atproto refresh during prepare failed");
72 }
73 Err(_) => {
74 warn!("atproto refresh during prepare timed out");
75 }
76 }
77 }
78
79 // If the record has no peers yet but the cache is warm, seed locally to bootstrap hints.
80 if record.peers.is_empty() {
81 match seed_with_sync(self.node.as_ref(), &cache, &entry.record_path, None).await {
82 Ok(outcome) => {
83 record = outcome.record;
84 }
85 Err(err) => {
86 warn!(error=%err, "seed bootstrap before prepare failed");
87 }
88 }
89 }
90
91 let atproto = InMemoryAtProto::default();
92 atproto.put_record(&record)?;
93 sync_repo_from_head(&atproto, &self.node, &cache, &entry.repo_id).await?;
94 let manifest = cache.read_manifest(&entry.repo_id)?;
95 let repo_path = cache.materialize_bare_repo(&entry.repo_id, &manifest)?;
96
97 Ok(PreparedRepo {
98 repo_id: entry.repo_id.clone(),
99 repo_path,
100 record,
101 record_path: entry.record_path.clone(),
102 })
103 }
104
105 async fn load_record(&self, entry: &RepoEntry) -> Result<AtProtoRecord> {
106 match tokio::fs::read(&entry.record_path).await {
107 Ok(bytes) => {
108 let record: AtProtoRecord = serde_json::from_slice(&bytes)?;
109 if record.repo_id != entry.repo_id {
110 return Err(Error::Invalid(format!(
111 "record repo_id mismatch: expected {}, found {}",
112 entry.repo_id, record.repo_id
113 )));
114 }
115 Ok(record)
116 }
117 Err(err) if err.kind() == ErrorKind::NotFound => {
118 let locator = entry.atproto.as_ref().ok_or_else(|| {
119 Error::NotFound(format!(
120 "record not found locally and no atproto locator for {}",
121 entry.repo_id
122 ))
123 })?;
124 let remote = AtProtoRemote::new(None);
125 let record = timeout(Duration::from_secs(5), remote.fetch_record(locator))
126 .await
127 .map_err(|_| Error::Invalid("atproto fetch timed out".into()))??;
128 if record.repo_id != entry.repo_id {
129 return Err(Error::Invalid(format!(
130 "atproto record repo_id mismatch: expected {}, found {}",
131 entry.repo_id, record.repo_id
132 )));
133 }
134 if let Some(parent) = entry.record_path.parent() {
135 tokio::fs::create_dir_all(parent).await?;
136 }
137 let json = serde_json::to_vec_pretty(&record)?;
138 tokio::fs::write(&entry.record_path, json).await?;
139 Ok(record)
140 }
141 Err(err) => Err(err.into()),
142 }
143 }
144
145 pub async fn publish_repo_update(
146 &self,
147 prepared: &PreparedRepo,
148 new_head: &str,
149 ) -> Result<PublishSummary> {
150 let cache = RepoCache::new(self.cache_root.clone());
151 publish_repo_update(
152 self.node.as_ref(),
153 &cache,
154 &prepared.repo_id,
155 &prepared.repo_path,
156 new_head,
157 )
158 .await
159 }
160
161 pub async fn write_back_record(
162 &self,
163 prepared: &PreparedRepo,
164 summary: &PublishSummary,
165 ) -> Result<AtProtoRecord> {
166 let mut record = prepared.record.clone();
167 record.head = summary.head_hash.clone();
168 record.manifest = summary.manifest_hash.clone();
169
170 let ticket = self.node.node_ticket().await?;
171 let hint = PeerHint {
172 node_id: self.node.node_id().to_string(),
173 addrs: Vec::new(),
174 ticket: Some(ticket),
175 };
176 upsert_peer_hint(&mut record.peers, hint);
177
178 if let Some(parent) = prepared.record_path.parent() {
179 tokio::fs::create_dir_all(parent).await?;
180 }
181 let json = serde_json::to_vec_pretty(&record)?;
182 tokio::fs::write(&prepared.record_path, json).await?;
183 Ok(record)
184 }
185}
186
187fn merge_records(remote: &AtProtoRecord, local: &AtProtoRecord) -> AtProtoRecord {
188 let mut merged = remote.clone();
189 merged.peers = merge_peer_hints(&remote.peers, &local.peers);
190 merged
191}
192
193fn merge_peer_hints(remote: &[PeerHint], local: &[PeerHint]) -> Vec<PeerHint> {
194 let mut merged: Vec<PeerHint> = remote.to_vec();
195 for peer in local {
196 if let Some(existing) = merged.iter_mut().find(|p| p.node_id == peer.node_id) {
197 merge_addresses(&mut existing.addrs, &peer.addrs);
198 if existing.ticket.is_none() {
199 existing.ticket = peer.ticket.clone();
200 }
201 } else {
202 merged.push(peer.clone());
203 }
204 }
205 for peer in &mut merged {
206 peer.addrs.sort();
207 peer.addrs.dedup();
208 }
209 merged
210}
211
212fn merge_addresses(existing: &mut Vec<String>, new_addrs: &[String]) {
213 let mut set: std::collections::BTreeSet<String> = existing.iter().cloned().collect();
214 for addr in new_addrs {
215 set.insert(addr.clone());
216 }
217 *existing = set.into_iter().collect();
218}
219
220#[cfg(test)]
221mod tests {
222 use super::{merge_records, GitService};
223 use crate::atproto::{AtProtoRecord, PeerHint};
224 use crate::registry::{AtProtoLocator, RepoEntry, RepoRegistry};
225 use crate::repo_resolver::RepoResolver;
226 use crate::iroh_node::IrohNode;
227 use axum::extract::Query;
228 use axum::routing::get;
229 use axum::{Json, Router};
230 use serde::Deserialize;
231 use serde_json::json;
232 use std::fs;
233 use std::sync::Arc;
234
235 #[derive(Deserialize)]
236 struct RecordQuery {
237 repo: String,
238 collection: String,
239 rkey: String,
240 }
241
242 #[test]
243 fn merge_records_prefers_remote_head_and_keeps_remote_ticket() {
244 let mut remote = AtProtoRecord::new("did:plc:test/repo", "head-remote", "manifest-remote");
245 remote.peers.push(PeerHint {
246 node_id: "node-1".to_string(),
247 addrs: Vec::new(),
248 ticket: Some("ticket-remote".to_string()),
249 });
250
251 let mut local = AtProtoRecord::new("did:plc:test/repo", "head-local", "manifest-local");
252 local.peers.push(PeerHint {
253 node_id: "node-1".to_string(),
254 addrs: vec!["127.0.0.1:1234".to_string()],
255 ticket: None,
256 });
257
258 let merged = merge_records(&remote, &local);
259 assert_eq!(merged.head, "head-remote");
260 assert_eq!(merged.manifest, "manifest-remote");
261 assert_eq!(merged.peers.len(), 1);
262 assert_eq!(merged.peers[0].ticket.as_deref(), Some("ticket-remote"));
263 }
264
265 #[tokio::test]
266 async fn load_record_fetches_remote_when_missing_locally() {
267 let repo_id = "did:plc:test/repo";
268 let remote_record = AtProtoRecord::new(repo_id, "head", "manifest");
269
270 let app = Router::new().route(
271 "/xrpc/com.atproto.repo.getRecord",
272 get(move |Query(params): Query<RecordQuery>| async move {
273 assert_eq!(params.repo, "did:plc:test");
274 assert_eq!(params.collection, "app.knot.repo");
275 assert_eq!(params.rkey, "repo");
276 let record = remote_record.clone();
277 Json(json!({ "value": record }))
278 }),
279 );
280
281 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
282 let addr = listener.local_addr().unwrap();
283 tokio::spawn(async move {
284 axum::serve(listener, app).await.unwrap();
285 });
286
287 let tmp = tempfile::tempdir().unwrap();
288 let registry_path = tmp.path().join("registry.json");
289 let record_path = tmp.path().join("records").join("repo.json");
290
291 let mut registry = RepoRegistry::default();
292 let entry = RepoEntry {
293 path: "/example.git".to_string(),
294 repo_id: repo_id.to_string(),
295 record_path: record_path.clone(),
296 atproto: Some(AtProtoLocator {
297 pds: format!("http://{}", addr),
298 repo: "did:plc:test".to_string(),
299 collection: "app.knot.repo".to_string(),
300 rkey: "repo".to_string(),
301 }),
302 };
303 registry.upsert(entry.clone());
304 registry.save(®istry_path).unwrap();
305
306 let node = Arc::new(IrohNode::new().await.unwrap());
307 let resolver = RepoResolver::new(®istry_path);
308 let service = GitService::new(node.clone(), resolver, tmp.path().join("cache"));
309
310 let loaded = service.load_record(&entry).await.unwrap();
311 assert_eq!(loaded.repo_id, repo_id);
312 assert!(record_path.exists());
313
314 if let Ok(node) = Arc::try_unwrap(node) {
315 node.shutdown().await.unwrap();
316 }
317 // Ensure we wrote a valid record file.
318 let bytes = fs::read(record_path).unwrap();
319 let parsed: AtProtoRecord = serde_json::from_slice(&bytes).unwrap();
320 assert_eq!(parsed.repo_id, repo_id);
321 }
322}
323
324fn upsert_peer_hint(peers: &mut Vec<PeerHint>, hint: PeerHint) {
325 if let Some(existing) = peers.iter_mut().find(|peer| peer.node_id == hint.node_id) {
326 *existing = hint;
327 } else {
328 peers.push(hint);
329 }
330}