this repo has no description
at main 17 kB view raw
1use crate::api::EmptyResponse; 2use crate::api::error::ApiError; 3use crate::api::repo::record::create_signed_commit; 4use crate::auth::BearerAuthAllowDeactivated; 5use crate::state::AppState; 6use crate::sync::import::{ImportError, apply_import, parse_car}; 7use crate::sync::verify::CarVerifier; 8use crate::types::Did; 9use axum::{ 10 body::Bytes, 11 extract::State, 12 response::{IntoResponse, Response}, 13}; 14use jacquard::types::{integer::LimitedU32, string::Tid}; 15use jacquard_repo::storage::BlockStore; 16use k256::ecdsa::SigningKey; 17use serde_json::json; 18use tracing::{debug, error, info, warn}; 19 20const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024; 21const DEFAULT_MAX_BLOCKS: usize = 500000; 22 23pub async fn import_repo( 24 State(state): State<AppState>, 25 auth: BearerAuthAllowDeactivated, 26 body: Bytes, 27) -> Response { 28 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 29 .map(|v| v != "false" && v != "0") 30 .unwrap_or(true); 31 if !accepting_imports { 32 return ApiError::InvalidRequest("Service is not accepting repo imports".into()) 33 .into_response(); 34 } 35 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 36 .ok() 37 .and_then(|s| s.parse().ok()) 38 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 39 if body.len() > max_size { 40 return ApiError::PayloadTooLarge(format!( 41 "Import size exceeds limit of {} bytes", 42 max_size 43 )) 44 .into_response(); 45 } 46 let auth_user = auth.0; 47 let did = &auth_user.did; 48 let user = match sqlx::query!( 49 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 50 did 51 ) 52 .fetch_optional(&state.db) 53 .await 54 { 55 Ok(Some(row)) => row, 56 Ok(None) => { 57 return ApiError::AccountNotFound.into_response(); 58 } 59 Err(e) => { 60 error!("DB error fetching user: {:?}", e); 61 return ApiError::InternalError(None).into_response(); 62 } 63 }; 64 if user.takedown_ref.is_some() { 65 return ApiError::AccountTakedown.into_response(); 66 } 67 let user_id = user.id; 68 let (root, blocks) = match parse_car(&body).await { 69 Ok((r, b)) => (r, b), 70 Err(ImportError::InvalidRootCount) => { 71 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into()) 72 .into_response(); 73 } 74 Err(ImportError::CarParse(msg)) => { 75 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg)) 76 .into_response(); 77 } 78 Err(e) => { 79 error!("CAR parsing error: {:?}", e); 80 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response(); 81 } 82 }; 83 info!( 84 "Importing repo for user {}: {} blocks, root {}", 85 did, 86 blocks.len(), 87 root 88 ); 89 let Some(root_block) = blocks.get(&root) else { 90 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response(); 91 }; 92 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 93 Ok(commit) => commit.did().to_string(), 94 Err(e) => { 95 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response(); 96 } 97 }; 98 if commit_did != *did { 99 return ApiError::InvalidRepo(format!( 100 "CAR file is for DID {} but you are authenticated as {}", 101 commit_did, did 102 )) 103 .into_response(); 104 } 105 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 106 .map(|v| v == "true" || v == "1") 107 .unwrap_or(false); 108 let is_migration = user.deactivated_at.is_some(); 109 if skip_verification { 110 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 111 } else if is_migration { 112 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 113 let verifier = CarVerifier::new(); 114 match verifier.verify_car_structure_only(did, &root, &blocks) { 115 Ok(verified) => { 116 debug!( 117 "CAR structure verification successful: rev={}, data_cid={}", 118 verified.rev, verified.data_cid 119 ); 120 } 121 Err(crate::sync::verify::VerifyError::DidMismatch { 122 commit_did, 123 expected_did, 124 }) => { 125 return ApiError::InvalidRepo(format!( 126 "CAR file is for DID {} but you are authenticated as {}", 127 commit_did, expected_did 128 )) 129 .into_response(); 130 } 131 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 132 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 133 .into_response(); 134 } 135 Err(e) => { 136 error!("CAR structure verification error: {:?}", e); 137 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 138 .into_response(); 139 } 140 } 141 } else { 142 debug!("Verifying CAR file signature and structure for DID {}", did); 143 let verifier = CarVerifier::new(); 144 match verifier.verify_car(did, &root, &blocks).await { 145 Ok(verified) => { 146 debug!( 147 "CAR verification successful: rev={}, data_cid={}", 148 verified.rev, verified.data_cid 149 ); 150 } 151 Err(crate::sync::verify::VerifyError::DidMismatch { 152 commit_did, 153 expected_did, 154 }) => { 155 return ApiError::InvalidRepo(format!( 156 "CAR file is for DID {} but you are authenticated as {}", 157 commit_did, expected_did 158 )) 159 .into_response(); 160 } 161 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 162 return ApiError::InvalidRequest( 163 "CAR file commit signature verification failed".into(), 164 ) 165 .into_response(); 166 } 167 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 168 warn!("DID resolution failed during import verification: {}", msg); 169 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg)) 170 .into_response(); 171 } 172 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 173 return ApiError::InvalidRequest( 174 "DID document does not contain a signing key".into(), 175 ) 176 .into_response(); 177 } 178 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 179 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg)) 180 .into_response(); 181 } 182 Err(e) => { 183 error!("CAR verification error: {:?}", e); 184 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e)) 185 .into_response(); 186 } 187 } 188 } 189 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 190 .ok() 191 .and_then(|s| s.parse().ok()) 192 .unwrap_or(DEFAULT_MAX_BLOCKS); 193 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { 194 Ok(import_result) => { 195 info!( 196 "Successfully imported {} records for user {}", 197 import_result.records.len(), 198 did 199 ); 200 let blob_refs: Vec<(String, String)> = import_result 201 .records 202 .iter() 203 .flat_map(|record| { 204 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey); 205 record 206 .blob_refs 207 .iter() 208 .map(move |blob_ref| (record_uri.clone(), blob_ref.cid.clone())) 209 }) 210 .collect(); 211 212 if !blob_refs.is_empty() { 213 let (record_uris, blob_cids): (Vec<String>, Vec<String>) = 214 blob_refs.into_iter().unzip(); 215 216 match sqlx::query!( 217 r#" 218 INSERT INTO record_blobs (repo_id, record_uri, blob_cid) 219 SELECT $1, * FROM UNNEST($2::text[], $3::text[]) 220 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING 221 "#, 222 user_id, 223 &record_uris, 224 &blob_cids 225 ) 226 .execute(&state.db) 227 .await 228 { 229 Ok(result) => { 230 info!( 231 "Recorded {} blob references for imported repo", 232 result.rows_affected() 233 ); 234 } 235 Err(e) => { 236 warn!("Failed to insert record_blobs: {:?}", e); 237 } 238 } 239 } 240 let key_row = match sqlx::query!( 241 r#"SELECT uk.key_bytes, uk.encryption_version 242 FROM user_keys uk 243 JOIN users u ON uk.user_id = u.id 244 WHERE u.did = $1"#, 245 did 246 ) 247 .fetch_optional(&state.db) 248 .await 249 { 250 Ok(Some(row)) => row, 251 Ok(None) => { 252 error!("No signing key found for user {}", did); 253 return ApiError::InternalError(Some("Signing key not found".into())) 254 .into_response(); 255 } 256 Err(e) => { 257 error!("DB error fetching signing key: {:?}", e); 258 return ApiError::InternalError(None).into_response(); 259 } 260 }; 261 let key_bytes = 262 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { 263 Ok(k) => k, 264 Err(e) => { 265 error!("Failed to decrypt signing key: {}", e); 266 return ApiError::InternalError(None).into_response(); 267 } 268 }; 269 let signing_key = match SigningKey::from_slice(&key_bytes) { 270 Ok(k) => k, 271 Err(e) => { 272 error!("Invalid signing key: {:?}", e); 273 return ApiError::InternalError(None).into_response(); 274 } 275 }; 276 let new_rev = Tid::now(LimitedU32::MIN); 277 let new_rev_str = new_rev.to_string(); 278 let (commit_bytes, _sig) = match create_signed_commit( 279 did, 280 import_result.data_cid, 281 &new_rev_str, 282 None, 283 &signing_key, 284 ) { 285 Ok(result) => result, 286 Err(e) => { 287 error!("Failed to create new commit: {}", e); 288 return ApiError::InternalError(None).into_response(); 289 } 290 }; 291 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { 292 Ok(cid) => cid, 293 Err(e) => { 294 error!("Failed to store new commit block: {:?}", e); 295 return ApiError::InternalError(None).into_response(); 296 } 297 }; 298 let new_root_str = new_root_cid.to_string(); 299 if let Err(e) = sqlx::query!( 300 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", 301 new_root_str, 302 &new_rev_str, 303 user_id 304 ) 305 .execute(&state.db) 306 .await 307 { 308 error!("Failed to update repo root: {:?}", e); 309 return ApiError::InternalError(None).into_response(); 310 } 311 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect(); 312 all_block_cids.push(new_root_cid.to_bytes()); 313 if let Err(e) = sqlx::query!( 314 r#" 315 INSERT INTO user_blocks (user_id, block_cid) 316 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) 317 ON CONFLICT (user_id, block_cid) DO NOTHING 318 "#, 319 user_id, 320 &all_block_cids 321 ) 322 .execute(&state.db) 323 .await 324 { 325 error!("Failed to insert user_blocks: {:?}", e); 326 return ApiError::InternalError(None).into_response(); 327 } 328 info!( 329 "Created new commit for imported repo: cid={}, rev={}", 330 new_root_str, new_rev_str 331 ); 332 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await 333 { 334 warn!("Failed to sequence import event: {:?}", e); 335 } 336 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() { 337 let birthdate_pref = json!({ 338 "$type": "app.bsky.actor.defs#personalDetailsPref", 339 "birthDate": "1998-05-06T00:00:00.000Z" 340 }); 341 if let Err(e) = sqlx::query!( 342 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3) 343 ON CONFLICT (user_id, name) DO NOTHING", 344 user_id, 345 "app.bsky.actor.defs#personalDetailsPref", 346 birthdate_pref 347 ) 348 .execute(&state.db) 349 .await 350 { 351 warn!( 352 "Failed to set default birthdate preference for migrated user: {:?}", 353 e 354 ); 355 } 356 } 357 EmptyResponse::ok().into_response() 358 } 359 Err(ImportError::SizeLimitExceeded) => { 360 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks)) 361 .into_response() 362 } 363 Err(ImportError::RepoNotFound) => { 364 ApiError::RepoNotFound(Some("Repository not initialized for this account".into())) 365 .into_response() 366 } 367 Err(ImportError::InvalidCbor(msg)) => { 368 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response() 369 } 370 Err(ImportError::InvalidCommit(msg)) => { 371 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response() 372 } 373 Err(ImportError::BlockNotFound(cid)) => { 374 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid)) 375 .into_response() 376 } 377 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some( 378 "Repository is being modified by another operation, please retry".into(), 379 )) 380 .into_response(), 381 Err(ImportError::VerificationFailed(ve)) => { 382 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response() 383 } 384 Err(ImportError::DidMismatch { car_did, auth_did }) => ApiError::InvalidRequest(format!( 385 "CAR is for {} but authenticated as {}", 386 car_did, auth_did 387 )) 388 .into_response(), 389 Err(e) => { 390 error!("Import error: {:?}", e); 391 ApiError::InternalError(None).into_response() 392 } 393 } 394} 395 396async fn sequence_import_event( 397 state: &AppState, 398 did: &Did, 399 commit_cid: &str, 400) -> Result<(), sqlx::Error> { 401 let prev_cid: Option<String> = None; 402 let prev_data_cid: Option<String> = None; 403 let ops = serde_json::json!([]); 404 let blobs: Vec<String> = vec![]; 405 let blocks_cids: Vec<String> = vec![]; 406 let did_str = did.as_str(); 407 408 let mut tx = state.db.begin().await?; 409 410 let seq_row = sqlx::query!( 411 r#" 412 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 413 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 414 RETURNING seq 415 "#, 416 did_str, 417 commit_cid, 418 prev_cid, 419 prev_data_cid, 420 ops, 421 &blobs, 422 &blocks_cids 423 ) 424 .fetch_one(&mut *tx) 425 .await?; 426 427 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 428 .execute(&mut *tx) 429 .await?; 430 431 tx.commit().await?; 432 Ok(()) 433}