Lightweight decentralized “knot” server prototype using Iroh + ATProto.
at master 222 lines 6.8 kB view raw
1use std::path::PathBuf; 2use std::sync::Arc; 3 4use axum::body::Body; 5use axum::extract::State; 6use axum::http::{Request, StatusCode}; 7use axum::response::Response; 8use axum::routing::{get, post}; 9use axum::{Json, Router}; 10use http_body_util::BodyExt; 11use serde::{Deserialize, Serialize}; 12 13use crate::atproto::{AtProtoClient, AtProtoRecord, InMemoryAtProto}; 14use crate::cache::RepoCache; 15use crate::flow::sync_repo_from_head; 16use crate::git_backend::{GitBackend, GitRequest}; 17use crate::iroh_node::IrohNode; 18use crate::{Error, Result}; 19 20#[derive(Clone)] 21struct AppState { 22 node: Arc<IrohNode>, 23 cache_dir: PathBuf, 24} 25 26#[derive(Debug, Deserialize)] 27struct SyncRequest { 28 repo_id: String, 29 record_path: PathBuf, 30 cache_dir: Option<PathBuf>, 31} 32 33#[derive(Debug, Serialize)] 34struct SyncResponse { 35 repo_id: String, 36 head_peer: String, 37 manifest_peer: String, 38 packs_fetched: usize, 39 packs_skipped: usize, 40} 41 42pub async fn serve(addr: &str, cache_dir: PathBuf) -> Result<()> { 43 let node = Arc::new(IrohNode::new().await?); 44 let app = app(node, cache_dir); 45 46 let listener = tokio::net::TcpListener::bind(addr) 47 .await 48 .map_err(|err| Error::Invalid(format!("bind {addr}: {err}")))?; 49 axum::serve(listener, app) 50 .await 51 .map_err(|err| Error::Invalid(format!("server error: {err}")))?; 52 Ok(()) 53} 54 55pub fn app(node: Arc<IrohNode>, cache_dir: PathBuf) -> Router { 56 let state = AppState { node, cache_dir }; 57 Router::new() 58 .route("/health", get(health_handler)) 59 .route("/status", get(status_handler)) 60 .route("/git/*path", get(git_handler).post(git_handler)) 61 .route("/sync", post(sync_handler)) 62 .with_state(state) 63} 64 65async fn health_handler() -> StatusCode { 66 StatusCode::OK 67} 68 69#[derive(Debug, Serialize)] 70struct StatusResponse { 71 node_id: String, 72 cache_dir: String, 73} 74 75async fn status_handler( 76 State(state): State<AppState>, 77) -> std::result::Result<Json<StatusResponse>, (StatusCode, String)> { 78 Ok(Json(StatusResponse { 79 node_id: state.node.node_id().to_string(), 80 cache_dir: state.cache_dir.to_string_lossy().to_string(), 81 })) 82} 83 84async fn sync_handler( 85 State(state): State<AppState>, 86 Json(payload): Json<SyncRequest>, 87) -> std::result::Result<Json<SyncResponse>, (StatusCode, String)> { 88 let record_bytes = tokio::fs::read(&payload.record_path) 89 .await 90 .map_err(|err| map_err(err.into()))?; 91 let record: AtProtoRecord = 92 serde_json::from_slice(&record_bytes).map_err(|err| map_err(err.into()))?; 93 if record.repo_id != payload.repo_id { 94 return Err(( 95 StatusCode::BAD_REQUEST, 96 format!( 97 "record repo_id mismatch: expected {}, found {}", 98 payload.repo_id, record.repo_id 99 ), 100 )); 101 } 102 103 let atproto = InMemoryAtProto::default(); 104 atproto.put_record(&record).map_err(map_err)?; 105 let cache_root = payload.cache_dir.unwrap_or_else(|| state.cache_dir.clone()); 106 let cache = RepoCache::new(cache_root); 107 108 let summary = sync_repo_from_head(&atproto, &state.node, &cache, &payload.repo_id) 109 .await 110 .map_err(map_err)?; 111 112 Ok(Json(SyncResponse { 113 repo_id: payload.repo_id, 114 head_peer: summary.head_peer, 115 manifest_peer: summary.manifest_peer, 116 packs_fetched: summary.packs.fetched, 117 packs_skipped: summary.packs.skipped, 118 })) 119} 120 121async fn git_handler( 122 State(state): State<AppState>, 123 req: Request<Body>, 124) -> std::result::Result<Response, (StatusCode, String)> { 125 let (parts, body) = req.into_parts(); 126 let path_info = strip_git_prefix(parts.uri.path())?; 127 let query_string = parts.uri.query().unwrap_or("").to_string(); 128 let content_type = parts 129 .headers 130 .get(axum::http::header::CONTENT_TYPE) 131 .and_then(|value| value.to_str().ok()) 132 .map(|value| value.to_string()); 133 let body_bytes = body 134 .collect() 135 .await 136 .map_err(|err| map_err(Error::Invalid(format!("read body: {err}"))))? 137 .to_bytes() 138 .to_vec(); 139 140 let mut env = std::collections::HashMap::new(); 141 if let Some(user_agent) = parts.headers.get(axum::http::header::USER_AGENT) { 142 if let Ok(value) = user_agent.to_str() { 143 env.insert("HTTP_USER_AGENT".to_string(), value.to_string()); 144 } 145 } 146 147 let git = GitBackend::new(); 148 let response = git 149 .handle_http_backend(GitRequest { 150 method: parts.method.to_string(), 151 path_info, 152 query_string, 153 content_type, 154 body: body_bytes, 155 project_root: state.cache_dir.clone(), 156 env, 157 }) 158 .map_err(map_err)?; 159 160 let mut builder = Response::builder().status(response.status); 161 for (key, value) in response.headers { 162 if let (Ok(name), Ok(value)) = ( 163 key.parse::<axum::http::header::HeaderName>(), 164 value.parse::<axum::http::HeaderValue>(), 165 ) { 166 builder = builder.header(name, value); 167 } 168 } 169 Ok(builder.body(Body::from(response.body)).unwrap()) 170} 171 172fn strip_git_prefix(path: &str) -> std::result::Result<String, (StatusCode, String)> { 173 if let Some(stripped) = path.strip_prefix("/git") { 174 if stripped.is_empty() || stripped == "/" { 175 return Err((StatusCode::BAD_REQUEST, "missing git path".to_string())); 176 } 177 return Ok(stripped.to_string()); 178 } 179 Err((StatusCode::BAD_REQUEST, "invalid git path".to_string())) 180} 181 182#[cfg(test)] 183mod tests { 184 use super::strip_git_prefix; 185 use axum::http::StatusCode; 186 187 #[test] 188 fn strip_git_prefix_accepts_repo_path() { 189 let path = "/git/repo.git/info/refs"; 190 let result = strip_git_prefix(path).unwrap(); 191 assert_eq!(result, "/repo.git/info/refs"); 192 } 193 194 #[test] 195 fn strip_git_prefix_rejects_root() { 196 let err = strip_git_prefix("/git").unwrap_err(); 197 assert_eq!(err.0, StatusCode::BAD_REQUEST); 198 assert!(err.1.contains("missing git path")); 199 } 200 201 #[test] 202 fn strip_git_prefix_rejects_trailing_slash() { 203 let err = strip_git_prefix("/git/").unwrap_err(); 204 assert_eq!(err.0, StatusCode::BAD_REQUEST); 205 assert!(err.1.contains("missing git path")); 206 } 207 208 #[test] 209 fn strip_git_prefix_rejects_invalid_prefix() { 210 let err = strip_git_prefix("/foo/repo.git").unwrap_err(); 211 assert_eq!(err.0, StatusCode::BAD_REQUEST); 212 assert!(err.1.contains("invalid git path")); 213 } 214} 215 216fn map_err(err: Error) -> (StatusCode, String) { 217 match err { 218 Error::Invalid(msg) => (StatusCode::BAD_REQUEST, msg), 219 Error::NotFound(msg) => (StatusCode::NOT_FOUND, msg), 220 other => (StatusCode::INTERNAL_SERVER_ERROR, other.to_string()), 221 } 222}