this repo has no description
1use crate::api::EmptyResponse; 2use crate::api::error::ApiError; 3use crate::api::repo::record::create_signed_commit; 4use crate::auth::BearerAuthAllowDeactivated; 5use crate::state::AppState; 6use crate::sync::import::{ImportError, apply_import, parse_car}; 7use crate::sync::verify::CarVerifier; 8use axum::{ 9 body::Bytes, 10 extract::State, 11 response::{IntoResponse, Response}, 12}; 13use jacquard::types::{integer::LimitedU32, string::Tid}; 14use jacquard_repo::storage::BlockStore; 15use k256::ecdsa::SigningKey; 16use serde_json::json; 17use tracing::{debug, error, info, warn}; 18 19const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024; 20const DEFAULT_MAX_BLOCKS: usize = 500000; 21 22pub async fn import_repo( 23 State(state): State<AppState>, 24 auth: BearerAuthAllowDeactivated, 25 body: Bytes, 26) -> Response { 27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 28 .map(|v| v != "false" && v != "0") 29 .unwrap_or(true); 30 if !accepting_imports { 31 return ApiError::InvalidRequest("Service is not accepting repo imports".into()) 32 .into_response(); 33 } 34 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 35 .ok() 36 .and_then(|s| s.parse().ok()) 37 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 38 if body.len() > max_size { 39 return ApiError::PayloadTooLarge(format!( 40 "Import size exceeds limit of {} bytes", 41 max_size 42 )) 43 .into_response(); 44 } 45 let auth_user = auth.0; 46 let did = &auth_user.did; 47 let user = match sqlx::query!( 48 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 49 did 50 ) 51 .fetch_optional(&state.db) 52 .await 53 { 54 Ok(Some(row)) => row, 55 Ok(None) => { 56 return ApiError::AccountNotFound.into_response(); 57 } 58 Err(e) => { 59 error!("DB error fetching user: {:?}", e); 60 return ApiError::InternalError(None).into_response(); 61 } 62 }; 63 if user.takedown_ref.is_some() { 64 return ApiError::AccountTakedown.into_response(); 65 } 66 let user_id = user.id; 67 let (root, blocks) = match parse_car(&body).await { 68 Ok((r, b)) => (r, b), 69 Err(ImportError::InvalidRootCount) => { 70 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into()) 71 .into_response(); 72 } 73 Err(ImportError::CarParse(msg)) => { 74 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg)) 75 .into_response(); 76 } 77 Err(e) => { 78 error!("CAR parsing error: {:?}", e); 79 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response(); 80 } 81 }; 82 info!( 83 "Importing repo for user {}: {} blocks, root {}", 84 did, 85 blocks.len(), 86 root 87 ); 88 let Some(root_block) = blocks.get(&root) else { 89 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response(); 90 }; 91 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 92 Ok(commit) => commit.did().to_string(), 93 Err(e) => { 94 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response(); 95 } 96 }; 97 if commit_did != *did { 98 return ApiError::InvalidRepo(format!( 99 "CAR file is for DID {} but you are authenticated as {}", 100 commit_did, did 101 )) 102 .into_response(); 103 } 104 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 105 .map(|v| v == "true" || v == "1") 106 .unwrap_or(false); 107 let is_migration = user.deactivated_at.is_some(); 108 if skip_verification { 109 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 110 } else if is_migration { 111 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 112 let verifier = CarVerifier::new(); 113 match verifier.verify_car_structure_only(did, &root, &blocks) { 114 Ok(verified) => { 115 debug!( 116 "CAR structure verification successful: rev={}, data_cid={}", 117 verified.rev, verified.data_cid 118 ); 119 } 120 Err(crate::sync::verify::VerifyError::DidMismatch { 121 commit_did, 122 expected_did, 123 }) => { 124 return ApiError::InvalidRepo(format!( 125 "CAR file is for DID {} but you are authenticated as {}", 126 commit_did, expected_did 127 )) 128 .into_response(); 129 } 130 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 131 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 132 .into_response(); 133 } 134 Err(e) => { 135 error!("CAR structure verification error: {:?}", e); 136 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 137 .into_response(); 138 } 139 } 140 } else { 141 debug!("Verifying CAR file signature and structure for DID {}", did); 142 let verifier = CarVerifier::new(); 143 match verifier.verify_car(did, &root, &blocks).await { 144 Ok(verified) => { 145 debug!( 146 "CAR verification successful: rev={}, data_cid={}", 147 verified.rev, verified.data_cid 148 ); 149 } 150 Err(crate::sync::verify::VerifyError::DidMismatch { 151 commit_did, 152 expected_did, 153 }) => { 154 return ApiError::InvalidRepo(format!( 155 "CAR file is for DID {} but you are authenticated as {}", 156 commit_did, expected_did 157 )) 158 .into_response(); 159 } 160 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 161 return ApiError::InvalidRequest( 162 "CAR file commit signature verification failed".into(), 163 ) 164 .into_response(); 165 } 166 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 167 warn!("DID resolution failed during import verification: {}", msg); 168 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg)) 169 .into_response(); 170 } 171 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 172 return ApiError::InvalidRequest( 173 "DID document does not contain a signing key".into(), 174 ) 175 .into_response(); 176 } 177 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 178 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 179 .into_response(); 180 } 181 Err(e) => { 182 error!("CAR verification error: {:?}", e); 183 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 184 .into_response(); 185 } 186 } 187 } 188 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 189 .ok() 190 .and_then(|s| s.parse().ok()) 191 .unwrap_or(DEFAULT_MAX_BLOCKS); 192 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { 193 Ok(import_result) => { 194 info!( 195 "Successfully imported {} records for user {}", 196 import_result.records.len(), 197 did 198 ); 199 let mut blob_ref_count = 0; 200 for record in &import_result.records { 201 for blob_ref in &record.blob_refs { 202 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey); 203 if let Err(e) = sqlx::query!( 204 r#" 205 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 206 VALUES ($1, $2, $3) 207 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 208 "#, 209 user_id, 210 record_uri, 211 blob_ref.cid 212 ) 213 .execute(&state.db) 214 .await 215 { 216 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e); 217 } else { 218 blob_ref_count += 1; 219 } 220 } 221 } 222 if blob_ref_count > 0 { 223 info!( 224 "Recorded {} blob references for imported repo", 225 blob_ref_count 226 ); 227 } 228 let key_row = match sqlx::query!( 229 r#"SELECT uk.key_bytes, uk.encryption_version 230 FROM user_keys uk 231 JOIN users u ON uk.user_id = u.id 232 WHERE u.did = $1"#, 233 did 234 ) 235 .fetch_optional(&state.db) 236 .await 237 { 238 Ok(Some(row)) => row, 239 Ok(None) => { 240 error!("No signing key found for user {}", did); 241 return ApiError::InternalError(Some("Signing key not found".into())) 242 .into_response(); 243 } 244 Err(e) => { 245 error!("DB error fetching signing key: {:?}", e); 246 return ApiError::InternalError(None).into_response(); 247 } 248 }; 249 let key_bytes = 250 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { 251 Ok(k) => k, 252 Err(e) => { 253 error!("Failed to decrypt signing key: {}", e); 254 return ApiError::InternalError(None).into_response(); 255 } 256 }; 257 let signing_key = match SigningKey::from_slice(&key_bytes) { 258 Ok(k) => k, 259 Err(e) => { 260 error!("Invalid signing key: {:?}", e); 261 return ApiError::InternalError(None).into_response(); 262 } 263 }; 264 let new_rev = Tid::now(LimitedU32::MIN); 265 let new_rev_str = new_rev.to_string(); 266 let (commit_bytes, _sig) = match create_signed_commit( 267 did, 268 import_result.data_cid, 269 &new_rev_str, 270 None, 271 &signing_key, 272 ) { 273 Ok(result) => result, 274 Err(e) => { 275 error!("Failed to create new commit: {}", e); 276 return ApiError::InternalError(None).into_response(); 277 } 278 }; 279 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { 280 Ok(cid) => cid, 281 Err(e) => { 282 error!("Failed to store new commit block: {:?}", e); 283 return ApiError::InternalError(None).into_response(); 284 } 285 }; 286 let new_root_str = new_root_cid.to_string(); 287 if let Err(e) = sqlx::query!( 288 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 289 new_root_str, 290 &new_rev_str, 291 user_id 292 ) 293 .execute(&state.db) 294 .await 295 { 296 error!("Failed to update repo root: {:?}", e); 297 return ApiError::InternalError(None).into_response(); 298 } 299 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect(); 300 all_block_cids.push(new_root_cid.to_bytes()); 301 if let Err(e) = sqlx::query!( 302 r#" 303 INSERT INTO user_blocks (user_id, block_cid) 304 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 305 ON CONFLICT (user_id, block_cid) DO NOTHING 306 "#, 307 user_id, 308 &all_block_cids 309 ) 310 .execute(&state.db) 311 .await 312 { 313 error!("Failed to insert user_blocks: {:?}", e); 314 return ApiError::InternalError(None).into_response(); 315 } 316 info!( 317 "Created new commit for imported repo: cid={}, rev={}", 318 new_root_str, new_rev_str 319 ); 320 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await 321 { 322 warn!("Failed to sequence import event: {:?}", e); 323 } 324 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 325 let birthdate_pref = json!({ 326 "$type": "app.bsky.actor.defs#personalDetailsPref", 327 "birthDate": "1998-05-06T00:00:00.000Z" 328 }); 329 if let Err(e) = sqlx::query!( 330 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 331 ON CONFLICT (user_id, name) DO NOTHING", 332 user_id, 333 "app.bsky.actor.defs#personalDetailsPref", 334 birthdate_pref 335 ) 336 .execute(&state.db) 337 .await 338 { 339 warn!( 340 "Failed to set default birthdate preference for migrated user: {:?}", 341 e 342 ); 343 } 344 } 345 EmptyResponse::ok().into_response() 346 } 347 Err(ImportError::SizeLimitExceeded) => { 348 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks)) 349 .into_response() 350 } 351 Err(ImportError::RepoNotFound) => { 352 ApiError::RepoNotFound(Some("Repository not initialized for this account".into())) 353 .into_response() 354 } 355 Err(ImportError::InvalidCbor(msg)) => { 356 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response() 357 } 358 Err(ImportError::InvalidCommit(msg)) => { 359 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response() 360 } 361 Err(ImportError::BlockNotFound(cid)) => { 362 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid)) 363 .into_response() 364 } 365 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some( 366 "Repository is being modified by another operation, please retry".into(), 367 )) 368 .into_response(), 369 Err(ImportError::VerificationFailed(ve)) => { 370 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response() 371 } 372 Err(ImportError::DidMismatch { car_did, auth_did }) => ApiError::InvalidRequest(format!( 373 "CAR is for {} but authenticated as {}", 374 car_did, auth_did 375 )) 376 .into_response(), 377 Err(e) => { 378 error!("Import error: {:?}", e); 379 ApiError::InternalError(None).into_response() 380 } 381 } 382} 383 384async fn sequence_import_event( 385 state: &AppState, 386 did: &str, 387 commit_cid: &str, 388) -> Result<(), sqlx::Error> { 389 let prev_cid: Option<String> = None; 390 let prev_data_cid: Option<String> = None; 391 let ops = serde_json::json!([]); 392 let blobs: Vec<String> = vec![]; 393 let blocks_cids: Vec<String> = vec![]; 394 let seq_row = sqlx::query!( 395 r#" 396 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 397 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 398 RETURNING seq 399 "#, 400 did, 401 commit_cid, 402 prev_cid, 403 prev_data_cid, 404 ops, 405 &blobs, 406 &blocks_cids 407 ) 408 .fetch_one(&state.db) 409 .await?; 410 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 411 .execute(&state.db) 412 .await?; 413 Ok(()) 414}