this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use crate::sync::import::{ImportError, apply_import, parse_car};
4use crate::sync::verify::CarVerifier;
5use axum::{
6 Json,
7 body::Bytes,
8 extract::State,
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use serde_json::json;
13use tracing::{debug, error, info, warn};
14
15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
16const DEFAULT_MAX_BLOCKS: usize = 50000;
17
18pub async fn import_repo(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
24 .map(|v| v != "false" && v != "0")
25 .unwrap_or(true);
26 if !accepting_imports {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({
30 "error": "InvalidRequest",
31 "message": "Service is not accepting repo imports"
32 })),
33 )
34 .into_response();
35 }
36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
40 if body.len() > max_size {
41 return (
42 StatusCode::PAYLOAD_TOO_LARGE,
43 Json(json!({
44 "error": "InvalidRequest",
45 "message": format!("Import size exceeds limit of {} bytes", max_size)
46 })),
47 )
48 .into_response();
49 }
50 let token = match crate::auth::extract_bearer_token_from_header(
51 headers.get("Authorization").and_then(|h| h.to_str().ok()),
52 ) {
53 Some(t) => t,
54 None => return ApiError::AuthenticationRequired.into_response(),
55 };
56 let auth_user = match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
57 Ok(user) => user,
58 Err(e) => return ApiError::from(e).into_response(),
59 };
60 let did = &auth_user.did;
61 let user = match sqlx::query!(
62 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1",
63 did
64 )
65 .fetch_optional(&state.db)
66 .await
67 {
68 Ok(Some(row)) => row,
69 Ok(None) => {
70 return (
71 StatusCode::NOT_FOUND,
72 Json(json!({"error": "AccountNotFound"})),
73 )
74 .into_response();
75 }
76 Err(e) => {
77 error!("DB error fetching user: {:?}", e);
78 return (
79 StatusCode::INTERNAL_SERVER_ERROR,
80 Json(json!({"error": "InternalError"})),
81 )
82 .into_response();
83 }
84 };
85 if user.takedown_ref.is_some() {
86 return (
87 StatusCode::FORBIDDEN,
88 Json(json!({
89 "error": "AccountTakenDown",
90 "message": "Account has been taken down"
91 })),
92 )
93 .into_response();
94 }
95 let user_id = user.id;
96 let (root, blocks) = match parse_car(&body).await {
97 Ok((r, b)) => (r, b),
98 Err(ImportError::InvalidRootCount) => {
99 return (
100 StatusCode::BAD_REQUEST,
101 Json(json!({
102 "error": "InvalidRequest",
103 "message": "Expected exactly one root in CAR file"
104 })),
105 )
106 .into_response();
107 }
108 Err(ImportError::CarParse(msg)) => {
109 return (
110 StatusCode::BAD_REQUEST,
111 Json(json!({
112 "error": "InvalidRequest",
113 "message": format!("Failed to parse CAR file: {}", msg)
114 })),
115 )
116 .into_response();
117 }
118 Err(e) => {
119 error!("CAR parsing error: {:?}", e);
120 return (
121 StatusCode::BAD_REQUEST,
122 Json(json!({
123 "error": "InvalidRequest",
124 "message": format!("Invalid CAR file: {}", e)
125 })),
126 )
127 .into_response();
128 }
129 };
130 info!(
131 "Importing repo for user {}: {} blocks, root {}",
132 did,
133 blocks.len(),
134 root
135 );
136 let root_block = match blocks.get(&root) {
137 Some(b) => b,
138 None => {
139 return (
140 StatusCode::BAD_REQUEST,
141 Json(json!({
142 "error": "InvalidRequest",
143 "message": "Root block not found in CAR file"
144 })),
145 )
146 .into_response();
147 }
148 };
149 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
150 Ok(commit) => commit.did().to_string(),
151 Err(e) => {
152 return (
153 StatusCode::BAD_REQUEST,
154 Json(json!({
155 "error": "InvalidRequest",
156 "message": format!("Invalid commit: {}", e)
157 })),
158 )
159 .into_response();
160 }
161 };
162 if commit_did != *did {
163 return (
164 StatusCode::FORBIDDEN,
165 Json(json!({
166 "error": "InvalidRequest",
167 "message": format!(
168 "CAR file is for DID {} but you are authenticated as {}",
169 commit_did, did
170 )
171 })),
172 )
173 .into_response();
174 }
175 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
176 .map(|v| v == "true" || v == "1")
177 .unwrap_or(false);
178 let is_migration = user.deactivated_at.is_some();
179 if skip_verification {
180 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
181 } else if is_migration {
182 debug!("Verifying CAR file structure for migration (skipping signature verification)");
183 let verifier = CarVerifier::new();
184 match verifier.verify_car_structure_only(did, &root, &blocks) {
185 Ok(verified) => {
186 debug!(
187 "CAR structure verification successful: rev={}, data_cid={}",
188 verified.rev, verified.data_cid
189 );
190 }
191 Err(crate::sync::verify::VerifyError::DidMismatch {
192 commit_did,
193 expected_did,
194 }) => {
195 return (
196 StatusCode::FORBIDDEN,
197 Json(json!({
198 "error": "InvalidRequest",
199 "message": format!(
200 "CAR file is for DID {} but you are authenticated as {}",
201 commit_did, expected_did
202 )
203 })),
204 )
205 .into_response();
206 }
207 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
208 return (
209 StatusCode::BAD_REQUEST,
210 Json(json!({
211 "error": "InvalidRequest",
212 "message": format!("MST validation failed: {}", msg)
213 })),
214 )
215 .into_response();
216 }
217 Err(e) => {
218 error!("CAR structure verification error: {:?}", e);
219 return (
220 StatusCode::BAD_REQUEST,
221 Json(json!({
222 "error": "InvalidRequest",
223 "message": format!("CAR verification failed: {}", e)
224 })),
225 )
226 .into_response();
227 }
228 }
229 } else {
230 debug!("Verifying CAR file signature and structure for DID {}", did);
231 let verifier = CarVerifier::new();
232 match verifier.verify_car(did, &root, &blocks).await {
233 Ok(verified) => {
234 debug!(
235 "CAR verification successful: rev={}, data_cid={}",
236 verified.rev, verified.data_cid
237 );
238 }
239 Err(crate::sync::verify::VerifyError::DidMismatch {
240 commit_did,
241 expected_did,
242 }) => {
243 return (
244 StatusCode::FORBIDDEN,
245 Json(json!({
246 "error": "InvalidRequest",
247 "message": format!(
248 "CAR file is for DID {} but you are authenticated as {}",
249 commit_did, expected_did
250 )
251 })),
252 )
253 .into_response();
254 }
255 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
256 return (
257 StatusCode::BAD_REQUEST,
258 Json(json!({
259 "error": "InvalidSignature",
260 "message": "CAR file commit signature verification failed"
261 })),
262 )
263 .into_response();
264 }
265 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
266 warn!("DID resolution failed during import verification: {}", msg);
267 return (
268 StatusCode::BAD_REQUEST,
269 Json(json!({
270 "error": "InvalidRequest",
271 "message": format!("Failed to verify DID: {}", msg)
272 })),
273 )
274 .into_response();
275 }
276 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
277 return (
278 StatusCode::BAD_REQUEST,
279 Json(json!({
280 "error": "InvalidRequest",
281 "message": "DID document does not contain a signing key"
282 })),
283 )
284 .into_response();
285 }
286 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
287 return (
288 StatusCode::BAD_REQUEST,
289 Json(json!({
290 "error": "InvalidRequest",
291 "message": format!("MST validation failed: {}", msg)
292 })),
293 )
294 .into_response();
295 }
296 Err(e) => {
297 error!("CAR verification error: {:?}", e);
298 return (
299 StatusCode::BAD_REQUEST,
300 Json(json!({
301 "error": "InvalidRequest",
302 "message": format!("CAR verification failed: {}", e)
303 })),
304 )
305 .into_response();
306 }
307 }
308 }
309 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
310 .ok()
311 .and_then(|s| s.parse().ok())
312 .unwrap_or(DEFAULT_MAX_BLOCKS);
313 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
314 Ok(records) => {
315 info!(
316 "Successfully imported {} records for user {}",
317 records.len(),
318 did
319 );
320 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
321 warn!("Failed to sequence import event: {:?}", e);
322 }
323 (StatusCode::OK, Json(json!({}))).into_response()
324 }
325 Err(ImportError::SizeLimitExceeded) => (
326 StatusCode::BAD_REQUEST,
327 Json(json!({
328 "error": "InvalidRequest",
329 "message": format!("Import exceeds block limit of {}", max_blocks)
330 })),
331 )
332 .into_response(),
333 Err(ImportError::RepoNotFound) => (
334 StatusCode::NOT_FOUND,
335 Json(json!({
336 "error": "RepoNotFound",
337 "message": "Repository not initialized for this account"
338 })),
339 )
340 .into_response(),
341 Err(ImportError::InvalidCbor(msg)) => (
342 StatusCode::BAD_REQUEST,
343 Json(json!({
344 "error": "InvalidRequest",
345 "message": format!("Invalid CBOR data: {}", msg)
346 })),
347 )
348 .into_response(),
349 Err(ImportError::InvalidCommit(msg)) => (
350 StatusCode::BAD_REQUEST,
351 Json(json!({
352 "error": "InvalidRequest",
353 "message": format!("Invalid commit structure: {}", msg)
354 })),
355 )
356 .into_response(),
357 Err(ImportError::BlockNotFound(cid)) => (
358 StatusCode::BAD_REQUEST,
359 Json(json!({
360 "error": "InvalidRequest",
361 "message": format!("Referenced block not found in CAR: {}", cid)
362 })),
363 )
364 .into_response(),
365 Err(ImportError::ConcurrentModification) => (
366 StatusCode::CONFLICT,
367 Json(json!({
368 "error": "ConcurrentModification",
369 "message": "Repository is being modified by another operation, please retry"
370 })),
371 )
372 .into_response(),
373 Err(ImportError::VerificationFailed(ve)) => (
374 StatusCode::BAD_REQUEST,
375 Json(json!({
376 "error": "VerificationFailed",
377 "message": format!("CAR verification failed: {}", ve)
378 })),
379 )
380 .into_response(),
381 Err(ImportError::DidMismatch { car_did, auth_did }) => (
382 StatusCode::FORBIDDEN,
383 Json(json!({
384 "error": "DidMismatch",
385 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
386 })),
387 )
388 .into_response(),
389 Err(e) => {
390 error!("Import error: {:?}", e);
391 (
392 StatusCode::INTERNAL_SERVER_ERROR,
393 Json(json!({"error": "InternalError"})),
394 )
395 .into_response()
396 }
397 }
398}
399
400async fn sequence_import_event(
401 state: &AppState,
402 did: &str,
403 commit_cid: &str,
404) -> Result<(), sqlx::Error> {
405 let prev_cid: Option<String> = None;
406 let prev_data_cid: Option<String> = None;
407 let ops = serde_json::json!([]);
408 let blobs: Vec<String> = vec![];
409 let blocks_cids: Vec<String> = vec![];
410 let seq_row = sqlx::query!(
411 r#"
412 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
413 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
414 RETURNING seq
415 "#,
416 did,
417 commit_cid,
418 prev_cid,
419 prev_data_cid,
420 ops,
421 &blobs,
422 &blocks_cids
423 )
424 .fetch_one(&state.db)
425 .await?;
426 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
427 .execute(&state.db)
428 .await?;
429 Ok(())
430}