this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 body::Body,
5 extract::{Query, State},
6 http::StatusCode,
7 http::header,
8 response::{IntoResponse, Response},
9};
10use bytes::Bytes;
11use cid::Cid;
12use jacquard_repo::{commit::Commit, storage::BlockStore};
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use std::collections::HashSet;
16use std::io::Write;
17use tracing::{error, info};
18
19fn write_varint<W: Write>(mut writer: W, mut value: u64) -> std::io::Result<()> {
20 loop {
21 let mut byte = (value & 0x7F) as u8;
22 value >>= 7;
23 if value != 0 {
24 byte |= 0x80;
25 }
26 writer.write_all(&[byte])?;
27 if value == 0 {
28 break;
29 }
30 }
31 Ok(())
32}
33
34fn ld_write<W: Write>(mut writer: W, data: &[u8]) -> std::io::Result<()> {
35 write_varint(&mut writer, data.len() as u64)?;
36 writer.write_all(data)?;
37 Ok(())
38}
39
40fn encode_car_header(root_cid: &Cid) -> Vec<u8> {
41 let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({
42 "version": 1u64,
43 "roots": [root_cid.to_bytes()]
44 }))
45 .unwrap_or_default();
46 header
47}
48
49#[derive(Deserialize)]
50pub struct GetLatestCommitParams {
51 pub did: String,
52}
53
54#[derive(Serialize)]
55pub struct GetLatestCommitOutput {
56 pub cid: String,
57 pub rev: String,
58}
59
60pub async fn get_latest_commit(
61 State(state): State<AppState>,
62 Query(params): Query<GetLatestCommitParams>,
63) -> Response {
64 let did = params.did.trim();
65
66 if did.is_empty() {
67 return (
68 StatusCode::BAD_REQUEST,
69 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
70 )
71 .into_response();
72 }
73
74 let result = sqlx::query!(
75 r#"
76 SELECT r.repo_root_cid
77 FROM repos r
78 JOIN users u ON r.user_id = u.id
79 WHERE u.did = $1
80 "#,
81 did
82 )
83 .fetch_optional(&state.db)
84 .await;
85
86 match result {
87 Ok(Some(row)) => {
88 (
89 StatusCode::OK,
90 Json(GetLatestCommitOutput {
91 cid: row.repo_root_cid,
92 rev: chrono::Utc::now().timestamp_millis().to_string(),
93 }),
94 )
95 .into_response()
96 }
97 Ok(None) => (
98 StatusCode::NOT_FOUND,
99 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
100 )
101 .into_response(),
102 Err(e) => {
103 error!("DB error in get_latest_commit: {:?}", e);
104 (
105 StatusCode::INTERNAL_SERVER_ERROR,
106 Json(json!({"error": "InternalError"})),
107 )
108 .into_response()
109 }
110 }
111}
112
113#[derive(Deserialize)]
114pub struct ListReposParams {
115 pub limit: Option<i64>,
116 pub cursor: Option<String>,
117}
118
119#[derive(Serialize)]
120#[serde(rename_all = "camelCase")]
121pub struct RepoInfo {
122 pub did: String,
123 pub head: String,
124 pub rev: String,
125 pub active: bool,
126}
127
128#[derive(Serialize)]
129pub struct ListReposOutput {
130 pub cursor: Option<String>,
131 pub repos: Vec<RepoInfo>,
132}
133
134pub async fn list_repos(
135 State(state): State<AppState>,
136 Query(params): Query<ListReposParams>,
137) -> Response {
138 let limit = params.limit.unwrap_or(50).min(1000);
139 let cursor_did = params.cursor.as_deref().unwrap_or("");
140
141 let result = sqlx::query!(
142 r#"
143 SELECT u.did, r.repo_root_cid
144 FROM repos r
145 JOIN users u ON r.user_id = u.id
146 WHERE u.did > $1
147 ORDER BY u.did ASC
148 LIMIT $2
149 "#,
150 cursor_did,
151 limit + 1
152 )
153 .fetch_all(&state.db)
154 .await;
155
156 match result {
157 Ok(rows) => {
158 let has_more = rows.len() as i64 > limit;
159 let repos: Vec<RepoInfo> = rows
160 .iter()
161 .take(limit as usize)
162 .map(|row| {
163 RepoInfo {
164 did: row.did.clone(),
165 head: row.repo_root_cid.clone(),
166 rev: chrono::Utc::now().timestamp_millis().to_string(),
167 active: true,
168 }
169 })
170 .collect();
171
172 let next_cursor = if has_more {
173 repos.last().map(|r| r.did.clone())
174 } else {
175 None
176 };
177
178 (
179 StatusCode::OK,
180 Json(ListReposOutput {
181 cursor: next_cursor,
182 repos,
183 }),
184 )
185 .into_response()
186 }
187 Err(e) => {
188 error!("DB error in list_repos: {:?}", e);
189 (
190 StatusCode::INTERNAL_SERVER_ERROR,
191 Json(json!({"error": "InternalError"})),
192 )
193 .into_response()
194 }
195 }
196}
197
198#[derive(Deserialize)]
199pub struct GetBlobParams {
200 pub did: String,
201 pub cid: String,
202}
203
204pub async fn get_blob(
205 State(state): State<AppState>,
206 Query(params): Query<GetBlobParams>,
207) -> Response {
208 let did = params.did.trim();
209 let cid = params.cid.trim();
210
211 if did.is_empty() {
212 return (
213 StatusCode::BAD_REQUEST,
214 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
215 )
216 .into_response();
217 }
218
219 if cid.is_empty() {
220 return (
221 StatusCode::BAD_REQUEST,
222 Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
223 )
224 .into_response();
225 }
226
227 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
228 .fetch_optional(&state.db)
229 .await;
230
231 match user_exists {
232 Ok(None) => {
233 return (
234 StatusCode::NOT_FOUND,
235 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
236 )
237 .into_response();
238 }
239 Err(e) => {
240 error!("DB error in get_blob: {:?}", e);
241 return (
242 StatusCode::INTERNAL_SERVER_ERROR,
243 Json(json!({"error": "InternalError"})),
244 )
245 .into_response();
246 }
247 Ok(Some(_)) => {}
248 }
249
250 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
251 .fetch_optional(&state.db)
252 .await;
253
254 match blob_result {
255 Ok(Some(row)) => {
256 let storage_key = &row.storage_key;
257 let mime_type = &row.mime_type;
258
259 match state.blob_store.get(&storage_key).await {
260 Ok(data) => Response::builder()
261 .status(StatusCode::OK)
262 .header(header::CONTENT_TYPE, mime_type)
263 .body(Body::from(data))
264 .unwrap(),
265 Err(e) => {
266 error!("Failed to fetch blob from storage: {:?}", e);
267 (
268 StatusCode::NOT_FOUND,
269 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
270 )
271 .into_response()
272 }
273 }
274 }
275 Ok(None) => (
276 StatusCode::NOT_FOUND,
277 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
278 )
279 .into_response(),
280 Err(e) => {
281 error!("DB error in get_blob: {:?}", e);
282 (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError"})),
285 )
286 .into_response()
287 }
288 }
289}
290
291#[derive(Deserialize)]
292pub struct ListBlobsParams {
293 pub did: String,
294 pub since: Option<String>,
295 pub limit: Option<i64>,
296 pub cursor: Option<String>,
297}
298
299#[derive(Serialize)]
300pub struct ListBlobsOutput {
301 pub cursor: Option<String>,
302 pub cids: Vec<String>,
303}
304
305pub async fn list_blobs(
306 State(state): State<AppState>,
307 Query(params): Query<ListBlobsParams>,
308) -> Response {
309 let did = params.did.trim();
310
311 if did.is_empty() {
312 return (
313 StatusCode::BAD_REQUEST,
314 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
315 )
316 .into_response();
317 }
318
319 let limit = params.limit.unwrap_or(500).min(1000);
320 let cursor_cid = params.cursor.as_deref().unwrap_or("");
321
322 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
323 .fetch_optional(&state.db)
324 .await;
325
326 let user_id = match user_result {
327 Ok(Some(row)) => row.id,
328 Ok(None) => {
329 return (
330 StatusCode::NOT_FOUND,
331 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
332 )
333 .into_response();
334 }
335 Err(e) => {
336 error!("DB error in list_blobs: {:?}", e);
337 return (
338 StatusCode::INTERNAL_SERVER_ERROR,
339 Json(json!({"error": "InternalError"})),
340 )
341 .into_response();
342 }
343 };
344
345 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = ¶ms.since {
346 let since_time = chrono::DateTime::parse_from_rfc3339(since)
347 .map(|dt| dt.with_timezone(&chrono::Utc))
348 .unwrap_or_else(|_| chrono::Utc::now());
349 sqlx::query!(
350 r#"
351 SELECT cid FROM blobs
352 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
353 ORDER BY cid ASC
354 LIMIT $4
355 "#,
356 user_id,
357 cursor_cid,
358 since_time,
359 limit + 1
360 )
361 .fetch_all(&state.db)
362 .await
363 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
364 } else {
365 sqlx::query!(
366 r#"
367 SELECT cid FROM blobs
368 WHERE created_by_user = $1 AND cid > $2
369 ORDER BY cid ASC
370 LIMIT $3
371 "#,
372 user_id,
373 cursor_cid,
374 limit + 1
375 )
376 .fetch_all(&state.db)
377 .await
378 .map(|rows| rows.into_iter().map(|r| r.cid).collect())
379 };
380
381 match cids_result {
382 Ok(cids) => {
383 let has_more = cids.len() as i64 > limit;
384 let cids: Vec<String> = cids
385 .into_iter()
386 .take(limit as usize)
387 .collect();
388
389 let next_cursor = if has_more {
390 cids.last().cloned()
391 } else {
392 None
393 };
394
395 (
396 StatusCode::OK,
397 Json(ListBlobsOutput {
398 cursor: next_cursor,
399 cids,
400 }),
401 )
402 .into_response()
403 }
404 Err(e) => {
405 error!("DB error in list_blobs: {:?}", e);
406 (
407 StatusCode::INTERNAL_SERVER_ERROR,
408 Json(json!({"error": "InternalError"})),
409 )
410 .into_response()
411 }
412 }
413}
414
415#[derive(Deserialize)]
416pub struct GetRepoStatusParams {
417 pub did: String,
418}
419
420#[derive(Serialize)]
421pub struct GetRepoStatusOutput {
422 pub did: String,
423 pub active: bool,
424 pub rev: Option<String>,
425}
426
427pub async fn get_repo_status(
428 State(state): State<AppState>,
429 Query(params): Query<GetRepoStatusParams>,
430) -> Response {
431 let did = params.did.trim();
432
433 if did.is_empty() {
434 return (
435 StatusCode::BAD_REQUEST,
436 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
437 )
438 .into_response();
439 }
440
441 let result = sqlx::query!(
442 r#"
443 SELECT u.did, r.repo_root_cid
444 FROM users u
445 LEFT JOIN repos r ON u.id = r.user_id
446 WHERE u.did = $1
447 "#,
448 did
449 )
450 .fetch_optional(&state.db)
451 .await;
452
453 match result {
454 Ok(Some(row)) => {
455 let rev = Some(chrono::Utc::now().timestamp_millis().to_string());
456
457 (
458 StatusCode::OK,
459 Json(GetRepoStatusOutput {
460 did: row.did,
461 active: true,
462 rev,
463 }),
464 )
465 .into_response()
466 }
467 Ok(None) => (
468 StatusCode::NOT_FOUND,
469 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
470 )
471 .into_response(),
472 Err(e) => {
473 error!("DB error in get_repo_status: {:?}", e);
474 (
475 StatusCode::INTERNAL_SERVER_ERROR,
476 Json(json!({"error": "InternalError"})),
477 )
478 .into_response()
479 }
480 }
481}
482
483#[derive(Deserialize)]
484pub struct NotifyOfUpdateParams {
485 pub hostname: String,
486}
487
488pub async fn notify_of_update(
489 State(_state): State<AppState>,
490 Query(params): Query<NotifyOfUpdateParams>,
491) -> Response {
492 info!("Received notifyOfUpdate from hostname: {}", params.hostname);
493 // TODO: Queue job for crawler interaction or relay notification
494 info!("TODO: Queue job for notifyOfUpdate (not implemented)");
495
496 (StatusCode::OK, Json(json!({}))).into_response()
497}
498
499#[derive(Deserialize)]
500pub struct RequestCrawlInput {
501 pub hostname: String,
502}
503
504pub async fn request_crawl(
505 State(_state): State<AppState>,
506 Json(input): Json<RequestCrawlInput>,
507) -> Response {
508 info!("Received requestCrawl for hostname: {}", input.hostname);
509 info!("TODO: Queue job for requestCrawl (not implemented)");
510
511 (StatusCode::OK, Json(json!({}))).into_response()
512}
513
514#[derive(Deserialize)]
515pub struct GetBlocksParams {
516 pub did: String,
517 pub cids: String,
518}
519
520pub async fn get_blocks(
521 State(state): State<AppState>,
522 Query(params): Query<GetBlocksParams>,
523) -> Response {
524 let did = params.did.trim();
525
526 if did.is_empty() {
527 return (
528 StatusCode::BAD_REQUEST,
529 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
530 )
531 .into_response();
532 }
533
534 let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
535
536 if cid_strings.is_empty() {
537 return (
538 StatusCode::BAD_REQUEST,
539 Json(json!({"error": "InvalidRequest", "message": "cids is required"})),
540 )
541 .into_response();
542 }
543
544 let repo_result = sqlx::query!(
545 r#"
546 SELECT r.repo_root_cid
547 FROM repos r
548 JOIN users u ON r.user_id = u.id
549 WHERE u.did = $1
550 "#,
551 did
552 )
553 .fetch_optional(&state.db)
554 .await;
555
556 let repo_root_cid_str = match repo_result {
557 Ok(Some(row)) => row.repo_root_cid,
558 Ok(None) => {
559 return (
560 StatusCode::NOT_FOUND,
561 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
562 )
563 .into_response();
564 }
565 Err(e) => {
566 error!("DB error in get_blocks: {:?}", e);
567 return (
568 StatusCode::INTERNAL_SERVER_ERROR,
569 Json(json!({"error": "InternalError"})),
570 )
571 .into_response();
572 }
573 };
574
575 let root_cid = match repo_root_cid_str.parse::<Cid>() {
576 Ok(c) => c,
577 Err(e) => {
578 error!("Failed to parse root CID: {:?}", e);
579 return (
580 StatusCode::INTERNAL_SERVER_ERROR,
581 Json(json!({"error": "InternalError"})),
582 )
583 .into_response();
584 }
585 };
586
587 let mut requested_cids: Vec<Cid> = Vec::new();
588 for cid_str in &cid_strings {
589 match cid_str.parse::<Cid>() {
590 Ok(c) => requested_cids.push(c),
591 Err(e) => {
592 error!("Failed to parse CID '{}': {:?}", cid_str, e);
593 return (
594 StatusCode::BAD_REQUEST,
595 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})),
596 )
597 .into_response();
598 }
599 }
600 }
601
602 let mut buf = Vec::new();
603 let header = encode_car_header(&root_cid);
604 if let Err(e) = ld_write(&mut buf, &header) {
605 error!("Failed to write CAR header: {:?}", e);
606 return (
607 StatusCode::INTERNAL_SERVER_ERROR,
608 Json(json!({"error": "InternalError"})),
609 )
610 .into_response();
611 }
612
613 for cid in &requested_cids {
614 let cid_bytes = cid.to_bytes();
615 let block_result = sqlx::query!(
616 "SELECT data FROM blocks WHERE cid = $1",
617 &cid_bytes
618 )
619 .fetch_optional(&state.db)
620 .await;
621
622 match block_result {
623 Ok(Some(row)) => {
624 let mut block_data = Vec::new();
625 block_data.extend_from_slice(&cid_bytes);
626 block_data.extend_from_slice(&row.data);
627 if let Err(e) = ld_write(&mut buf, &block_data) {
628 error!("Failed to write block: {:?}", e);
629 return (
630 StatusCode::INTERNAL_SERVER_ERROR,
631 Json(json!({"error": "InternalError"})),
632 )
633 .into_response();
634 }
635 }
636 Ok(None) => {
637 return (
638 StatusCode::NOT_FOUND,
639 Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})),
640 )
641 .into_response();
642 }
643 Err(e) => {
644 error!("DB error fetching block: {:?}", e);
645 return (
646 StatusCode::INTERNAL_SERVER_ERROR,
647 Json(json!({"error": "InternalError"})),
648 )
649 .into_response();
650 }
651 }
652 }
653
654 Response::builder()
655 .status(StatusCode::OK)
656 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
657 .body(Body::from(buf))
658 .unwrap()
659}
660
661#[derive(Deserialize)]
662pub struct GetRepoParams {
663 pub did: String,
664 pub since: Option<String>,
665}
666
667pub async fn get_repo(
668 State(state): State<AppState>,
669 Query(params): Query<GetRepoParams>,
670) -> Response {
671 let did = params.did.trim();
672
673 if did.is_empty() {
674 return (
675 StatusCode::BAD_REQUEST,
676 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
677 )
678 .into_response();
679 }
680
681 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
682 .fetch_optional(&state.db)
683 .await;
684
685 let user_id = match user_result {
686 Ok(Some(row)) => row.id,
687 Ok(None) => {
688 return (
689 StatusCode::NOT_FOUND,
690 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
691 )
692 .into_response();
693 }
694 Err(e) => {
695 error!("DB error in get_repo: {:?}", e);
696 return (
697 StatusCode::INTERNAL_SERVER_ERROR,
698 Json(json!({"error": "InternalError"})),
699 )
700 .into_response();
701 }
702 };
703
704 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
705 .fetch_optional(&state.db)
706 .await;
707
708 let repo_root_cid_str = match repo_result {
709 Ok(Some(row)) => row.repo_root_cid,
710 Ok(None) => {
711 return (
712 StatusCode::NOT_FOUND,
713 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
714 )
715 .into_response();
716 }
717 Err(e) => {
718 error!("DB error in get_repo: {:?}", e);
719 return (
720 StatusCode::INTERNAL_SERVER_ERROR,
721 Json(json!({"error": "InternalError"})),
722 )
723 .into_response();
724 }
725 };
726
727 let root_cid = match repo_root_cid_str.parse::<Cid>() {
728 Ok(c) => c,
729 Err(e) => {
730 error!("Failed to parse root CID: {:?}", e);
731 return (
732 StatusCode::INTERNAL_SERVER_ERROR,
733 Json(json!({"error": "InternalError"})),
734 )
735 .into_response();
736 }
737 };
738
739 let commit_bytes = match state.block_store.get(&root_cid).await {
740 Ok(Some(b)) => b,
741 Ok(None) => {
742 error!("Commit block not found: {}", root_cid);
743 return (
744 StatusCode::INTERNAL_SERVER_ERROR,
745 Json(json!({"error": "InternalError"})),
746 )
747 .into_response();
748 }
749 Err(e) => {
750 error!("Failed to load commit block: {:?}", e);
751 return (
752 StatusCode::INTERNAL_SERVER_ERROR,
753 Json(json!({"error": "InternalError"})),
754 )
755 .into_response();
756 }
757 };
758
759 let commit = match Commit::from_cbor(&commit_bytes) {
760 Ok(c) => c,
761 Err(e) => {
762 error!("Failed to parse commit: {:?}", e);
763 return (
764 StatusCode::INTERNAL_SERVER_ERROR,
765 Json(json!({"error": "InternalError"})),
766 )
767 .into_response();
768 }
769 };
770
771 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
772 let mut visited: HashSet<Vec<u8>> = HashSet::new();
773
774 collected_blocks.push((root_cid, commit_bytes.clone()));
775 visited.insert(root_cid.to_bytes());
776
777 let mst_root_cid = commit.data;
778 if !visited.contains(&mst_root_cid.to_bytes()) {
779 visited.insert(mst_root_cid.to_bytes());
780 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
781 collected_blocks.push((mst_root_cid, data));
782 }
783 }
784
785 let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id)
786 .fetch_all(&state.db)
787 .await
788 .unwrap_or_default();
789
790 for record in records {
791 if let Ok(cid) = record.record_cid.parse::<Cid>() {
792 if !visited.contains(&cid.to_bytes()) {
793 visited.insert(cid.to_bytes());
794 if let Ok(Some(data)) = state.block_store.get(&cid).await {
795 collected_blocks.push((cid, data));
796 }
797 }
798 }
799 }
800
801 let mut buf = Vec::new();
802 let header = encode_car_header(&root_cid);
803 if let Err(e) = ld_write(&mut buf, &header) {
804 error!("Failed to write CAR header: {:?}", e);
805 return (
806 StatusCode::INTERNAL_SERVER_ERROR,
807 Json(json!({"error": "InternalError"})),
808 )
809 .into_response();
810 }
811
812 for (cid, data) in &collected_blocks {
813 let mut block_data = Vec::new();
814 block_data.extend_from_slice(&cid.to_bytes());
815 block_data.extend_from_slice(data);
816 if let Err(e) = ld_write(&mut buf, &block_data) {
817 error!("Failed to write block: {:?}", e);
818 return (
819 StatusCode::INTERNAL_SERVER_ERROR,
820 Json(json!({"error": "InternalError"})),
821 )
822 .into_response();
823 }
824 }
825
826 Response::builder()
827 .status(StatusCode::OK)
828 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
829 .body(Body::from(buf))
830 .unwrap()
831}
832
833#[derive(Deserialize)]
834pub struct GetRecordParams {
835 pub did: String,
836 pub collection: String,
837 pub rkey: String,
838}
839
840pub async fn get_record(
841 State(state): State<AppState>,
842 Query(params): Query<GetRecordParams>,
843) -> Response {
844 let did = params.did.trim();
845 let collection = params.collection.trim();
846 let rkey = params.rkey.trim();
847
848 if did.is_empty() {
849 return (
850 StatusCode::BAD_REQUEST,
851 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
852 )
853 .into_response();
854 }
855
856 if collection.is_empty() {
857 return (
858 StatusCode::BAD_REQUEST,
859 Json(json!({"error": "InvalidRequest", "message": "collection is required"})),
860 )
861 .into_response();
862 }
863
864 if rkey.is_empty() {
865 return (
866 StatusCode::BAD_REQUEST,
867 Json(json!({"error": "InvalidRequest", "message": "rkey is required"})),
868 )
869 .into_response();
870 }
871
872 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
873 .fetch_optional(&state.db)
874 .await;
875
876 let user_id = match user_result {
877 Ok(Some(row)) => row.id,
878 Ok(None) => {
879 return (
880 StatusCode::NOT_FOUND,
881 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
882 )
883 .into_response();
884 }
885 Err(e) => {
886 error!("DB error in sync get_record: {:?}", e);
887 return (
888 StatusCode::INTERNAL_SERVER_ERROR,
889 Json(json!({"error": "InternalError"})),
890 )
891 .into_response();
892 }
893 };
894
895 let record_result = sqlx::query!(
896 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
897 user_id,
898 collection,
899 rkey
900 )
901 .fetch_optional(&state.db)
902 .await;
903
904 let record_cid_str = match record_result {
905 Ok(Some(row)) => row.record_cid,
906 Ok(None) => {
907 return (
908 StatusCode::NOT_FOUND,
909 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
910 )
911 .into_response();
912 }
913 Err(e) => {
914 error!("DB error in sync get_record: {:?}", e);
915 return (
916 StatusCode::INTERNAL_SERVER_ERROR,
917 Json(json!({"error": "InternalError"})),
918 )
919 .into_response();
920 }
921 };
922
923 let record_cid = match record_cid_str.parse::<Cid>() {
924 Ok(c) => c,
925 Err(e) => {
926 error!("Failed to parse record CID: {:?}", e);
927 return (
928 StatusCode::INTERNAL_SERVER_ERROR,
929 Json(json!({"error": "InternalError"})),
930 )
931 .into_response();
932 }
933 };
934
935 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
936 .fetch_optional(&state.db)
937 .await;
938
939 let repo_root_cid_str = match repo_result {
940 Ok(Some(row)) => row.repo_root_cid,
941 Ok(None) => {
942 return (
943 StatusCode::NOT_FOUND,
944 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
945 )
946 .into_response();
947 }
948 Err(e) => {
949 error!("DB error in sync get_record: {:?}", e);
950 return (
951 StatusCode::INTERNAL_SERVER_ERROR,
952 Json(json!({"error": "InternalError"})),
953 )
954 .into_response();
955 }
956 };
957
958 let root_cid = match repo_root_cid_str.parse::<Cid>() {
959 Ok(c) => c,
960 Err(e) => {
961 error!("Failed to parse root CID: {:?}", e);
962 return (
963 StatusCode::INTERNAL_SERVER_ERROR,
964 Json(json!({"error": "InternalError"})),
965 )
966 .into_response();
967 }
968 };
969
970 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
971
972 let commit_bytes = match state.block_store.get(&root_cid).await {
973 Ok(Some(b)) => b,
974 Ok(None) => {
975 error!("Commit block not found: {}", root_cid);
976 return (
977 StatusCode::INTERNAL_SERVER_ERROR,
978 Json(json!({"error": "InternalError"})),
979 )
980 .into_response();
981 }
982 Err(e) => {
983 error!("Failed to load commit block: {:?}", e);
984 return (
985 StatusCode::INTERNAL_SERVER_ERROR,
986 Json(json!({"error": "InternalError"})),
987 )
988 .into_response();
989 }
990 };
991
992 collected_blocks.push((root_cid, commit_bytes.clone()));
993
994 let commit = match Commit::from_cbor(&commit_bytes) {
995 Ok(c) => c,
996 Err(e) => {
997 error!("Failed to parse commit: {:?}", e);
998 return (
999 StatusCode::INTERNAL_SERVER_ERROR,
1000 Json(json!({"error": "InternalError"})),
1001 )
1002 .into_response();
1003 }
1004 };
1005
1006 let mst_root_cid = commit.data;
1007 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
1008 collected_blocks.push((mst_root_cid, data));
1009 }
1010
1011 if let Ok(Some(data)) = state.block_store.get(&record_cid).await {
1012 collected_blocks.push((record_cid, data));
1013 } else {
1014 return (
1015 StatusCode::NOT_FOUND,
1016 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
1017 )
1018 .into_response();
1019 }
1020
1021 let mut buf = Vec::new();
1022 let header = encode_car_header(&root_cid);
1023 if let Err(e) = ld_write(&mut buf, &header) {
1024 error!("Failed to write CAR header: {:?}", e);
1025 return (
1026 StatusCode::INTERNAL_SERVER_ERROR,
1027 Json(json!({"error": "InternalError"})),
1028 )
1029 .into_response();
1030 }
1031
1032 for (cid, data) in &collected_blocks {
1033 let mut block_data = Vec::new();
1034 block_data.extend_from_slice(&cid.to_bytes());
1035 block_data.extend_from_slice(data);
1036 if let Err(e) = ld_write(&mut buf, &block_data) {
1037 error!("Failed to write block: {:?}", e);
1038 return (
1039 StatusCode::INTERNAL_SERVER_ERROR,
1040 Json(json!({"error": "InternalError"})),
1041 )
1042 .into_response();
1043 }
1044 }
1045
1046 Response::builder()
1047 .status(StatusCode::OK)
1048 .header(header::CONTENT_TYPE, "application/vnd.ipld.car")
1049 .body(Body::from(buf))
1050 .unwrap()
1051}