this repo has no description
1use crate::api::ApiError;
2use crate::state::AppState;
3use crate::sync::import::{apply_import, parse_car, ImportError};
4use crate::sync::verify::CarVerifier;
5use axum::{
6 body::Bytes,
7 extract::State,
8 http::StatusCode,
9 response::{IntoResponse, Response},
10 Json,
11};
12use serde_json::json;
13use tracing::{debug, error, info, warn};
14const DEFAULT_MAX_IMPORT_SIZE: usize = 100 * 1024 * 1024;
15const DEFAULT_MAX_BLOCKS: usize = 50000;
16pub async fn import_repo(
17 State(state): State<AppState>,
18 headers: axum::http::HeaderMap,
19 body: Bytes,
20) -> Response {
21 let accepting_imports = std::env::var("ACCEPTING_REPO_IMPORTS")
22 .map(|v| v != "false" && v != "0")
23 .unwrap_or(true);
24 if !accepting_imports {
25 return (
26 StatusCode::BAD_REQUEST,
27 Json(json!({
28 "error": "InvalidRequest",
29 "message": "Service is not accepting repo imports"
30 })),
31 )
32 .into_response();
33 }
34 let max_size: usize = std::env::var("MAX_IMPORT_SIZE")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(DEFAULT_MAX_IMPORT_SIZE);
38 if body.len() > max_size {
39 return (
40 StatusCode::PAYLOAD_TOO_LARGE,
41 Json(json!({
42 "error": "InvalidRequest",
43 "message": format!("Import size exceeds limit of {} bytes", max_size)
44 })),
45 )
46 .into_response();
47 }
48 let token = match crate::auth::extract_bearer_token_from_header(
49 headers.get("Authorization").and_then(|h| h.to_str().ok()),
50 ) {
51 Some(t) => t,
52 None => return ApiError::AuthenticationRequired.into_response(),
53 };
54 let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
55 Ok(user) => user,
56 Err(e) => return ApiError::from(e).into_response(),
57 };
58 let did = &auth_user.did;
59 let user = match sqlx::query!(
60 "SELECT id, deactivated_at, takedown_ref FROM users WHERE did = $1",
61 did
62 )
63 .fetch_optional(&state.db)
64 .await
65 {
66 Ok(Some(row)) => row,
67 Ok(None) => {
68 return (
69 StatusCode::NOT_FOUND,
70 Json(json!({"error": "AccountNotFound"})),
71 )
72 .into_response();
73 }
74 Err(e) => {
75 error!("DB error fetching user: {:?}", e);
76 return (
77 StatusCode::INTERNAL_SERVER_ERROR,
78 Json(json!({"error": "InternalError"})),
79 )
80 .into_response();
81 }
82 };
83 if user.deactivated_at.is_some() {
84 return (
85 StatusCode::FORBIDDEN,
86 Json(json!({
87 "error": "AccountDeactivated",
88 "message": "Account is deactivated"
89 })),
90 )
91 .into_response();
92 }
93 if user.takedown_ref.is_some() {
94 return (
95 StatusCode::FORBIDDEN,
96 Json(json!({
97 "error": "AccountTakenDown",
98 "message": "Account has been taken down"
99 })),
100 )
101 .into_response();
102 }
103 let user_id = user.id;
104 let (root, blocks) = match parse_car(&body).await {
105 Ok((r, b)) => (r, b),
106 Err(ImportError::InvalidRootCount) => {
107 return (
108 StatusCode::BAD_REQUEST,
109 Json(json!({
110 "error": "InvalidRequest",
111 "message": "Expected exactly one root in CAR file"
112 })),
113 )
114 .into_response();
115 }
116 Err(ImportError::CarParse(msg)) => {
117 return (
118 StatusCode::BAD_REQUEST,
119 Json(json!({
120 "error": "InvalidRequest",
121 "message": format!("Failed to parse CAR file: {}", msg)
122 })),
123 )
124 .into_response();
125 }
126 Err(e) => {
127 error!("CAR parsing error: {:?}", e);
128 return (
129 StatusCode::BAD_REQUEST,
130 Json(json!({
131 "error": "InvalidRequest",
132 "message": format!("Invalid CAR file: {}", e)
133 })),
134 )
135 .into_response();
136 }
137 };
138 info!(
139 "Importing repo for user {}: {} blocks, root {}",
140 did,
141 blocks.len(),
142 root
143 );
144 let root_block = match blocks.get(&root) {
145 Some(b) => b,
146 None => {
147 return (
148 StatusCode::BAD_REQUEST,
149 Json(json!({
150 "error": "InvalidRequest",
151 "message": "Root block not found in CAR file"
152 })),
153 )
154 .into_response();
155 }
156 };
157 let commit_did = match jacquard_repo::commit::Commit::from_cbor(root_block) {
158 Ok(commit) => commit.did().to_string(),
159 Err(e) => {
160 return (
161 StatusCode::BAD_REQUEST,
162 Json(json!({
163 "error": "InvalidRequest",
164 "message": format!("Invalid commit: {}", e)
165 })),
166 )
167 .into_response();
168 }
169 };
170 if commit_did != *did {
171 return (
172 StatusCode::FORBIDDEN,
173 Json(json!({
174 "error": "InvalidRequest",
175 "message": format!(
176 "CAR file is for DID {} but you are authenticated as {}",
177 commit_did, did
178 )
179 })),
180 )
181 .into_response();
182 }
183 let skip_verification = std::env::var("SKIP_IMPORT_VERIFICATION")
184 .map(|v| v == "true" || v == "1")
185 .unwrap_or(false);
186 if !skip_verification {
187 debug!("Verifying CAR file signature and structure for DID {}", did);
188 let verifier = CarVerifier::new();
189 match verifier.verify_car(did, &root, &blocks).await {
190 Ok(verified) => {
191 debug!(
192 "CAR verification successful: rev={}, data_cid={}",
193 verified.rev, verified.data_cid
194 );
195 }
196 Err(crate::sync::verify::VerifyError::DidMismatch {
197 commit_did,
198 expected_did,
199 }) => {
200 return (
201 StatusCode::FORBIDDEN,
202 Json(json!({
203 "error": "InvalidRequest",
204 "message": format!(
205 "CAR file is for DID {} but you are authenticated as {}",
206 commit_did, expected_did
207 )
208 })),
209 )
210 .into_response();
211 }
212 Err(crate::sync::verify::VerifyError::InvalidSignature) => {
213 return (
214 StatusCode::BAD_REQUEST,
215 Json(json!({
216 "error": "InvalidSignature",
217 "message": "CAR file commit signature verification failed"
218 })),
219 )
220 .into_response();
221 }
222 Err(crate::sync::verify::VerifyError::DidResolutionFailed(msg)) => {
223 warn!("DID resolution failed during import verification: {}", msg);
224 return (
225 StatusCode::BAD_REQUEST,
226 Json(json!({
227 "error": "InvalidRequest",
228 "message": format!("Failed to verify DID: {}", msg)
229 })),
230 )
231 .into_response();
232 }
233 Err(crate::sync::verify::VerifyError::NoSigningKey) => {
234 return (
235 StatusCode::BAD_REQUEST,
236 Json(json!({
237 "error": "InvalidRequest",
238 "message": "DID document does not contain a signing key"
239 })),
240 )
241 .into_response();
242 }
243 Err(crate::sync::verify::VerifyError::MstValidationFailed(msg)) => {
244 return (
245 StatusCode::BAD_REQUEST,
246 Json(json!({
247 "error": "InvalidRequest",
248 "message": format!("MST validation failed: {}", msg)
249 })),
250 )
251 .into_response();
252 }
253 Err(e) => {
254 error!("CAR verification error: {:?}", e);
255 return (
256 StatusCode::BAD_REQUEST,
257 Json(json!({
258 "error": "InvalidRequest",
259 "message": format!("CAR verification failed: {}", e)
260 })),
261 )
262 .into_response();
263 }
264 }
265 } else {
266 warn!("Skipping CAR signature verification for import (SKIP_IMPORT_VERIFICATION=true)");
267 }
268 let max_blocks: usize = std::env::var("MAX_IMPORT_BLOCKS")
269 .ok()
270 .and_then(|s| s.parse().ok())
271 .unwrap_or(DEFAULT_MAX_BLOCKS);
272 match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
273 Ok(records) => {
274 info!(
275 "Successfully imported {} records for user {}",
276 records.len(),
277 did
278 );
279 if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await {
280 warn!("Failed to sequence import event: {:?}", e);
281 }
282 (StatusCode::OK, Json(json!({}))).into_response()
283 }
284 Err(ImportError::SizeLimitExceeded) => (
285 StatusCode::BAD_REQUEST,
286 Json(json!({
287 "error": "InvalidRequest",
288 "message": format!("Import exceeds block limit of {}", max_blocks)
289 })),
290 )
291 .into_response(),
292 Err(ImportError::RepoNotFound) => (
293 StatusCode::NOT_FOUND,
294 Json(json!({
295 "error": "RepoNotFound",
296 "message": "Repository not initialized for this account"
297 })),
298 )
299 .into_response(),
300 Err(ImportError::InvalidCbor(msg)) => (
301 StatusCode::BAD_REQUEST,
302 Json(json!({
303 "error": "InvalidRequest",
304 "message": format!("Invalid CBOR data: {}", msg)
305 })),
306 )
307 .into_response(),
308 Err(ImportError::InvalidCommit(msg)) => (
309 StatusCode::BAD_REQUEST,
310 Json(json!({
311 "error": "InvalidRequest",
312 "message": format!("Invalid commit structure: {}", msg)
313 })),
314 )
315 .into_response(),
316 Err(ImportError::BlockNotFound(cid)) => (
317 StatusCode::BAD_REQUEST,
318 Json(json!({
319 "error": "InvalidRequest",
320 "message": format!("Referenced block not found in CAR: {}", cid)
321 })),
322 )
323 .into_response(),
324 Err(ImportError::ConcurrentModification) => (
325 StatusCode::CONFLICT,
326 Json(json!({
327 "error": "ConcurrentModification",
328 "message": "Repository is being modified by another operation, please retry"
329 })),
330 )
331 .into_response(),
332 Err(ImportError::VerificationFailed(ve)) => (
333 StatusCode::BAD_REQUEST,
334 Json(json!({
335 "error": "VerificationFailed",
336 "message": format!("CAR verification failed: {}", ve)
337 })),
338 )
339 .into_response(),
340 Err(ImportError::DidMismatch { car_did, auth_did }) => (
341 StatusCode::FORBIDDEN,
342 Json(json!({
343 "error": "DidMismatch",
344 "message": format!("CAR is for {} but authenticated as {}", car_did, auth_did)
345 })),
346 )
347 .into_response(),
348 Err(e) => {
349 error!("Import error: {:?}", e);
350 (
351 StatusCode::INTERNAL_SERVER_ERROR,
352 Json(json!({"error": "InternalError"})),
353 )
354 .into_response()
355 }
356 }
357}
358async fn sequence_import_event(
359 state: &AppState,
360 did: &str,
361 commit_cid: &str,
362) -> Result<(), sqlx::Error> {
363 let prev_cid: Option<String> = None;
364 let prev_data_cid: Option<String> = None;
365 let ops = serde_json::json!([]);
366 let blobs: Vec<String> = vec![];
367 let blocks_cids: Vec<String> = vec![];
368 let seq_row = sqlx::query!(
369 r#"
370 INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids)
371 VALUES ($1, 'commit', $2, $3, $4, $5, $6, $7)
372 RETURNING seq
373 "#,
374 did,
375 commit_cid,
376 prev_cid,
377 prev_data_cid,
378 ops,
379 &blobs,
380 &blocks_cids
381 )
382 .fetch_one(&state.db)
383 .await?;
384 sqlx::query(&format!("NOTIFY repo_updates, '{}'", seq_row.seq))
385 .execute(&state.db)
386 .await?;
387 Ok(())
388}