this repo has no description
1use crate::api::ApiError; 2use crate::state::AppState; 3use crate::sync::import::{ImportError, apply_import, parse_car}; 4use crate::sync::verify::CarVerifier; 5use axum::{ 6 Json, 7 body::Bytes, 8 extract::State, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11}; 12use serde_json::json; 13use tracing::{debug, error, info, warn}; 14 15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024; 16const DEFAULT_MAX_BLOCKS: usize = 50000; 17 18pub async fn import_repo( 19 State(state): State<AppState>, 20 headers: axum::http::HeaderMap, 21 body: Bytes, 22) -> Response { 23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS") 24 .map(|v| v != "false" && v != "0") 25 .unwrap_or(true); 26 if !accepting_imports { 27 return ( 28 StatusCode::BAD_REQUEST, 29 Json(json!({ 30 "error": "InvalidRequest", 31 "message": "Service is not accepting repo imports" 32 })), 33 ) 34 .into_response(); 35 } 36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE") 37 .ok() 38 .and_then(|s| s.parse().ok()) 39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE); 40 if body.len() > max_size { 41 return ( 42 StatusCode::PAYLOAD_TOO_LARGE, 43 Json(json!({ 44 "error": "InvalidRequest", 45 "message": format!("Import size exceeds limit of {} bytes", max_size) 46 })), 47 ) 48 .into_response(); 49 } 50 let token = match crate::auth::extract_bearer_token_from_header( 51 headers.get("Authorization").and_then(|h| h.to_str().ok()), 52 ) { 53 Some(t) => t, 54 None => return ApiError::AuthenticationRequired.into_response(), 55 }; 56 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await { 57 Ok(user) => user, 58 Err(e) => return ApiError::from(e).into_response(), 59 }; 60 let did = &auth_user.did; 61 let user = match sqlx::query!( 62 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1", 63 did 64 ) 65 .fetch_optional(&state.db) 66 .await 67 { 68 Ok(Some(row)) => row, 69 Ok(None) => { 70 return ( 71 StatusCode::NOT_FOUND, 72 Json(json!({"error": "AccountNotFound"})), 73 ) 74 .into_response(); 75 } 76 Err(e) => { 77 error!("DB error fetching user: {:?}", e); 78 return ( 79 StatusCode::INTERNAL_SERVER_ERROR, 80 Json(json!({"error": "InternalError"})), 81 ) 82 .into_response(); 83 } 84 }; 85 if user.deactivated_at.is_some() { 86 return ( 87 StatusCode::FORBIDDEN, 88 Json(json!({ 89 "error": "AccountDeactivated", 90 "message": "Account is deactivated" 91 })), 92 ) 93 .into_response(); 94 } 95 if user.takedown_ref.is_some() { 96 return ( 97 StatusCode::FORBIDDEN, 98 Json(json!({ 99 "error": "AccountTakenDown", 100 "message": "Account has been taken down" 101 })), 102 ) 103 .into_response(); 104 } 105 let user_id = user.id; 106 let (root, blocks) = match parse_car(&body).await { 107 Ok((r, b)) => (r, b), 108 Err(ImportError::InvalidRootCount) => { 109 return ( 110 StatusCode::BAD_REQUEST, 111 Json(json!({ 112 "error": "InvalidRequest", 113 "message": "Expected exactly one root in CAR file" 114 })), 115 ) 116 .into_response(); 117 } 118 Err(ImportError::CarParse(msg)) => { 119 return ( 120 StatusCode::BAD_REQUEST, 121 Json(json!({ 122 "error": "InvalidRequest", 123 "message": format!("Failed to parse CAR file: {}", msg) 124 })), 125 ) 126 .into_response(); 127 } 128 Err(e) => { 129 error!("CAR parsing error: {:?}", e); 130 return ( 131 StatusCode::BAD_REQUEST, 132 Json(json!({ 133 "error": "InvalidRequest", 134 "message": format!("Invalid CAR file: {}", e) 135 })), 136 ) 137 .into_response(); 138 } 139 }; 140 info!( 141 "Importing repo for user {}: {} blocks, root {}", 142 did, 143 blocks.len(), 144 root 145 ); 146 let root_block = match blocks.get(&root) { 147 Some(b) => b, 148 None => { 149 return ( 150 StatusCode::BAD_REQUEST, 151 Json(json!({ 152 "error": "InvalidRequest", 153 "message": "Root block not found in CAR file" 154 })), 155 ) 156 .into_response(); 157 } 158 }; 159 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) { 160 Ok(commit) => commit.did().to_string(), 161 Err(e) => { 162 return ( 163 StatusCode::BAD_REQUEST, 164 Json(json!({ 165 "error": "InvalidRequest", 166 "message": format!("Invalid commit: {}", e) 167 })), 168 ) 169 .into_response(); 170 } 171 }; 172 if commit_did != *did { 173 return ( 174 StatusCode::FORBIDDEN, 175 Json(json!({ 176 "error": "InvalidRequest", 177 "message": format!( 178 "CAR file is for DID {} but you are authenticated as {}", 179 commit_did, did 180 ) 181 })), 182 ) 183 .into_response(); 184 } 185 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION") 186 .map(|v| v == "true" || v == "1") 187 .unwrap_or(false); 188 if !skip_verification { 189 debug!("Verifying CAR file signature and structure for DID {}", did); 190 let verifier = CarVerifier::new(); 191 match verifier.verify_car(did, &root, &blocks).await { 192 Ok(verified) => { 193 debug!( 194 "CAR verification successful: rev={}, data_cid={}", 195 verified.rev, verified.data_cid 196 ); 197 } 198 Err(crate::sync::verify::VerifyError::DidMismatch { 199 commit_did, 200 expected_did, 201 }) => { 202 return ( 203 StatusCode::FORBIDDEN, 204 Json(json!({ 205 "error": "InvalidRequest", 206 "message": format!( 207 "CAR file is for DID {} but you are authenticated as {}", 208 commit_did, expected_did 209 ) 210 })), 211 ) 212 .into_response(); 213 } 214 Err(crate::sync::verify::VerifyError::InvalidSignature) => { 215 return ( 216 StatusCode::BAD_REQUEST, 217 Json(json!({ 218 "error": "InvalidSignature", 219 "message": "CAR file commit signature verification failed" 220 })), 221 ) 222 .into_response(); 223 } 224 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => { 225 warn!("DID resolution failed during import verification: {}", msg); 226 return ( 227 StatusCode::BAD_REQUEST, 228 Json(json!({ 229 "error": "InvalidRequest", 230 "message": format!("Failed to verify DID: {}", msg) 231 })), 232 ) 233 .into_response(); 234 } 235 Err(crate::sync::verify::VerifyError::NoSigningKey) => { 236 return ( 237 StatusCode::BAD_REQUEST, 238 Json(json!({ 239 "error": "InvalidRequest", 240 "message": "DID document does not contain a signing key" 241 })), 242 ) 243 .into_response(); 244 } 245 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => { 246 return ( 247 StatusCode::BAD_REQUEST, 248 Json(json!({ 249 "error": "InvalidRequest", 250 "message": format!("MST validation failed: {}", msg) 251 })), 252 ) 253 .into_response(); 254 } 255 Err(e) => { 256 error!("CAR verification error: {:?}", e); 257 return ( 258 StatusCode::BAD_REQUEST, 259 Json(json!({ 260 "error": "InvalidRequest", 261 "message": format!("CAR verification failed: {}", e) 262 })), 263 ) 264 .into_response(); 265 } 266 } 267 } else { 268 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)"); 269 } 270 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS") 271 .ok() 272 .and_then(|s| s.parse().ok()) 273 .unwrap_or(DEFAULT_MAX_BLOCKS); 274 match apply_import(&state.db, user_id, root, blocks, max_blocks).await { 275 Ok(records) => { 276 info!( 277 "Successfully imported {} records for user {}", 278 records.len(), 279 did 280 ); 281 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { 282 warn!("Failed to sequence import event: {:?}", e); 283 } 284 (StatusCode::OK, Json(json!({}))).into_response() 285 } 286 Err(ImportError::SizeLimitExceeded) => ( 287 StatusCode::BAD_REQUEST, 288 Json(json!({ 289 "error": "InvalidRequest", 290 "message": format!("Import exceeds block limit of {}", max_blocks) 291 })), 292 ) 293 .into_response(), 294 Err(ImportError::RepoNotFound) => ( 295 StatusCode::NOT_FOUND, 296 Json(json!({ 297 "error": "RepoNotFound", 298 "message": "Repository not initialized for this account" 299 })), 300 ) 301 .into_response(), 302 Err(ImportError::InvalidCbor(msg)) => ( 303 StatusCode::BAD_REQUEST, 304 Json(json!({ 305 "error": "InvalidRequest", 306 "message": format!("Invalid CBOR data: {}", msg) 307 })), 308 ) 309 .into_response(), 310 Err(ImportError::InvalidCommit(msg)) => ( 311 StatusCode::BAD_REQUEST, 312 Json(json!({ 313 "error": "InvalidRequest", 314 "message": format!("Invalid commit structure: {}", msg) 315 })), 316 ) 317 .into_response(), 318 Err(ImportError::BlockNotFound(cid)) => ( 319 StatusCode::BAD_REQUEST, 320 Json(json!({ 321 "error": "InvalidRequest", 322 "message": format!("Referenced block not found in CAR: {}", cid) 323 })), 324 ) 325 .into_response(), 326 Err(ImportError::ConcurrentModification) => ( 327 StatusCode::CONFLICT, 328 Json(json!({ 329 "error": "ConcurrentModification", 330 "message": "Repository is being modified by another operation, please retry" 331 })), 332 ) 333 .into_response(), 334 Err(ImportError::VerificationFailed(ve)) => ( 335 StatusCode::BAD_REQUEST, 336 Json(json!({ 337 "error": "VerificationFailed", 338 "message": format!("CAR verification failed: {}", ve) 339 })), 340 ) 341 .into_response(), 342 Err(ImportError::DidMismatch { car_did, auth_did }) => ( 343 StatusCode::FORBIDDEN, 344 Json(json!({ 345 "error": "DidMismatch", 346 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did) 347 })), 348 ) 349 .into_response(), 350 Err(e) => { 351 error!("Import error: {:?}", e); 352 ( 353 StatusCode::INTERNAL_SERVER_ERROR, 354 Json(json!({"error": "InternalError"})), 355 ) 356 .into_response() 357 } 358 } 359} 360 361async fn sequence_import_event( 362 state: &AppState, 363 did: &str, 364 commit_cid: &str, 365) -> Result<(), sqlx::Error> { 366 let prev_cid: Option<String> = None; 367 let prev_data_cid: Option<String> = None; 368 let ops = serde_json::json!([]); 369 let blobs: Vec<String> = vec![]; 370 let blocks_cids: Vec<String> = vec![]; 371 let seq_row = sqlx::query!( 372 r#" 373 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids) 374 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7) 375 RETURNING seq 376 "#, 377 did, 378 commit_cid, 379 prev_cid, 380 prev_data_cid, 381 ops, 382 &blobs, 383 &blocks_cids 384 ) 385 .fetch_one(&state.db) 386 .await?; 387 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq)) 388 .execute(&state.db) 389 .await?; 390 Ok(()) 391}