this repo has no description
1use crate::api::EmptyResponse;
2use crate::api::error::ApiError;
3use crate::api::repo::record::create_signed_commit;
4use crate::auth::BearerAuthAllowDeactivated;
5use crate::state::AppState;
6use crate::sync::import::{ImportError, apply_import, parse_car};
7use crate::sync::verify::CarVerifier;
8use crate::types::Did;
9use axum::{
10 body::Bytes,
11 extract::State,
12 response::{IntoResponse, Response},
13};
14use jacquard::types::{integer::LimitedU32, string::Tid};
15use jacquard_repo::storage::BlockStore;
16use k256::ecdsa::SigningKey;
17use serde_json::json;
18use tracing::{debug, error, info, warn};
19
20const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024;
21const DEFAULT_MAX_BLOCKS: usize = 500000;
22
23pub async fn import_repo(
24 State(state): State<AppState>,
25 auth: BearerAuthAllowDeactivated,
26 body: Bytes,
27) -> Response {
28 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
29 .map(|v| v != "false" && v != "0")
30 .unwrap_or(true);
31 if !accepting_imports {
32 return ApiError::InvalidRequest("Service is not accepting repo imports".into())
33 .into_response();
34 }
35 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
36 .ok()
37 .and_then(|s| s.parse().ok())
38 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
39 if body.len() > max_size {
40 return ApiError::PayloadTooLarge(format!(
41 "Import size exceeds limit of {} bytes",
42 max_size
43 ))
44 .into_response();
45 }
46 let auth_user = auth.0;
47 let did = &auth_user.did;
48 let user = match sqlx::query!(
49 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
50 did
51 )
52 .fetch_optional(&state.db)
53 .await
54 {
55 Ok(Some(row)) => row,
56 Ok(None) => {
57 return ApiError::AccountNotFound.into_response();
58 }
59 Err(e) => {
60 error!("DB error fetching user: {:?}", e);
61 return ApiError::InternalError(None).into_response();
62 }
63 };
64 if user.takedown_ref.is_some() {
65 return ApiError::AccountTakedown.into_response();
66 }
67 let user_id = user.id;
68 let (root, blocks) = match parse_car(&body).await {
69 Ok((r, b)) => (r, b),
70 Err(ImportError::InvalidRootCount) => {
71 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into())
72 .into_response();
73 }
74 Err(ImportError::CarParse(msg)) => {
75 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg))
76 .into_response();
77 }
78 Err(e) => {
79 error!("CAR parsing error: {:?}", e);
80 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response();
81 }
82 };
83 info!(
84 "Importing repo for user {}: {} blocks, root {}",
85 did,
86 blocks.len(),
87 root
88 );
89 let Some(root_block) = blocks.get(&root) else {
90 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response();
91 };
92 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
93 Ok(commit) => commit.did().to_string(),
94 Err(e) => {
95 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response();
96 }
97 };
98 if commit_did != *did {
99 return ApiError::InvalidRepo(format!(
100 "CAR file is for DID {} but you are authenticated as {}",
101 commit_did, did
102 ))
103 .into_response();
104 }
105 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
106 .map(|v| v == "true" || v == "1")
107 .unwrap_or(false);
108 let is_migration = user.deactivated_at.is_some();
109 if skip_verification {
110 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
111 } else if is_migration {
112 debug!("Verifying CAR file structure for migration (skipping signature verification)");
113 let verifier = CarVerifier::new();
114 match verifier.verify_car_structure_only(did, &root, &blocks) {
115 Ok(verified) => {
116 debug!(
117 "CAR structure verification successful: rev={}, data_cid={}",
118 verified.rev, verified.data_cid
119 );
120 }
121 Err(crate::sync::verify::VerifyError::DidMismatch {
122 commit_did,
123 expected_did,
124 }) => {
125 return ApiError::InvalidRepo(format!(
126 "CAR file is for DID {} but you are authenticated as {}",
127 commit_did, expected_did
128 ))
129 .into_response();
130 }
131 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
132 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
133 .into_response();
134 }
135 Err(e) => {
136 error!("CAR structure verification error: {:?}", e);
137 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
138 .into_response();
139 }
140 }
141 } else {
142 debug!("Verifying CAR file signature and structure for DID {}", did);
143 let verifier = CarVerifier::new();
144 match verifier.verify_car(did, &root, &blocks).await {
145 Ok(verified) => {
146 debug!(
147 "CAR verification successful: rev={}, data_cid={}",
148 verified.rev, verified.data_cid
149 );
150 }
151 Err(crate::sync::verify::VerifyError::DidMismatch {
152 commit_did,
153 expected_did,
154 }) => {
155 return ApiError::InvalidRepo(format!(
156 "CAR file is for DID {} but you are authenticated as {}",
157 commit_did, expected_did
158 ))
159 .into_response();
160 }
161 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
162 return ApiError::InvalidRequest(
163 "CAR file commit signature verification failed".into(),
164 )
165 .into_response();
166 }
167 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
168 warn!("DID resolution failed during import verification: {}", msg);
169 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg))
170 .into_response();
171 }
172 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
173 return ApiError::InvalidRequest(
174 "DID document does not contain a signing key".into(),
175 )
176 .into_response();
177 }
178 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
179 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
180 .into_response();
181 }
182 Err(e) => {
183 error!("CAR verification error: {:?}", e);
184 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
185 .into_response();
186 }
187 }
188 }
189 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
190 .ok()
191 .and_then(|s| s.parse().ok())
192 .unwrap_or(DEFAULT_MAX_BLOCKS);
193 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
194 Ok(import_result) => {
195 info!(
196 "Successfully imported {} records for user {}",
197 import_result.records.len(),
198 did
199 );
200 let blob_refs: Vec<(String, String)> = import_result
201 .records
202 .iter()
203 .flat_map(|record| {
204 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
205 record
206 .blob_refs
207 .iter()
208 .map(move |blob_ref| (record_uri.clone(), blob_ref.cid.clone()))
209 })
210 .collect();
211
212 if !blob_refs.is_empty() {
213 let (record_uris, blob_cids): (Vec<String>, Vec<String>) =
214 blob_refs.into_iter().unzip();
215
216 match sqlx::query!(
217 r#"
218 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
219 SELECT $1, * FROM UNNEST($2::text[], $3::text[])
220 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
221 "#,
222 user_id,
223 &record_uris,
224 &blob_cids
225 )
226 .execute(&state.db)
227 .await
228 {
229 Ok(result) => {
230 info!(
231 "Recorded {} blob references for imported repo",
232 result.rows_affected()
233 );
234 }
235 Err(e) => {
236 warn!("Failed to insert record_blobs: {:?}", e);
237 }
238 }
239 }
240 let key_row = match sqlx::query!(
241 r#"SELECT uk.key_bytes, uk.encryption_version
242 FROM user_keys uk
243 JOIN users u ON uk.user_id = u.id
244 WHERE u.did = $1"#,
245 did
246 )
247 .fetch_optional(&state.db)
248 .await
249 {
250 Ok(Some(row)) => row,
251 Ok(None) => {
252 error!("No signing key found for user {}", did);
253 return ApiError::InternalError(Some("Signing key not found".into()))
254 .into_response();
255 }
256 Err(e) => {
257 error!("DB error fetching signing key: {:?}", e);
258 return ApiError::InternalError(None).into_response();
259 }
260 };
261 let key_bytes =
262 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
263 Ok(k) => k,
264 Err(e) => {
265 error!("Failed to decrypt signing key: {}", e);
266 return ApiError::InternalError(None).into_response();
267 }
268 };
269 let signing_key = match SigningKey::from_slice(&key_bytes) {
270 Ok(k) => k,
271 Err(e) => {
272 error!("Invalid signing key: {:?}", e);
273 return ApiError::InternalError(None).into_response();
274 }
275 };
276 let new_rev = Tid::now(LimitedU32::MIN);
277 let new_rev_str = new_rev.to_string();
278 let (commit_bytes, _sig) = match create_signed_commit(
279 did,
280 import_result.data_cid,
281 &new_rev_str,
282 None,
283 &signing_key,
284 ) {
285 Ok(result) => result,
286 Err(e) => {
287 error!("Failed to create new commit: {}", e);
288 return ApiError::InternalError(None).into_response();
289 }
290 };
291 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
292 Ok(cid) => cid,
293 Err(e) => {
294 error!("Failed to store new commit block: {:?}", e);
295 return ApiError::InternalError(None).into_response();
296 }
297 };
298 let new_root_str = new_root_cid.to_string();
299 if let Err(e) = sqlx::query!(
300 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
301 new_root_str,
302 &new_rev_str,
303 user_id
304 )
305 .execute(&state.db)
306 .await
307 {
308 error!("Failed to update repo root: {:?}", e);
309 return ApiError::InternalError(None).into_response();
310 }
311 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
312 all_block_cids.push(new_root_cid.to_bytes());
313 if let Err(e) = sqlx::query!(
314 r#"
315 INSERT INTO user_blocks (user_id, block_cid)
316 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
317 ON CONFLICT (user_id, block_cid) DO NOTHING
318 "#,
319 user_id,
320 &all_block_cids
321 )
322 .execute(&state.db)
323 .await
324 {
325 error!("Failed to insert user_blocks: {:?}", e);
326 return ApiError::InternalError(None).into_response();
327 }
328 info!(
329 "Created new commit for imported repo: cid={}, rev={}",
330 new_root_str, new_rev_str
331 );
332 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
333 {
334 warn!("Failed to sequence import event: {:?}", e);
335 }
336 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
337 let birthdate_pref = json!({
338 "$type": "app.bsky.actor.defs#personalDetailsPref",
339 "birthDate": "1998-05-06T00:00:00.000Z"
340 });
341 if let Err(e) = sqlx::query!(
342 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
343 ON CONFLICT (user_id, name) DO NOTHING",
344 user_id,
345 "app.bsky.actor.defs#personalDetailsPref",
346 birthdate_pref
347 )
348 .execute(&state.db)
349 .await
350 {
351 warn!(
352 "Failed to set default birthdate preference for migrated user: {:?}",
353 e
354 );
355 }
356 }
357 EmptyResponse::ok().into_response()
358 }
359 Err(ImportError::SizeLimitExceeded) => {
360 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks))
361 .into_response()
362 }
363 Err(ImportError::RepoNotFound) => {
364 ApiError::RepoNotFound(Some("Repository not initialized for this account".into()))
365 .into_response()
366 }
367 Err(ImportError::InvalidCbor(msg)) => {
368 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response()
369 }
370 Err(ImportError::InvalidCommit(msg)) => {
371 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response()
372 }
373 Err(ImportError::BlockNotFound(cid)) => {
374 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid))
375 .into_response()
376 }
377 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some(
378 "Repository is being modified by another operation, please retry".into(),
379 ))
380 .into_response(),
381 Err(ImportError::VerificationFailed(ve)) => {
382 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response()
383 }
384 Err(ImportError::DidMismatch { car_did, auth_did }) => ApiError::InvalidRequest(format!(
385 "CAR is for {} but authenticated as {}",
386 car_did, auth_did
387 ))
388 .into_response(),
389 Err(e) => {
390 error!("Import error: {:?}", e);
391 ApiError::InternalError(None).into_response()
392 }
393 }
394}
395
396async fn sequence_import_event(
397 state: &AppState,
398 did: &Did,
399 commit_cid: &str,
400) -> Result<(), sqlx::Error> {
401 let prev_cid: Option<String> = None;
402 let prev_data_cid: Option<String> = None;
403 let ops = serde_json::json!([]);
404 let blobs: Vec<String> = vec![];
405 let blocks_cids: Vec<String> = vec![];
406 let did_str = did.as_str();
407
408 let mut tx = state.db.begin().await?;
409
410 let seq_row = sqlx::query!(
411 r#"
412 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
413 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
414 RETURNING seq
415 "#,
416 did_str,
417 commit_cid,
418 prev_cid,
419 prev_data_cid,
420 ops,
421 &blobs,
422 &blocks_cids
423 )
424 .fetch_one(&mut *tx)
425 .await?;
426
427 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
428 .execute(&mut *tx)
429 .await?;
430
431 tx.commit().await?;
432 Ok(())
433}