this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{apply_import, parse_car, ImportError}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 body::Bytes, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10 Json, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14 15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 16const DEFAULT_MAX_BLOCKS: usize = 50000; 17 18pub async fn import_repo( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 24 .map(|v| v != "false" && v != "0") 25 .unwrap_or(true); 26 27 if !accepting_imports { 28 return ( 29 StatusCode::BAD_REQUEST, 30 Json(json!({ 31 "error": "InvalidRequest", 32 "message": "Service is not accepting repo imports" 33 })), 34 ) 35 .into_response(); 36 } 37 38 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 39 .ok() 40 .and_then(|s| s.parse().ok()) 41 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 42 43 if body.len() > max_size { 44 return ( 45 StatusCode::PAYLOAD_TOO_LARGE, 46 Json(json!({ 47 "error": "InvalidRequest", 48 "message": format!("Import size exceeds limit of {} bytes", max_size) 49 })), 50 ) 51 .into_response(); 52 } 53 54 let token = match crate::auth::extract_bearer_token_from_header( 55 headers.get("Authorization").and_then(|h| h.to_str().ok()), 56 ) { 57 Some(t) => t, 58 None => return ApiError::AuthenticationRequired.into_response(), 59 }; 60 61 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 62 Ok(user) => user, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 66 let did = &auth_user.did; 67 68 let user = match sqlx::query!( 69 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1", 70 did 71 ) 72 .fetch_optional(&state.db) 73 .await 74 { 75 Ok(Some(row)) => row, 76 Ok(None) => { 77 return ( 78 StatusCode::NOT_FOUND, 79 Json(json!({"error": "AccountNotFound"})), 80 ) 81 .into_response(); 82 } 83 Err(e) => { 84 error!("DB error fetching user: {:?}", e); 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError"})), 88 ) 89 .into_response(); 90 } 91 }; 92 93 if user.deactivated_at.is_some() { 94 return ( 95 StatusCode::FORBIDDEN, 96 Json(json!({ 97 "error": "AccountDeactivated", 98 "message": "Account is deactivated" 99 })), 100 ) 101 .into_response(); 102 } 103 104 if user.takedown_ref.is_some() { 105 return ( 106 StatusCode::FORBIDDEN, 107 Json(json!({ 108 "error": "AccountTakenDown", 109 "message": "Account has been taken down" 110 })), 111 ) 112 .into_response(); 113 } 114 115 let user_id = user.id; 116 117 let (root, blocks) = match parse_car(&body).await { 118 Ok((r, b)) => (r, b), 119 Err(ImportError::InvalidRootCount) => { 120 return ( 121 StatusCode::BAD_REQUEST, 122 Json(json!({ 123 "error": "InvalidRequest", 124 "message": "Expected exactly one root in CAR file" 125 })), 126 ) 127 .into_response(); 128 } 129 Err(ImportError::CarParse(msg)) => { 130 return ( 131 StatusCode::BAD_REQUEST, 132 Json(json!({ 133 "error": "InvalidRequest", 134 "message": format!("Failed to parse CAR file: {}", msg) 135 })), 136 ) 137 .into_response(); 138 } 139 Err(e) => { 140 error!("CAR parsing error: {:?}", e); 141 return ( 142 StatusCode::BAD_REQUEST, 143 Json(json!({ 144 "error": "InvalidRequest", 145 "message": format!("Invalid CAR file: {}", e) 146 })), 147 ) 148 .into_response(); 149 } 150 }; 151 152 info!( 153 "Importing repo for user {}: {} blocks, root {}", 154 did, 155 blocks.len(), 156 root 157 ); 158 159 let root_block = match blocks.get(&root) { 160 Some(b) => b, 161 None => { 162 return ( 163 StatusCode::BAD_REQUEST, 164 Json(json!({ 165 "error": "InvalidRequest", 166 "message": "Root block not found in CAR file" 167 })), 168 ) 169 .into_response(); 170 } 171 }; 172 173 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 174 Ok(commit) => commit.did().to_string(), 175 Err(e) => { 176 return ( 177 StatusCode::BAD_REQUEST, 178 Json(json!({ 179 "error": "InvalidRequest", 180 "message": format!("Invalid commit: {}", e) 181 })), 182 ) 183 .into_response(); 184 } 185 }; 186 187 if commit_did != *did { 188 return ( 189 StatusCode::FORBIDDEN, 190 Json(json!({ 191 "error": "InvalidRequest", 192 "message": format!( 193 "CAR file is for DID {} but you are authenticated as {}", 194 commit_did, did 195 ) 196 })), 197 ) 198 .into_response(); 199 } 200 201 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 202 .map(|v| v == "true" || v == "1") 203 .unwrap_or(false); 204 205 if !skip_verification { 206 debug!("Verifying CAR file signature and structure for DID {}", did); 207 let verifier = CarVerifier::new(); 208 209 match verifier.verify_car(did, &root, &blocks).await { 210 Ok(verified) => { 211 debug!( 212 "CAR verification successful: rev={}, data_cid={}", 213 verified.rev, verified.data_cid 214 ); 215 } 216 Err(crate::sync::verify::VerifyError::DidMismatch { 217 commit_did, 218 expected_did, 219 }) => { 220 return ( 221 StatusCode::FORBIDDEN, 222 Json(json!({ 223 "error": "InvalidRequest", 224 "message": format!( 225 "CAR file is for DID {} but you are authenticated as {}", 226 commit_did, expected_did 227 ) 228 })), 229 ) 230 .into_response(); 231 } 232 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 233 return ( 234 StatusCode::BAD_REQUEST, 235 Json(json!({ 236 "error": "InvalidSignature", 237 "message": "CAR file commit signature verification failed" 238 })), 239 ) 240 .into_response(); 241 } 242 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 243 warn!("DID resolution failed during import verification: {}", msg); 244 return ( 245 StatusCode::BAD_REQUEST, 246 Json(json!({ 247 "error": "InvalidRequest", 248 "message": format!("Failed to verify DID: {}", msg) 249 })), 250 ) 251 .into_response(); 252 } 253 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 254 return ( 255 StatusCode::BAD_REQUEST, 256 Json(json!({ 257 "error": "InvalidRequest", 258 "message": "DID document does not contain a signing key" 259 })), 260 ) 261 .into_response(); 262 } 263 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 264 return ( 265 StatusCode::BAD_REQUEST, 266 Json(json!({ 267 "error": "InvalidRequest", 268 "message": format!("MST validation failed: {}", msg) 269 })), 270 ) 271 .into_response(); 272 } 273 Err(e) => { 274 error!("CAR verification error: {:?}", e); 275 return ( 276 StatusCode::BAD_REQUEST, 277 Json(json!({ 278 "error": "InvalidRequest", 279 "message": format!("CAR verification failed: {}", e) 280 })), 281 ) 282 .into_response(); 283 } 284 } 285 } else { 286 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)"); 287 } 288 289 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 290 .ok() 291 .and_then(|s| s.parse().ok()) 292 .unwrap_or(DEFAULT_MAX_BLOCKS); 293 294 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 295 Ok(records) => { 296 info!( 297 "Successfully imported {} records for user {}", 298 records.len(), 299 did 300 ); 301 302 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 303 warn!("Failed to sequence import event: {:?}", e); 304 } 305 306 (StatusCode::OK, Json(json!({}))).into_response() 307 } 308 Err(ImportError::SizeLimitExceeded) => ( 309 StatusCode::BAD_REQUEST, 310 Json(json!({ 311 "error": "InvalidRequest", 312 "message": format!("Import exceeds block limit of {}", max_blocks) 313 })), 314 ) 315 .into_response(), 316 Err(ImportError::RepoNotFound) => ( 317 StatusCode::NOT_FOUND, 318 Json(json!({ 319 "error": "RepoNotFound", 320 "message": "Repository not initialized for this account" 321 })), 322 ) 323 .into_response(), 324 Err(ImportError::InvalidCbor(msg)) => ( 325 StatusCode::BAD_REQUEST, 326 Json(json!({ 327 "error": "InvalidRequest", 328 "message": format!("Invalid CBOR data: {}", msg) 329 })), 330 ) 331 .into_response(), 332 Err(ImportError::InvalidCommit(msg)) => ( 333 StatusCode::BAD_REQUEST, 334 Json(json!({ 335 "error": "InvalidRequest", 336 "message": format!("Invalid commit structure: {}", msg) 337 })), 338 ) 339 .into_response(), 340 Err(ImportError::BlockNotFound(cid)) => ( 341 StatusCode::BAD_REQUEST, 342 Json(json!({ 343 "error": "InvalidRequest", 344 "message": format!("Referenced block not found in CAR: {}", cid) 345 })), 346 ) 347 .into_response(), 348 Err(ImportError::ConcurrentModification) => ( 349 StatusCode::CONFLICT, 350 Json(json!({ 351 "error": "ConcurrentModification", 352 "message": "Repository is being modified by another operation, please retry" 353 })), 354 ) 355 .into_response(), 356 Err(ImportError::VerificationFailed(ve)) => ( 357 StatusCode::BAD_REQUEST, 358 Json(json!({ 359 "error": "VerificationFailed", 360 "message": format!("CAR verification failed: {}", ve) 361 })), 362 ) 363 .into_response(), 364 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 365 StatusCode::FORBIDDEN, 366 Json(json!({ 367 "error": "DidMismatch", 368 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 369 })), 370 ) 371 .into_response(), 372 Err(e) => { 373 error!("Import error: {:?}", e); 374 ( 375 StatusCode::INTERNAL_SERVER_ERROR, 376 Json(json!({"error": "InternalError"})), 377 ) 378 .into_response() 379 } 380 } 381} 382 383async fn sequence_import_event( 384 state: &AppState, 385 did: &str, 386 commit_cid: &str, 387) -> Result<(), sqlx::Error> { 388 let prev_cid: Option<String> = None; 389 let ops = serde_json::json!([]); 390 let blobs: Vec<String> = vec![]; 391 let blocks_cids: Vec<String> = vec![]; 392 393 sqlx::query!( 394 r#" 395 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) 396 VALUES ($1, 'commit', $2, $3, $4, $5, $6) 397 "#, 398 did, 399 commit_cid, 400 prev_cid, 401 ops, 402 &blobs, 403 &blocks_cids 404 ) 405 .execute(&state.db) 406 .await?; 407 408 Ok(()) 409}