this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{ImportError, apply_import, parse_car}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 Json, 7 body::Bytes, 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14 15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 16const DEFAULT_MAX_BLOCKS: usize = 50000; 17 18pub async fn import_repo( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 24 .map(|v| v != "false" && v != "0") 25 .unwrap_or(true); 26 if !accepting_imports { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Service is not accepting repo imports" 32 })), 33 ) 34 .into_response(); 35 } 36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 37 .ok() 38 .and_then(|s| s.parse().ok()) 39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 40 if body.len() > max_size { 41 return ( 42 StatusCode::PAYLOAD_TOO_LARGE, 43 Json(json!({ 44 "error": "InvalidRequest", 45 "message": format!("Import size exceeds limit of {} bytes", max_size) 46 })), 47 ) 48 .into_response(); 49 } 50 let token = match crate::auth::extract_bearer_token_from_header( 51 headers.get("Authorization").and_then(|h| h.to_str().ok()), 52 ) { 53 Some(t) => t, 54 None => return ApiError::AuthenticationRequired.into_response(), 55 }; 56 let auth_user = 57 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 58 Ok(user) => user, 59 Err(e) => return ApiError::from(e).into_response(), 60 }; 61 let did = &auth_user.did; 62 let user = match sqlx::query!( 63 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1", 64 did 65 ) 66 .fetch_optional(&state.db) 67 .await 68 { 69 Ok(Some(row)) => row, 70 Ok(None) => { 71 return ( 72 StatusCode::NOT_FOUND, 73 Json(json!({"error": "AccountNotFound"})), 74 ) 75 .into_response(); 76 } 77 Err(e) => { 78 error!("DB error fetching user: {:?}", e); 79 return ( 80 StatusCode::INTERNAL_SERVER_ERROR, 81 Json(json!({"error": "InternalError"})), 82 ) 83 .into_response(); 84 } 85 }; 86 if user.takedown_ref.is_some() { 87 return ( 88 StatusCode::FORBIDDEN, 89 Json(json!({ 90 "error": "AccountTakenDown", 91 "message": "Account has been taken down" 92 })), 93 ) 94 .into_response(); 95 } 96 let user_id = user.id; 97 let (root, blocks) = match parse_car(&body).await { 98 Ok((r, b)) => (r, b), 99 Err(ImportError::InvalidRootCount) => { 100 return ( 101 StatusCode::BAD_REQUEST, 102 Json(json!({ 103 "error": "InvalidRequest", 104 "message": "Expected exactly one root in CAR file" 105 })), 106 ) 107 .into_response(); 108 } 109 Err(ImportError::CarParse(msg)) => { 110 return ( 111 StatusCode::BAD_REQUEST, 112 Json(json!({ 113 "error": "InvalidRequest", 114 "message": format!("Failed to parse CAR file: {}", msg) 115 })), 116 ) 117 .into_response(); 118 } 119 Err(e) => { 120 error!("CAR parsing error: {:?}", e); 121 return ( 122 StatusCode::BAD_REQUEST, 123 Json(json!({ 124 "error": "InvalidRequest", 125 "message": format!("Invalid CAR file: {}", e) 126 })), 127 ) 128 .into_response(); 129 } 130 }; 131 info!( 132 "Importing repo for user {}: {} blocks, root {}", 133 did, 134 blocks.len(), 135 root 136 ); 137 let root_block = match blocks.get(&root) { 138 Some(b) => b, 139 None => { 140 return ( 141 StatusCode::BAD_REQUEST, 142 Json(json!({ 143 "error": "InvalidRequest", 144 "message": "Root block not found in CAR file" 145 })), 146 ) 147 .into_response(); 148 } 149 }; 150 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 151 Ok(commit) => commit.did().to_string(), 152 Err(e) => { 153 return ( 154 StatusCode::BAD_REQUEST, 155 Json(json!({ 156 "error": "InvalidRequest", 157 "message": format!("Invalid commit: {}", e) 158 })), 159 ) 160 .into_response(); 161 } 162 }; 163 if commit_did != *did { 164 return ( 165 StatusCode::FORBIDDEN, 166 Json(json!({ 167 "error": "InvalidRequest", 168 "message": format!( 169 "CAR file is for DID {} but you are authenticated as {}", 170 commit_did, did 171 ) 172 })), 173 ) 174 .into_response(); 175 } 176 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 177 .map(|v| v == "true" || v == "1") 178 .unwrap_or(false); 179 let is_migration = user.deactivated_at.is_some(); 180 if skip_verification { 181 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 182 } else if is_migration { 183 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 184 let verifier = CarVerifier::new(); 185 match verifier.verify_car_structure_only(did, &root, &blocks) { 186 Ok(verified) => { 187 debug!( 188 "CAR structure verification successful: rev={}, data_cid={}", 189 verified.rev, verified.data_cid 190 ); 191 } 192 Err(crate::sync::verify::VerifyError::DidMismatch { 193 commit_did, 194 expected_did, 195 }) => { 196 return ( 197 StatusCode::FORBIDDEN, 198 Json(json!({ 199 "error": "InvalidRequest", 200 "message": format!( 201 "CAR file is for DID {} but you are authenticated as {}", 202 commit_did, expected_did 203 ) 204 })), 205 ) 206 .into_response(); 207 } 208 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 209 return ( 210 StatusCode::BAD_REQUEST, 211 Json(json!({ 212 "error": "InvalidRequest", 213 "message": format!("MST validation failed: {}", msg) 214 })), 215 ) 216 .into_response(); 217 } 218 Err(e) => { 219 error!("CAR structure verification error: {:?}", e); 220 return ( 221 StatusCode::BAD_REQUEST, 222 Json(json!({ 223 "error": "InvalidRequest", 224 "message": format!("CAR verification failed: {}", e) 225 })), 226 ) 227 .into_response(); 228 } 229 } 230 } else { 231 debug!("Verifying CAR file signature and structure for DID {}", did); 232 let verifier = CarVerifier::new(); 233 match verifier.verify_car(did, &root, &blocks).await { 234 Ok(verified) => { 235 debug!( 236 "CAR verification successful: rev={}, data_cid={}", 237 verified.rev, verified.data_cid 238 ); 239 } 240 Err(crate::sync::verify::VerifyError::DidMismatch { 241 commit_did, 242 expected_did, 243 }) => { 244 return ( 245 StatusCode::FORBIDDEN, 246 Json(json!({ 247 "error": "InvalidRequest", 248 "message": format!( 249 "CAR file is for DID {} but you are authenticated as {}", 250 commit_did, expected_did 251 ) 252 })), 253 ) 254 .into_response(); 255 } 256 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 257 return ( 258 StatusCode::BAD_REQUEST, 259 Json(json!({ 260 "error": "InvalidSignature", 261 "message": "CAR file commit signature verification failed" 262 })), 263 ) 264 .into_response(); 265 } 266 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 267 warn!("DID resolution failed during import verification: {}", msg); 268 return ( 269 StatusCode::BAD_REQUEST, 270 Json(json!({ 271 "error": "InvalidRequest", 272 "message": format!("Failed to verify DID: {}", msg) 273 })), 274 ) 275 .into_response(); 276 } 277 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 278 return ( 279 StatusCode::BAD_REQUEST, 280 Json(json!({ 281 "error": "InvalidRequest", 282 "message": "DID document does not contain a signing key" 283 })), 284 ) 285 .into_response(); 286 } 287 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 288 return ( 289 StatusCode::BAD_REQUEST, 290 Json(json!({ 291 "error": "InvalidRequest", 292 "message": format!("MST validation failed: {}", msg) 293 })), 294 ) 295 .into_response(); 296 } 297 Err(e) => { 298 error!("CAR verification error: {:?}", e); 299 return ( 300 StatusCode::BAD_REQUEST, 301 Json(json!({ 302 "error": "InvalidRequest", 303 "message": format!("CAR verification failed: {}", e) 304 })), 305 ) 306 .into_response(); 307 } 308 } 309 } 310 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 311 .ok() 312 .and_then(|s| s.parse().ok()) 313 .unwrap_or(DEFAULT_MAX_BLOCKS); 314 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 315 Ok(records) => { 316 info!( 317 "Successfully imported {} records for user {}", 318 records.len(), 319 did 320 ); 321 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 322 warn!("Failed to sequence import event: {:?}", e); 323 } 324 (StatusCode::OK, Json(json!({}))).into_response() 325 } 326 Err(ImportError::SizeLimitExceeded) => ( 327 StatusCode::BAD_REQUEST, 328 Json(json!({ 329 "error": "InvalidRequest", 330 "message": format!("Import exceeds block limit of {}", max_blocks) 331 })), 332 ) 333 .into_response(), 334 Err(ImportError::RepoNotFound) => ( 335 StatusCode::NOT_FOUND, 336 Json(json!({ 337 "error": "RepoNotFound", 338 "message": "Repository not initialized for this account" 339 })), 340 ) 341 .into_response(), 342 Err(ImportError::InvalidCbor(msg)) => ( 343 StatusCode::BAD_REQUEST, 344 Json(json!({ 345 "error": "InvalidRequest", 346 "message": format!("Invalid CBOR data: {}", msg) 347 })), 348 ) 349 .into_response(), 350 Err(ImportError::InvalidCommit(msg)) => ( 351 StatusCode::BAD_REQUEST, 352 Json(json!({ 353 "error": "InvalidRequest", 354 "message": format!("Invalid commit structure: {}", msg) 355 })), 356 ) 357 .into_response(), 358 Err(ImportError::BlockNotFound(cid)) => ( 359 StatusCode::BAD_REQUEST, 360 Json(json!({ 361 "error": "InvalidRequest", 362 "message": format!("Referenced block not found in CAR: {}", cid) 363 })), 364 ) 365 .into_response(), 366 Err(ImportError::ConcurrentModification) => ( 367 StatusCode::CONFLICT, 368 Json(json!({ 369 "error": "ConcurrentModification", 370 "message": "Repository is being modified by another operation, please retry" 371 })), 372 ) 373 .into_response(), 374 Err(ImportError::VerificationFailed(ve)) => ( 375 StatusCode::BAD_REQUEST, 376 Json(json!({ 377 "error": "VerificationFailed", 378 "message": format!("CAR verification failed: {}", ve) 379 })), 380 ) 381 .into_response(), 382 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 383 StatusCode::FORBIDDEN, 384 Json(json!({ 385 "error": "DidMismatch", 386 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 387 })), 388 ) 389 .into_response(), 390 Err(e) => { 391 error!("Import error: {:?}", e); 392 ( 393 StatusCode::INTERNAL_SERVER_ERROR, 394 Json(json!({"error": "InternalError"})), 395 ) 396 .into_response() 397 } 398 } 399} 400 401async fn sequence_import_event( 402 state: &AppState, 403 did: &str, 404 commit_cid: &str, 405) -> Result<(), sqlx::Error> { 406 let prev_cid: Option<String> = None; 407 let prev_data_cid: Option<String> = None; 408 let ops = serde_json::json!([]); 409 let blobs: Vec<String> = vec![]; 410 let blocks_cids: Vec<String> = vec![]; 411 let seq_row = sqlx::query!( 412 r#" 413 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 414 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 415 RETURNING seq 416 "#, 417 did, 418 commit_cid, 419 prev_cid, 420 prev_data_cid, 421 ops, 422 &blobs, 423 &blocks_cids 424 ) 425 .fetch_one(&state.db) 426 .await?; 427 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 428 .execute(&state.db) 429 .await?; 430 Ok(()) 431}