this repo has no description
1use crate::state::AppState;
2use crate::sync::car::{encode_car_header, ld_write};
3use axum::{
4 Json,
5 body::Body,
6 extract::{Query, State},
7 http::StatusCode,
8 http::header,
9 response::{IntoResponse, Response},
10};
11use bytes::Bytes;
12use cid::Cid;
13use jacquard_repo::{commit::Commit, storage::BlockStore};
14use serde::Deserialize;
15use serde_json::json;
16use std::collections::HashSet;
17use tracing::error;
18
19#[derive(Deserialize)]
20pub struct GetBlocksParams {
21 pub did: String,
22 pub cids: String,
23}
24
25pub async fn get_blocks(
26 State(state): State<AppState>,
27 Query(params): Query<GetBlocksParams>,
28) -> Response {
29 let did = params.did.trim();
30
31 if did.is_empty() {
32 return (
33 StatusCode::BAD_REQUEST,
34 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
35 )
36 .into_response();
37 }
38
39 let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
40
41 if cid_strings.is_empty() {
42 return (
43 StatusCode::BAD_REQUEST,
44 Json(json!({"error": "InvalidRequest", "message": "cids is required"})),
45 )
46 .into_response();
47 }
48
49 let repo_result = sqlx::query!(
50 r#"
51 SELECT r.repo_root_cid
52 FROM repos r
53 JOIN users u ON r.user_id = u.id
54 WHERE u.did = $1
55 "#,
56 did
57 )
58 .fetch_optional(&state.db)
59 .await;
60
61 let repo_root_cid_str = match repo_result {
62 Ok(Some(row)) => row.repo_root_cid,
63 Ok(None) => {
64 return (
65 StatusCode::NOT_FOUND,
66 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
67 )
68 .into_response();
69 }
70 Err(e) => {
71 error!("DB error in get_blocks: {:?}", e);
72 return (
73 StatusCode::INTERNAL_SERVER_ERROR,
74 Json(json!({"error": "InternalError"})),
75 )
76 .into_response();
77 }
78 };
79
80 let root_cid = match repo_root_cid_str.parse::<Cid>() {
81 Ok(c) => c,
82 Err(e) => {
83 error!("Failed to parse root CID: {:?}", e);
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError"})),
87 )
88 .into_response();
89 }
90 };
91
92 let mut requested_cids: Vec<Cid> = Vec::new();
93 for cid_str in &cid_strings {
94 match cid_str.parse::<Cid>() {
95 Ok(c) => requested_cids.push(c),
96 Err(e) => {
97 error!("Failed to parse CID '{}': {:?}", cid_str, e);
98 return (
99 StatusCode::BAD_REQUEST,
100 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})),
101 )
102 .into_response();
103 }
104 }
105 }
106
107 let mut buf = Vec::new();
108 let car_header = encode_car_header(&root_cid);
109 if let Err(e) = ld_write(&mut buf, &car_header) {
110 error!("Failed to write CAR header: {:?}", e);
111 return (
112 StatusCode::INTERNAL_SERVER_ERROR,
113 Json(json!({"error": "InternalError"})),
114 )
115 .into_response();
116 }
117
118 for cid in &requested_cids {
119 let cid_bytes = cid.to_bytes();
120 let block_result = sqlx::query!(
121 "SELECT data FROM blocks WHERE cid = $1",
122 &cid_bytes
123 )
124 .fetch_optional(&state.db)
125 .await;
126
127 match block_result {
128 Ok(Some(row)) => {
129 let mut block_data = Vec::new();
130 block_data.extend_from_slice(&cid_bytes);
131 block_data.extend_from_slice(&row.data);
132 if let Err(e) = ld_write(&mut buf, &block_data) {
133 error!("Failed to write block: {:?}", e);
134 return (
135 StatusCode::INTERNAL_SERVER_ERROR,
136 Json(json!({"error": "InternalError"})),
137 )
138 .into_response();
139 }
140 }
141 Ok(None) => {
142 return (
143 StatusCode::NOT_FOUND,
144 Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})),
145 )
146 .into_response();
147 }
148 Err(e) => {
149 error!("DB error fetching block: {:?}", e);
150 return (
151 StatusCode::INTERNAL_SERVER_ERROR,
152 Json(json!({"error": "InternalError"})),
153 )
154 .into_response();
155 }
156 }
157 }
158
159 Response::builder()
160 .status(StatusCode::OK)
161 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
162 .body(Body::from(buf))
163 .unwrap()
164}
165
166#[derive(Deserialize)]
167pub struct GetRepoParams {
168 pub did: String,
169 pub since: Option<String>,
170}
171
172pub async fn get_repo(
173 State(state): State<AppState>,
174 Query(params): Query<GetRepoParams>,
175) -> Response {
176 let did = params.did.trim();
177
178 if did.is_empty() {
179 return (
180 StatusCode::BAD_REQUEST,
181 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
182 )
183 .into_response();
184 }
185
186 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
187 .fetch_optional(&state.db)
188 .await;
189
190 let user_id = match user_result {
191 Ok(Some(row)) => row.id,
192 Ok(None) => {
193 return (
194 StatusCode::NOT_FOUND,
195 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
196 )
197 .into_response();
198 }
199 Err(e) => {
200 error!("DB error in get_repo: {:?}", e);
201 return (
202 StatusCode::INTERNAL_SERVER_ERROR,
203 Json(json!({"error": "InternalError"})),
204 )
205 .into_response();
206 }
207 };
208
209 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
210 .fetch_optional(&state.db)
211 .await;
212
213 let repo_root_cid_str = match repo_result {
214 Ok(Some(row)) => row.repo_root_cid,
215 Ok(None) => {
216 return (
217 StatusCode::NOT_FOUND,
218 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
219 )
220 .into_response();
221 }
222 Err(e) => {
223 error!("DB error in get_repo: {:?}", e);
224 return (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 Json(json!({"error": "InternalError"})),
227 )
228 .into_response();
229 }
230 };
231
232 let root_cid = match repo_root_cid_str.parse::<Cid>() {
233 Ok(c) => c,
234 Err(e) => {
235 error!("Failed to parse root CID: {:?}", e);
236 return (
237 StatusCode::INTERNAL_SERVER_ERROR,
238 Json(json!({"error": "InternalError"})),
239 )
240 .into_response();
241 }
242 };
243
244 let commit_bytes = match state.block_store.get(&root_cid).await {
245 Ok(Some(b)) => b,
246 Ok(None) => {
247 error!("Commit block not found: {}", root_cid);
248 return (
249 StatusCode::INTERNAL_SERVER_ERROR,
250 Json(json!({"error": "InternalError"})),
251 )
252 .into_response();
253 }
254 Err(e) => {
255 error!("Failed to load commit block: {:?}", e);
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError"})),
259 )
260 .into_response();
261 }
262 };
263
264 let commit = match Commit::from_cbor(&commit_bytes) {
265 Ok(c) => c,
266 Err(e) => {
267 error!("Failed to parse commit: {:?}", e);
268 return (
269 StatusCode::INTERNAL_SERVER_ERROR,
270 Json(json!({"error": "InternalError"})),
271 )
272 .into_response();
273 }
274 };
275
276 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
277 let mut visited: HashSet<Vec<u8>> = HashSet::new();
278
279 collected_blocks.push((root_cid, commit_bytes.clone()));
280 visited.insert(root_cid.to_bytes());
281
282 let mst_root_cid = commit.data;
283 if !visited.contains(&mst_root_cid.to_bytes()) {
284 visited.insert(mst_root_cid.to_bytes());
285 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
286 collected_blocks.push((mst_root_cid, data));
287 }
288 }
289
290 let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id)
291 .fetch_all(&state.db)
292 .await
293 .unwrap_or_default();
294
295 for record in records {
296 if let Ok(cid) = record.record_cid.parse::<Cid>() {
297 if !visited.contains(&cid.to_bytes()) {
298 visited.insert(cid.to_bytes());
299 if let Ok(Some(data)) = state.block_store.get(&cid).await {
300 collected_blocks.push((cid, data));
301 }
302 }
303 }
304 }
305
306 let mut buf = Vec::new();
307 let car_header = encode_car_header(&root_cid);
308 if let Err(e) = ld_write(&mut buf, &car_header) {
309 error!("Failed to write CAR header: {:?}", e);
310 return (
311 StatusCode::INTERNAL_SERVER_ERROR,
312 Json(json!({"error": "InternalError"})),
313 )
314 .into_response();
315 }
316
317 for (cid, data) in &collected_blocks {
318 let mut block_data = Vec::new();
319 block_data.extend_from_slice(&cid.to_bytes());
320 block_data.extend_from_slice(data);
321 if let Err(e) = ld_write(&mut buf, &block_data) {
322 error!("Failed to write block: {:?}", e);
323 return (
324 StatusCode::INTERNAL_SERVER_ERROR,
325 Json(json!({"error": "InternalError"})),
326 )
327 .into_response();
328 }
329 }
330
331 Response::builder()
332 .status(StatusCode::OK)
333 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
334 .body(Body::from(buf))
335 .unwrap()
336}
337
338#[derive(Deserialize)]
339pub struct GetRecordParams {
340 pub did: String,
341 pub collection: String,
342 pub rkey: String,
343}
344
345pub async fn get_record(
346 State(state): State<AppState>,
347 Query(params): Query<GetRecordParams>,
348) -> Response {
349 let did = params.did.trim();
350 let collection = params.collection.trim();
351 let rkey = params.rkey.trim();
352
353 if did.is_empty() {
354 return (
355 StatusCode::BAD_REQUEST,
356 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
357 )
358 .into_response();
359 }
360
361 if collection.is_empty() {
362 return (
363 StatusCode::BAD_REQUEST,
364 Json(json!({"error": "InvalidRequest", "message": "collection is required"})),
365 )
366 .into_response();
367 }
368
369 if rkey.is_empty() {
370 return (
371 StatusCode::BAD_REQUEST,
372 Json(json!({"error": "InvalidRequest", "message": "rkey is required"})),
373 )
374 .into_response();
375 }
376
377 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
378 .fetch_optional(&state.db)
379 .await;
380
381 let user_id = match user_result {
382 Ok(Some(row)) => row.id,
383 Ok(None) => {
384 return (
385 StatusCode::NOT_FOUND,
386 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
387 )
388 .into_response();
389 }
390 Err(e) => {
391 error!("DB error in sync get_record: {:?}", e);
392 return (
393 StatusCode::INTERNAL_SERVER_ERROR,
394 Json(json!({"error": "InternalError"})),
395 )
396 .into_response();
397 }
398 };
399
400 let record_result = sqlx::query!(
401 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
402 user_id,
403 collection,
404 rkey
405 )
406 .fetch_optional(&state.db)
407 .await;
408
409 let record_cid_str = match record_result {
410 Ok(Some(row)) => row.record_cid,
411 Ok(None) => {
412 return (
413 StatusCode::NOT_FOUND,
414 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
415 )
416 .into_response();
417 }
418 Err(e) => {
419 error!("DB error in sync get_record: {:?}", e);
420 return (
421 StatusCode::INTERNAL_SERVER_ERROR,
422 Json(json!({"error": "InternalError"})),
423 )
424 .into_response();
425 }
426 };
427
428 let record_cid = match record_cid_str.parse::<Cid>() {
429 Ok(c) => c,
430 Err(e) => {
431 error!("Failed to parse record CID: {:?}", e);
432 return (
433 StatusCode::INTERNAL_SERVER_ERROR,
434 Json(json!({"error": "InternalError"})),
435 )
436 .into_response();
437 }
438 };
439
440 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
441 .fetch_optional(&state.db)
442 .await;
443
444 let repo_root_cid_str = match repo_result {
445 Ok(Some(row)) => row.repo_root_cid,
446 Ok(None) => {
447 return (
448 StatusCode::NOT_FOUND,
449 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
450 )
451 .into_response();
452 }
453 Err(e) => {
454 error!("DB error in sync get_record: {:?}", e);
455 return (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(json!({"error": "InternalError"})),
458 )
459 .into_response();
460 }
461 };
462
463 let root_cid = match repo_root_cid_str.parse::<Cid>() {
464 Ok(c) => c,
465 Err(e) => {
466 error!("Failed to parse root CID: {:?}", e);
467 return (
468 StatusCode::INTERNAL_SERVER_ERROR,
469 Json(json!({"error": "InternalError"})),
470 )
471 .into_response();
472 }
473 };
474
475 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
476
477 let commit_bytes = match state.block_store.get(&root_cid).await {
478 Ok(Some(b)) => b,
479 Ok(None) => {
480 error!("Commit block not found: {}", root_cid);
481 return (
482 StatusCode::INTERNAL_SERVER_ERROR,
483 Json(json!({"error": "InternalError"})),
484 )
485 .into_response();
486 }
487 Err(e) => {
488 error!("Failed to load commit block: {:?}", e);
489 return (
490 StatusCode::INTERNAL_SERVER_ERROR,
491 Json(json!({"error": "InternalError"})),
492 )
493 .into_response();
494 }
495 };
496
497 collected_blocks.push((root_cid, commit_bytes.clone()));
498
499 let commit = match Commit::from_cbor(&commit_bytes) {
500 Ok(c) => c,
501 Err(e) => {
502 error!("Failed to parse commit: {:?}", e);
503 return (
504 StatusCode::INTERNAL_SERVER_ERROR,
505 Json(json!({"error": "InternalError"})),
506 )
507 .into_response();
508 }
509 };
510
511 let mst_root_cid = commit.data;
512 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
513 collected_blocks.push((mst_root_cid, data));
514 }
515
516 if let Ok(Some(data)) = state.block_store.get(&record_cid).await {
517 collected_blocks.push((record_cid, data));
518 } else {
519 return (
520 StatusCode::NOT_FOUND,
521 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
522 )
523 .into_response();
524 }
525
526 let mut buf = Vec::new();
527 let car_header = encode_car_header(&root_cid);
528 if let Err(e) = ld_write(&mut buf, &car_header) {
529 error!("Failed to write CAR header: {:?}", e);
530 return (
531 StatusCode::INTERNAL_SERVER_ERROR,
532 Json(json!({"error": "InternalError"})),
533 )
534 .into_response();
535 }
536
537 for (cid, data) in &collected_blocks {
538 let mut block_data = Vec::new();
539 block_data.extend_from_slice(&cid.to_bytes());
540 block_data.extend_from_slice(data);
541 if let Err(e) = ld_write(&mut buf, &block_data) {
542 error!("Failed to write block: {:?}", e);
543 return (
544 StatusCode::INTERNAL_SERVER_ERROR,
545 Json(json!({"error": "InternalError"})),
546 )
547 .into_response();
548 }
549 }
550
551 Response::builder()
552 .status(StatusCode::OK)
553 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
554 .body(Body::from(buf))
555 .unwrap()
556}