this repo has no description
1use crate::api::error::ApiError; 2use crate::api::repo::record::create_signed_commit; 3use crate::api::EmptyResponse; 4use crate::state::AppState; 5use crate::sync::import::{ImportError, apply_import, parse_car}; 6use crate::sync::verify::CarVerifier; 7use axum::{ 8 body::Bytes, 9 extract::State, 10 response::{IntoResponse, Response}, 11}; 12use jacquard::types::{integer::LimitedU32, string::Tid}; 13use jacquard_repo::storage::BlockStore; 14use k256::ecdsa::SigningKey; 15use serde_json::json; 16use tracing::{debug, error, info, warn}; 17 18const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024; 19const DEFAULT_MAX_BLOCKS: usize = 500000; 20 21pub async fn import_repo( 22 State(state): State<AppState>, 23 headers: axum::http::HeaderMap, 24 body: Bytes, 25) -> Response { 26 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 27 .map(|v| v != "false" && v != "0") 28 .unwrap_or(true); 29 if !accepting_imports { 30 return ApiError::InvalidRequest("Service is not accepting repo imports".into()) 31 .into_response(); 32 } 33 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 34 .ok() 35 .and_then(|s| s.parse().ok()) 36 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 37 if body.len() > max_size { 38 return ApiError::PayloadTooLarge(format!( 39 "Import size exceeds limit of {} bytes", 40 max_size 41 )) 42 .into_response(); 43 } 44 let token = match crate::auth::extract_bearer_token_from_header( 45 headers.get("Authorization").and_then(|h| h.to_str().ok()), 46 ) { 47 Some(t) => t, 48 None => return ApiError::AuthenticationRequired.into_response(), 49 }; 50 let auth_user = 51 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 52 Ok(user) => user, 53 Err(e) => return ApiError::from(e).into_response(), 54 }; 55 let did = &auth_user.did; 56 let user = match sqlx::query!( 57 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 58 did 59 ) 60 .fetch_optional(&state.db) 61 .await 62 { 63 Ok(Some(row)) => row, 64 Ok(None) => { 65 return ApiError::AccountNotFound.into_response(); 66 } 67 Err(e) => { 68 error!("DB error fetching user: {:?}", e); 69 return ApiError::InternalError(None).into_response(); 70 } 71 }; 72 if user.takedown_ref.is_some() { 73 return ApiError::AccountTakedown.into_response(); 74 } 75 let user_id = user.id; 76 let (root, blocks) = match parse_car(&body).await { 77 Ok((r, b)) => (r, b), 78 Err(ImportError::InvalidRootCount) => { 79 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into()) 80 .into_response(); 81 } 82 Err(ImportError::CarParse(msg)) => { 83 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg)) 84 .into_response(); 85 } 86 Err(e) => { 87 error!("CAR parsing error: {:?}", e); 88 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response(); 89 } 90 }; 91 info!( 92 "Importing repo for user {}: {} blocks, root {}", 93 did, 94 blocks.len(), 95 root 96 ); 97 let Some(root_block) = blocks.get(&root) else { 98 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response(); 99 }; 100 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 101 Ok(commit) => commit.did().to_string(), 102 Err(e) => { 103 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response(); 104 } 105 }; 106 if commit_did != *did { 107 return ApiError::InvalidRepo(format!( 108 "CAR file is for DID {} but you are authenticated as {}", 109 commit_did, did 110 )) 111 .into_response(); 112 } 113 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 114 .map(|v| v == "true" || v == "1") 115 .unwrap_or(false); 116 let is_migration = user.deactivated_at.is_some(); 117 if skip_verification { 118 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 119 } else if is_migration { 120 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 121 let verifier = CarVerifier::new(); 122 match verifier.verify_car_structure_only(did, &root, &blocks) { 123 Ok(verified) => { 124 debug!( 125 "CAR structure verification successful: rev={}, data_cid={}", 126 verified.rev, verified.data_cid 127 ); 128 } 129 Err(crate::sync::verify::VerifyError::DidMismatch { 130 commit_did, 131 expected_did, 132 }) => { 133 return ApiError::InvalidRepo(format!( 134 "CAR file is for DID {} but you are authenticated as {}", 135 commit_did, expected_did 136 )) 137 .into_response(); 138 } 139 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 140 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 141 .into_response(); 142 } 143 Err(e) => { 144 error!("CAR structure verification error: {:?}", e); 145 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 146 .into_response(); 147 } 148 } 149 } else { 150 debug!("Verifying CAR file signature and structure for DID {}", did); 151 let verifier = CarVerifier::new(); 152 match verifier.verify_car(did, &root, &blocks).await { 153 Ok(verified) => { 154 debug!( 155 "CAR verification successful: rev={}, data_cid={}", 156 verified.rev, verified.data_cid 157 ); 158 } 159 Err(crate::sync::verify::VerifyError::DidMismatch { 160 commit_did, 161 expected_did, 162 }) => { 163 return ApiError::InvalidRepo(format!( 164 "CAR file is for DID {} but you are authenticated as {}", 165 commit_did, expected_did 166 )) 167 .into_response(); 168 } 169 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 170 return ApiError::InvalidRequest( 171 "CAR file commit signature verification failed".into(), 172 ) 173 .into_response(); 174 } 175 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 176 warn!("DID resolution failed during import verification: {}", msg); 177 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg)) 178 .into_response(); 179 } 180 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 181 return ApiError::InvalidRequest( 182 "DID document does not contain a signing key".into(), 183 ) 184 .into_response(); 185 } 186 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 187 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 188 .into_response(); 189 } 190 Err(e) => { 191 error!("CAR verification error: {:?}", e); 192 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 193 .into_response(); 194 } 195 } 196 } 197 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 198 .ok() 199 .and_then(|s| s.parse().ok()) 200 .unwrap_or(DEFAULT_MAX_BLOCKS); 201 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { 202 Ok(import_result) => { 203 info!( 204 "Successfully imported {} records for user {}", 205 import_result.records.len(), 206 did 207 ); 208 let mut blob_ref_count = 0; 209 for record in &import_result.records { 210 for blob_ref in &record.blob_refs { 211 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey); 212 if let Err(e) = sqlx::query!( 213 r#" 214 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 215 VALUES ($1, $2, $3) 216 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 217 "#, 218 user_id, 219 record_uri, 220 blob_ref.cid 221 ) 222 .execute(&state.db) 223 .await 224 { 225 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e); 226 } else { 227 blob_ref_count += 1; 228 } 229 } 230 } 231 if blob_ref_count > 0 { 232 info!( 233 "Recorded {} blob references for imported repo", 234 blob_ref_count 235 ); 236 } 237 let key_row = match sqlx::query!( 238 r#"SELECT uk.key_bytes, uk.encryption_version 239 FROM user_keys uk 240 JOIN users u ON uk.user_id = u.id 241 WHERE u.did = $1"#, 242 did 243 ) 244 .fetch_optional(&state.db) 245 .await 246 { 247 Ok(Some(row)) => row, 248 Ok(None) => { 249 error!("No signing key found for user {}", did); 250 return ApiError::InternalError(Some("Signing key not found".into())) 251 .into_response(); 252 } 253 Err(e) => { 254 error!("DB error fetching signing key: {:?}", e); 255 return ApiError::InternalError(None).into_response(); 256 } 257 }; 258 let key_bytes = 259 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { 260 Ok(k) => k, 261 Err(e) => { 262 error!("Failed to decrypt signing key: {}", e); 263 return ApiError::InternalError(None).into_response(); 264 } 265 }; 266 let signing_key = match SigningKey::from_slice(&key_bytes) { 267 Ok(k) => k, 268 Err(e) => { 269 error!("Invalid signing key: {:?}", e); 270 return ApiError::InternalError(None).into_response(); 271 } 272 }; 273 let new_rev = Tid::now(LimitedU32::MIN); 274 let new_rev_str = new_rev.to_string(); 275 let (commit_bytes, _sig) = match create_signed_commit( 276 did, 277 import_result.data_cid, 278 &new_rev_str, 279 None, 280 &signing_key, 281 ) { 282 Ok(result) => result, 283 Err(e) => { 284 error!("Failed to create new commit: {}", e); 285 return ApiError::InternalError(None).into_response(); 286 } 287 }; 288 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { 289 Ok(cid) => cid, 290 Err(e) => { 291 error!("Failed to store new commit block: {:?}", e); 292 return ApiError::InternalError(None).into_response(); 293 } 294 }; 295 let new_root_str = new_root_cid.to_string(); 296 if let Err(e) = sqlx::query!( 297 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 298 new_root_str, 299 &new_rev_str, 300 user_id 301 ) 302 .execute(&state.db) 303 .await 304 { 305 error!("Failed to update repo root: {:?}", e); 306 return ApiError::InternalError(None).into_response(); 307 } 308 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect(); 309 all_block_cids.push(new_root_cid.to_bytes()); 310 if let Err(e) = sqlx::query!( 311 r#" 312 INSERT INTO user_blocks (user_id, block_cid) 313 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 314 ON CONFLICT (user_id, block_cid) DO NOTHING 315 "#, 316 user_id, 317 &all_block_cids 318 ) 319 .execute(&state.db) 320 .await 321 { 322 error!("Failed to insert user_blocks: {:?}", e); 323 return ApiError::InternalError(None).into_response(); 324 } 325 info!( 326 "Created new commit for imported repo: cid={}, rev={}", 327 new_root_str, new_rev_str 328 ); 329 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await 330 { 331 warn!("Failed to sequence import event: {:?}", e); 332 } 333 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 334 let birthdate_pref = json!({ 335 "$type": "app.bsky.actor.defs#personalDetailsPref", 336 "birthDate": "1998-05-06T00:00:00.000Z" 337 }); 338 if let Err(e) = sqlx::query!( 339 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 340 ON CONFLICT (user_id, name) DO NOTHING", 341 user_id, 342 "app.bsky.actor.defs#personalDetailsPref", 343 birthdate_pref 344 ) 345 .execute(&state.db) 346 .await 347 { 348 warn!( 349 "Failed to set default birthdate preference for migrated user: {:?}", 350 e 351 ); 352 } 353 } 354 EmptyResponse::ok().into_response() 355 } 356 Err(ImportError::SizeLimitExceeded) => { 357 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks)) 358 .into_response() 359 } 360 Err(ImportError::RepoNotFound) => { 361 ApiError::RepoNotFound(Some("Repository not initialized for this account".into())) 362 .into_response() 363 } 364 Err(ImportError::InvalidCbor(msg)) => { 365 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response() 366 } 367 Err(ImportError::InvalidCommit(msg)) => { 368 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response() 369 } 370 Err(ImportError::BlockNotFound(cid)) => { 371 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid)) 372 .into_response() 373 } 374 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some("Repository is being modified by another operation, please retry".into(),)) 375 .into_response(), 376 Err(ImportError::VerificationFailed(ve)) => { 377 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response() 378 } 379 Err(ImportError::DidMismatch { car_did, auth_did }) => { 380 ApiError::InvalidRequest(format!( 381 "CAR is for {} but authenticated as {}", 382 car_did, auth_did 383 )) 384 .into_response() 385 } 386 Err(e) => { 387 error!("Import error: {:?}", e); 388 ApiError::InternalError(None).into_response() 389 } 390 } 391} 392 393async fn sequence_import_event( 394 state: &AppState, 395 did: &str, 396 commit_cid: &str, 397) -> Result<(), sqlx::Error> { 398 let prev_cid: Option<String> = None; 399 let prev_data_cid: Option<String> = None; 400 let ops = serde_json::json!([]); 401 let blobs: Vec<String> = vec![]; 402 let blocks_cids: Vec<String> = vec![]; 403 let seq_row = sqlx::query!( 404 r#" 405 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 406 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 407 RETURNING seq 408 "#, 409 did, 410 commit_cid, 411 prev_cid, 412 prev_data_cid, 413 ops, 414 &blobs, 415 &blocks_cids 416 ) 417 .fetch_one(&state.db) 418 .await?; 419 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 420 .execute(&state.db) 421 .await?; 422 Ok(()) 423}