this repo has no description
1use crate::state::AppState; 2use crate::sync::import::{apply_import, parse_car, ImportError}; 3use crate::sync::verify::CarVerifier; 4use axum::{ 5 body::Bytes, 6 extract::State, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use serde_json::json; 12use tracing::{debug, error, info, warn}; 13 14const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 15const DEFAULT_MAX_BLOCKS: usize = 50000; 16 17pub async fn import_repo( 18 State(state): State<AppState>, 19 headers: axum::http::HeaderMap, 20 body: Bytes, 21) -> Response { 22 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 23 .map(|v| v != "false" && v != "0") 24 .unwrap_or(true); 25 26 if !accepting_imports { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Service is not accepting repo imports" 32 })), 33 ) 34 .into_response(); 35 } 36 37 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 38 .ok() 39 .and_then(|s| s.parse().ok()) 40 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 41 42 if body.len() > max_size { 43 return ( 44 StatusCode::PAYLOAD_TOO_LARGE, 45 Json(json!({ 46 "error": "InvalidRequest", 47 "message": format!("Import size exceeds limit of {} bytes", max_size) 48 })), 49 ) 50 .into_response(); 51 } 52 53 let token = match crate::auth::extract_bearer_token_from_header( 54 headers.get("Authorization").and_then(|h| h.to_str().ok()), 55 ) { 56 Some(t) => t, 57 None => { 58 return ( 59 StatusCode::UNAUTHORIZED, 60 Json(json!({"error": "AuthenticationRequired"})), 61 ) 62 .into_response(); 63 } 64 }; 65 66 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 67 Ok(user) => user, 68 Err(e) => { 69 return ( 70 StatusCode::UNAUTHORIZED, 71 Json(json!({"error": "AuthenticationFailed", "message": e})), 72 ) 73 .into_response(); 74 } 75 }; 76 77 let did = &auth_user.did; 78 79 let user = match sqlx::query!( 80 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1", 81 did 82 ) 83 .fetch_optional(&state.db) 84 .await 85 { 86 Ok(Some(row)) => row, 87 Ok(None) => { 88 return ( 89 StatusCode::NOT_FOUND, 90 Json(json!({"error": "AccountNotFound"})), 91 ) 92 .into_response(); 93 } 94 Err(e) => { 95 error!("DB error fetching user: {:?}", e); 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError"})), 99 ) 100 .into_response(); 101 } 102 }; 103 104 if user.deactivated_at.is_some() { 105 return ( 106 StatusCode::FORBIDDEN, 107 Json(json!({ 108 "error": "AccountDeactivated", 109 "message": "Account is deactivated" 110 })), 111 ) 112 .into_response(); 113 } 114 115 if user.takedown_ref.is_some() { 116 return ( 117 StatusCode::FORBIDDEN, 118 Json(json!({ 119 "error": "AccountTakenDown", 120 "message": "Account has been taken down" 121 })), 122 ) 123 .into_response(); 124 } 125 126 let user_id = user.id; 127 128 let (root, blocks) = match parse_car(&body).await { 129 Ok((r, b)) => (r, b), 130 Err(ImportError::InvalidRootCount) => { 131 return ( 132 StatusCode::BAD_REQUEST, 133 Json(json!({ 134 "error": "InvalidRequest", 135 "message": "Expected exactly one root in CAR file" 136 })), 137 ) 138 .into_response(); 139 } 140 Err(ImportError::CarParse(msg)) => { 141 return ( 142 StatusCode::BAD_REQUEST, 143 Json(json!({ 144 "error": "InvalidRequest", 145 "message": format!("Failed to parse CAR file: {}", msg) 146 })), 147 ) 148 .into_response(); 149 } 150 Err(e) => { 151 error!("CAR parsing error: {:?}", e); 152 return ( 153 StatusCode::BAD_REQUEST, 154 Json(json!({ 155 "error": "InvalidRequest", 156 "message": format!("Invalid CAR file: {}", e) 157 })), 158 ) 159 .into_response(); 160 } 161 }; 162 163 info!( 164 "Importing repo for user {}: {} blocks, root {}", 165 did, 166 blocks.len(), 167 root 168 ); 169 170 let root_block = match blocks.get(&root) { 171 Some(b) => b, 172 None => { 173 return ( 174 StatusCode::BAD_REQUEST, 175 Json(json!({ 176 "error": "InvalidRequest", 177 "message": "Root block not found in CAR file" 178 })), 179 ) 180 .into_response(); 181 } 182 }; 183 184 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 185 Ok(commit) => commit.did().to_string(), 186 Err(e) => { 187 return ( 188 StatusCode::BAD_REQUEST, 189 Json(json!({ 190 "error": "InvalidRequest", 191 "message": format!("Invalid commit: {}", e) 192 })), 193 ) 194 .into_response(); 195 } 196 }; 197 198 if commit_did != *did { 199 return ( 200 StatusCode::FORBIDDEN, 201 Json(json!({ 202 "error": "InvalidRequest", 203 "message": format!( 204 "CAR file is for DID {} but you are authenticated as {}", 205 commit_did, did 206 ) 207 })), 208 ) 209 .into_response(); 210 } 211 212 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 213 .map(|v| v == "true" || v == "1") 214 .unwrap_or(false); 215 216 if !skip_verification { 217 debug!("Verifying CAR file signature and structure for DID {}", did); 218 let verifier = CarVerifier::new(); 219 220 match verifier.verify_car(did, &root, &blocks).await { 221 Ok(verified) => { 222 debug!( 223 "CAR verification successful: rev={}, data_cid={}", 224 verified.rev, verified.data_cid 225 ); 226 } 227 Err(crate::sync::verify::VerifyError::DidMismatch { 228 commit_did, 229 expected_did, 230 }) => { 231 return ( 232 StatusCode::FORBIDDEN, 233 Json(json!({ 234 "error": "InvalidRequest", 235 "message": format!( 236 "CAR file is for DID {} but you are authenticated as {}", 237 commit_did, expected_did 238 ) 239 })), 240 ) 241 .into_response(); 242 } 243 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 244 return ( 245 StatusCode::BAD_REQUEST, 246 Json(json!({ 247 "error": "InvalidSignature", 248 "message": "CAR file commit signature verification failed" 249 })), 250 ) 251 .into_response(); 252 } 253 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 254 warn!("DID resolution failed during import verification: {}", msg); 255 return ( 256 StatusCode::BAD_REQUEST, 257 Json(json!({ 258 "error": "InvalidRequest", 259 "message": format!("Failed to verify DID: {}", msg) 260 })), 261 ) 262 .into_response(); 263 } 264 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 265 return ( 266 StatusCode::BAD_REQUEST, 267 Json(json!({ 268 "error": "InvalidRequest", 269 "message": "DID document does not contain a signing key" 270 })), 271 ) 272 .into_response(); 273 } 274 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 275 return ( 276 StatusCode::BAD_REQUEST, 277 Json(json!({ 278 "error": "InvalidRequest", 279 "message": format!("MST validation failed: {}", msg) 280 })), 281 ) 282 .into_response(); 283 } 284 Err(e) => { 285 error!("CAR verification error: {:?}", e); 286 return ( 287 StatusCode::BAD_REQUEST, 288 Json(json!({ 289 "error": "InvalidRequest", 290 "message": format!("CAR verification failed: {}", e) 291 })), 292 ) 293 .into_response(); 294 } 295 } 296 } else { 297 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)"); 298 } 299 300 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 301 .ok() 302 .and_then(|s| s.parse().ok()) 303 .unwrap_or(DEFAULT_MAX_BLOCKS); 304 305 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 306 Ok(records) => { 307 info!( 308 "Successfully imported {} records for user {}", 309 records.len(), 310 did 311 ); 312 313 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 314 warn!("Failed to sequence import event: {:?}", e); 315 } 316 317 (StatusCode::OK, Json(json!({}))).into_response() 318 } 319 Err(ImportError::SizeLimitExceeded) => ( 320 StatusCode::BAD_REQUEST, 321 Json(json!({ 322 "error": "InvalidRequest", 323 "message": format!("Import exceeds block limit of {}", max_blocks) 324 })), 325 ) 326 .into_response(), 327 Err(ImportError::RepoNotFound) => ( 328 StatusCode::NOT_FOUND, 329 Json(json!({ 330 "error": "RepoNotFound", 331 "message": "Repository not initialized for this account" 332 })), 333 ) 334 .into_response(), 335 Err(ImportError::InvalidCbor(msg)) => ( 336 StatusCode::BAD_REQUEST, 337 Json(json!({ 338 "error": "InvalidRequest", 339 "message": format!("Invalid CBOR data: {}", msg) 340 })), 341 ) 342 .into_response(), 343 Err(ImportError::InvalidCommit(msg)) => ( 344 StatusCode::BAD_REQUEST, 345 Json(json!({ 346 "error": "InvalidRequest", 347 "message": format!("Invalid commit structure: {}", msg) 348 })), 349 ) 350 .into_response(), 351 Err(ImportError::BlockNotFound(cid)) => ( 352 StatusCode::BAD_REQUEST, 353 Json(json!({ 354 "error": "InvalidRequest", 355 "message": format!("Referenced block not found in CAR: {}", cid) 356 })), 357 ) 358 .into_response(), 359 Err(ImportError::ConcurrentModification) => ( 360 StatusCode::CONFLICT, 361 Json(json!({ 362 "error": "ConcurrentModification", 363 "message": "Repository is being modified by another operation, please retry" 364 })), 365 ) 366 .into_response(), 367 Err(ImportError::VerificationFailed(ve)) => ( 368 StatusCode::BAD_REQUEST, 369 Json(json!({ 370 "error": "VerificationFailed", 371 "message": format!("CAR verification failed: {}", ve) 372 })), 373 ) 374 .into_response(), 375 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 376 StatusCode::FORBIDDEN, 377 Json(json!({ 378 "error": "DidMismatch", 379 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 380 })), 381 ) 382 .into_response(), 383 Err(e) => { 384 error!("Import error: {:?}", e); 385 ( 386 StatusCode::INTERNAL_SERVER_ERROR, 387 Json(json!({"error": "InternalError"})), 388 ) 389 .into_response() 390 } 391 } 392} 393 394async fn sequence_import_event( 395 state: &AppState, 396 did: &str, 397 commit_cid: &str, 398) -> Result<(), sqlx::Error> { 399 let prev_cid: Option<String> = None; 400 let ops = serde_json::json!([]); 401 let blobs: Vec<String> = vec![]; 402 let blocks_cids: Vec<String> = vec![]; 403 404 sqlx::query!( 405 r#" 406 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 407 VALUES ($1, 'commit', $2, $3, $4, $5, $6) 408 "#, 409 did, 410 commit_cid, 411 prev_cid, 412 ops, 413 &blobs, 414 &blocks_cids 415 ) 416 .execute(&state.db) 417 .await?; 418 419 Ok(()) 420}