this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{ImportError, apply_import, parse_car}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 Json, 7 body::Bytes, 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14 15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 16const DEFAULT_MAX_BLOCKS: usize = 50000; 17 18pub async fn import_repo( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 24 .map(|v| v != "false" && v != "0") 25 .unwrap_or(true); 26 if !accepting_imports { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Service is not accepting repo imports" 32 })), 33 ) 34 .into_response(); 35 } 36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 37 .ok() 38 .and_then(|s| s.parse().ok()) 39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 40 if body.len() > max_size { 41 return ( 42 StatusCode::PAYLOAD_TOO_LARGE, 43 Json(json!({ 44 "error": "InvalidRequest", 45 "message": format!("Import size exceeds limit of {} bytes", max_size) 46 })), 47 ) 48 .into_response(); 49 } 50 let token = match crate::auth::extract_bearer_token_from_header( 51 headers.get("Authorization").and_then(|h| h.to_str().ok()), 52 ) { 53 Some(t) => t, 54 None => return ApiError::AuthenticationRequired.into_response(), 55 }; 56 let auth_user = 57 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 58 Ok(user) => user, 59 Err(e) => return ApiError::from(e).into_response(), 60 }; 61 let did = &auth_user.did; 62 let user = match sqlx::query!( 63 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 64 did 65 ) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(row)) => row, 70 Ok(None) => { 71 return ( 72 StatusCode::NOT_FOUND, 73 Json(json!({"error": "AccountNotFound"})), 74 ) 75 .into_response(); 76 } 77 Err(e) => { 78 error!("DB error fetching user: {:?}", e); 79 return ( 80 StatusCode::INTERNAL_SERVER_ERROR, 81 Json(json!({"error": "InternalError"})), 82 ) 83 .into_response(); 84 } 85 }; 86 if user.takedown_ref.is_some() { 87 return ( 88 StatusCode::FORBIDDEN, 89 Json(json!({ 90 "error": "AccountTakenDown", 91 "message": "Account has been taken down" 92 })), 93 ) 94 .into_response(); 95 } 96 let user_id = user.id; 97 let (root, blocks) = match parse_car(&body).await { 98 Ok((r, b)) => (r, b), 99 Err(ImportError::InvalidRootCount) => { 100 return ( 101 StatusCode::BAD_REQUEST, 102 Json(json!({ 103 "error": "InvalidRequest", 104 "message": "Expected exactly one root in CAR file" 105 })), 106 ) 107 .into_response(); 108 } 109 Err(ImportError::CarParse(msg)) => { 110 return ( 111 StatusCode::BAD_REQUEST, 112 Json(json!({ 113 "error": "InvalidRequest", 114 "message": format!("Failed to parse CAR file: {}", msg) 115 })), 116 ) 117 .into_response(); 118 } 119 Err(e) => { 120 error!("CAR parsing error: {:?}", e); 121 return ( 122 StatusCode::BAD_REQUEST, 123 Json(json!({ 124 "error": "InvalidRequest", 125 "message": format!("Invalid CAR file: {}", e) 126 })), 127 ) 128 .into_response(); 129 } 130 }; 131 info!( 132 "Importing repo for user {}: {} blocks, root {}", 133 did, 134 blocks.len(), 135 root 136 ); 137 let root_block = match blocks.get(&root) { 138 Some(b) => b, 139 None => { 140 return ( 141 StatusCode::BAD_REQUEST, 142 Json(json!({ 143 "error": "InvalidRequest", 144 "message": "Root block not found in CAR file" 145 })), 146 ) 147 .into_response(); 148 } 149 }; 150 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 151 Ok(commit) => commit.did().to_string(), 152 Err(e) => { 153 return ( 154 StatusCode::BAD_REQUEST, 155 Json(json!({ 156 "error": "InvalidRequest", 157 "message": format!("Invalid commit: {}", e) 158 })), 159 ) 160 .into_response(); 161 } 162 }; 163 if commit_did != *did { 164 return ( 165 StatusCode::FORBIDDEN, 166 Json(json!({ 167 "error": "InvalidRequest", 168 "message": format!( 169 "CAR file is for DID {} but you are authenticated as {}", 170 commit_did, did 171 ) 172 })), 173 ) 174 .into_response(); 175 } 176 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 177 .map(|v| v == "true" || v == "1") 178 .unwrap_or(false); 179 let is_migration = user.deactivated_at.is_some(); 180 if skip_verification { 181 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 182 } else if is_migration { 183 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 184 let verifier = CarVerifier::new(); 185 match verifier.verify_car_structure_only(did, &root, &blocks) { 186 Ok(verified) => { 187 debug!( 188 "CAR structure verification successful: rev={}, data_cid={}", 189 verified.rev, verified.data_cid 190 ); 191 } 192 Err(crate::sync::verify::VerifyError::DidMismatch { 193 commit_did, 194 expected_did, 195 }) => { 196 return ( 197 StatusCode::FORBIDDEN, 198 Json(json!({ 199 "error": "InvalidRequest", 200 "message": format!( 201 "CAR file is for DID {} but you are authenticated as {}", 202 commit_did, expected_did 203 ) 204 })), 205 ) 206 .into_response(); 207 } 208 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 209 return ( 210 StatusCode::BAD_REQUEST, 211 Json(json!({ 212 "error": "InvalidRequest", 213 "message": format!("MST validation failed: {}", msg) 214 })), 215 ) 216 .into_response(); 217 } 218 Err(e) => { 219 error!("CAR structure verification error: {:?}", e); 220 return ( 221 StatusCode::BAD_REQUEST, 222 Json(json!({ 223 "error": "InvalidRequest", 224 "message": format!("CAR verification failed: {}", e) 225 })), 226 ) 227 .into_response(); 228 } 229 } 230 } else { 231 debug!("Verifying CAR file signature and structure for DID {}", did); 232 let verifier = CarVerifier::new(); 233 match verifier.verify_car(did, &root, &blocks).await { 234 Ok(verified) => { 235 debug!( 236 "CAR verification successful: rev={}, data_cid={}", 237 verified.rev, verified.data_cid 238 ); 239 } 240 Err(crate::sync::verify::VerifyError::DidMismatch { 241 commit_did, 242 expected_did, 243 }) => { 244 return ( 245 StatusCode::FORBIDDEN, 246 Json(json!({ 247 "error": "InvalidRequest", 248 "message": format!( 249 "CAR file is for DID {} but you are authenticated as {}", 250 commit_did, expected_did 251 ) 252 })), 253 ) 254 .into_response(); 255 } 256 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 257 return ( 258 StatusCode::BAD_REQUEST, 259 Json(json!({ 260 "error": "InvalidSignature", 261 "message": "CAR file commit signature verification failed" 262 })), 263 ) 264 .into_response(); 265 } 266 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 267 warn!("DID resolution failed during import verification: {}", msg); 268 return ( 269 StatusCode::BAD_REQUEST, 270 Json(json!({ 271 "error": "InvalidRequest", 272 "message": format!("Failed to verify DID: {}", msg) 273 })), 274 ) 275 .into_response(); 276 } 277 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 278 return ( 279 StatusCode::BAD_REQUEST, 280 Json(json!({ 281 "error": "InvalidRequest", 282 "message": "DID document does not contain a signing key" 283 })), 284 ) 285 .into_response(); 286 } 287 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 288 return ( 289 StatusCode::BAD_REQUEST, 290 Json(json!({ 291 "error": "InvalidRequest", 292 "message": format!("MST validation failed: {}", msg) 293 })), 294 ) 295 .into_response(); 296 } 297 Err(e) => { 298 error!("CAR verification error: {:?}", e); 299 return ( 300 StatusCode::BAD_REQUEST, 301 Json(json!({ 302 "error": "InvalidRequest", 303 "message": format!("CAR verification failed: {}", e) 304 })), 305 ) 306 .into_response(); 307 } 308 } 309 } 310 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 311 .ok() 312 .and_then(|s| s.parse().ok()) 313 .unwrap_or(DEFAULT_MAX_BLOCKS); 314 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 315 Ok(records) => { 316 info!( 317 "Successfully imported {} records for user {}", 318 records.len(), 319 did 320 ); 321 if !is_migration { 322 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 323 warn!("Failed to sequence import event: {:?}", e); 324 } 325 } 326 (StatusCode::OK, Json(json!({}))).into_response() 327 } 328 Err(ImportError::SizeLimitExceeded) => ( 329 StatusCode::BAD_REQUEST, 330 Json(json!({ 331 "error": "InvalidRequest", 332 "message": format!("Import exceeds block limit of {}", max_blocks) 333 })), 334 ) 335 .into_response(), 336 Err(ImportError::RepoNotFound) => ( 337 StatusCode::NOT_FOUND, 338 Json(json!({ 339 "error": "RepoNotFound", 340 "message": "Repository not initialized for this account" 341 })), 342 ) 343 .into_response(), 344 Err(ImportError::InvalidCbor(msg)) => ( 345 StatusCode::BAD_REQUEST, 346 Json(json!({ 347 "error": "InvalidRequest", 348 "message": format!("Invalid CBOR data: {}", msg) 349 })), 350 ) 351 .into_response(), 352 Err(ImportError::InvalidCommit(msg)) => ( 353 StatusCode::BAD_REQUEST, 354 Json(json!({ 355 "error": "InvalidRequest", 356 "message": format!("Invalid commit structure: {}", msg) 357 })), 358 ) 359 .into_response(), 360 Err(ImportError::BlockNotFound(cid)) => ( 361 StatusCode::BAD_REQUEST, 362 Json(json!({ 363 "error": "InvalidRequest", 364 "message": format!("Referenced block not found in CAR: {}", cid) 365 })), 366 ) 367 .into_response(), 368 Err(ImportError::ConcurrentModification) => ( 369 StatusCode::CONFLICT, 370 Json(json!({ 371 "error": "ConcurrentModification", 372 "message": "Repository is being modified by another operation, please retry" 373 })), 374 ) 375 .into_response(), 376 Err(ImportError::VerificationFailed(ve)) => ( 377 StatusCode::BAD_REQUEST, 378 Json(json!({ 379 "error": "VerificationFailed", 380 "message": format!("CAR verification failed: {}", ve) 381 })), 382 ) 383 .into_response(), 384 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 385 StatusCode::FORBIDDEN, 386 Json(json!({ 387 "error": "DidMismatch", 388 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 389 })), 390 ) 391 .into_response(), 392 Err(e) => { 393 error!("Import error: {:?}", e); 394 ( 395 StatusCode::INTERNAL_SERVER_ERROR, 396 Json(json!({"error": "InternalError"})), 397 ) 398 .into_response() 399 } 400 } 401} 402 403async fn sequence_import_event( 404 state: &AppState, 405 did: &str, 406 commit_cid: &str, 407) -> Result<(), sqlx::Error> { 408 let prev_cid: Option<String> = None; 409 let prev_data_cid: Option<String> = None; 410 let ops = serde_json::json!([]); 411 let blobs: Vec<String> = vec![]; 412 let blocks_cids: Vec<String> = vec![]; 413 let seq_row = sqlx::query!( 414 r#" 415 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 416 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 417 RETURNING seq 418 "#, 419 did, 420 commit_cid, 421 prev_cid, 422 prev_data_cid, 423 ops, 424 &blobs, 425 &blocks_cids 426 ) 427 .fetch_one(&state.db) 428 .await?; 429 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 430 .execute(&state.db) 431 .await?; 432 Ok(()) 433}