this repo has no description
1use crate::api::ApiError;
2use crate::api::repo::record::create_signed_commit;
3use crate::state::AppState;
4use crate::sync::import::{ImportError, apply_import, parse_car};
5use crate::sync::verify::CarVerifier;
6use axum::{
7 Json,
8 body::Bytes,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard::types::{integer::LimitedU32, string::Tid};
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde_json::json;
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
20const DEFAULT_MAX_BLOCKS: usize = 50000;
21
22pub async fn import_repo(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
28 .map(|v| v != "false" && v != "0")
29 .unwrap_or(true);
30 if !accepting_imports {
31 return (
32 StatusCode::BAD_REQUEST,
33 Json(json!({
34 "error": "InvalidRequest",
35 "message": "Service is not accepting repo imports"
36 })),
37 )
38 .into_response();
39 }
40 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
41 .ok()
42 .and_then(|s| s.parse().ok())
43 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
44 if body.len() > max_size {
45 return (
46 StatusCode::PAYLOAD_TOO_LARGE,
47 Json(json!({
48 "error": "InvalidRequest",
49 "message": format!("Import size exceeds limit of {} bytes", max_size)
50 })),
51 )
52 .into_response();
53 }
54 let token = match crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok()),
56 ) {
57 Some(t) => t,
58 None => return ApiError::AuthenticationRequired.into_response(),
59 };
60 let auth_user =
61 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
62 Ok(user) => user,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let did = &auth_user.did;
66 let user = match sqlx::query!(
67 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
68 did
69 )
70 .fetch_optional(&state.db)
71 .await
72 {
73 Ok(Some(row)) => row,
74 Ok(None) => {
75 return (
76 StatusCode::NOT_FOUND,
77 Json(json!({"error": "AccountNotFound"})),
78 )
79 .into_response();
80 }
81 Err(e) => {
82 error!("DB error fetching user: {:?}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90 if user.takedown_ref.is_some() {
91 return (
92 StatusCode::FORBIDDEN,
93 Json(json!({
94 "error": "AccountTakenDown",
95 "message": "Account has been taken down"
96 })),
97 )
98 .into_response();
99 }
100 let user_id = user.id;
101 let (root, blocks) = match parse_car(&body).await {
102 Ok((r, b)) => (r, b),
103 Err(ImportError::InvalidRootCount) => {
104 return (
105 StatusCode::BAD_REQUEST,
106 Json(json!({
107 "error": "InvalidRequest",
108 "message": "Expected exactly one root in CAR file"
109 })),
110 )
111 .into_response();
112 }
113 Err(ImportError::CarParse(msg)) => {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({
117 "error": "InvalidRequest",
118 "message": format!("Failed to parse CAR file: {}", msg)
119 })),
120 )
121 .into_response();
122 }
123 Err(e) => {
124 error!("CAR parsing error: {:?}", e);
125 return (
126 StatusCode::BAD_REQUEST,
127 Json(json!({
128 "error": "InvalidRequest",
129 "message": format!("Invalid CAR file: {}", e)
130 })),
131 )
132 .into_response();
133 }
134 };
135 info!(
136 "Importing repo for user {}: {} blocks, root {}",
137 did,
138 blocks.len(),
139 root
140 );
141 let root_block = match blocks.get(&root) {
142 Some(b) => b,
143 None => {
144 return (
145 StatusCode::BAD_REQUEST,
146 Json(json!({
147 "error": "InvalidRequest",
148 "message": "Root block not found in CAR file"
149 })),
150 )
151 .into_response();
152 }
153 };
154 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
155 Ok(commit) => commit.did().to_string(),
156 Err(e) => {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": format!("Invalid commit: {}", e)
162 })),
163 )
164 .into_response();
165 }
166 };
167 if commit_did != *did {
168 return (
169 StatusCode::FORBIDDEN,
170 Json(json!({
171 "error": "InvalidRequest",
172 "message": format!(
173 "CAR file is for DID {} but you are authenticated as {}",
174 commit_did, did
175 )
176 })),
177 )
178 .into_response();
179 }
180 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
181 .map(|v| v == "true" || v == "1")
182 .unwrap_or(false);
183 let is_migration = user.deactivated_at.is_some();
184 if skip_verification {
185 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
186 } else if is_migration {
187 debug!("Verifying CAR file structure for migration (skipping signature verification)");
188 let verifier = CarVerifier::new();
189 match verifier.verify_car_structure_only(did, &root, &blocks) {
190 Ok(verified) => {
191 debug!(
192 "CAR structure verification successful: rev={}, data_cid={}",
193 verified.rev, verified.data_cid
194 );
195 }
196 Err(crate::sync::verify::VerifyError::DidMismatch {
197 commit_did,
198 expected_did,
199 }) => {
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "InvalidRequest",
204 "message": format!(
205 "CAR file is for DID {} but you are authenticated as {}",
206 commit_did, expected_did
207 )
208 })),
209 )
210 .into_response();
211 }
212 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
213 return (
214 StatusCode::BAD_REQUEST,
215 Json(json!({
216 "error": "InvalidRequest",
217 "message": format!("MST validation failed: {}", msg)
218 })),
219 )
220 .into_response();
221 }
222 Err(e) => {
223 error!("CAR structure verification error: {:?}", e);
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({
227 "error": "InvalidRequest",
228 "message": format!("CAR verification failed: {}", e)
229 })),
230 )
231 .into_response();
232 }
233 }
234 } else {
235 debug!("Verifying CAR file signature and structure for DID {}", did);
236 let verifier = CarVerifier::new();
237 match verifier.verify_car(did, &root, &blocks).await {
238 Ok(verified) => {
239 debug!(
240 "CAR verification successful: rev={}, data_cid={}",
241 verified.rev, verified.data_cid
242 );
243 }
244 Err(crate::sync::verify::VerifyError::DidMismatch {
245 commit_did,
246 expected_did,
247 }) => {
248 return (
249 StatusCode::FORBIDDEN,
250 Json(json!({
251 "error": "InvalidRequest",
252 "message": format!(
253 "CAR file is for DID {} but you are authenticated as {}",
254 commit_did, expected_did
255 )
256 })),
257 )
258 .into_response();
259 }
260 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
261 return (
262 StatusCode::BAD_REQUEST,
263 Json(json!({
264 "error": "InvalidSignature",
265 "message": "CAR file commit signature verification failed"
266 })),
267 )
268 .into_response();
269 }
270 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
271 warn!("DID resolution failed during import verification: {}", msg);
272 return (
273 StatusCode::BAD_REQUEST,
274 Json(json!({
275 "error": "InvalidRequest",
276 "message": format!("Failed to verify DID: {}", msg)
277 })),
278 )
279 .into_response();
280 }
281 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
282 return (
283 StatusCode::BAD_REQUEST,
284 Json(json!({
285 "error": "InvalidRequest",
286 "message": "DID document does not contain a signing key"
287 })),
288 )
289 .into_response();
290 }
291 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
292 return (
293 StatusCode::BAD_REQUEST,
294 Json(json!({
295 "error": "InvalidRequest",
296 "message": format!("MST validation failed: {}", msg)
297 })),
298 )
299 .into_response();
300 }
301 Err(e) => {
302 error!("CAR verification error: {:?}", e);
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({
306 "error": "InvalidRequest",
307 "message": format!("CAR verification failed: {}", e)
308 })),
309 )
310 .into_response();
311 }
312 }
313 }
314 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
315 .ok()
316 .and_then(|s| s.parse().ok())
317 .unwrap_or(DEFAULT_MAX_BLOCKS);
318 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
319 Ok(import_result) => {
320 info!(
321 "Successfully imported {} records for user {}",
322 import_result.records.len(),
323 did
324 );
325 let mut blob_ref_count = 0;
326 for record in &import_result.records {
327 for blob_ref in &record.blob_refs {
328 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
329 if let Err(e) = sqlx::query!(
330 r#"
331 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
332 VALUES ($1, $2, $3)
333 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
334 "#,
335 user_id,
336 record_uri,
337 blob_ref.cid
338 )
339 .execute(&state.db)
340 .await
341 {
342 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
343 } else {
344 blob_ref_count += 1;
345 }
346 }
347 }
348 if blob_ref_count > 0 {
349 info!("Recorded {} blob references for imported repo", blob_ref_count);
350 }
351 let key_row = match sqlx::query!(
352 r#"SELECT uk.key_bytes, uk.encryption_version
353 FROM user_keys uk
354 JOIN users u ON uk.user_id = u.id
355 WHERE u.did = $1"#,
356 did
357 )
358 .fetch_optional(&state.db)
359 .await
360 {
361 Ok(Some(row)) => row,
362 Ok(None) => {
363 error!("No signing key found for user {}", did);
364 return (
365 StatusCode::INTERNAL_SERVER_ERROR,
366 Json(json!({"error": "InternalError", "message": "Signing key not found"})),
367 )
368 .into_response();
369 }
370 Err(e) => {
371 error!("DB error fetching signing key: {:?}", e);
372 return (
373 StatusCode::INTERNAL_SERVER_ERROR,
374 Json(json!({"error": "InternalError"})),
375 )
376 .into_response();
377 }
378 };
379 let key_bytes =
380 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
381 Ok(k) => k,
382 Err(e) => {
383 error!("Failed to decrypt signing key: {}", e);
384 return (
385 StatusCode::INTERNAL_SERVER_ERROR,
386 Json(json!({"error": "InternalError"})),
387 )
388 .into_response();
389 }
390 };
391 let signing_key = match SigningKey::from_slice(&key_bytes) {
392 Ok(k) => k,
393 Err(e) => {
394 error!("Invalid signing key: {:?}", e);
395 return (
396 StatusCode::INTERNAL_SERVER_ERROR,
397 Json(json!({"error": "InternalError"})),
398 )
399 .into_response();
400 }
401 };
402 let new_rev = Tid::now(LimitedU32::MIN);
403 let new_rev_str = new_rev.to_string();
404 let (commit_bytes, _sig) = match create_signed_commit(
405 did,
406 import_result.data_cid,
407 &new_rev_str,
408 None,
409 &signing_key,
410 ) {
411 Ok(result) => result,
412 Err(e) => {
413 error!("Failed to create new commit: {}", e);
414 return (
415 StatusCode::INTERNAL_SERVER_ERROR,
416 Json(json!({"error": "InternalError"})),
417 )
418 .into_response();
419 }
420 };
421 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
422 Ok(cid) => cid,
423 Err(e) => {
424 error!("Failed to store new commit block: {:?}", e);
425 return (
426 StatusCode::INTERNAL_SERVER_ERROR,
427 Json(json!({"error": "InternalError"})),
428 )
429 .into_response();
430 }
431 };
432 let new_root_str = new_root_cid.to_string();
433 if let Err(e) = sqlx::query!(
434 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
435 new_root_str,
436 &new_rev_str,
437 user_id
438 )
439 .execute(&state.db)
440 .await
441 {
442 error!("Failed to update repo root: {:?}", e);
443 return (
444 StatusCode::INTERNAL_SERVER_ERROR,
445 Json(json!({"error": "InternalError"})),
446 )
447 .into_response();
448 }
449 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
450 all_block_cids.push(new_root_cid.to_bytes());
451 if let Err(e) = sqlx::query!(
452 r#"
453 INSERT INTO user_blocks (user_id, block_cid)
454 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
455 ON CONFLICT (user_id, block_cid) DO NOTHING
456 "#,
457 user_id,
458 &all_block_cids
459 )
460 .execute(&state.db)
461 .await
462 {
463 error!("Failed to insert user_blocks: {:?}", e);
464 return (
465 StatusCode::INTERNAL_SERVER_ERROR,
466 Json(json!({"error": "InternalError"})),
467 )
468 .into_response();
469 }
470 info!(
471 "Created new commit for imported repo: cid={}, rev={}",
472 new_root_str, new_rev_str
473 );
474 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
475 {
476 warn!("Failed to sequence import event: {:?}", e);
477 }
478 (StatusCode::OK, Json(json!({}))).into_response()
479 }
480 Err(ImportError::SizeLimitExceeded) => (
481 StatusCode::BAD_REQUEST,
482 Json(json!({
483 "error": "InvalidRequest",
484 "message": format!("Import exceeds block limit of {}", max_blocks)
485 })),
486 )
487 .into_response(),
488 Err(ImportError::RepoNotFound) => (
489 StatusCode::NOT_FOUND,
490 Json(json!({
491 "error": "RepoNotFound",
492 "message": "Repository not initialized for this account"
493 })),
494 )
495 .into_response(),
496 Err(ImportError::InvalidCbor(msg)) => (
497 StatusCode::BAD_REQUEST,
498 Json(json!({
499 "error": "InvalidRequest",
500 "message": format!("Invalid CBOR data: {}", msg)
501 })),
502 )
503 .into_response(),
504 Err(ImportError::InvalidCommit(msg)) => (
505 StatusCode::BAD_REQUEST,
506 Json(json!({
507 "error": "InvalidRequest",
508 "message": format!("Invalid commit structure: {}", msg)
509 })),
510 )
511 .into_response(),
512 Err(ImportError::BlockNotFound(cid)) => (
513 StatusCode::BAD_REQUEST,
514 Json(json!({
515 "error": "InvalidRequest",
516 "message": format!("Referenced block not found in CAR: {}", cid)
517 })),
518 )
519 .into_response(),
520 Err(ImportError::ConcurrentModification) => (
521 StatusCode::CONFLICT,
522 Json(json!({
523 "error": "ConcurrentModification",
524 "message": "Repository is being modified by another operation, please retry"
525 })),
526 )
527 .into_response(),
528 Err(ImportError::VerificationFailed(ve)) => (
529 StatusCode::BAD_REQUEST,
530 Json(json!({
531 "error": "VerificationFailed",
532 "message": format!("CAR verification failed: {}", ve)
533 })),
534 )
535 .into_response(),
536 Err(ImportError::DidMismatch { car_did, auth_did }) => (
537 StatusCode::FORBIDDEN,
538 Json(json!({
539 "error": "DidMismatch",
540 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
541 })),
542 )
543 .into_response(),
544 Err(e) => {
545 error!("Import error: {:?}", e);
546 (
547 StatusCode::INTERNAL_SERVER_ERROR,
548 Json(json!({"error": "InternalError"})),
549 )
550 .into_response()
551 }
552 }
553}
554
555async fn sequence_import_event(
556 state: &AppState,
557 did: &str,
558 commit_cid: &str,
559) -> Result<(), sqlx::Error> {
560 let prev_cid: Option<String> = None;
561 let prev_data_cid: Option<String> = None;
562 let ops = serde_json::json!([]);
563 let blobs: Vec<String> = vec![];
564 let blocks_cids: Vec<String> = vec![];
565 let seq_row = sqlx::query!(
566 r#"
567 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
568 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
569 RETURNING seq
570 "#,
571 did,
572 commit_cid,
573 prev_cid,
574 prev_data_cid,
575 ops,
576 &blobs,
577 &blocks_cids
578 )
579 .fetch_one(&state.db)
580 .await?;
581 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
582 .execute(&state.db)
583 .await?;
584 Ok(())
585}