this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{ImportError, apply_import, parse_car}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 Json, 7 body::Bytes, 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14 15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 16const DEFAULT_MAX_BLOCKS: usize = 50000; 17 18pub async fn import_repo( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 24 .map(|v| v != "false" && v != "0") 25 .unwrap_or(true); 26 if !accepting_imports { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Service is not accepting repo imports" 32 })), 33 ) 34 .into_response(); 35 } 36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 37 .ok() 38 .and_then(|s| s.parse().ok()) 39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 40 if body.len() > max_size { 41 return ( 42 StatusCode::PAYLOAD_TOO_LARGE, 43 Json(json!({ 44 "error": "InvalidRequest", 45 "message": format!("Import size exceeds limit of {} bytes", max_size) 46 })), 47 ) 48 .into_response(); 49 } 50 let token = match crate::auth::extract_bearer_token_from_header( 51 headers.get("Authorization").and_then(|h| h.to_str().ok()), 52 ) { 53 Some(t) => t, 54 None => return ApiError::AuthenticationRequired.into_response(), 55 }; 56 let auth_user = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await { 57 Ok(user) => user, 58 Err(e) => return ApiError::from(e).into_response(), 59 }; 60 let did = &auth_user.did; 61 let user = match sqlx::query!( 62 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1", 63 did 64 ) 65 .fetch_optional(&state.db) 66 .await 67 { 68 Ok(Some(row)) => row, 69 Ok(None) => { 70 return ( 71 StatusCode::NOT_FOUND, 72 Json(json!({"error": "AccountNotFound"})), 73 ) 74 .into_response(); 75 } 76 Err(e) => { 77 error!("DB error fetching user: {:?}", e); 78 return ( 79 StatusCode::INTERNAL_SERVER_ERROR, 80 Json(json!({"error": "InternalError"})), 81 ) 82 .into_response(); 83 } 84 }; 85 if user.takedown_ref.is_some() { 86 return ( 87 StatusCode::FORBIDDEN, 88 Json(json!({ 89 "error": "AccountTakenDown", 90 "message": "Account has been taken down" 91 })), 92 ) 93 .into_response(); 94 } 95 let user_id = user.id; 96 let (root, blocks) = match parse_car(&body).await { 97 Ok((r, b)) => (r, b), 98 Err(ImportError::InvalidRootCount) => { 99 return ( 100 StatusCode::BAD_REQUEST, 101 Json(json!({ 102 "error": "InvalidRequest", 103 "message": "Expected exactly one root in CAR file" 104 })), 105 ) 106 .into_response(); 107 } 108 Err(ImportError::CarParse(msg)) => { 109 return ( 110 StatusCode::BAD_REQUEST, 111 Json(json!({ 112 "error": "InvalidRequest", 113 "message": format!("Failed to parse CAR file: {}", msg) 114 })), 115 ) 116 .into_response(); 117 } 118 Err(e) => { 119 error!("CAR parsing error: {:?}", e); 120 return ( 121 StatusCode::BAD_REQUEST, 122 Json(json!({ 123 "error": "InvalidRequest", 124 "message": format!("Invalid CAR file: {}", e) 125 })), 126 ) 127 .into_response(); 128 } 129 }; 130 info!( 131 "Importing repo for user {}: {} blocks, root {}", 132 did, 133 blocks.len(), 134 root 135 ); 136 let root_block = match blocks.get(&root) { 137 Some(b) => b, 138 None => { 139 return ( 140 StatusCode::BAD_REQUEST, 141 Json(json!({ 142 "error": "InvalidRequest", 143 "message": "Root block not found in CAR file" 144 })), 145 ) 146 .into_response(); 147 } 148 }; 149 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 150 Ok(commit) => commit.did().to_string(), 151 Err(e) => { 152 return ( 153 StatusCode::BAD_REQUEST, 154 Json(json!({ 155 "error": "InvalidRequest", 156 "message": format!("Invalid commit: {}", e) 157 })), 158 ) 159 .into_response(); 160 } 161 }; 162 if commit_did != *did { 163 return ( 164 StatusCode::FORBIDDEN, 165 Json(json!({ 166 "error": "InvalidRequest", 167 "message": format!( 168 "CAR file is for DID {} but you are authenticated as {}", 169 commit_did, did 170 ) 171 })), 172 ) 173 .into_response(); 174 } 175 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 176 .map(|v| v == "true" || v == "1") 177 .unwrap_or(false); 178 let is_migration = user.deactivated_at.is_some(); 179 if skip_verification { 180 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)"); 181 } else if is_migration { 182 debug!("Verifying CAR file structure for migration (skipping signature verification)"); 183 let verifier = CarVerifier::new(); 184 match verifier.verify_car_structure_only(did, &root, &blocks) { 185 Ok(verified) => { 186 debug!( 187 "CAR structure verification successful: rev={}, data_cid={}", 188 verified.rev, verified.data_cid 189 ); 190 } 191 Err(crate::sync::verify::VerifyError::DidMismatch { 192 commit_did, 193 expected_did, 194 }) => { 195 return ( 196 StatusCode::FORBIDDEN, 197 Json(json!({ 198 "error": "InvalidRequest", 199 "message": format!( 200 "CAR file is for DID {} but you are authenticated as {}", 201 commit_did, expected_did 202 ) 203 })), 204 ) 205 .into_response(); 206 } 207 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 208 return ( 209 StatusCode::BAD_REQUEST, 210 Json(json!({ 211 "error": "InvalidRequest", 212 "message": format!("MST validation failed: {}", msg) 213 })), 214 ) 215 .into_response(); 216 } 217 Err(e) => { 218 error!("CAR structure verification error: {:?}", e); 219 return ( 220 StatusCode::BAD_REQUEST, 221 Json(json!({ 222 "error": "InvalidRequest", 223 "message": format!("CAR verification failed: {}", e) 224 })), 225 ) 226 .into_response(); 227 } 228 } 229 } else { 230 debug!("Verifying CAR file signature and structure for DID {}", did); 231 let verifier = CarVerifier::new(); 232 match verifier.verify_car(did, &root, &blocks).await { 233 Ok(verified) => { 234 debug!( 235 "CAR verification successful: rev={}, data_cid={}", 236 verified.rev, verified.data_cid 237 ); 238 } 239 Err(crate::sync::verify::VerifyError::DidMismatch { 240 commit_did, 241 expected_did, 242 }) => { 243 return ( 244 StatusCode::FORBIDDEN, 245 Json(json!({ 246 "error": "InvalidRequest", 247 "message": format!( 248 "CAR file is for DID {} but you are authenticated as {}", 249 commit_did, expected_did 250 ) 251 })), 252 ) 253 .into_response(); 254 } 255 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 256 return ( 257 StatusCode::BAD_REQUEST, 258 Json(json!({ 259 "error": "InvalidSignature", 260 "message": "CAR file commit signature verification failed" 261 })), 262 ) 263 .into_response(); 264 } 265 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 266 warn!("DID resolution failed during import verification: {}", msg); 267 return ( 268 StatusCode::BAD_REQUEST, 269 Json(json!({ 270 "error": "InvalidRequest", 271 "message": format!("Failed to verify DID: {}", msg) 272 })), 273 ) 274 .into_response(); 275 } 276 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 277 return ( 278 StatusCode::BAD_REQUEST, 279 Json(json!({ 280 "error": "InvalidRequest", 281 "message": "DID document does not contain a signing key" 282 })), 283 ) 284 .into_response(); 285 } 286 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 287 return ( 288 StatusCode::BAD_REQUEST, 289 Json(json!({ 290 "error": "InvalidRequest", 291 "message": format!("MST validation failed: {}", msg) 292 })), 293 ) 294 .into_response(); 295 } 296 Err(e) => { 297 error!("CAR verification error: {:?}", e); 298 return ( 299 StatusCode::BAD_REQUEST, 300 Json(json!({ 301 "error": "InvalidRequest", 302 "message": format!("CAR verification failed: {}", e) 303 })), 304 ) 305 .into_response(); 306 } 307 } 308 } 309 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 310 .ok() 311 .and_then(|s| s.parse().ok()) 312 .unwrap_or(DEFAULT_MAX_BLOCKS); 313 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 314 Ok(records) => { 315 info!( 316 "Successfully imported {} records for user {}", 317 records.len(), 318 did 319 ); 320 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 321 warn!("Failed to sequence import event: {:?}", e); 322 } 323 (StatusCode::OK, Json(json!({}))).into_response() 324 } 325 Err(ImportError::SizeLimitExceeded) => ( 326 StatusCode::BAD_REQUEST, 327 Json(json!({ 328 "error": "InvalidRequest", 329 "message": format!("Import exceeds block limit of {}", max_blocks) 330 })), 331 ) 332 .into_response(), 333 Err(ImportError::RepoNotFound) => ( 334 StatusCode::NOT_FOUND, 335 Json(json!({ 336 "error": "RepoNotFound", 337 "message": "Repository not initialized for this account" 338 })), 339 ) 340 .into_response(), 341 Err(ImportError::InvalidCbor(msg)) => ( 342 StatusCode::BAD_REQUEST, 343 Json(json!({ 344 "error": "InvalidRequest", 345 "message": format!("Invalid CBOR data: {}", msg) 346 })), 347 ) 348 .into_response(), 349 Err(ImportError::InvalidCommit(msg)) => ( 350 StatusCode::BAD_REQUEST, 351 Json(json!({ 352 "error": "InvalidRequest", 353 "message": format!("Invalid commit structure: {}", msg) 354 })), 355 ) 356 .into_response(), 357 Err(ImportError::BlockNotFound(cid)) => ( 358 StatusCode::BAD_REQUEST, 359 Json(json!({ 360 "error": "InvalidRequest", 361 "message": format!("Referenced block not found in CAR: {}", cid) 362 })), 363 ) 364 .into_response(), 365 Err(ImportError::ConcurrentModification) => ( 366 StatusCode::CONFLICT, 367 Json(json!({ 368 "error": "ConcurrentModification", 369 "message": "Repository is being modified by another operation, please retry" 370 })), 371 ) 372 .into_response(), 373 Err(ImportError::VerificationFailed(ve)) => ( 374 StatusCode::BAD_REQUEST, 375 Json(json!({ 376 "error": "VerificationFailed", 377 "message": format!("CAR verification failed: {}", ve) 378 })), 379 ) 380 .into_response(), 381 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 382 StatusCode::FORBIDDEN, 383 Json(json!({ 384 "error": "DidMismatch", 385 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 386 })), 387 ) 388 .into_response(), 389 Err(e) => { 390 error!("Import error: {:?}", e); 391 ( 392 StatusCode::INTERNAL_SERVER_ERROR, 393 Json(json!({"error": "InternalError"})), 394 ) 395 .into_response() 396 } 397 } 398} 399 400async fn sequence_import_event( 401 state: &AppState, 402 did: &str, 403 commit_cid: &str, 404) -> Result<(), sqlx::Error> { 405 let prev_cid: Option<String> = None; 406 let prev_data_cid: Option<String> = None; 407 let ops = serde_json::json!([]); 408 let blobs: Vec<String> = vec![]; 409 let blocks_cids: Vec<String> = vec![]; 410 let seq_row = sqlx::query!( 411 r#" 412 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 413 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 414 RETURNING seq 415 "#, 416 did, 417 commit_cid, 418 prev_cid, 419 prev_data_cid, 420 ops, 421 &blobs, 422 &blocks_cids 423 ) 424 .fetch_one(&state.db) 425 .await?; 426 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 427 .execute(&state.db) 428 .await?; 429 Ok(()) 430}