this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use crate::sync::import::{ImportError, apply_import, parse_car};
4use crate::sync::verify::CarVerifier;
5use axum::{
6 Json,
7 body::Bytes,
8 extract::State,
9 http::StatusCode,
10 response::{IntoResponse, Response},
11};
12use serde_json::json;
13use tracing::{debug, error, info, warn};
14
15const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
16const DEFAULT_MAX_BLOCKS: usize = 50000;
17
18pub async fn import_repo(
19 State(state): State<AppState>,
20 headers: axum::http::HeaderMap,
21 body: Bytes,
22) -> Response {
23 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
24 .map(|v| v != "false" && v != "0")
25 .unwrap_or(true);
26 if !accepting_imports {
27 return (
28 StatusCode::BAD_REQUEST,
29 Json(json!({
30 "error": "InvalidRequest",
31 "message": "Service is not accepting repo imports"
32 })),
33 )
34 .into_response();
35 }
36 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
37 .ok()
38 .and_then(|s| s.parse().ok())
39 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
40 if body.len() > max_size {
41 return (
42 StatusCode::PAYLOAD_TOO_LARGE,
43 Json(json!({
44 "error": "InvalidRequest",
45 "message": format!("Import size exceeds limit of {} bytes", max_size)
46 })),
47 )
48 .into_response();
49 }
50 let token = match crate::auth::extract_bearer_token_from_header(
51 headers.get("Authorization").and_then(|h| h.to_str().ok()),
52 ) {
53 Some(t) => t,
54 None => return ApiError::AuthenticationRequired.into_response(),
55 };
56 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
57 Ok(user) => user,
58 Err(e) => return ApiError::from(e).into_response(),
59 };
60 let did = &auth_user.did;
61 let user = match sqlx::query!(
62 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1",
63 did
64 )
65 .fetch_optional(&state.db)
66 .await
67 {
68 Ok(Some(row)) => row,
69 Ok(None) => {
70 return (
71 StatusCode::NOT_FOUND,
72 Json(json!({"error": "AccountNotFound"})),
73 )
74 .into_response();
75 }
76 Err(e) => {
77 error!("DB error fetching user: {:?}", e);
78 return (
79 StatusCode::INTERNAL_SERVER_ERROR,
80 Json(json!({"error": "InternalError"})),
81 )
82 .into_response();
83 }
84 };
85 if user.deactivated_at.is_some() {
86 return (
87 StatusCode::FORBIDDEN,
88 Json(json!({
89 "error": "AccountDeactivated",
90 "message": "Account is deactivated"
91 })),
92 )
93 .into_response();
94 }
95 if user.takedown_ref.is_some() {
96 return (
97 StatusCode::FORBIDDEN,
98 Json(json!({
99 "error": "AccountTakenDown",
100 "message": "Account has been taken down"
101 })),
102 )
103 .into_response();
104 }
105 let user_id = user.id;
106 let (root, blocks) = match parse_car(&body).await {
107 Ok((r, b)) => (r, b),
108 Err(ImportError::InvalidRootCount) => {
109 return (
110 StatusCode::BAD_REQUEST,
111 Json(json!({
112 "error": "InvalidRequest",
113 "message": "Expected exactly one root in CAR file"
114 })),
115 )
116 .into_response();
117 }
118 Err(ImportError::CarParse(msg)) => {
119 return (
120 StatusCode::BAD_REQUEST,
121 Json(json!({
122 "error": "InvalidRequest",
123 "message": format!("Failed to parse CAR file: {}", msg)
124 })),
125 )
126 .into_response();
127 }
128 Err(e) => {
129 error!("CAR parsing error: {:?}", e);
130 return (
131 StatusCode::BAD_REQUEST,
132 Json(json!({
133 "error": "InvalidRequest",
134 "message": format!("Invalid CAR file: {}", e)
135 })),
136 )
137 .into_response();
138 }
139 };
140 info!(
141 "Importing repo for user {}: {} blocks, root {}",
142 did,
143 blocks.len(),
144 root
145 );
146 let root_block = match blocks.get(&root) {
147 Some(b) => b,
148 None => {
149 return (
150 StatusCode::BAD_REQUEST,
151 Json(json!({
152 "error": "InvalidRequest",
153 "message": "Root block not found in CAR file"
154 })),
155 )
156 .into_response();
157 }
158 };
159 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
160 Ok(commit) => commit.did().to_string(),
161 Err(e) => {
162 return (
163 StatusCode::BAD_REQUEST,
164 Json(json!({
165 "error": "InvalidRequest",
166 "message": format!("Invalid commit: {}", e)
167 })),
168 )
169 .into_response();
170 }
171 };
172 if commit_did != *did {
173 return (
174 StatusCode::FORBIDDEN,
175 Json(json!({
176 "error": "InvalidRequest",
177 "message": format!(
178 "CAR file is for DID {} but you are authenticated as {}",
179 commit_did, did
180 )
181 })),
182 )
183 .into_response();
184 }
185 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
186 .map(|v| v == "true" || v == "1")
187 .unwrap_or(false);
188 if !skip_verification {
189 debug!("Verifying CAR file signature and structure for DID {}", did);
190 let verifier = CarVerifier::new();
191 match verifier.verify_car(did, &root, &blocks).await {
192 Ok(verified) => {
193 debug!(
194 "CAR verification successful: rev={}, data_cid={}",
195 verified.rev, verified.data_cid
196 );
197 }
198 Err(crate::sync::verify::VerifyError::DidMismatch {
199 commit_did,
200 expected_did,
201 }) => {
202 return (
203 StatusCode::FORBIDDEN,
204 Json(json!({
205 "error": "InvalidRequest",
206 "message": format!(
207 "CAR file is for DID {} but you are authenticated as {}",
208 commit_did, expected_did
209 )
210 })),
211 )
212 .into_response();
213 }
214 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
215 return (
216 StatusCode::BAD_REQUEST,
217 Json(json!({
218 "error": "InvalidSignature",
219 "message": "CAR file commit signature verification failed"
220 })),
221 )
222 .into_response();
223 }
224 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
225 warn!("DID resolution failed during import verification: {}", msg);
226 return (
227 StatusCode::BAD_REQUEST,
228 Json(json!({
229 "error": "InvalidRequest",
230 "message": format!("Failed to verify DID: {}", msg)
231 })),
232 )
233 .into_response();
234 }
235 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
236 return (
237 StatusCode::BAD_REQUEST,
238 Json(json!({
239 "error": "InvalidRequest",
240 "message": "DID document does not contain a signing key"
241 })),
242 )
243 .into_response();
244 }
245 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
246 return (
247 StatusCode::BAD_REQUEST,
248 Json(json!({
249 "error": "InvalidRequest",
250 "message": format!("MST validation failed: {}", msg)
251 })),
252 )
253 .into_response();
254 }
255 Err(e) => {
256 error!("CAR verification error: {:?}", e);
257 return (
258 StatusCode::BAD_REQUEST,
259 Json(json!({
260 "error": "InvalidRequest",
261 "message": format!("CAR verification failed: {}", e)
262 })),
263 )
264 .into_response();
265 }
266 }
267 } else {
268 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)");
269 }
270 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
271 .ok()
272 .and_then(|s| s.parse().ok())
273 .unwrap_or(DEFAULT_MAX_BLOCKS);
274 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
275 Ok(records) => {
276 info!(
277 "Successfully imported {} records for user {}",
278 records.len(),
279 did
280 );
281 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
282 warn!("Failed to sequence import event: {:?}", e);
283 }
284 (StatusCode::OK, Json(json!({}))).into_response()
285 }
286 Err(ImportError::SizeLimitExceeded) => (
287 StatusCode::BAD_REQUEST,
288 Json(json!({
289 "error": "InvalidRequest",
290 "message": format!("Import exceeds block limit of {}", max_blocks)
291 })),
292 )
293 .into_response(),
294 Err(ImportError::RepoNotFound) => (
295 StatusCode::NOT_FOUND,
296 Json(json!({
297 "error": "RepoNotFound",
298 "message": "Repository not initialized for this account"
299 })),
300 )
301 .into_response(),
302 Err(ImportError::InvalidCbor(msg)) => (
303 StatusCode::BAD_REQUEST,
304 Json(json!({
305 "error": "InvalidRequest",
306 "message": format!("Invalid CBOR data: {}", msg)
307 })),
308 )
309 .into_response(),
310 Err(ImportError::InvalidCommit(msg)) => (
311 StatusCode::BAD_REQUEST,
312 Json(json!({
313 "error": "InvalidRequest",
314 "message": format!("Invalid commit structure: {}", msg)
315 })),
316 )
317 .into_response(),
318 Err(ImportError::BlockNotFound(cid)) => (
319 StatusCode::BAD_REQUEST,
320 Json(json!({
321 "error": "InvalidRequest",
322 "message": format!("Referenced block not found in CAR: {}", cid)
323 })),
324 )
325 .into_response(),
326 Err(ImportError::ConcurrentModification) => (
327 StatusCode::CONFLICT,
328 Json(json!({
329 "error": "ConcurrentModification",
330 "message": "Repository is being modified by another operation, please retry"
331 })),
332 )
333 .into_response(),
334 Err(ImportError::VerificationFailed(ve)) => (
335 StatusCode::BAD_REQUEST,
336 Json(json!({
337 "error": "VerificationFailed",
338 "message": format!("CAR verification failed: {}", ve)
339 })),
340 )
341 .into_response(),
342 Err(ImportError::DidMismatch { car_did, auth_did }) => (
343 StatusCode::FORBIDDEN,
344 Json(json!({
345 "error": "DidMismatch",
346 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
347 })),
348 )
349 .into_response(),
350 Err(e) => {
351 error!("Import error: {:?}", e);
352 (
353 StatusCode::INTERNAL_SERVER_ERROR,
354 Json(json!({"error": "InternalError"})),
355 )
356 .into_response()
357 }
358 }
359}
360
361async fn sequence_import_event(
362 state: &AppState,
363 did: &str,
364 commit_cid: &str,
365) -> Result<(), sqlx::Error> {
366 let prev_cid: Option<String> = None;
367 let prev_data_cid: Option<String> = None;
368 let ops = serde_json::json!([]);
369 let blobs: Vec<String> = vec![];
370 let blocks_cids: Vec<String> = vec![];
371 let seq_row = sqlx::query!(
372 r#"
373 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
374 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
375 RETURNING seq
376 "#,
377 did,
378 commit_cid,
379 prev_cid,
380 prev_data_cid,
381 ops,
382 &blobs,
383 &blocks_cids
384 )
385 .fetch_one(&state.db)
386 .await?;
387 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
388 .execute(&state.db)
389 .await?;
390 Ok(())
391}