this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use crate::sync::import::{apply_import, parse_car, ImportError};
4use crate::sync::verify::CarVerifier;
5use axum::{
6 body::Bytes,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10 Json,
11};
12use serde_json::json;
13use tracing::{debug, error, info, warn};
14
15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
16const DEFAULT_MAX_BLOCKS: usize = 50000;
17
18pub async fn import_repo(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
24 .map(|v| v != "false" && v != "0")
25 .unwrap_or(true);
26
27 if !accepting_imports {
28 return (
29 StatusCode::BAD_REQUEST,
30 Json(json!({
31 "error": "InvalidRequest",
32 "message": "Service is not accepting repo imports"
33 })),
34 )
35 .into_response();
36 }
37
38 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
39 .ok()
40 .and_then(|s| s.parse().ok())
41 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
42
43 if body.len() > max_size {
44 return (
45 StatusCode::PAYLOAD_TOO_LARGE,
46 Json(json!({
47 "error": "InvalidRequest",
48 "message": format!("Import size exceeds limit of {} bytes", max_size)
49 })),
50 )
51 .into_response();
52 }
53
54 let token = match crate::auth::extract_bearer_token_from_header(
55 headers.get("Authorization").and_then(|h| h.to_str().ok()),
56 ) {
57 Some(t) => t,
58 None => return ApiError::AuthenticationRequired.into_response(),
59 };
60
61 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
62 Ok(user) => user,
63 Err(e) => return ApiError::from(e).into_response(),
64 };
65
66 let did = &auth_user.did;
67
68 let user = match sqlx::query!(
69 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1",
70 did
71 )
72 .fetch_optional(&state.db)
73 .await
74 {
75 Ok(Some(row)) => row,
76 Ok(None) => {
77 return (
78 StatusCode::NOT_FOUND,
79 Json(json!({"error": "AccountNotFound"})),
80 )
81 .into_response();
82 }
83 Err(e) => {
84 error!("DB error fetching user: {:?}", e);
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError"})),
88 )
89 .into_response();
90 }
91 };
92
93 if user.deactivated_at.is_some() {
94 return (
95 StatusCode::FORBIDDEN,
96 Json(json!({
97 "error": "AccountDeactivated",
98 "message": "Account is deactivated"
99 })),
100 )
101 .into_response();
102 }
103
104 if user.takedown_ref.is_some() {
105 return (
106 StatusCode::FORBIDDEN,
107 Json(json!({
108 "error": "AccountTakenDown",
109 "message": "Account has been taken down"
110 })),
111 )
112 .into_response();
113 }
114
115 let user_id = user.id;
116
117 let (root, blocks) = match parse_car(&body).await {
118 Ok((r, b)) => (r, b),
119 Err(ImportError::InvalidRootCount) => {
120 return (
121 StatusCode::BAD_REQUEST,
122 Json(json!({
123 "error": "InvalidRequest",
124 "message": "Expected exactly one root in CAR file"
125 })),
126 )
127 .into_response();
128 }
129 Err(ImportError::CarParse(msg)) => {
130 return (
131 StatusCode::BAD_REQUEST,
132 Json(json!({
133 "error": "InvalidRequest",
134 "message": format!("Failed to parse CAR file: {}", msg)
135 })),
136 )
137 .into_response();
138 }
139 Err(e) => {
140 error!("CAR parsing error: {:?}", e);
141 return (
142 StatusCode::BAD_REQUEST,
143 Json(json!({
144 "error": "InvalidRequest",
145 "message": format!("Invalid CAR file: {}", e)
146 })),
147 )
148 .into_response();
149 }
150 };
151
152 info!(
153 "Importing repo for user {}: {} blocks, root {}",
154 did,
155 blocks.len(),
156 root
157 );
158
159 let root_block = match blocks.get(&root) {
160 Some(b) => b,
161 None => {
162 return (
163 StatusCode::BAD_REQUEST,
164 Json(json!({
165 "error": "InvalidRequest",
166 "message": "Root block not found in CAR file"
167 })),
168 )
169 .into_response();
170 }
171 };
172
173 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
174 Ok(commit) => commit.did().to_string(),
175 Err(e) => {
176 return (
177 StatusCode::BAD_REQUEST,
178 Json(json!({
179 "error": "InvalidRequest",
180 "message": format!("Invalid commit: {}", e)
181 })),
182 )
183 .into_response();
184 }
185 };
186
187 if commit_did != *did {
188 return (
189 StatusCode::FORBIDDEN,
190 Json(json!({
191 "error": "InvalidRequest",
192 "message": format!(
193 "CAR file is for DID {} but you are authenticated as {}",
194 commit_did, did
195 )
196 })),
197 )
198 .into_response();
199 }
200
201 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
202 .map(|v| v == "true" || v == "1")
203 .unwrap_or(false);
204
205 if !skip_verification {
206 debug!("Verifying CAR file signature and structure for DID {}", did);
207 let verifier = CarVerifier::new();
208
209 match verifier.verify_car(did, &root, &blocks).await {
210 Ok(verified) => {
211 debug!(
212 "CAR verification successful: rev={}, data_cid={}",
213 verified.rev, verified.data_cid
214 );
215 }
216 Err(crate::sync::verify::VerifyError::DidMismatch {
217 commit_did,
218 expected_did,
219 }) => {
220 return (
221 StatusCode::FORBIDDEN,
222 Json(json!({
223 "error": "InvalidRequest",
224 "message": format!(
225 "CAR file is for DID {} but you are authenticated as {}",
226 commit_did, expected_did
227 )
228 })),
229 )
230 .into_response();
231 }
232 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
233 return (
234 StatusCode::BAD_REQUEST,
235 Json(json!({
236 "error": "InvalidSignature",
237 "message": "CAR file commit signature verification failed"
238 })),
239 )
240 .into_response();
241 }
242 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
243 warn!("DID resolution failed during import verification: {}", msg);
244 return (
245 StatusCode::BAD_REQUEST,
246 Json(json!({
247 "error": "InvalidRequest",
248 "message": format!("Failed to verify DID: {}", msg)
249 })),
250 )
251 .into_response();
252 }
253 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
254 return (
255 StatusCode::BAD_REQUEST,
256 Json(json!({
257 "error": "InvalidRequest",
258 "message": "DID document does not contain a signing key"
259 })),
260 )
261 .into_response();
262 }
263 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
264 return (
265 StatusCode::BAD_REQUEST,
266 Json(json!({
267 "error": "InvalidRequest",
268 "message": format!("MST validation failed: {}", msg)
269 })),
270 )
271 .into_response();
272 }
273 Err(e) => {
274 error!("CAR verification error: {:?}", e);
275 return (
276 StatusCode::BAD_REQUEST,
277 Json(json!({
278 "error": "InvalidRequest",
279 "message": format!("CAR verification failed: {}", e)
280 })),
281 )
282 .into_response();
283 }
284 }
285 } else {
286 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)");
287 }
288
289 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
290 .ok()
291 .and_then(|s| s.parse().ok())
292 .unwrap_or(DEFAULT_MAX_BLOCKS);
293
294 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
295 Ok(records) => {
296 info!(
297 "Successfully imported {} records for user {}",
298 records.len(),
299 did
300 );
301
302 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
303 warn!("Failed to sequence import event: {:?}", e);
304 }
305
306 (StatusCode::OK, Json(json!({}))).into_response()
307 }
308 Err(ImportError::SizeLimitExceeded) => (
309 StatusCode::BAD_REQUEST,
310 Json(json!({
311 "error": "InvalidRequest",
312 "message": format!("Import exceeds block limit of {}", max_blocks)
313 })),
314 )
315 .into_response(),
316 Err(ImportError::RepoNotFound) => (
317 StatusCode::NOT_FOUND,
318 Json(json!({
319 "error": "RepoNotFound",
320 "message": "Repository not initialized for this account"
321 })),
322 )
323 .into_response(),
324 Err(ImportError::InvalidCbor(msg)) => (
325 StatusCode::BAD_REQUEST,
326 Json(json!({
327 "error": "InvalidRequest",
328 "message": format!("Invalid CBOR data: {}", msg)
329 })),
330 )
331 .into_response(),
332 Err(ImportError::InvalidCommit(msg)) => (
333 StatusCode::BAD_REQUEST,
334 Json(json!({
335 "error": "InvalidRequest",
336 "message": format!("Invalid commit structure: {}", msg)
337 })),
338 )
339 .into_response(),
340 Err(ImportError::BlockNotFound(cid)) => (
341 StatusCode::BAD_REQUEST,
342 Json(json!({
343 "error": "InvalidRequest",
344 "message": format!("Referenced block not found in CAR: {}", cid)
345 })),
346 )
347 .into_response(),
348 Err(ImportError::ConcurrentModification) => (
349 StatusCode::CONFLICT,
350 Json(json!({
351 "error": "ConcurrentModification",
352 "message": "Repository is being modified by another operation, please retry"
353 })),
354 )
355 .into_response(),
356 Err(ImportError::VerificationFailed(ve)) => (
357 StatusCode::BAD_REQUEST,
358 Json(json!({
359 "error": "VerificationFailed",
360 "message": format!("CAR verification failed: {}", ve)
361 })),
362 )
363 .into_response(),
364 Err(ImportError::DidMismatch { car_did, auth_did }) => (
365 StatusCode::FORBIDDEN,
366 Json(json!({
367 "error": "DidMismatch",
368 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
369 })),
370 )
371 .into_response(),
372 Err(e) => {
373 error!("Import error: {:?}", e);
374 (
375 StatusCode::INTERNAL_SERVER_ERROR,
376 Json(json!({"error": "InternalError"})),
377 )
378 .into_response()
379 }
380 }
381}
382
383async fn sequence_import_event(
384 state: &AppState,
385 did: &str,
386 commit_cid: &str,
387) -> Result<(), sqlx::Error> {
388 let prev_cid: Option<String> = None;
389 let ops = serde_json::json!([]);
390 let blobs: Vec<String> = vec![];
391 let blocks_cids: Vec<String> = vec![];
392
393 sqlx::query!(
394 r#"
395 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
396 VALUES ($1, 'commit', $2, $3, $4, $5, $6)
397 "#,
398 did,
399 commit_cid,
400 prev_cid,
401 ops,
402 &blobs,
403 &blocks_cids
404 )
405 .execute(&state.db)
406 .await?;
407
408 Ok(())
409}