this repo has no description
1use crate::api::error::ApiError;
2use crate::api::repo::record::create_signed_commit;
3use crate::api::EmptyResponse;
4use crate::state::AppState;
5use crate::sync::import::{ImportError, apply_import, parse_car};
6use crate::sync::verify::CarVerifier;
7use axum::{
8 body::Bytes,
9 extract::State,
10 response::{IntoResponse, Response},
11};
12use jacquard::types::{integer::LimitedU32, string::Tid};
13use jacquard_repo::storage::BlockStore;
14use k256::ecdsa::SigningKey;
15use serde_json::json;
16use tracing::{debug, error, info, warn};
17
18const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024;
19const DEFAULT_MAX_BLOCKS: usize = 500000;
20
21pub async fn import_repo(
22 State(state): State<AppState>,
23 headers: axum::http::HeaderMap,
24 body: Bytes,
25) -> Response {
26 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
27 .map(|v| v != "false" && v != "0")
28 .unwrap_or(true);
29 if !accepting_imports {
30 return ApiError::InvalidRequest("Service is not accepting repo imports".into())
31 .into_response();
32 }
33 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
34 .ok()
35 .and_then(|s| s.parse().ok())
36 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
37 if body.len() > max_size {
38 return ApiError::PayloadTooLarge(format!(
39 "Import size exceeds limit of {} bytes",
40 max_size
41 ))
42 .into_response();
43 }
44 let token = match crate::auth::extract_bearer_token_from_header(
45 headers.get("Authorization").and_then(|h| h.to_str().ok()),
46 ) {
47 Some(t) => t,
48 None => return ApiError::AuthenticationRequired.into_response(),
49 };
50 let auth_user =
51 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
52 Ok(user) => user,
53 Err(e) => return ApiError::from(e).into_response(),
54 };
55 let did = &auth_user.did;
56 let user = match sqlx::query!(
57 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
58 did
59 )
60 .fetch_optional(&state.db)
61 .await
62 {
63 Ok(Some(row)) => row,
64 Ok(None) => {
65 return ApiError::AccountNotFound.into_response();
66 }
67 Err(e) => {
68 error!("DB error fetching user: {:?}", e);
69 return ApiError::InternalError(None).into_response();
70 }
71 };
72 if user.takedown_ref.is_some() {
73 return ApiError::AccountTakedown.into_response();
74 }
75 let user_id = user.id;
76 let (root, blocks) = match parse_car(&body).await {
77 Ok((r, b)) => (r, b),
78 Err(ImportError::InvalidRootCount) => {
79 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into())
80 .into_response();
81 }
82 Err(ImportError::CarParse(msg)) => {
83 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg))
84 .into_response();
85 }
86 Err(e) => {
87 error!("CAR parsing error: {:?}", e);
88 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response();
89 }
90 };
91 info!(
92 "Importing repo for user {}: {} blocks, root {}",
93 did,
94 blocks.len(),
95 root
96 );
97 let Some(root_block) = blocks.get(&root) else {
98 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response();
99 };
100 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
101 Ok(commit) => commit.did().to_string(),
102 Err(e) => {
103 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response();
104 }
105 };
106 if commit_did != *did {
107 return ApiError::InvalidRepo(format!(
108 "CAR file is for DID {} but you are authenticated as {}",
109 commit_did, did
110 ))
111 .into_response();
112 }
113 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
114 .map(|v| v == "true" || v == "1")
115 .unwrap_or(false);
116 let is_migration = user.deactivated_at.is_some();
117 if skip_verification {
118 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
119 } else if is_migration {
120 debug!("Verifying CAR file structure for migration (skipping signature verification)");
121 let verifier = CarVerifier::new();
122 match verifier.verify_car_structure_only(did, &root, &blocks) {
123 Ok(verified) => {
124 debug!(
125 "CAR structure verification successful: rev={}, data_cid={}",
126 verified.rev, verified.data_cid
127 );
128 }
129 Err(crate::sync::verify::VerifyError::DidMismatch {
130 commit_did,
131 expected_did,
132 }) => {
133 return ApiError::InvalidRepo(format!(
134 "CAR file is for DID {} but you are authenticated as {}",
135 commit_did, expected_did
136 ))
137 .into_response();
138 }
139 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
140 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
141 .into_response();
142 }
143 Err(e) => {
144 error!("CAR structure verification error: {:?}", e);
145 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
146 .into_response();
147 }
148 }
149 } else {
150 debug!("Verifying CAR file signature and structure for DID {}", did);
151 let verifier = CarVerifier::new();
152 match verifier.verify_car(did, &root, &blocks).await {
153 Ok(verified) => {
154 debug!(
155 "CAR verification successful: rev={}, data_cid={}",
156 verified.rev, verified.data_cid
157 );
158 }
159 Err(crate::sync::verify::VerifyError::DidMismatch {
160 commit_did,
161 expected_did,
162 }) => {
163 return ApiError::InvalidRepo(format!(
164 "CAR file is for DID {} but you are authenticated as {}",
165 commit_did, expected_did
166 ))
167 .into_response();
168 }
169 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
170 return ApiError::InvalidRequest(
171 "CAR file commit signature verification failed".into(),
172 )
173 .into_response();
174 }
175 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
176 warn!("DID resolution failed during import verification: {}", msg);
177 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg))
178 .into_response();
179 }
180 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
181 return ApiError::InvalidRequest(
182 "DID document does not contain a signing key".into(),
183 )
184 .into_response();
185 }
186 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
187 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
188 .into_response();
189 }
190 Err(e) => {
191 error!("CAR verification error: {:?}", e);
192 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
193 .into_response();
194 }
195 }
196 }
197 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
198 .ok()
199 .and_then(|s| s.parse().ok())
200 .unwrap_or(DEFAULT_MAX_BLOCKS);
201 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
202 Ok(import_result) => {
203 info!(
204 "Successfully imported {} records for user {}",
205 import_result.records.len(),
206 did
207 );
208 let mut blob_ref_count = 0;
209 for record in &import_result.records {
210 for blob_ref in &record.blob_refs {
211 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
212 if let Err(e) = sqlx::query!(
213 r#"
214 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
215 VALUES ($1, $2, $3)
216 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
217 "#,
218 user_id,
219 record_uri,
220 blob_ref.cid
221 )
222 .execute(&state.db)
223 .await
224 {
225 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
226 } else {
227 blob_ref_count += 1;
228 }
229 }
230 }
231 if blob_ref_count > 0 {
232 info!(
233 "Recorded {} blob references for imported repo",
234 blob_ref_count
235 );
236 }
237 let key_row = match sqlx::query!(
238 r#"SELECT uk.key_bytes, uk.encryption_version
239 FROM user_keys uk
240 JOIN users u ON uk.user_id = u.id
241 WHERE u.did = $1"#,
242 did
243 )
244 .fetch_optional(&state.db)
245 .await
246 {
247 Ok(Some(row)) => row,
248 Ok(None) => {
249 error!("No signing key found for user {}", did);
250 return ApiError::InternalError(Some("Signing key not found".into()))
251 .into_response();
252 }
253 Err(e) => {
254 error!("DB error fetching signing key: {:?}", e);
255 return ApiError::InternalError(None).into_response();
256 }
257 };
258 let key_bytes =
259 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
260 Ok(k) => k,
261 Err(e) => {
262 error!("Failed to decrypt signing key: {}", e);
263 return ApiError::InternalError(None).into_response();
264 }
265 };
266 let signing_key = match SigningKey::from_slice(&key_bytes) {
267 Ok(k) => k,
268 Err(e) => {
269 error!("Invalid signing key: {:?}", e);
270 return ApiError::InternalError(None).into_response();
271 }
272 };
273 let new_rev = Tid::now(LimitedU32::MIN);
274 let new_rev_str = new_rev.to_string();
275 let (commit_bytes, _sig) = match create_signed_commit(
276 did,
277 import_result.data_cid,
278 &new_rev_str,
279 None,
280 &signing_key,
281 ) {
282 Ok(result) => result,
283 Err(e) => {
284 error!("Failed to create new commit: {}", e);
285 return ApiError::InternalError(None).into_response();
286 }
287 };
288 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
289 Ok(cid) => cid,
290 Err(e) => {
291 error!("Failed to store new commit block: {:?}", e);
292 return ApiError::InternalError(None).into_response();
293 }
294 };
295 let new_root_str = new_root_cid.to_string();
296 if let Err(e) = sqlx::query!(
297 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
298 new_root_str,
299 &new_rev_str,
300 user_id
301 )
302 .execute(&state.db)
303 .await
304 {
305 error!("Failed to update repo root: {:?}", e);
306 return ApiError::InternalError(None).into_response();
307 }
308 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
309 all_block_cids.push(new_root_cid.to_bytes());
310 if let Err(e) = sqlx::query!(
311 r#"
312 INSERT INTO user_blocks (user_id, block_cid)
313 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
314 ON CONFLICT (user_id, block_cid) DO NOTHING
315 "#,
316 user_id,
317 &all_block_cids
318 )
319 .execute(&state.db)
320 .await
321 {
322 error!("Failed to insert user_blocks: {:?}", e);
323 return ApiError::InternalError(None).into_response();
324 }
325 info!(
326 "Created new commit for imported repo: cid={}, rev={}",
327 new_root_str, new_rev_str
328 );
329 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
330 {
331 warn!("Failed to sequence import event: {:?}", e);
332 }
333 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
334 let birthdate_pref = json!({
335 "$type": "app.bsky.actor.defs#personalDetailsPref",
336 "birthDate": "1998-05-06T00:00:00.000Z"
337 });
338 if let Err(e) = sqlx::query!(
339 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
340 ON CONFLICT (user_id, name) DO NOTHING",
341 user_id,
342 "app.bsky.actor.defs#personalDetailsPref",
343 birthdate_pref
344 )
345 .execute(&state.db)
346 .await
347 {
348 warn!(
349 "Failed to set default birthdate preference for migrated user: {:?}",
350 e
351 );
352 }
353 }
354 EmptyResponse::ok().into_response()
355 }
356 Err(ImportError::SizeLimitExceeded) => {
357 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks))
358 .into_response()
359 }
360 Err(ImportError::RepoNotFound) => {
361 ApiError::RepoNotFound(Some("Repository not initialized for this account".into()))
362 .into_response()
363 }
364 Err(ImportError::InvalidCbor(msg)) => {
365 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response()
366 }
367 Err(ImportError::InvalidCommit(msg)) => {
368 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response()
369 }
370 Err(ImportError::BlockNotFound(cid)) => {
371 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid))
372 .into_response()
373 }
374 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some("Repository is being modified by another operation, please retry".into(),))
375 .into_response(),
376 Err(ImportError::VerificationFailed(ve)) => {
377 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response()
378 }
379 Err(ImportError::DidMismatch { car_did, auth_did }) => {
380 ApiError::InvalidRequest(format!(
381 "CAR is for {} but authenticated as {}",
382 car_did, auth_did
383 ))
384 .into_response()
385 }
386 Err(e) => {
387 error!("Import error: {:?}", e);
388 ApiError::InternalError(None).into_response()
389 }
390 }
391}
392
393async fn sequence_import_event(
394 state: &AppState,
395 did: &str,
396 commit_cid: &str,
397) -> Result<(), sqlx::Error> {
398 let prev_cid: Option<String> = None;
399 let prev_data_cid: Option<String> = None;
400 let ops = serde_json::json!([]);
401 let blobs: Vec<String> = vec![];
402 let blocks_cids: Vec<String> = vec![];
403 let seq_row = sqlx::query!(
404 r#"
405 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
406 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
407 RETURNING seq
408 "#,
409 did,
410 commit_cid,
411 prev_cid,
412 prev_data_cid,
413 ops,
414 &blobs,
415 &blocks_cids
416 )
417 .fetch_one(&state.db)
418 .await?;
419 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
420 .execute(&state.db)
421 .await?;
422 Ok(())
423}