this repo has no description
1use crate::state::AppState;
2use crate::sync::import::{apply_import, parse_car, ImportError};
3use crate::sync::verify::CarVerifier;
4use axum::{
5 body::Bytes,
6 extract::State,
7 http::StatusCode,
8 response::{IntoResponse, Response},
9 Json,
10};
11use serde_json::json;
12use tracing::{debug, error, info, warn};
13
14const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
15const DEFAULT_MAX_BLOCKS: usize = 50000;
16
17pub async fn import_repo(
18 State(state): State<AppState>,
19 headers: axum::http::HeaderMap,
20 body: Bytes,
21) -> Response {
22 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
23 .map(|v| v != "false" && v != "0")
24 .unwrap_or(true);
25
26 if !accepting_imports {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({
30 "error": "InvalidRequest",
31 "message": "Service is not accepting repo imports"
32 })),
33 )
34 .into_response();
35 }
36
37 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
38 .ok()
39 .and_then(|s| s.parse().ok())
40 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
41
42 if body.len() > max_size {
43 return (
44 StatusCode::PAYLOAD_TOO_LARGE,
45 Json(json!({
46 "error": "InvalidRequest",
47 "message": format!("Import size exceeds limit of {} bytes", max_size)
48 })),
49 )
50 .into_response();
51 }
52
53 let token = match crate::auth::extract_bearer_token_from_header(
54 headers.get("Authorization").and_then(|h| h.to_str().ok()),
55 ) {
56 Some(t) => t,
57 None => {
58 return (
59 StatusCode::UNAUTHORIZED,
60 Json(json!({"error": "AuthenticationRequired"})),
61 )
62 .into_response();
63 }
64 };
65
66 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
67 Ok(user) => user,
68 Err(e) => {
69 return (
70 StatusCode::UNAUTHORIZED,
71 Json(json!({"error": "AuthenticationFailed", "message": e})),
72 )
73 .into_response();
74 }
75 };
76
77 let did = &auth_user.did;
78
79 let user = match sqlx::query!(
80 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1",
81 did
82 )
83 .fetch_optional(&state.db)
84 .await
85 {
86 Ok(Some(row)) => row,
87 Ok(None) => {
88 return (
89 StatusCode::NOT_FOUND,
90 Json(json!({"error": "AccountNotFound"})),
91 )
92 .into_response();
93 }
94 Err(e) => {
95 error!("DB error fetching user: {:?}", e);
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError"})),
99 )
100 .into_response();
101 }
102 };
103
104 if user.deactivated_at.is_some() {
105 return (
106 StatusCode::FORBIDDEN,
107 Json(json!({
108 "error": "AccountDeactivated",
109 "message": "Account is deactivated"
110 })),
111 )
112 .into_response();
113 }
114
115 if user.takedown_ref.is_some() {
116 return (
117 StatusCode::FORBIDDEN,
118 Json(json!({
119 "error": "AccountTakenDown",
120 "message": "Account has been taken down"
121 })),
122 )
123 .into_response();
124 }
125
126 let user_id = user.id;
127
128 let (root, blocks) = match parse_car(&body).await {
129 Ok((r, b)) => (r, b),
130 Err(ImportError::InvalidRootCount) => {
131 return (
132 StatusCode::BAD_REQUEST,
133 Json(json!({
134 "error": "InvalidRequest",
135 "message": "Expected exactly one root in CAR file"
136 })),
137 )
138 .into_response();
139 }
140 Err(ImportError::CarParse(msg)) => {
141 return (
142 StatusCode::BAD_REQUEST,
143 Json(json!({
144 "error": "InvalidRequest",
145 "message": format!("Failed to parse CAR file: {}", msg)
146 })),
147 )
148 .into_response();
149 }
150 Err(e) => {
151 error!("CAR parsing error: {:?}", e);
152 return (
153 StatusCode::BAD_REQUEST,
154 Json(json!({
155 "error": "InvalidRequest",
156 "message": format!("Invalid CAR file: {}", e)
157 })),
158 )
159 .into_response();
160 }
161 };
162
163 info!(
164 "Importing repo for user {}: {} blocks, root {}",
165 did,
166 blocks.len(),
167 root
168 );
169
170 let root_block = match blocks.get(&root) {
171 Some(b) => b,
172 None => {
173 return (
174 StatusCode::BAD_REQUEST,
175 Json(json!({
176 "error": "InvalidRequest",
177 "message": "Root block not found in CAR file"
178 })),
179 )
180 .into_response();
181 }
182 };
183
184 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
185 Ok(commit) => commit.did().to_string(),
186 Err(e) => {
187 return (
188 StatusCode::BAD_REQUEST,
189 Json(json!({
190 "error": "InvalidRequest",
191 "message": format!("Invalid commit: {}", e)
192 })),
193 )
194 .into_response();
195 }
196 };
197
198 if commit_did != *did {
199 return (
200 StatusCode::FORBIDDEN,
201 Json(json!({
202 "error": "InvalidRequest",
203 "message": format!(
204 "CAR file is for DID {} but you are authenticated as {}",
205 commit_did, did
206 )
207 })),
208 )
209 .into_response();
210 }
211
212 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
213 .map(|v| v == "true" || v == "1")
214 .unwrap_or(false);
215
216 if !skip_verification {
217 debug!("Verifying CAR file signature and structure for DID {}", did);
218 let verifier = CarVerifier::new();
219
220 match verifier.verify_car(did, &root, &blocks).await {
221 Ok(verified) => {
222 debug!(
223 "CAR verification successful: rev={}, data_cid={}",
224 verified.rev, verified.data_cid
225 );
226 }
227 Err(crate::sync::verify::VerifyError::DidMismatch {
228 commit_did,
229 expected_did,
230 }) => {
231 return (
232 StatusCode::FORBIDDEN,
233 Json(json!({
234 "error": "InvalidRequest",
235 "message": format!(
236 "CAR file is for DID {} but you are authenticated as {}",
237 commit_did, expected_did
238 )
239 })),
240 )
241 .into_response();
242 }
243 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
244 return (
245 StatusCode::BAD_REQUEST,
246 Json(json!({
247 "error": "InvalidSignature",
248 "message": "CAR file commit signature verification failed"
249 })),
250 )
251 .into_response();
252 }
253 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
254 warn!("DID resolution failed during import verification: {}", msg);
255 return (
256 StatusCode::BAD_REQUEST,
257 Json(json!({
258 "error": "InvalidRequest",
259 "message": format!("Failed to verify DID: {}", msg)
260 })),
261 )
262 .into_response();
263 }
264 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
265 return (
266 StatusCode::BAD_REQUEST,
267 Json(json!({
268 "error": "InvalidRequest",
269 "message": "DID document does not contain a signing key"
270 })),
271 )
272 .into_response();
273 }
274 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
275 return (
276 StatusCode::BAD_REQUEST,
277 Json(json!({
278 "error": "InvalidRequest",
279 "message": format!("MST validation failed: {}", msg)
280 })),
281 )
282 .into_response();
283 }
284 Err(e) => {
285 error!("CAR verification error: {:?}", e);
286 return (
287 StatusCode::BAD_REQUEST,
288 Json(json!({
289 "error": "InvalidRequest",
290 "message": format!("CAR verification failed: {}", e)
291 })),
292 )
293 .into_response();
294 }
295 }
296 } else {
297 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)");
298 }
299
300 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
301 .ok()
302 .and_then(|s| s.parse().ok())
303 .unwrap_or(DEFAULT_MAX_BLOCKS);
304
305 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
306 Ok(records) => {
307 info!(
308 "Successfully imported {} records for user {}",
309 records.len(),
310 did
311 );
312
313 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
314 warn!("Failed to sequence import event: {:?}", e);
315 }
316
317 (StatusCode::OK, Json(json!({}))).into_response()
318 }
319 Err(ImportError::SizeLimitExceeded) => (
320 StatusCode::BAD_REQUEST,
321 Json(json!({
322 "error": "InvalidRequest",
323 "message": format!("Import exceeds block limit of {}", max_blocks)
324 })),
325 )
326 .into_response(),
327 Err(ImportError::RepoNotFound) => (
328 StatusCode::NOT_FOUND,
329 Json(json!({
330 "error": "RepoNotFound",
331 "message": "Repository not initialized for this account"
332 })),
333 )
334 .into_response(),
335 Err(ImportError::InvalidCbor(msg)) => (
336 StatusCode::BAD_REQUEST,
337 Json(json!({
338 "error": "InvalidRequest",
339 "message": format!("Invalid CBOR data: {}", msg)
340 })),
341 )
342 .into_response(),
343 Err(ImportError::InvalidCommit(msg)) => (
344 StatusCode::BAD_REQUEST,
345 Json(json!({
346 "error": "InvalidRequest",
347 "message": format!("Invalid commit structure: {}", msg)
348 })),
349 )
350 .into_response(),
351 Err(ImportError::BlockNotFound(cid)) => (
352 StatusCode::BAD_REQUEST,
353 Json(json!({
354 "error": "InvalidRequest",
355 "message": format!("Referenced block not found in CAR: {}", cid)
356 })),
357 )
358 .into_response(),
359 Err(ImportError::ConcurrentModification) => (
360 StatusCode::CONFLICT,
361 Json(json!({
362 "error": "ConcurrentModification",
363 "message": "Repository is being modified by another operation, please retry"
364 })),
365 )
366 .into_response(),
367 Err(ImportError::VerificationFailed(ve)) => (
368 StatusCode::BAD_REQUEST,
369 Json(json!({
370 "error": "VerificationFailed",
371 "message": format!("CAR verification failed: {}", ve)
372 })),
373 )
374 .into_response(),
375 Err(ImportError::DidMismatch { car_did, auth_did }) => (
376 StatusCode::FORBIDDEN,
377 Json(json!({
378 "error": "DidMismatch",
379 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
380 })),
381 )
382 .into_response(),
383 Err(e) => {
384 error!("Import error: {:?}", e);
385 (
386 StatusCode::INTERNAL_SERVER_ERROR,
387 Json(json!({"error": "InternalError"})),
388 )
389 .into_response()
390 }
391 }
392}
393
394async fn sequence_import_event(
395 state: &AppState,
396 did: &str,
397 commit_cid: &str,
398) -> Result<(), sqlx::Error> {
399 let prev_cid: Option<String> = None;
400 let ops = serde_json::json!([]);
401 let blobs: Vec<String> = vec![];
402 let blocks_cids: Vec<String> = vec![];
403
404 sqlx::query!(
405 r#"
406 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
407 VALUES ($1, 'commit', $2, $3, $4, $5, $6)
408 "#,
409 did,
410 commit_cid,
411 prev_cid,
412 ops,
413 &blobs,
414 &blocks_cids
415 )
416 .execute(&state.db)
417 .await?;
418
419 Ok(())
420}