this repo has no description
1use crate::api::ApiError;
2use crate::api::repo::record::create_signed_commit;
3use crate::state::AppState;
4use crate::sync::import::{ImportError, apply_import, parse_car};
5use crate::sync::verify::CarVerifier;
6use axum::{
7 Json,
8 body::Bytes,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard::types::{integer::LimitedU32, string::Tid};
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde_json::json;
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
20const DEFAULT_MAX_BLOCKS: usize = 50000;
21
22pub async fn import_repo(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
28 .map(|v| v != "false" && v != "0")
29 .unwrap_or(true);
30 if !accepting_imports {
31 return (
32 StatusCode::BAD_REQUEST,
33 Json(json!({
34 "error": "InvalidRequest",
35 "message": "Service is not accepting repo imports"
36 })),
37 )
38 .into_response();
39 }
40 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
41 .ok()
42 .and_then(|s| s.parse().ok())
43 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
44 if body.len() > max_size {
45 return (
46 StatusCode::PAYLOAD_TOO_LARGE,
47 Json(json!({
48 "error": "InvalidRequest",
49 "message": format!("Import size exceeds limit of {} bytes", max_size)
50 })),
51 )
52 .into_response();
53 }
54 let token = match crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok()),
56 ) {
57 Some(t) => t,
58 None => return ApiError::AuthenticationRequired.into_response(),
59 };
60 let auth_user =
61 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
62 Ok(user) => user,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let did = &auth_user.did;
66 let user = match sqlx::query!(
67 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
68 did
69 )
70 .fetch_optional(&state.db)
71 .await
72 {
73 Ok(Some(row)) => row,
74 Ok(None) => {
75 return (
76 StatusCode::NOT_FOUND,
77 Json(json!({"error": "AccountNotFound"})),
78 )
79 .into_response();
80 }
81 Err(e) => {
82 error!("DB error fetching user: {:?}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90 if user.takedown_ref.is_some() {
91 return (
92 StatusCode::FORBIDDEN,
93 Json(json!({
94 "error": "AccountTakenDown",
95 "message": "Account has been taken down"
96 })),
97 )
98 .into_response();
99 }
100 let user_id = user.id;
101 let (root, blocks) = match parse_car(&body).await {
102 Ok((r, b)) => (r, b),
103 Err(ImportError::InvalidRootCount) => {
104 return (
105 StatusCode::BAD_REQUEST,
106 Json(json!({
107 "error": "InvalidRequest",
108 "message": "Expected exactly one root in CAR file"
109 })),
110 )
111 .into_response();
112 }
113 Err(ImportError::CarParse(msg)) => {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({
117 "error": "InvalidRequest",
118 "message": format!("Failed to parse CAR file: {}", msg)
119 })),
120 )
121 .into_response();
122 }
123 Err(e) => {
124 error!("CAR parsing error: {:?}", e);
125 return (
126 StatusCode::BAD_REQUEST,
127 Json(json!({
128 "error": "InvalidRequest",
129 "message": format!("Invalid CAR file: {}", e)
130 })),
131 )
132 .into_response();
133 }
134 };
135 info!(
136 "Importing repo for user {}: {} blocks, root {}",
137 did,
138 blocks.len(),
139 root
140 );
141 let root_block = match blocks.get(&root) {
142 Some(b) => b,
143 None => {
144 return (
145 StatusCode::BAD_REQUEST,
146 Json(json!({
147 "error": "InvalidRequest",
148 "message": "Root block not found in CAR file"
149 })),
150 )
151 .into_response();
152 }
153 };
154 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
155 Ok(commit) => commit.did().to_string(),
156 Err(e) => {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": format!("Invalid commit: {}", e)
162 })),
163 )
164 .into_response();
165 }
166 };
167 if commit_did != *did {
168 return (
169 StatusCode::FORBIDDEN,
170 Json(json!({
171 "error": "InvalidRequest",
172 "message": format!(
173 "CAR file is for DID {} but you are authenticated as {}",
174 commit_did, did
175 )
176 })),
177 )
178 .into_response();
179 }
180 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
181 .map(|v| v == "true" || v == "1")
182 .unwrap_or(false);
183 let is_migration = user.deactivated_at.is_some();
184 if skip_verification {
185 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
186 } else if is_migration {
187 debug!("Verifying CAR file structure for migration (skipping signature verification)");
188 let verifier = CarVerifier::new();
189 match verifier.verify_car_structure_only(did, &root, &blocks) {
190 Ok(verified) => {
191 debug!(
192 "CAR structure verification successful: rev={}, data_cid={}",
193 verified.rev, verified.data_cid
194 );
195 }
196 Err(crate::sync::verify::VerifyError::DidMismatch {
197 commit_did,
198 expected_did,
199 }) => {
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "InvalidRequest",
204 "message": format!(
205 "CAR file is for DID {} but you are authenticated as {}",
206 commit_did, expected_did
207 )
208 })),
209 )
210 .into_response();
211 }
212 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
213 return (
214 StatusCode::BAD_REQUEST,
215 Json(json!({
216 "error": "InvalidRequest",
217 "message": format!("MST validation failed: {}", msg)
218 })),
219 )
220 .into_response();
221 }
222 Err(e) => {
223 error!("CAR structure verification error: {:?}", e);
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({
227 "error": "InvalidRequest",
228 "message": format!("CAR verification failed: {}", e)
229 })),
230 )
231 .into_response();
232 }
233 }
234 } else {
235 debug!("Verifying CAR file signature and structure for DID {}", did);
236 let verifier = CarVerifier::new();
237 match verifier.verify_car(did, &root, &blocks).await {
238 Ok(verified) => {
239 debug!(
240 "CAR verification successful: rev={}, data_cid={}",
241 verified.rev, verified.data_cid
242 );
243 }
244 Err(crate::sync::verify::VerifyError::DidMismatch {
245 commit_did,
246 expected_did,
247 }) => {
248 return (
249 StatusCode::FORBIDDEN,
250 Json(json!({
251 "error": "InvalidRequest",
252 "message": format!(
253 "CAR file is for DID {} but you are authenticated as {}",
254 commit_did, expected_did
255 )
256 })),
257 )
258 .into_response();
259 }
260 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
261 return (
262 StatusCode::BAD_REQUEST,
263 Json(json!({
264 "error": "InvalidSignature",
265 "message": "CAR file commit signature verification failed"
266 })),
267 )
268 .into_response();
269 }
270 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
271 warn!("DID resolution failed during import verification: {}", msg);
272 return (
273 StatusCode::BAD_REQUEST,
274 Json(json!({
275 "error": "InvalidRequest",
276 "message": format!("Failed to verify DID: {}", msg)
277 })),
278 )
279 .into_response();
280 }
281 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
282 return (
283 StatusCode::BAD_REQUEST,
284 Json(json!({
285 "error": "InvalidRequest",
286 "message": "DID document does not contain a signing key"
287 })),
288 )
289 .into_response();
290 }
291 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
292 return (
293 StatusCode::BAD_REQUEST,
294 Json(json!({
295 "error": "InvalidRequest",
296 "message": format!("MST validation failed: {}", msg)
297 })),
298 )
299 .into_response();
300 }
301 Err(e) => {
302 error!("CAR verification error: {:?}", e);
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({
306 "error": "InvalidRequest",
307 "message": format!("CAR verification failed: {}", e)
308 })),
309 )
310 .into_response();
311 }
312 }
313 }
314 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
315 .ok()
316 .and_then(|s| s.parse().ok())
317 .unwrap_or(DEFAULT_MAX_BLOCKS);
318 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
319 Ok(import_result) => {
320 info!(
321 "Successfully imported {} records for user {}",
322 import_result.records.len(),
323 did
324 );
325 let key_row = match sqlx::query!(
326 r#"SELECT uk.key_bytes, uk.encryption_version
327 FROM user_keys uk
328 JOIN users u ON uk.user_id = u.id
329 WHERE u.did = $1"#,
330 did
331 )
332 .fetch_optional(&state.db)
333 .await
334 {
335 Ok(Some(row)) => row,
336 Ok(None) => {
337 error!("No signing key found for user {}", did);
338 return (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 Json(json!({"error": "InternalError", "message": "Signing key not found"})),
341 )
342 .into_response();
343 }
344 Err(e) => {
345 error!("DB error fetching signing key: {:?}", e);
346 return (
347 StatusCode::INTERNAL_SERVER_ERROR,
348 Json(json!({"error": "InternalError"})),
349 )
350 .into_response();
351 }
352 };
353 let key_bytes =
354 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
355 Ok(k) => k,
356 Err(e) => {
357 error!("Failed to decrypt signing key: {}", e);
358 return (
359 StatusCode::INTERNAL_SERVER_ERROR,
360 Json(json!({"error": "InternalError"})),
361 )
362 .into_response();
363 }
364 };
365 let signing_key = match SigningKey::from_slice(&key_bytes) {
366 Ok(k) => k,
367 Err(e) => {
368 error!("Invalid signing key: {:?}", e);
369 return (
370 StatusCode::INTERNAL_SERVER_ERROR,
371 Json(json!({"error": "InternalError"})),
372 )
373 .into_response();
374 }
375 };
376 let new_rev = Tid::now(LimitedU32::MIN);
377 let new_rev_str = new_rev.to_string();
378 let (commit_bytes, _sig) = match create_signed_commit(
379 did,
380 import_result.data_cid,
381 &new_rev_str,
382 None,
383 &signing_key,
384 ) {
385 Ok(result) => result,
386 Err(e) => {
387 error!("Failed to create new commit: {}", e);
388 return (
389 StatusCode::INTERNAL_SERVER_ERROR,
390 Json(json!({"error": "InternalError"})),
391 )
392 .into_response();
393 }
394 };
395 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
396 Ok(cid) => cid,
397 Err(e) => {
398 error!("Failed to store new commit block: {:?}", e);
399 return (
400 StatusCode::INTERNAL_SERVER_ERROR,
401 Json(json!({"error": "InternalError"})),
402 )
403 .into_response();
404 }
405 };
406 let new_root_str = new_root_cid.to_string();
407 if let Err(e) = sqlx::query!(
408 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
409 new_root_str,
410 &new_rev_str,
411 user_id
412 )
413 .execute(&state.db)
414 .await
415 {
416 error!("Failed to update repo root: {:?}", e);
417 return (
418 StatusCode::INTERNAL_SERVER_ERROR,
419 Json(json!({"error": "InternalError"})),
420 )
421 .into_response();
422 }
423 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
424 all_block_cids.push(new_root_cid.to_bytes());
425 if let Err(e) = sqlx::query!(
426 r#"
427 INSERT INTO user_blocks (user_id, block_cid)
428 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
429 ON CONFLICT (user_id, block_cid) DO NOTHING
430 "#,
431 user_id,
432 &all_block_cids
433 )
434 .execute(&state.db)
435 .await
436 {
437 error!("Failed to insert user_blocks: {:?}", e);
438 return (
439 StatusCode::INTERNAL_SERVER_ERROR,
440 Json(json!({"error": "InternalError"})),
441 )
442 .into_response();
443 }
444 info!(
445 "Created new commit for imported repo: cid={}, rev={}",
446 new_root_str, new_rev_str
447 );
448 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
449 {
450 warn!("Failed to sequence import event: {:?}", e);
451 }
452 (StatusCode::OK, Json(json!({}))).into_response()
453 }
454 Err(ImportError::SizeLimitExceeded) => (
455 StatusCode::BAD_REQUEST,
456 Json(json!({
457 "error": "InvalidRequest",
458 "message": format!("Import exceeds block limit of {}", max_blocks)
459 })),
460 )
461 .into_response(),
462 Err(ImportError::RepoNotFound) => (
463 StatusCode::NOT_FOUND,
464 Json(json!({
465 "error": "RepoNotFound",
466 "message": "Repository not initialized for this account"
467 })),
468 )
469 .into_response(),
470 Err(ImportError::InvalidCbor(msg)) => (
471 StatusCode::BAD_REQUEST,
472 Json(json!({
473 "error": "InvalidRequest",
474 "message": format!("Invalid CBOR data: {}", msg)
475 })),
476 )
477 .into_response(),
478 Err(ImportError::InvalidCommit(msg)) => (
479 StatusCode::BAD_REQUEST,
480 Json(json!({
481 "error": "InvalidRequest",
482 "message": format!("Invalid commit structure: {}", msg)
483 })),
484 )
485 .into_response(),
486 Err(ImportError::BlockNotFound(cid)) => (
487 StatusCode::BAD_REQUEST,
488 Json(json!({
489 "error": "InvalidRequest",
490 "message": format!("Referenced block not found in CAR: {}", cid)
491 })),
492 )
493 .into_response(),
494 Err(ImportError::ConcurrentModification) => (
495 StatusCode::CONFLICT,
496 Json(json!({
497 "error": "ConcurrentModification",
498 "message": "Repository is being modified by another operation, please retry"
499 })),
500 )
501 .into_response(),
502 Err(ImportError::VerificationFailed(ve)) => (
503 StatusCode::BAD_REQUEST,
504 Json(json!({
505 "error": "VerificationFailed",
506 "message": format!("CAR verification failed: {}", ve)
507 })),
508 )
509 .into_response(),
510 Err(ImportError::DidMismatch { car_did, auth_did }) => (
511 StatusCode::FORBIDDEN,
512 Json(json!({
513 "error": "DidMismatch",
514 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
515 })),
516 )
517 .into_response(),
518 Err(e) => {
519 error!("Import error: {:?}", e);
520 (
521 StatusCode::INTERNAL_SERVER_ERROR,
522 Json(json!({"error": "InternalError"})),
523 )
524 .into_response()
525 }
526 }
527}
528
529async fn sequence_import_event(
530 state: &AppState,
531 did: &str,
532 commit_cid: &str,
533) -> Result<(), sqlx::Error> {
534 let prev_cid: Option<String> = None;
535 let prev_data_cid: Option<String> = None;
536 let ops = serde_json::json!([]);
537 let blobs: Vec<String> = vec![];
538 let blocks_cids: Vec<String> = vec![];
539 let seq_row = sqlx::query!(
540 r#"
541 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
542 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
543 RETURNING seq
544 "#,
545 did,
546 commit_cid,
547 prev_cid,
548 prev_data_cid,
549 ops,
550 &blobs,
551 &blocks_cids
552 )
553 .fetch_one(&state.db)
554 .await?;
555 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
556 .execute(&state.db)
557 .await?;
558 Ok(())
559}