this repo has no description
1use crate::api::ApiError;
2use crate::api::repo::record::create_signed_commit;
3use crate::state::AppState;
4use crate::sync::import::{ImportError, apply_import, parse_car};
5use crate::sync::verify::CarVerifier;
6use axum::{
7 Json,
8 body::Bytes,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard::types::{integer::LimitedU32, string::Tid};
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde_json::json;
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
20const DEFAULT_MAX_BLOCKS: usize = 50000;
21
22pub async fn import_repo(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
28 .map(|v| v != "false" && v != "0")
29 .unwrap_or(true);
30 if !accepting_imports {
31 return (
32 StatusCode::BAD_REQUEST,
33 Json(json!({
34 "error": "InvalidRequest",
35 "message": "Service is not accepting repo imports"
36 })),
37 )
38 .into_response();
39 }
40 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
41 .ok()
42 .and_then(|s| s.parse().ok())
43 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
44 if body.len() > max_size {
45 return (
46 StatusCode::PAYLOAD_TOO_LARGE,
47 Json(json!({
48 "error": "InvalidRequest",
49 "message": format!("Import size exceeds limit of {} bytes", max_size)
50 })),
51 )
52 .into_response();
53 }
54 let token = match crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok()),
56 ) {
57 Some(t) => t,
58 None => return ApiError::AuthenticationRequired.into_response(),
59 };
60 let auth_user =
61 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
62 Ok(user) => user,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let did = &auth_user.did;
66 let user = match sqlx::query!(
67 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
68 did
69 )
70 .fetch_optional(&state.db)
71 .await
72 {
73 Ok(Some(row)) => row,
74 Ok(None) => {
75 return (
76 StatusCode::NOT_FOUND,
77 Json(json!({"error": "AccountNotFound"})),
78 )
79 .into_response();
80 }
81 Err(e) => {
82 error!("DB error fetching user: {:?}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90 if user.takedown_ref.is_some() {
91 return (
92 StatusCode::FORBIDDEN,
93 Json(json!({
94 "error": "AccountTakenDown",
95 "message": "Account has been taken down"
96 })),
97 )
98 .into_response();
99 }
100 let user_id = user.id;
101 let (root, blocks) = match parse_car(&body).await {
102 Ok((r, b)) => (r, b),
103 Err(ImportError::InvalidRootCount) => {
104 return (
105 StatusCode::BAD_REQUEST,
106 Json(json!({
107 "error": "InvalidRequest",
108 "message": "Expected exactly one root in CAR file"
109 })),
110 )
111 .into_response();
112 }
113 Err(ImportError::CarParse(msg)) => {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({
117 "error": "InvalidRequest",
118 "message": format!("Failed to parse CAR file: {}", msg)
119 })),
120 )
121 .into_response();
122 }
123 Err(e) => {
124 error!("CAR parsing error: {:?}", e);
125 return (
126 StatusCode::BAD_REQUEST,
127 Json(json!({
128 "error": "InvalidRequest",
129 "message": format!("Invalid CAR file: {}", e)
130 })),
131 )
132 .into_response();
133 }
134 };
135 info!(
136 "Importing repo for user {}: {} blocks, root {}",
137 did,
138 blocks.len(),
139 root
140 );
141 let root_block = match blocks.get(&root) {
142 Some(b) => b,
143 None => {
144 return (
145 StatusCode::BAD_REQUEST,
146 Json(json!({
147 "error": "InvalidRequest",
148 "message": "Root block not found in CAR file"
149 })),
150 )
151 .into_response();
152 }
153 };
154 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
155 Ok(commit) => commit.did().to_string(),
156 Err(e) => {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": format!("Invalid commit: {}", e)
162 })),
163 )
164 .into_response();
165 }
166 };
167 if commit_did != *did {
168 return (
169 StatusCode::FORBIDDEN,
170 Json(json!({
171 "error": "InvalidRequest",
172 "message": format!(
173 "CAR file is for DID {} but you are authenticated as {}",
174 commit_did, did
175 )
176 })),
177 )
178 .into_response();
179 }
180 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
181 .map(|v| v == "true" || v == "1")
182 .unwrap_or(false);
183 let is_migration = user.deactivated_at.is_some();
184 if skip_verification {
185 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
186 } else if is_migration {
187 debug!("Verifying CAR file structure for migration (skipping signature verification)");
188 let verifier = CarVerifier::new();
189 match verifier.verify_car_structure_only(did, &root, &blocks) {
190 Ok(verified) => {
191 debug!(
192 "CAR structure verification successful: rev={}, data_cid={}",
193 verified.rev, verified.data_cid
194 );
195 }
196 Err(crate::sync::verify::VerifyError::DidMismatch {
197 commit_did,
198 expected_did,
199 }) => {
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "InvalidRequest",
204 "message": format!(
205 "CAR file is for DID {} but you are authenticated as {}",
206 commit_did, expected_did
207 )
208 })),
209 )
210 .into_response();
211 }
212 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
213 return (
214 StatusCode::BAD_REQUEST,
215 Json(json!({
216 "error": "InvalidRequest",
217 "message": format!("MST validation failed: {}", msg)
218 })),
219 )
220 .into_response();
221 }
222 Err(e) => {
223 error!("CAR structure verification error: {:?}", e);
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({
227 "error": "InvalidRequest",
228 "message": format!("CAR verification failed: {}", e)
229 })),
230 )
231 .into_response();
232 }
233 }
234 } else {
235 debug!("Verifying CAR file signature and structure for DID {}", did);
236 let verifier = CarVerifier::new();
237 match verifier.verify_car(did, &root, &blocks).await {
238 Ok(verified) => {
239 debug!(
240 "CAR verification successful: rev={}, data_cid={}",
241 verified.rev, verified.data_cid
242 );
243 }
244 Err(crate::sync::verify::VerifyError::DidMismatch {
245 commit_did,
246 expected_did,
247 }) => {
248 return (
249 StatusCode::FORBIDDEN,
250 Json(json!({
251 "error": "InvalidRequest",
252 "message": format!(
253 "CAR file is for DID {} but you are authenticated as {}",
254 commit_did, expected_did
255 )
256 })),
257 )
258 .into_response();
259 }
260 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
261 return (
262 StatusCode::BAD_REQUEST,
263 Json(json!({
264 "error": "InvalidSignature",
265 "message": "CAR file commit signature verification failed"
266 })),
267 )
268 .into_response();
269 }
270 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
271 warn!("DID resolution failed during import verification: {}", msg);
272 return (
273 StatusCode::BAD_REQUEST,
274 Json(json!({
275 "error": "InvalidRequest",
276 "message": format!("Failed to verify DID: {}", msg)
277 })),
278 )
279 .into_response();
280 }
281 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
282 return (
283 StatusCode::BAD_REQUEST,
284 Json(json!({
285 "error": "InvalidRequest",
286 "message": "DID document does not contain a signing key"
287 })),
288 )
289 .into_response();
290 }
291 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
292 return (
293 StatusCode::BAD_REQUEST,
294 Json(json!({
295 "error": "InvalidRequest",
296 "message": format!("MST validation failed: {}", msg)
297 })),
298 )
299 .into_response();
300 }
301 Err(e) => {
302 error!("CAR verification error: {:?}", e);
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({
306 "error": "InvalidRequest",
307 "message": format!("CAR verification failed: {}", e)
308 })),
309 )
310 .into_response();
311 }
312 }
313 }
314 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
315 .ok()
316 .and_then(|s| s.parse().ok())
317 .unwrap_or(DEFAULT_MAX_BLOCKS);
318 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
319 Ok(import_result) => {
320 info!(
321 "Successfully imported {} records for user {}",
322 import_result.records.len(),
323 did
324 );
325 let key_row = match sqlx::query!(
326 r#"SELECT uk.key_bytes, uk.encryption_version
327 FROM user_keys uk
328 JOIN users u ON uk.user_id = u.id
329 WHERE u.did = $1"#,
330 did
331 )
332 .fetch_optional(&state.db)
333 .await
334 {
335 Ok(Some(row)) => row,
336 Ok(None) => {
337 error!("No signing key found for user {}", did);
338 return (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 Json(json!({"error": "InternalError", "message": "Signing key not found"})),
341 )
342 .into_response();
343 }
344 Err(e) => {
345 error!("DB error fetching signing key: {:?}", e);
346 return (
347 StatusCode::INTERNAL_SERVER_ERROR,
348 Json(json!({"error": "InternalError"})),
349 )
350 .into_response();
351 }
352 };
353 let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
354 Ok(k) => k,
355 Err(e) => {
356 error!("Failed to decrypt signing key: {}", e);
357 return (
358 StatusCode::INTERNAL_SERVER_ERROR,
359 Json(json!({"error": "InternalError"})),
360 )
361 .into_response();
362 }
363 };
364 let signing_key = match SigningKey::from_slice(&key_bytes) {
365 Ok(k) => k,
366 Err(e) => {
367 error!("Invalid signing key: {:?}", e);
368 return (
369 StatusCode::INTERNAL_SERVER_ERROR,
370 Json(json!({"error": "InternalError"})),
371 )
372 .into_response();
373 }
374 };
375 let new_rev = Tid::now(LimitedU32::MIN);
376 let new_rev_str = new_rev.to_string();
377 let (commit_bytes, _sig) = match create_signed_commit(
378 did,
379 import_result.data_cid,
380 &new_rev_str,
381 None,
382 &signing_key,
383 ) {
384 Ok(result) => result,
385 Err(e) => {
386 error!("Failed to create new commit: {}", e);
387 return (
388 StatusCode::INTERNAL_SERVER_ERROR,
389 Json(json!({"error": "InternalError"})),
390 )
391 .into_response();
392 }
393 };
394 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
395 Ok(cid) => cid,
396 Err(e) => {
397 error!("Failed to store new commit block: {:?}", e);
398 return (
399 StatusCode::INTERNAL_SERVER_ERROR,
400 Json(json!({"error": "InternalError"})),
401 )
402 .into_response();
403 }
404 };
405 let new_root_str = new_root_cid.to_string();
406 if let Err(e) = sqlx::query!(
407 "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
408 new_root_str,
409 user_id
410 )
411 .execute(&state.db)
412 .await
413 {
414 error!("Failed to update repo root: {:?}", e);
415 return (
416 StatusCode::INTERNAL_SERVER_ERROR,
417 Json(json!({"error": "InternalError"})),
418 )
419 .into_response();
420 }
421 info!(
422 "Created new commit for imported repo: cid={}, rev={}",
423 new_root_str, new_rev_str
424 );
425 if !is_migration {
426 if let Err(e) = sequence_import_event(&state, did, &new_root_str).await {
427 warn!("Failed to sequence import event: {:?}", e);
428 }
429 }
430 (StatusCode::OK, Json(json!({}))).into_response()
431 }
432 Err(ImportError::SizeLimitExceeded) => (
433 StatusCode::BAD_REQUEST,
434 Json(json!({
435 "error": "InvalidRequest",
436 "message": format!("Import exceeds block limit of {}", max_blocks)
437 })),
438 )
439 .into_response(),
440 Err(ImportError::RepoNotFound) => (
441 StatusCode::NOT_FOUND,
442 Json(json!({
443 "error": "RepoNotFound",
444 "message": "Repository not initialized for this account"
445 })),
446 )
447 .into_response(),
448 Err(ImportError::InvalidCbor(msg)) => (
449 StatusCode::BAD_REQUEST,
450 Json(json!({
451 "error": "InvalidRequest",
452 "message": format!("Invalid CBOR data: {}", msg)
453 })),
454 )
455 .into_response(),
456 Err(ImportError::InvalidCommit(msg)) => (
457 StatusCode::BAD_REQUEST,
458 Json(json!({
459 "error": "InvalidRequest",
460 "message": format!("Invalid commit structure: {}", msg)
461 })),
462 )
463 .into_response(),
464 Err(ImportError::BlockNotFound(cid)) => (
465 StatusCode::BAD_REQUEST,
466 Json(json!({
467 "error": "InvalidRequest",
468 "message": format!("Referenced block not found in CAR: {}", cid)
469 })),
470 )
471 .into_response(),
472 Err(ImportError::ConcurrentModification) => (
473 StatusCode::CONFLICT,
474 Json(json!({
475 "error": "ConcurrentModification",
476 "message": "Repository is being modified by another operation, please retry"
477 })),
478 )
479 .into_response(),
480 Err(ImportError::VerificationFailed(ve)) => (
481 StatusCode::BAD_REQUEST,
482 Json(json!({
483 "error": "VerificationFailed",
484 "message": format!("CAR verification failed: {}", ve)
485 })),
486 )
487 .into_response(),
488 Err(ImportError::DidMismatch { car_did, auth_did }) => (
489 StatusCode::FORBIDDEN,
490 Json(json!({
491 "error": "DidMismatch",
492 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
493 })),
494 )
495 .into_response(),
496 Err(e) => {
497 error!("Import error: {:?}", e);
498 (
499 StatusCode::INTERNAL_SERVER_ERROR,
500 Json(json!({"error": "InternalError"})),
501 )
502 .into_response()
503 }
504 }
505}
506
507async fn sequence_import_event(
508 state: &AppState,
509 did: &str,
510 commit_cid: &str,
511) -> Result<(), sqlx::Error> {
512 let prev_cid: Option<String> = None;
513 let prev_data_cid: Option<String> = None;
514 let ops = serde_json::json!([]);
515 let blobs: Vec<String> = vec![];
516 let blocks_cids: Vec<String> = vec![];
517 let seq_row = sqlx::query!(
518 r#"
519 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
520 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
521 RETURNING seq
522 "#,
523 did,
524 commit_cid,
525 prev_cid,
526 prev_data_cid,
527 ops,
528 &blobs,
529 &blocks_cids
530 )
531 .fetch_one(&state.db)
532 .await?;
533 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
534 .execute(&state.db)
535 .await?;
536 Ok(())
537}