this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{apply_import, parse_car, ImportError}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 body::Bytes, 7 extract::State, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10 Json, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 15const DEFAULT_MAX_BLOCKS: usize = 50000; 16pub async fn import_repo( 17 State(state): State<AppState>, 18 headers: axum::http::HeaderMap, 19 body: Bytes, 20) -> Response { 21 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 22 .map(|v| v != "false" && v != "0") 23 .unwrap_or(true); 24 if !accepting_imports { 25 return ( 26 StatusCode::BAD_REQUEST, 27 Json(json!({ 28 "error": "InvalidRequest", 29 "message": "Service is not accepting repo imports" 30 })), 31 ) 32 .into_response(); 33 } 34 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 35 .ok() 36 .and_then(|s| s.parse().ok()) 37 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 38 if body.len() > max_size { 39 return ( 40 StatusCode::PAYLOAD_TOO_LARGE, 41 Json(json!({ 42 "error": "InvalidRequest", 43 "message": format!("Import size exceeds limit of {} bytes", max_size) 44 })), 45 ) 46 .into_response(); 47 } 48 let token = match crate::auth::extract_bearer_token_from_header( 49 headers.get("Authorization").and_then(|h| h.to_str().ok()), 50 ) { 51 Some(t) => t, 52 None => return ApiError::AuthenticationRequired.into_response(), 53 }; 54 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 55 Ok(user) => user, 56 Err(e) => return ApiError::from(e).into_response(), 57 }; 58 let did = &auth_user.did; 59 let user = match sqlx::query!( 60 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1", 61 did 62 ) 63 .fetch_optional(&state.db) 64 .await 65 { 66 Ok(Some(row)) => row, 67 Ok(None) => { 68 return ( 69 StatusCode::NOT_FOUND, 70 Json(json!({"error": "AccountNotFound"})), 71 ) 72 .into_response(); 73 } 74 Err(e) => { 75 error!("DB error fetching user: {:?}", e); 76 return ( 77 StatusCode::INTERNAL_SERVER_ERROR, 78 Json(json!({"error": "InternalError"})), 79 ) 80 .into_response(); 81 } 82 }; 83 if user.deactivated_at.is_some() { 84 return ( 85 StatusCode::FORBIDDEN, 86 Json(json!({ 87 "error": "AccountDeactivated", 88 "message": "Account is deactivated" 89 })), 90 ) 91 .into_response(); 92 } 93 if user.takedown_ref.is_some() { 94 return ( 95 StatusCode::FORBIDDEN, 96 Json(json!({ 97 "error": "AccountTakenDown", 98 "message": "Account has been taken down" 99 })), 100 ) 101 .into_response(); 102 } 103 let user_id = user.id; 104 let (root, blocks) = match parse_car(&body).await { 105 Ok((r, b)) => (r, b), 106 Err(ImportError::InvalidRootCount) => { 107 return ( 108 StatusCode::BAD_REQUEST, 109 Json(json!({ 110 "error": "InvalidRequest", 111 "message": "Expected exactly one root in CAR file" 112 })), 113 ) 114 .into_response(); 115 } 116 Err(ImportError::CarParse(msg)) => { 117 return ( 118 StatusCode::BAD_REQUEST, 119 Json(json!({ 120 "error": "InvalidRequest", 121 "message": format!("Failed to parse CAR file: {}", msg) 122 })), 123 ) 124 .into_response(); 125 } 126 Err(e) => { 127 error!("CAR parsing error: {:?}", e); 128 return ( 129 StatusCode::BAD_REQUEST, 130 Json(json!({ 131 "error": "InvalidRequest", 132 "message": format!("Invalid CAR file: {}", e) 133 })), 134 ) 135 .into_response(); 136 } 137 }; 138 info!( 139 "Importing repo for user {}: {} blocks, root {}", 140 did, 141 blocks.len(), 142 root 143 ); 144 let root_block = match blocks.get(&root) { 145 Some(b) => b, 146 None => { 147 return ( 148 StatusCode::BAD_REQUEST, 149 Json(json!({ 150 "error": "InvalidRequest", 151 "message": "Root block not found in CAR file" 152 })), 153 ) 154 .into_response(); 155 } 156 }; 157 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 158 Ok(commit) => commit.did().to_string(), 159 Err(e) => { 160 return ( 161 StatusCode::BAD_REQUEST, 162 Json(json!({ 163 "error": "InvalidRequest", 164 "message": format!("Invalid commit: {}", e) 165 })), 166 ) 167 .into_response(); 168 } 169 }; 170 if commit_did != *did { 171 return ( 172 StatusCode::FORBIDDEN, 173 Json(json!({ 174 "error": "InvalidRequest", 175 "message": format!( 176 "CAR file is for DID {} but you are authenticated as {}", 177 commit_did, did 178 ) 179 })), 180 ) 181 .into_response(); 182 } 183 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 184 .map(|v| v == "true" || v == "1") 185 .unwrap_or(false); 186 if !skip_verification { 187 debug!("Verifying CAR file signature and structure for DID {}", did); 188 let verifier = CarVerifier::new(); 189 match verifier.verify_car(did, &root, &blocks).await { 190 Ok(verified) => { 191 debug!( 192 "CAR verification successful: rev={}, data_cid={}", 193 verified.rev, verified.data_cid 194 ); 195 } 196 Err(crate::sync::verify::VerifyError::DidMismatch { 197 commit_did, 198 expected_did, 199 }) => { 200 return ( 201 StatusCode::FORBIDDEN, 202 Json(json!({ 203 "error": "InvalidRequest", 204 "message": format!( 205 "CAR file is for DID {} but you are authenticated as {}", 206 commit_did, expected_did 207 ) 208 })), 209 ) 210 .into_response(); 211 } 212 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 213 return ( 214 StatusCode::BAD_REQUEST, 215 Json(json!({ 216 "error": "InvalidSignature", 217 "message": "CAR file commit signature verification failed" 218 })), 219 ) 220 .into_response(); 221 } 222 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 223 warn!("DID resolution failed during import verification: {}", msg); 224 return ( 225 StatusCode::BAD_REQUEST, 226 Json(json!({ 227 "error": "InvalidRequest", 228 "message": format!("Failed to verify DID: {}", msg) 229 })), 230 ) 231 .into_response(); 232 } 233 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 234 return ( 235 StatusCode::BAD_REQUEST, 236 Json(json!({ 237 "error": "InvalidRequest", 238 "message": "DID document does not contain a signing key" 239 })), 240 ) 241 .into_response(); 242 } 243 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 244 return ( 245 StatusCode::BAD_REQUEST, 246 Json(json!({ 247 "error": "InvalidRequest", 248 "message": format!("MST validation failed: {}", msg) 249 })), 250 ) 251 .into_response(); 252 } 253 Err(e) => { 254 error!("CAR verification error: {:?}", e); 255 return ( 256 StatusCode::BAD_REQUEST, 257 Json(json!({ 258 "error": "InvalidRequest", 259 "message": format!("CAR verification failed: {}", e) 260 })), 261 ) 262 .into_response(); 263 } 264 } 265 } else { 266 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)"); 267 } 268 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 269 .ok() 270 .and_then(|s| s.parse().ok()) 271 .unwrap_or(DEFAULT_MAX_BLOCKS); 272 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 273 Ok(records) => { 274 info!( 275 "Successfully imported {} records for user {}", 276 records.len(), 277 did 278 ); 279 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 280 warn!("Failed to sequence import event: {:?}", e); 281 } 282 (StatusCode::OK, Json(json!({}))).into_response() 283 } 284 Err(ImportError::SizeLimitExceeded) => ( 285 StatusCode::BAD_REQUEST, 286 Json(json!({ 287 "error": "InvalidRequest", 288 "message": format!("Import exceeds block limit of {}", max_blocks) 289 })), 290 ) 291 .into_response(), 292 Err(ImportError::RepoNotFound) => ( 293 StatusCode::NOT_FOUND, 294 Json(json!({ 295 "error": "RepoNotFound", 296 "message": "Repository not initialized for this account" 297 })), 298 ) 299 .into_response(), 300 Err(ImportError::InvalidCbor(msg)) => ( 301 StatusCode::BAD_REQUEST, 302 Json(json!({ 303 "error": "InvalidRequest", 304 "message": format!("Invalid CBOR data: {}", msg) 305 })), 306 ) 307 .into_response(), 308 Err(ImportError::InvalidCommit(msg)) => ( 309 StatusCode::BAD_REQUEST, 310 Json(json!({ 311 "error": "InvalidRequest", 312 "message": format!("Invalid commit structure: {}", msg) 313 })), 314 ) 315 .into_response(), 316 Err(ImportError::BlockNotFound(cid)) => ( 317 StatusCode::BAD_REQUEST, 318 Json(json!({ 319 "error": "InvalidRequest", 320 "message": format!("Referenced block not found in CAR: {}", cid) 321 })), 322 ) 323 .into_response(), 324 Err(ImportError::ConcurrentModification) => ( 325 StatusCode::CONFLICT, 326 Json(json!({ 327 "error": "ConcurrentModification", 328 "message": "Repository is being modified by another operation, please retry" 329 })), 330 ) 331 .into_response(), 332 Err(ImportError::VerificationFailed(ve)) => ( 333 StatusCode::BAD_REQUEST, 334 Json(json!({ 335 "error": "VerificationFailed", 336 "message": format!("CAR verification failed: {}", ve) 337 })), 338 ) 339 .into_response(), 340 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 341 StatusCode::FORBIDDEN, 342 Json(json!({ 343 "error": "DidMismatch", 344 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 345 })), 346 ) 347 .into_response(), 348 Err(e) => { 349 error!("Import error: {:?}", e); 350 ( 351 StatusCode::INTERNAL_SERVER_ERROR, 352 Json(json!({"error": "InternalError"})), 353 ) 354 .into_response() 355 } 356 } 357} 358async fn sequence_import_event( 359 state: &AppState, 360 did: &str, 361 commit_cid: &str, 362) -> Result<(), sqlx::Error> { 363 let prev_cid: Option<String> = None; 364 let prev_data_cid: Option<String> = None; 365 let ops = serde_json::json!([]); 366 let blobs: Vec<String> = vec![]; 367 let blocks_cids: Vec<String> = vec![]; 368 let seq_row = sqlx::query!( 369 r#" 370 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 371 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 372 RETURNING seq 373 "#, 374 did, 375 commit_cid, 376 prev_cid, 377 prev_data_cid, 378 ops, 379 &blobs, 380 &blocks_cids 381 ) 382 .fetch_one(&state.db) 383 .await?; 384 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 385 .execute(&state.db) 386 .await?; 387 Ok(()) 388}