this repo has no description
1use crate::api::ApiError;
2use crate::api::repo::record::create_signed_commit;
3use crate::state::AppState;
4use crate::sync::import::{ImportError, apply_import, parse_car};
5use crate::sync::verify::CarVerifier;
6use axum::{
7 Json,
8 body::Bytes,
9 extract::State,
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use jacquard::types::{integer::LimitedU32, string::Tid};
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde_json::json;
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024;
20const DEFAULT_MAX_BLOCKS: usize = 500000;
21
22pub async fn import_repo(
23 State(state): State<AppState>,
24 headers: axum::http::HeaderMap,
25 body: Bytes,
26) -> Response {
27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
28 .map(|v| v != "false" && v != "0")
29 .unwrap_or(true);
30 if !accepting_imports {
31 return (
32 StatusCode::BAD_REQUEST,
33 Json(json!({
34 "error": "InvalidRequest",
35 "message": "Service is not accepting repo imports"
36 })),
37 )
38 .into_response();
39 }
40 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
41 .ok()
42 .and_then(|s| s.parse().ok())
43 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
44 if body.len() > max_size {
45 return (
46 StatusCode::PAYLOAD_TOO_LARGE,
47 Json(json!({
48 "error": "InvalidRequest",
49 "message": format!("Import size exceeds limit of {} bytes", max_size)
50 })),
51 )
52 .into_response();
53 }
54 let token = match crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok()),
56 ) {
57 Some(t) => t,
58 None => return ApiError::AuthenticationRequired.into_response(),
59 };
60 let auth_user =
61 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
62 Ok(user) => user,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65 let did = &auth_user.did;
66 let user = match sqlx::query!(
67 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
68 did
69 )
70 .fetch_optional(&state.db)
71 .await
72 {
73 Ok(Some(row)) => row,
74 Ok(None) => {
75 return (
76 StatusCode::NOT_FOUND,
77 Json(json!({"error": "AccountNotFound"})),
78 )
79 .into_response();
80 }
81 Err(e) => {
82 error!("DB error fetching user: {:?}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError"})),
86 )
87 .into_response();
88 }
89 };
90 if user.takedown_ref.is_some() {
91 return (
92 StatusCode::FORBIDDEN,
93 Json(json!({
94 "error": "AccountTakenDown",
95 "message": "Account has been taken down"
96 })),
97 )
98 .into_response();
99 }
100 let user_id = user.id;
101 let (root, blocks) = match parse_car(&body).await {
102 Ok((r, b)) => (r, b),
103 Err(ImportError::InvalidRootCount) => {
104 return (
105 StatusCode::BAD_REQUEST,
106 Json(json!({
107 "error": "InvalidRequest",
108 "message": "Expected exactly one root in CAR file"
109 })),
110 )
111 .into_response();
112 }
113 Err(ImportError::CarParse(msg)) => {
114 return (
115 StatusCode::BAD_REQUEST,
116 Json(json!({
117 "error": "InvalidRequest",
118 "message": format!("Failed to parse CAR file: {}", msg)
119 })),
120 )
121 .into_response();
122 }
123 Err(e) => {
124 error!("CAR parsing error: {:?}", e);
125 return (
126 StatusCode::BAD_REQUEST,
127 Json(json!({
128 "error": "InvalidRequest",
129 "message": format!("Invalid CAR file: {}", e)
130 })),
131 )
132 .into_response();
133 }
134 };
135 info!(
136 "Importing repo for user {}: {} blocks, root {}",
137 did,
138 blocks.len(),
139 root
140 );
141 let root_block = match blocks.get(&root) {
142 Some(b) => b,
143 None => {
144 return (
145 StatusCode::BAD_REQUEST,
146 Json(json!({
147 "error": "InvalidRequest",
148 "message": "Root block not found in CAR file"
149 })),
150 )
151 .into_response();
152 }
153 };
154 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
155 Ok(commit) => commit.did().to_string(),
156 Err(e) => {
157 return (
158 StatusCode::BAD_REQUEST,
159 Json(json!({
160 "error": "InvalidRequest",
161 "message": format!("Invalid commit: {}", e)
162 })),
163 )
164 .into_response();
165 }
166 };
167 if commit_did != *did {
168 return (
169 StatusCode::FORBIDDEN,
170 Json(json!({
171 "error": "InvalidRequest",
172 "message": format!(
173 "CAR file is for DID {} but you are authenticated as {}",
174 commit_did, did
175 )
176 })),
177 )
178 .into_response();
179 }
180 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
181 .map(|v| v == "true" || v == "1")
182 .unwrap_or(false);
183 let is_migration = user.deactivated_at.is_some();
184 if skip_verification {
185 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
186 } else if is_migration {
187 debug!("Verifying CAR file structure for migration (skipping signature verification)");
188 let verifier = CarVerifier::new();
189 match verifier.verify_car_structure_only(did, &root, &blocks) {
190 Ok(verified) => {
191 debug!(
192 "CAR structure verification successful: rev={}, data_cid={}",
193 verified.rev, verified.data_cid
194 );
195 }
196 Err(crate::sync::verify::VerifyError::DidMismatch {
197 commit_did,
198 expected_did,
199 }) => {
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "InvalidRequest",
204 "message": format!(
205 "CAR file is for DID {} but you are authenticated as {}",
206 commit_did, expected_did
207 )
208 })),
209 )
210 .into_response();
211 }
212 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
213 return (
214 StatusCode::BAD_REQUEST,
215 Json(json!({
216 "error": "InvalidRequest",
217 "message": format!("MST validation failed: {}", msg)
218 })),
219 )
220 .into_response();
221 }
222 Err(e) => {
223 error!("CAR structure verification error: {:?}", e);
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({
227 "error": "InvalidRequest",
228 "message": format!("CAR verification failed: {}", e)
229 })),
230 )
231 .into_response();
232 }
233 }
234 } else {
235 debug!("Verifying CAR file signature and structure for DID {}", did);
236 let verifier = CarVerifier::new();
237 match verifier.verify_car(did, &root, &blocks).await {
238 Ok(verified) => {
239 debug!(
240 "CAR verification successful: rev={}, data_cid={}",
241 verified.rev, verified.data_cid
242 );
243 }
244 Err(crate::sync::verify::VerifyError::DidMismatch {
245 commit_did,
246 expected_did,
247 }) => {
248 return (
249 StatusCode::FORBIDDEN,
250 Json(json!({
251 "error": "InvalidRequest",
252 "message": format!(
253 "CAR file is for DID {} but you are authenticated as {}",
254 commit_did, expected_did
255 )
256 })),
257 )
258 .into_response();
259 }
260 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
261 return (
262 StatusCode::BAD_REQUEST,
263 Json(json!({
264 "error": "InvalidSignature",
265 "message": "CAR file commit signature verification failed"
266 })),
267 )
268 .into_response();
269 }
270 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
271 warn!("DID resolution failed during import verification: {}", msg);
272 return (
273 StatusCode::BAD_REQUEST,
274 Json(json!({
275 "error": "InvalidRequest",
276 "message": format!("Failed to verify DID: {}", msg)
277 })),
278 )
279 .into_response();
280 }
281 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
282 return (
283 StatusCode::BAD_REQUEST,
284 Json(json!({
285 "error": "InvalidRequest",
286 "message": "DID document does not contain a signing key"
287 })),
288 )
289 .into_response();
290 }
291 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
292 return (
293 StatusCode::BAD_REQUEST,
294 Json(json!({
295 "error": "InvalidRequest",
296 "message": format!("MST validation failed: {}", msg)
297 })),
298 )
299 .into_response();
300 }
301 Err(e) => {
302 error!("CAR verification error: {:?}", e);
303 return (
304 StatusCode::BAD_REQUEST,
305 Json(json!({
306 "error": "InvalidRequest",
307 "message": format!("CAR verification failed: {}", e)
308 })),
309 )
310 .into_response();
311 }
312 }
313 }
314 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
315 .ok()
316 .and_then(|s| s.parse().ok())
317 .unwrap_or(DEFAULT_MAX_BLOCKS);
318 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
319 Ok(import_result) => {
320 info!(
321 "Successfully imported {} records for user {}",
322 import_result.records.len(),
323 did
324 );
325 let mut blob_ref_count = 0;
326 for record in &import_result.records {
327 for blob_ref in &record.blob_refs {
328 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
329 if let Err(e) = sqlx::query!(
330 r#"
331 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
332 VALUES ($1, $2, $3)
333 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
334 "#,
335 user_id,
336 record_uri,
337 blob_ref.cid
338 )
339 .execute(&state.db)
340 .await
341 {
342 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
343 } else {
344 blob_ref_count += 1;
345 }
346 }
347 }
348 if blob_ref_count > 0 {
349 info!(
350 "Recorded {} blob references for imported repo",
351 blob_ref_count
352 );
353 }
354 let key_row = match sqlx::query!(
355 r#"SELECT uk.key_bytes, uk.encryption_version
356 FROM user_keys uk
357 JOIN users u ON uk.user_id = u.id
358 WHERE u.did = $1"#,
359 did
360 )
361 .fetch_optional(&state.db)
362 .await
363 {
364 Ok(Some(row)) => row,
365 Ok(None) => {
366 error!("No signing key found for user {}", did);
367 return (
368 StatusCode::INTERNAL_SERVER_ERROR,
369 Json(json!({"error": "InternalError", "message": "Signing key not found"})),
370 )
371 .into_response();
372 }
373 Err(e) => {
374 error!("DB error fetching signing key: {:?}", e);
375 return (
376 StatusCode::INTERNAL_SERVER_ERROR,
377 Json(json!({"error": "InternalError"})),
378 )
379 .into_response();
380 }
381 };
382 let key_bytes =
383 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
384 Ok(k) => k,
385 Err(e) => {
386 error!("Failed to decrypt signing key: {}", e);
387 return (
388 StatusCode::INTERNAL_SERVER_ERROR,
389 Json(json!({"error": "InternalError"})),
390 )
391 .into_response();
392 }
393 };
394 let signing_key = match SigningKey::from_slice(&key_bytes) {
395 Ok(k) => k,
396 Err(e) => {
397 error!("Invalid signing key: {:?}", e);
398 return (
399 StatusCode::INTERNAL_SERVER_ERROR,
400 Json(json!({"error": "InternalError"})),
401 )
402 .into_response();
403 }
404 };
405 let new_rev = Tid::now(LimitedU32::MIN);
406 let new_rev_str = new_rev.to_string();
407 let (commit_bytes, _sig) = match create_signed_commit(
408 did,
409 import_result.data_cid,
410 &new_rev_str,
411 None,
412 &signing_key,
413 ) {
414 Ok(result) => result,
415 Err(e) => {
416 error!("Failed to create new commit: {}", e);
417 return (
418 StatusCode::INTERNAL_SERVER_ERROR,
419 Json(json!({"error": "InternalError"})),
420 )
421 .into_response();
422 }
423 };
424 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
425 Ok(cid) => cid,
426 Err(e) => {
427 error!("Failed to store new commit block: {:?}", e);
428 return (
429 StatusCode::INTERNAL_SERVER_ERROR,
430 Json(json!({"error": "InternalError"})),
431 )
432 .into_response();
433 }
434 };
435 let new_root_str = new_root_cid.to_string();
436 if let Err(e) = sqlx::query!(
437 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
438 new_root_str,
439 &new_rev_str,
440 user_id
441 )
442 .execute(&state.db)
443 .await
444 {
445 error!("Failed to update repo root: {:?}", e);
446 return (
447 StatusCode::INTERNAL_SERVER_ERROR,
448 Json(json!({"error": "InternalError"})),
449 )
450 .into_response();
451 }
452 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
453 all_block_cids.push(new_root_cid.to_bytes());
454 if let Err(e) = sqlx::query!(
455 r#"
456 INSERT INTO user_blocks (user_id, block_cid)
457 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
458 ON CONFLICT (user_id, block_cid) DO NOTHING
459 "#,
460 user_id,
461 &all_block_cids
462 )
463 .execute(&state.db)
464 .await
465 {
466 error!("Failed to insert user_blocks: {:?}", e);
467 return (
468 StatusCode::INTERNAL_SERVER_ERROR,
469 Json(json!({"error": "InternalError"})),
470 )
471 .into_response();
472 }
473 info!(
474 "Created new commit for imported repo: cid={}, rev={}",
475 new_root_str, new_rev_str
476 );
477 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
478 {
479 warn!("Failed to sequence import event: {:?}", e);
480 }
481 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
482 let birthdate_pref = json!({
483 "$type": "app.bsky.actor.defs#personalDetailsPref",
484 "birthDate": "1998-05-06T00:00:00.000Z"
485 });
486 if let Err(e) = sqlx::query!(
487 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
488 ON CONFLICT (user_id, name) DO NOTHING",
489 user_id,
490 "app.bsky.actor.defs#personalDetailsPref",
491 birthdate_pref
492 )
493 .execute(&state.db)
494 .await
495 {
496 warn!(
497 "Failed to set default birthdate preference for migrated user: {:?}",
498 e
499 );
500 }
501 }
502 (StatusCode::OK, Json(json!({}))).into_response()
503 }
504 Err(ImportError::SizeLimitExceeded) => (
505 StatusCode::BAD_REQUEST,
506 Json(json!({
507 "error": "InvalidRequest",
508 "message": format!("Import exceeds block limit of {}", max_blocks)
509 })),
510 )
511 .into_response(),
512 Err(ImportError::RepoNotFound) => (
513 StatusCode::NOT_FOUND,
514 Json(json!({
515 "error": "RepoNotFound",
516 "message": "Repository not initialized for this account"
517 })),
518 )
519 .into_response(),
520 Err(ImportError::InvalidCbor(msg)) => (
521 StatusCode::BAD_REQUEST,
522 Json(json!({
523 "error": "InvalidRequest",
524 "message": format!("Invalid CBOR data: {}", msg)
525 })),
526 )
527 .into_response(),
528 Err(ImportError::InvalidCommit(msg)) => (
529 StatusCode::BAD_REQUEST,
530 Json(json!({
531 "error": "InvalidRequest",
532 "message": format!("Invalid commit structure: {}", msg)
533 })),
534 )
535 .into_response(),
536 Err(ImportError::BlockNotFound(cid)) => (
537 StatusCode::BAD_REQUEST,
538 Json(json!({
539 "error": "InvalidRequest",
540 "message": format!("Referenced block not found in CAR: {}", cid)
541 })),
542 )
543 .into_response(),
544 Err(ImportError::ConcurrentModification) => (
545 StatusCode::CONFLICT,
546 Json(json!({
547 "error": "ConcurrentModification",
548 "message": "Repository is being modified by another operation, please retry"
549 })),
550 )
551 .into_response(),
552 Err(ImportError::VerificationFailed(ve)) => (
553 StatusCode::BAD_REQUEST,
554 Json(json!({
555 "error": "VerificationFailed",
556 "message": format!("CAR verification failed: {}", ve)
557 })),
558 )
559 .into_response(),
560 Err(ImportError::DidMismatch { car_did, auth_did }) => (
561 StatusCode::FORBIDDEN,
562 Json(json!({
563 "error": "DidMismatch",
564 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
565 })),
566 )
567 .into_response(),
568 Err(e) => {
569 error!("Import error: {:?}", e);
570 (
571 StatusCode::INTERNAL_SERVER_ERROR,
572 Json(json!({"error": "InternalError"})),
573 )
574 .into_response()
575 }
576 }
577}
578
579async fn sequence_import_event(
580 state: &AppState,
581 did: &str,
582 commit_cid: &str,
583) -> Result<(), sqlx::Error> {
584 let prev_cid: Option<String> = None;
585 let prev_data_cid: Option<String> = None;
586 let ops = serde_json::json!([]);
587 let blobs: Vec<String> = vec![];
588 let blocks_cids: Vec<String> = vec![];
589 let seq_row = sqlx::query!(
590 r#"
591 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
592 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
593 RETURNING seq
594 "#,
595 did,
596 commit_cid,
597 prev_cid,
598 prev_data_cid,
599 ops,
600 &blobs,
601 &blocks_cids
602 )
603 .fetch_one(&state.db)
604 .await?;
605 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
606 .execute(&state.db)
607 .await?;
608 Ok(())
609}