this repo has no description
1use crate::api::ApiError; 2use crate::api::repo::record::create_signed_commit; 3use crate::state::AppState; 4use crate::sync::import::{ImportError, apply_import, parse_car}; 5use crate::sync::verify::CarVerifier; 6use axum::{ 7 Json, 8 body::Bytes, 9 extract::State, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use jacquard::types::{integer::LimitedU32, string::Tid}; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde_json::json; 17use tracing::{debug, error, info, warn}; 18 19const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024; 20const DEFAULT_MAX_BLOCKS: usize = 500000; 21 22pub async fn import_repo( 23 State(state): State<AppState>, 24 headers: axum::http::HeaderMap, 25 body: Bytes, 26) -> Response { 27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 28 .map(|v| v != "false" && v != "0") 29 .unwrap_or(true); 30 if !accepting_imports { 31 return ( 32 StatusCode::BAD_REQUEST, 33 Json(json!({ 34 "error": "InvalidRequest", 35 "message": "Service is not accepting repo imports" 36 })), 37 ) 38 .into_response(); 39 } 40 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 41 .ok() 42 .and_then(|s| s.parse().ok()) 43 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 44 if body.len() > max_size { 45 return ( 46 StatusCode::PAYLOAD_TOO_LARGE, 47 Json(json!({ 48 "error": "InvalidRequest", 49 "message": format!("Import size exceeds limit of {} bytes", max_size) 50 })), 51 ) 52 .into_response(); 53 } 54 let token = match crate::auth::extract_bearer_token_from_header( 55 headers.get("Authorization").and_then(|h| h.to_str().ok()), 56 ) { 57 Some(t) => t, 58 None => return ApiError::AuthenticationRequired.into_response(), 59 }; 60 let auth_user = 61 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 62 Ok(user) => user, 63 Err(e) => return ApiError::from(e).into_response(), 64 }; 65 let did = &auth_user.did; 66 let user = match sqlx::query!( 67 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 68 did 69 ) 70 .fetch_optional(&state.db) 71 .await 72 { 73 Ok(Some(row)) => row, 74 Ok(None) => { 75 return ( 76 StatusCode::NOT_FOUND, 77 Json(json!({"error": "AccountNotFound"})), 78 ) 79 .into_response(); 80 } 81 Err(e) => { 82 error!("DB error fetching user: {:?}", e); 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError"})), 86 ) 87 .into_response(); 88 } 89 }; 90 if user.takedown_ref.is_some() { 91 return ( 92 StatusCode::FORBIDDEN, 93 Json(json!({ 94 "error": "AccountTakenDown", 95 "message": "Account has been taken down" 96 })), 97 ) 98 .into_response(); 99 } 100 let user_id = user.id; 101 let (root, blocks) = match parse_car(&body).await { 102 Ok((r, b)) => (r, b), 103 Err(ImportError::InvalidRootCount) => { 104 return ( 105 StatusCode::BAD_REQUEST, 106 Json(json!({ 107 "error": "InvalidRequest", 108 "message": "Expected exactly one root in CAR file" 109 })), 110 ) 111 .into_response(); 112 } 113 Err(ImportError::CarParse(msg)) => { 114 return ( 115 StatusCode::BAD_REQUEST, 116 Json(json!({ 117 "error": "InvalidRequest", 118 "message": format!("Failed to parse CAR file: {}", msg) 119 })), 120 ) 121 .into_response(); 122 } 123 Err(e) => { 124 error!("CAR parsing error: {:?}", e); 125 return ( 126 StatusCode::BAD_REQUEST, 127 Json(json!({ 128 "error": "InvalidRequest", 129 "message": format!("Invalid CAR file: {}", e) 130 })), 131 ) 132 .into_response(); 133 } 134 }; 135 info!( 136 "Importing repo for user {}: {} blocks, root {}", 137 did, 138 blocks.len(), 139 root 140 ); 141 let root_block = match blocks.get(&root) { 142 Some(b) => b, 143 None => { 144 return ( 145 StatusCode::BAD_REQUEST, 146 Json(json!({ 147 "error": "InvalidRequest", 148 "message": "Root block not found in CAR file" 149 })), 150 ) 151 .into_response(); 152 } 153 }; 154 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 155 Ok(commit) => commit.did().to_string(), 156 Err(e) => { 157 return ( 158 StatusCode::BAD_REQUEST, 159 Json(json!({ 160 "error": "InvalidRequest", 161 "message": format!("Invalid commit: {}", e) 162 })), 163 ) 164 .into_response(); 165 } 166 }; 167 if commit_did != *did { 168 return ( 169 StatusCode::FORBIDDEN, 170 Json(json!({ 171 "error": "InvalidRequest", 172 "message": format!( 173 "CAR file is for DID {} but you are authenticated as {}", 174 commit_did, did 175 ) 176 })), 177 ) 178 .into_response(); 179 } 180 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 181 .map(|v| v == "true" || v == "1") 182 .unwrap_or(false); 183 let is_migration = user.deactivated_at.is_some(); 184 if skip_verification { 185 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 186 } else if is_migration { 187 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 188 let verifier = CarVerifier::new(); 189 match verifier.verify_car_structure_only(did, &root, &blocks) { 190 Ok(verified) => { 191 debug!( 192 "CAR structure verification successful: rev={}, data_cid={}", 193 verified.rev, verified.data_cid 194 ); 195 } 196 Err(crate::sync::verify::VerifyError::DidMismatch { 197 commit_did, 198 expected_did, 199 }) => { 200 return ( 201 StatusCode::FORBIDDEN, 202 Json(json!({ 203 "error": "InvalidRequest", 204 "message": format!( 205 "CAR file is for DID {} but you are authenticated as {}", 206 commit_did, expected_did 207 ) 208 })), 209 ) 210 .into_response(); 211 } 212 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 213 return ( 214 StatusCode::BAD_REQUEST, 215 Json(json!({ 216 "error": "InvalidRequest", 217 "message": format!("MST validation failed: {}", msg) 218 })), 219 ) 220 .into_response(); 221 } 222 Err(e) => { 223 error!("CAR structure verification error: {:?}", e); 224 return ( 225 StatusCode::BAD_REQUEST, 226 Json(json!({ 227 "error": "InvalidRequest", 228 "message": format!("CAR verification failed: {}", e) 229 })), 230 ) 231 .into_response(); 232 } 233 } 234 } else { 235 debug!("Verifying CAR file signature and structure for DID {}", did); 236 let verifier = CarVerifier::new(); 237 match verifier.verify_car(did, &root, &blocks).await { 238 Ok(verified) => { 239 debug!( 240 "CAR verification successful: rev={}, data_cid={}", 241 verified.rev, verified.data_cid 242 ); 243 } 244 Err(crate::sync::verify::VerifyError::DidMismatch { 245 commit_did, 246 expected_did, 247 }) => { 248 return ( 249 StatusCode::FORBIDDEN, 250 Json(json!({ 251 "error": "InvalidRequest", 252 "message": format!( 253 "CAR file is for DID {} but you are authenticated as {}", 254 commit_did, expected_did 255 ) 256 })), 257 ) 258 .into_response(); 259 } 260 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 261 return ( 262 StatusCode::BAD_REQUEST, 263 Json(json!({ 264 "error": "InvalidSignature", 265 "message": "CAR file commit signature verification failed" 266 })), 267 ) 268 .into_response(); 269 } 270 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 271 warn!("DID resolution failed during import verification: {}", msg); 272 return ( 273 StatusCode::BAD_REQUEST, 274 Json(json!({ 275 "error": "InvalidRequest", 276 "message": format!("Failed to verify DID: {}", msg) 277 })), 278 ) 279 .into_response(); 280 } 281 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 282 return ( 283 StatusCode::BAD_REQUEST, 284 Json(json!({ 285 "error": "InvalidRequest", 286 "message": "DID document does not contain a signing key" 287 })), 288 ) 289 .into_response(); 290 } 291 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 292 return ( 293 StatusCode::BAD_REQUEST, 294 Json(json!({ 295 "error": "InvalidRequest", 296 "message": format!("MST validation failed: {}", msg) 297 })), 298 ) 299 .into_response(); 300 } 301 Err(e) => { 302 error!("CAR verification error: {:?}", e); 303 return ( 304 StatusCode::BAD_REQUEST, 305 Json(json!({ 306 "error": "InvalidRequest", 307 "message": format!("CAR verification failed: {}", e) 308 })), 309 ) 310 .into_response(); 311 } 312 } 313 } 314 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 315 .ok() 316 .and_then(|s| s.parse().ok()) 317 .unwrap_or(DEFAULT_MAX_BLOCKS); 318 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { 319 Ok(import_result) => { 320 info!( 321 "Successfully imported {} records for user {}", 322 import_result.records.len(), 323 did 324 ); 325 let mut blob_ref_count = 0; 326 for record in &import_result.records { 327 for blob_ref in &record.blob_refs { 328 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey); 329 if let Err(e) = sqlx::query!( 330 r#" 331 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 332 VALUES ($1, $2, $3) 333 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 334 "#, 335 user_id, 336 record_uri, 337 blob_ref.cid 338 ) 339 .execute(&state.db) 340 .await 341 { 342 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e); 343 } else { 344 blob_ref_count += 1; 345 } 346 } 347 } 348 if blob_ref_count > 0 { 349 info!( 350 "Recorded {} blob references for imported repo", 351 blob_ref_count 352 ); 353 } 354 let key_row = match sqlx::query!( 355 r#"SELECT uk.key_bytes, uk.encryption_version 356 FROM user_keys uk 357 JOIN users u ON uk.user_id = u.id 358 WHERE u.did = $1"#, 359 did 360 ) 361 .fetch_optional(&state.db) 362 .await 363 { 364 Ok(Some(row)) => row, 365 Ok(None) => { 366 error!("No signing key found for user {}", did); 367 return ( 368 StatusCode::INTERNAL_SERVER_ERROR, 369 Json(json!({"error": "InternalError", "message": "Signing key not found"})), 370 ) 371 .into_response(); 372 } 373 Err(e) => { 374 error!("DB error fetching signing key: {:?}", e); 375 return ( 376 StatusCode::INTERNAL_SERVER_ERROR, 377 Json(json!({"error": "InternalError"})), 378 ) 379 .into_response(); 380 } 381 }; 382 let key_bytes = 383 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { 384 Ok(k) => k, 385 Err(e) => { 386 error!("Failed to decrypt signing key: {}", e); 387 return ( 388 StatusCode::INTERNAL_SERVER_ERROR, 389 Json(json!({"error": "InternalError"})), 390 ) 391 .into_response(); 392 } 393 }; 394 let signing_key = match SigningKey::from_slice(&key_bytes) { 395 Ok(k) => k, 396 Err(e) => { 397 error!("Invalid signing key: {:?}", e); 398 return ( 399 StatusCode::INTERNAL_SERVER_ERROR, 400 Json(json!({"error": "InternalError"})), 401 ) 402 .into_response(); 403 } 404 }; 405 let new_rev = Tid::now(LimitedU32::MIN); 406 let new_rev_str = new_rev.to_string(); 407 let (commit_bytes, _sig) = match create_signed_commit( 408 did, 409 import_result.data_cid, 410 &new_rev_str, 411 None, 412 &signing_key, 413 ) { 414 Ok(result) => result, 415 Err(e) => { 416 error!("Failed to create new commit: {}", e); 417 return ( 418 StatusCode::INTERNAL_SERVER_ERROR, 419 Json(json!({"error": "InternalError"})), 420 ) 421 .into_response(); 422 } 423 }; 424 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { 425 Ok(cid) => cid, 426 Err(e) => { 427 error!("Failed to store new commit block: {:?}", e); 428 return ( 429 StatusCode::INTERNAL_SERVER_ERROR, 430 Json(json!({"error": "InternalError"})), 431 ) 432 .into_response(); 433 } 434 }; 435 let new_root_str = new_root_cid.to_string(); 436 if let Err(e) = sqlx::query!( 437 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 438 new_root_str, 439 &new_rev_str, 440 user_id 441 ) 442 .execute(&state.db) 443 .await 444 { 445 error!("Failed to update repo root: {:?}", e); 446 return ( 447 StatusCode::INTERNAL_SERVER_ERROR, 448 Json(json!({"error": "InternalError"})), 449 ) 450 .into_response(); 451 } 452 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect(); 453 all_block_cids.push(new_root_cid.to_bytes()); 454 if let Err(e) = sqlx::query!( 455 r#" 456 INSERT INTO user_blocks (user_id, block_cid) 457 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 458 ON CONFLICT (user_id, block_cid) DO NOTHING 459 "#, 460 user_id, 461 &all_block_cids 462 ) 463 .execute(&state.db) 464 .await 465 { 466 error!("Failed to insert user_blocks: {:?}", e); 467 return ( 468 StatusCode::INTERNAL_SERVER_ERROR, 469 Json(json!({"error": "InternalError"})), 470 ) 471 .into_response(); 472 } 473 info!( 474 "Created new commit for imported repo: cid={}, rev={}", 475 new_root_str, new_rev_str 476 ); 477 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await 478 { 479 warn!("Failed to sequence import event: {:?}", e); 480 } 481 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 482 let birthdate_pref = json!({ 483 "$type": "app.bsky.actor.defs#personalDetailsPref", 484 "birthDate": "1998-05-06T00:00:00.000Z" 485 }); 486 if let Err(e) = sqlx::query!( 487 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 488 ON CONFLICT (user_id, name) DO NOTHING", 489 user_id, 490 "app.bsky.actor.defs#personalDetailsPref", 491 birthdate_pref 492 ) 493 .execute(&state.db) 494 .await 495 { 496 warn!( 497 "Failed to set default birthdate preference for migrated user: {:?}", 498 e 499 ); 500 } 501 } 502 (StatusCode::OK, Json(json!({}))).into_response() 503 } 504 Err(ImportError::SizeLimitExceeded) => ( 505 StatusCode::BAD_REQUEST, 506 Json(json!({ 507 "error": "InvalidRequest", 508 "message": format!("Import exceeds block limit of {}", max_blocks) 509 })), 510 ) 511 .into_response(), 512 Err(ImportError::RepoNotFound) => ( 513 StatusCode::NOT_FOUND, 514 Json(json!({ 515 "error": "RepoNotFound", 516 "message": "Repository not initialized for this account" 517 })), 518 ) 519 .into_response(), 520 Err(ImportError::InvalidCbor(msg)) => ( 521 StatusCode::BAD_REQUEST, 522 Json(json!({ 523 "error": "InvalidRequest", 524 "message": format!("Invalid CBOR data: {}", msg) 525 })), 526 ) 527 .into_response(), 528 Err(ImportError::InvalidCommit(msg)) => ( 529 StatusCode::BAD_REQUEST, 530 Json(json!({ 531 "error": "InvalidRequest", 532 "message": format!("Invalid commit structure: {}", msg) 533 })), 534 ) 535 .into_response(), 536 Err(ImportError::BlockNotFound(cid)) => ( 537 StatusCode::BAD_REQUEST, 538 Json(json!({ 539 "error": "InvalidRequest", 540 "message": format!("Referenced block not found in CAR: {}", cid) 541 })), 542 ) 543 .into_response(), 544 Err(ImportError::ConcurrentModification) => ( 545 StatusCode::CONFLICT, 546 Json(json!({ 547 "error": "ConcurrentModification", 548 "message": "Repository is being modified by another operation, please retry" 549 })), 550 ) 551 .into_response(), 552 Err(ImportError::VerificationFailed(ve)) => ( 553 StatusCode::BAD_REQUEST, 554 Json(json!({ 555 "error": "VerificationFailed", 556 "message": format!("CAR verification failed: {}", ve) 557 })), 558 ) 559 .into_response(), 560 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 561 StatusCode::FORBIDDEN, 562 Json(json!({ 563 "error": "DidMismatch", 564 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 565 })), 566 ) 567 .into_response(), 568 Err(e) => { 569 error!("Import error: {:?}", e); 570 ( 571 StatusCode::INTERNAL_SERVER_ERROR, 572 Json(json!({"error": "InternalError"})), 573 ) 574 .into_response() 575 } 576 } 577} 578 579async fn sequence_import_event( 580 state: &AppState, 581 did: &str, 582 commit_cid: &str, 583) -> Result<(), sqlx::Error> { 584 let prev_cid: Option<String> = None; 585 let prev_data_cid: Option<String> = None; 586 let ops = serde_json::json!([]); 587 let blobs: Vec<String> = vec![]; 588 let blocks_cids: Vec<String> = vec![]; 589 let seq_row = sqlx::query!( 590 r#" 591 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 592 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 593 RETURNING seq 594 "#, 595 did, 596 commit_cid, 597 prev_cid, 598 prev_data_cid, 599 ops, 600 &blobs, 601 &blocks_cids 602 ) 603 .fetch_one(&state.db) 604 .await?; 605 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 606 .execute(&state.db) 607 .await?; 608 Ok(()) 609}