this repo has no description
1use crate::api::EmptyResponse;
2use crate::api::error::ApiError;
3use crate::api::repo::record::create_signed_commit;
4use crate::auth::BearerAuthAllowDeactivated;
5use crate::state::AppState;
6use crate::sync::import::{ImportError, apply_import, parse_car};
7use crate::sync::verify::CarVerifier;
8use axum::{
9 body::Bytes,
10 extract::State,
11 response::{IntoResponse, Response},
12};
13use jacquard::types::{integer::LimitedU32, string::Tid};
14use jacquard_repo::storage::BlockStore;
15use k256::ecdsa::SigningKey;
16use serde_json::json;
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_MAX_IMPORT_SIZE: usize = 1024 * 1024 * 1024;
20const DEFAULT_MAX_BLOCKS: usize = 500000;
21
22pub async fn import_repo(
23 State(state): State<AppState>,
24 auth: BearerAuthAllowDeactivated,
25 body: Bytes,
26) -> Response {
27 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
28 .map(|v| v != "false" && v != "0")
29 .unwrap_or(true);
30 if !accepting_imports {
31 return ApiError::InvalidRequest("Service is not accepting repo imports".into())
32 .into_response();
33 }
34 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
38 if body.len() > max_size {
39 return ApiError::PayloadTooLarge(format!(
40 "Import size exceeds limit of {} bytes",
41 max_size
42 ))
43 .into_response();
44 }
45 let auth_user = auth.0;
46 let did = &auth_user.did;
47 let user = match sqlx::query!(
48 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
49 did
50 )
51 .fetch_optional(&state.db)
52 .await
53 {
54 Ok(Some(row)) => row,
55 Ok(None) => {
56 return ApiError::AccountNotFound.into_response();
57 }
58 Err(e) => {
59 error!("DB error fetching user: {:?}", e);
60 return ApiError::InternalError(None).into_response();
61 }
62 };
63 if user.takedown_ref.is_some() {
64 return ApiError::AccountTakedown.into_response();
65 }
66 let user_id = user.id;
67 let (root, blocks) = match parse_car(&body).await {
68 Ok((r, b)) => (r, b),
69 Err(ImportError::InvalidRootCount) => {
70 return ApiError::InvalidRequest("Expected exactly one root in CAR file".into())
71 .into_response();
72 }
73 Err(ImportError::CarParse(msg)) => {
74 return ApiError::InvalidRequest(format!("Failed to parse CAR file: {}", msg))
75 .into_response();
76 }
77 Err(e) => {
78 error!("CAR parsing error: {:?}", e);
79 return ApiError::InvalidRequest(format!("Invalid CAR file: {}", e)).into_response();
80 }
81 };
82 info!(
83 "Importing repo for user {}: {} blocks, root {}",
84 did,
85 blocks.len(),
86 root
87 );
88 let Some(root_block) = blocks.get(&root) else {
89 return ApiError::InvalidRequest("Root block not found in CAR file".into()).into_response();
90 };
91 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
92 Ok(commit) => commit.did().to_string(),
93 Err(e) => {
94 return ApiError::InvalidRequest(format!("Invalid commit: {}", e)).into_response();
95 }
96 };
97 if commit_did != *did {
98 return ApiError::InvalidRepo(format!(
99 "CAR file is for DID {} but you are authenticated as {}",
100 commit_did, did
101 ))
102 .into_response();
103 }
104 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
105 .map(|v| v == "true" || v == "1")
106 .unwrap_or(false);
107 let is_migration = user.deactivated_at.is_some();
108 if skip_verification {
109 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
110 } else if is_migration {
111 debug!("Verifying CAR file structure for migration (skipping signature verification)");
112 let verifier = CarVerifier::new();
113 match verifier.verify_car_structure_only(did, &root, &blocks) {
114 Ok(verified) => {
115 debug!(
116 "CAR structure verification successful: rev={}, data_cid={}",
117 verified.rev, verified.data_cid
118 );
119 }
120 Err(crate::sync::verify::VerifyError::DidMismatch {
121 commit_did,
122 expected_did,
123 }) => {
124 return ApiError::InvalidRepo(format!(
125 "CAR file is for DID {} but you are authenticated as {}",
126 commit_did, expected_did
127 ))
128 .into_response();
129 }
130 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
131 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
132 .into_response();
133 }
134 Err(e) => {
135 error!("CAR structure verification error: {:?}", e);
136 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
137 .into_response();
138 }
139 }
140 } else {
141 debug!("Verifying CAR file signature and structure for DID {}", did);
142 let verifier = CarVerifier::new();
143 match verifier.verify_car(did, &root, &blocks).await {
144 Ok(verified) => {
145 debug!(
146 "CAR verification successful: rev={}, data_cid={}",
147 verified.rev, verified.data_cid
148 );
149 }
150 Err(crate::sync::verify::VerifyError::DidMismatch {
151 commit_did,
152 expected_did,
153 }) => {
154 return ApiError::InvalidRepo(format!(
155 "CAR file is for DID {} but you are authenticated as {}",
156 commit_did, expected_did
157 ))
158 .into_response();
159 }
160 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
161 return ApiError::InvalidRequest(
162 "CAR file commit signature verification failed".into(),
163 )
164 .into_response();
165 }
166 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
167 warn!("DID resolution failed during import verification: {}", msg);
168 return ApiError::InvalidRequest(format!("Failed to verify DID: {}", msg))
169 .into_response();
170 }
171 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
172 return ApiError::InvalidRequest(
173 "DID document does not contain a signing key".into(),
174 )
175 .into_response();
176 }
177 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
178 return ApiError::InvalidRequest(format!("MST validation failed: {}", msg))
179 .into_response();
180 }
181 Err(e) => {
182 error!("CAR verification error: {:?}", e);
183 return ApiError::InvalidRequest(format!("CAR verification failed: {}", e))
184 .into_response();
185 }
186 }
187 }
188 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
189 .ok()
190 .and_then(|s| s.parse().ok())
191 .unwrap_or(DEFAULT_MAX_BLOCKS);
192 match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
193 Ok(import_result) => {
194 info!(
195 "Successfully imported {} records for user {}",
196 import_result.records.len(),
197 did
198 );
199 let mut blob_ref_count = 0;
200 for record in &import_result.records {
201 for blob_ref in &record.blob_refs {
202 let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
203 if let Err(e) = sqlx::query!(
204 r#"
205 INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
206 VALUES ($1, $2, $3)
207 ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
208 "#,
209 user_id,
210 record_uri,
211 blob_ref.cid
212 )
213 .execute(&state.db)
214 .await
215 {
216 warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
217 } else {
218 blob_ref_count += 1;
219 }
220 }
221 }
222 if blob_ref_count > 0 {
223 info!(
224 "Recorded {} blob references for imported repo",
225 blob_ref_count
226 );
227 }
228 let key_row = match sqlx::query!(
229 r#"SELECT uk.key_bytes, uk.encryption_version
230 FROM user_keys uk
231 JOIN users u ON uk.user_id = u.id
232 WHERE u.did = $1"#,
233 did
234 )
235 .fetch_optional(&state.db)
236 .await
237 {
238 Ok(Some(row)) => row,
239 Ok(None) => {
240 error!("No signing key found for user {}", did);
241 return ApiError::InternalError(Some("Signing key not found".into()))
242 .into_response();
243 }
244 Err(e) => {
245 error!("DB error fetching signing key: {:?}", e);
246 return ApiError::InternalError(None).into_response();
247 }
248 };
249 let key_bytes =
250 match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) {
251 Ok(k) => k,
252 Err(e) => {
253 error!("Failed to decrypt signing key: {}", e);
254 return ApiError::InternalError(None).into_response();
255 }
256 };
257 let signing_key = match SigningKey::from_slice(&key_bytes) {
258 Ok(k) => k,
259 Err(e) => {
260 error!("Invalid signing key: {:?}", e);
261 return ApiError::InternalError(None).into_response();
262 }
263 };
264 let new_rev = Tid::now(LimitedU32::MIN);
265 let new_rev_str = new_rev.to_string();
266 let (commit_bytes, _sig) = match create_signed_commit(
267 did,
268 import_result.data_cid,
269 &new_rev_str,
270 None,
271 &signing_key,
272 ) {
273 Ok(result) => result,
274 Err(e) => {
275 error!("Failed to create new commit: {}", e);
276 return ApiError::InternalError(None).into_response();
277 }
278 };
279 let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await {
280 Ok(cid) => cid,
281 Err(e) => {
282 error!("Failed to store new commit block: {:?}", e);
283 return ApiError::InternalError(None).into_response();
284 }
285 };
286 let new_root_str = new_root_cid.to_string();
287 if let Err(e) = sqlx::query!(
288 "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
289 new_root_str,
290 &new_rev_str,
291 user_id
292 )
293 .execute(&state.db)
294 .await
295 {
296 error!("Failed to update repo root: {:?}", e);
297 return ApiError::InternalError(None).into_response();
298 }
299 let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
300 all_block_cids.push(new_root_cid.to_bytes());
301 if let Err(e) = sqlx::query!(
302 r#"
303 INSERT INTO user_blocks (user_id, block_cid)
304 SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
305 ON CONFLICT (user_id, block_cid) DO NOTHING
306 "#,
307 user_id,
308 &all_block_cids
309 )
310 .execute(&state.db)
311 .await
312 {
313 error!("Failed to insert user_blocks: {:?}", e);
314 return ApiError::InternalError(None).into_response();
315 }
316 info!(
317 "Created new commit for imported repo: cid={}, rev={}",
318 new_root_str, new_rev_str
319 );
320 if !is_migration && let Err(e) = sequence_import_event(&state, did, &new_root_str).await
321 {
322 warn!("Failed to sequence import event: {:?}", e);
323 }
324 if std::env::var("PDS_AGE_ASSURANCE_OVERRIDE").is_ok() {
325 let birthdate_pref = json!({
326 "$type": "app.bsky.actor.defs#personalDetailsPref",
327 "birthDate": "1998-05-06T00:00:00.000Z"
328 });
329 if let Err(e) = sqlx::query!(
330 "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)
331 ON CONFLICT (user_id, name) DO NOTHING",
332 user_id,
333 "app.bsky.actor.defs#personalDetailsPref",
334 birthdate_pref
335 )
336 .execute(&state.db)
337 .await
338 {
339 warn!(
340 "Failed to set default birthdate preference for migrated user: {:?}",
341 e
342 );
343 }
344 }
345 EmptyResponse::ok().into_response()
346 }
347 Err(ImportError::SizeLimitExceeded) => {
348 ApiError::PayloadTooLarge(format!("Import exceeds block limit of {}", max_blocks))
349 .into_response()
350 }
351 Err(ImportError::RepoNotFound) => {
352 ApiError::RepoNotFound(Some("Repository not initialized for this account".into()))
353 .into_response()
354 }
355 Err(ImportError::InvalidCbor(msg)) => {
356 ApiError::InvalidRequest(format!("Invalid CBOR data: {}", msg)).into_response()
357 }
358 Err(ImportError::InvalidCommit(msg)) => {
359 ApiError::InvalidRequest(format!("Invalid commit structure: {}", msg)).into_response()
360 }
361 Err(ImportError::BlockNotFound(cid)) => {
362 ApiError::InvalidRequest(format!("Referenced block not found in CAR: {}", cid))
363 .into_response()
364 }
365 Err(ImportError::ConcurrentModification) => ApiError::InvalidSwap(Some(
366 "Repository is being modified by another operation, please retry".into(),
367 ))
368 .into_response(),
369 Err(ImportError::VerificationFailed(ve)) => {
370 ApiError::InvalidRequest(format!("CAR verification failed: {}", ve)).into_response()
371 }
372 Err(ImportError::DidMismatch { car_did, auth_did }) => ApiError::InvalidRequest(format!(
373 "CAR is for {} but authenticated as {}",
374 car_did, auth_did
375 ))
376 .into_response(),
377 Err(e) => {
378 error!("Import error: {:?}", e);
379 ApiError::InternalError(None).into_response()
380 }
381 }
382}
383
384async fn sequence_import_event(
385 state: &AppState,
386 did: &str,
387 commit_cid: &str,
388) -> Result<(), sqlx::Error> {
389 let prev_cid: Option<String> = None;
390 let prev_data_cid: Option<String> = None;
391 let ops = serde_json::json!([]);
392 let blobs: Vec<String> = vec![];
393 let blocks_cids: Vec<String> = vec![];
394 let seq_row = sqlx::query!(
395 r#"
396 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
397 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
398 RETURNING seq
399 "#,
400 did,
401 commit_cid,
402 prev_cid,
403 prev_data_cid,
404 ops,
405 &blobs,
406 &blocks_cids
407 )
408 .fetch_one(&state.db)
409 .await?;
410 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
411 .execute(&state.db)
412 .await?;
413 Ok(())
414}