this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use crate::sync::import::{ImportError, apply_import, parse_car};
4use crate::sync::verify::CarVerifier;
5use axum::{
6 Json,
7 body::Bytes,
8 extract::State,
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use serde_json::json;
13use tracing::{debug, error, info, warn};
14
15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
16const DEFAULT_MAX_BLOCKS: usize = 50000;
17
18pub async fn import_repo(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
24 .map(|v| v != "false" && v != "0")
25 .unwrap_or(true);
26 if !accepting_imports {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({
30 "error": "InvalidRequest",
31 "message": "Service is not accepting repo imports"
32 })),
33 )
34 .into_response();
35 }
36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
40 if body.len() > max_size {
41 return (
42 StatusCode::PAYLOAD_TOO_LARGE,
43 Json(json!({
44 "error": "InvalidRequest",
45 "message": format!("Import size exceeds limit of {} bytes", max_size)
46 })),
47 )
48 .into_response();
49 }
50 let token = match crate::auth::extract_bearer_token_from_header(
51 headers.get("Authorization").and_then(|h| h.to_str().ok()),
52 ) {
53 Some(t) => t,
54 None => return ApiError::AuthenticationRequired.into_response(),
55 };
56 let auth_user =
57 match crate::auth::validate_bearer_token_allow_deactivated(&state.db, &token).await {
58 Ok(user) => user,
59 Err(e) => return ApiError::from(e).into_response(),
60 };
61 let did = &auth_user.did;
62 let user = match sqlx::query!(
63 "SELECT id, handle, deactivated_at, takedown_ref FROM users WHERE did = $1",
64 did
65 )
66 .fetch_optional(&state.db)
67 .await
68 {
69 Ok(Some(row)) => row,
70 Ok(None) => {
71 return (
72 StatusCode::NOT_FOUND,
73 Json(json!({"error": "AccountNotFound"})),
74 )
75 .into_response();
76 }
77 Err(e) => {
78 error!("DB error fetching user: {:?}", e);
79 return (
80 StatusCode::INTERNAL_SERVER_ERROR,
81 Json(json!({"error": "InternalError"})),
82 )
83 .into_response();
84 }
85 };
86 if user.takedown_ref.is_some() {
87 return (
88 StatusCode::FORBIDDEN,
89 Json(json!({
90 "error": "AccountTakenDown",
91 "message": "Account has been taken down"
92 })),
93 )
94 .into_response();
95 }
96 let user_id = user.id;
97 let (root, blocks) = match parse_car(&body).await {
98 Ok((r, b)) => (r, b),
99 Err(ImportError::InvalidRootCount) => {
100 return (
101 StatusCode::BAD_REQUEST,
102 Json(json!({
103 "error": "InvalidRequest",
104 "message": "Expected exactly one root in CAR file"
105 })),
106 )
107 .into_response();
108 }
109 Err(ImportError::CarParse(msg)) => {
110 return (
111 StatusCode::BAD_REQUEST,
112 Json(json!({
113 "error": "InvalidRequest",
114 "message": format!("Failed to parse CAR file: {}", msg)
115 })),
116 )
117 .into_response();
118 }
119 Err(e) => {
120 error!("CAR parsing error: {:?}", e);
121 return (
122 StatusCode::BAD_REQUEST,
123 Json(json!({
124 "error": "InvalidRequest",
125 "message": format!("Invalid CAR file: {}", e)
126 })),
127 )
128 .into_response();
129 }
130 };
131 info!(
132 "Importing repo for user {}: {} blocks, root {}",
133 did,
134 blocks.len(),
135 root
136 );
137 let root_block = match blocks.get(&root) {
138 Some(b) => b,
139 None => {
140 return (
141 StatusCode::BAD_REQUEST,
142 Json(json!({
143 "error": "InvalidRequest",
144 "message": "Root block not found in CAR file"
145 })),
146 )
147 .into_response();
148 }
149 };
150 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
151 Ok(commit) => commit.did().to_string(),
152 Err(e) => {
153 return (
154 StatusCode::BAD_REQUEST,
155 Json(json!({
156 "error": "InvalidRequest",
157 "message": format!("Invalid commit: {}", e)
158 })),
159 )
160 .into_response();
161 }
162 };
163 if commit_did != *did {
164 return (
165 StatusCode::FORBIDDEN,
166 Json(json!({
167 "error": "InvalidRequest",
168 "message": format!(
169 "CAR file is for DID {} but you are authenticated as {}",
170 commit_did, did
171 )
172 })),
173 )
174 .into_response();
175 }
176 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
177 .map(|v| v == "true" || v == "1")
178 .unwrap_or(false);
179 let is_migration = user.deactivated_at.is_some();
180 if skip_verification {
181 warn!("Skipping all CAR verification for import (SKIP_IMPORT_VERIFICATION=true)");
182 } else if is_migration {
183 debug!("Verifying CAR file structure for migration (skipping signature verification)");
184 let verifier = CarVerifier::new();
185 match verifier.verify_car_structure_only(did, &root, &blocks) {
186 Ok(verified) => {
187 debug!(
188 "CAR structure verification successful: rev={}, data_cid={}",
189 verified.rev, verified.data_cid
190 );
191 }
192 Err(crate::sync::verify::VerifyError::DidMismatch {
193 commit_did,
194 expected_did,
195 }) => {
196 return (
197 StatusCode::FORBIDDEN,
198 Json(json!({
199 "error": "InvalidRequest",
200 "message": format!(
201 "CAR file is for DID {} but you are authenticated as {}",
202 commit_did, expected_did
203 )
204 })),
205 )
206 .into_response();
207 }
208 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
209 return (
210 StatusCode::BAD_REQUEST,
211 Json(json!({
212 "error": "InvalidRequest",
213 "message": format!("MST validation failed: {}", msg)
214 })),
215 )
216 .into_response();
217 }
218 Err(e) => {
219 error!("CAR structure verification error: {:?}", e);
220 return (
221 StatusCode::BAD_REQUEST,
222 Json(json!({
223 "error": "InvalidRequest",
224 "message": format!("CAR verification failed: {}", e)
225 })),
226 )
227 .into_response();
228 }
229 }
230 } else {
231 debug!("Verifying CAR file signature and structure for DID {}", did);
232 let verifier = CarVerifier::new();
233 match verifier.verify_car(did, &root, &blocks).await {
234 Ok(verified) => {
235 debug!(
236 "CAR verification successful: rev={}, data_cid={}",
237 verified.rev, verified.data_cid
238 );
239 }
240 Err(crate::sync::verify::VerifyError::DidMismatch {
241 commit_did,
242 expected_did,
243 }) => {
244 return (
245 StatusCode::FORBIDDEN,
246 Json(json!({
247 "error": "InvalidRequest",
248 "message": format!(
249 "CAR file is for DID {} but you are authenticated as {}",
250 commit_did, expected_did
251 )
252 })),
253 )
254 .into_response();
255 }
256 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
257 return (
258 StatusCode::BAD_REQUEST,
259 Json(json!({
260 "error": "InvalidSignature",
261 "message": "CAR file commit signature verification failed"
262 })),
263 )
264 .into_response();
265 }
266 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
267 warn!("DID resolution failed during import verification: {}", msg);
268 return (
269 StatusCode::BAD_REQUEST,
270 Json(json!({
271 "error": "InvalidRequest",
272 "message": format!("Failed to verify DID: {}", msg)
273 })),
274 )
275 .into_response();
276 }
277 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
278 return (
279 StatusCode::BAD_REQUEST,
280 Json(json!({
281 "error": "InvalidRequest",
282 "message": "DID document does not contain a signing key"
283 })),
284 )
285 .into_response();
286 }
287 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
288 return (
289 StatusCode::BAD_REQUEST,
290 Json(json!({
291 "error": "InvalidRequest",
292 "message": format!("MST validation failed: {}", msg)
293 })),
294 )
295 .into_response();
296 }
297 Err(e) => {
298 error!("CAR verification error: {:?}", e);
299 return (
300 StatusCode::BAD_REQUEST,
301 Json(json!({
302 "error": "InvalidRequest",
303 "message": format!("CAR verification failed: {}", e)
304 })),
305 )
306 .into_response();
307 }
308 }
309 }
310 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
311 .ok()
312 .and_then(|s| s.parse().ok())
313 .unwrap_or(DEFAULT_MAX_BLOCKS);
314 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
315 Ok(records) => {
316 info!(
317 "Successfully imported {} records for user {}",
318 records.len(),
319 did
320 );
321 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
322 warn!("Failed to sequence import event: {:?}", e);
323 }
324 (StatusCode::OK, Json(json!({}))).into_response()
325 }
326 Err(ImportError::SizeLimitExceeded) => (
327 StatusCode::BAD_REQUEST,
328 Json(json!({
329 "error": "InvalidRequest",
330 "message": format!("Import exceeds block limit of {}", max_blocks)
331 })),
332 )
333 .into_response(),
334 Err(ImportError::RepoNotFound) => (
335 StatusCode::NOT_FOUND,
336 Json(json!({
337 "error": "RepoNotFound",
338 "message": "Repository not initialized for this account"
339 })),
340 )
341 .into_response(),
342 Err(ImportError::InvalidCbor(msg)) => (
343 StatusCode::BAD_REQUEST,
344 Json(json!({
345 "error": "InvalidRequest",
346 "message": format!("Invalid CBOR data: {}", msg)
347 })),
348 )
349 .into_response(),
350 Err(ImportError::InvalidCommit(msg)) => (
351 StatusCode::BAD_REQUEST,
352 Json(json!({
353 "error": "InvalidRequest",
354 "message": format!("Invalid commit structure: {}", msg)
355 })),
356 )
357 .into_response(),
358 Err(ImportError::BlockNotFound(cid)) => (
359 StatusCode::BAD_REQUEST,
360 Json(json!({
361 "error": "InvalidRequest",
362 "message": format!("Referenced block not found in CAR: {}", cid)
363 })),
364 )
365 .into_response(),
366 Err(ImportError::ConcurrentModification) => (
367 StatusCode::CONFLICT,
368 Json(json!({
369 "error": "ConcurrentModification",
370 "message": "Repository is being modified by another operation, please retry"
371 })),
372 )
373 .into_response(),
374 Err(ImportError::VerificationFailed(ve)) => (
375 StatusCode::BAD_REQUEST,
376 Json(json!({
377 "error": "VerificationFailed",
378 "message": format!("CAR verification failed: {}", ve)
379 })),
380 )
381 .into_response(),
382 Err(ImportError::DidMismatch { car_did, auth_did }) => (
383 StatusCode::FORBIDDEN,
384 Json(json!({
385 "error": "DidMismatch",
386 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
387 })),
388 )
389 .into_response(),
390 Err(e) => {
391 error!("Import error: {:?}", e);
392 (
393 StatusCode::INTERNAL_SERVER_ERROR,
394 Json(json!({"error": "InternalError"})),
395 )
396 .into_response()
397 }
398 }
399}
400
401async fn sequence_import_event(
402 state: &AppState,
403 did: &str,
404 commit_cid: &str,
405) -> Result<(), sqlx::Error> {
406 let prev_cid: Option<String> = None;
407 let prev_data_cid: Option<String> = None;
408 let ops = serde_json::json!([]);
409 let blobs: Vec<String> = vec![];
410 let blocks_cids: Vec<String> = vec![];
411 let seq_row = sqlx::query!(
412 r#"
413 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
414 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
415 RETURNING seq
416 "#,
417 did,
418 commit_cid,
419 prev_cid,
420 prev_data_cid,
421 ops,
422 &blobs,
423 &blocks_cids
424 )
425 .fetch_one(&state.db)
426 .await?;
427 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
428 .execute(&state.db)
429 .await?;
430 Ok(())
431}