this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 body::Body, 5 extract::{Query, State}, 6 http::StatusCode, 7 http::header, 8 response::{IntoResponse, Response}, 9}; 10use bytes::Bytes; 11use cid::Cid; 12use jacquard_repo::{commit::Commit, storage::BlockStore}; 13use serde::{Deserialize, Serialize}; 14use serde_json::json; 15use std::collections::HashSet; 16use std::io::Write; 17use tracing::{error, info}; 18 19fn write_varint<W: Write>(mut writer: W, mut value: u64) -> std::io::Result<()> { 20 loop { 21 let mut byte = (value & 0x7F) as u8; 22 value >>= 7; 23 if value != 0 { 24 byte |= 0x80; 25 } 26 writer.write_all(&[byte])?; 27 if value == 0 { 28 break; 29 } 30 } 31 Ok(()) 32} 33 34fn ld_write<W: Write>(mut writer: W, data: &[u8]) -> std::io::Result<()> { 35 write_varint(&mut writer, data.len() as u64)?; 36 writer.write_all(data)?; 37 Ok(()) 38} 39 40fn encode_car_header(root_cid: &Cid) -> Vec<u8> { 41 let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({ 42 "version": 1u64, 43 "roots": [root_cid.to_bytes()] 44 })) 45 .unwrap_or_default(); 46 header 47} 48 49#[derive(Deserialize)] 50pub struct GetLatestCommitParams { 51 pub did: String, 52} 53 54#[derive(Serialize)] 55pub struct GetLatestCommitOutput { 56 pub cid: String, 57 pub rev: String, 58} 59 60pub async fn get_latest_commit( 61 State(state): State<AppState>, 62 Query(params): Query<GetLatestCommitParams>, 63) -> Response { 64 let did = params.did.trim(); 65 66 if did.is_empty() { 67 return ( 68 StatusCode::BAD_REQUEST, 69 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 70 ) 71 .into_response(); 72 } 73 74 let result = sqlx::query!( 75 r#" 76 SELECT r.repo_root_cid 77 FROM repos r 78 JOIN users u ON r.user_id = u.id 79 WHERE u.did = $1 80 "#, 81 did 82 ) 83 .fetch_optional(&state.db) 84 .await; 85 86 match result { 87 Ok(Some(row)) => { 88 ( 89 StatusCode::OK, 90 Json(GetLatestCommitOutput { 91 cid: row.repo_root_cid, 92 rev: chrono::Utc::now().timestamp_millis().to_string(), 93 }), 94 ) 95 .into_response() 96 } 97 Ok(None) => ( 98 StatusCode::NOT_FOUND, 99 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 100 ) 101 .into_response(), 102 Err(e) => { 103 error!("DB error in get_latest_commit: {:?}", e); 104 ( 105 StatusCode::INTERNAL_SERVER_ERROR, 106 Json(json!({"error": "InternalError"})), 107 ) 108 .into_response() 109 } 110 } 111} 112 113#[derive(Deserialize)] 114pub struct ListReposParams { 115 pub limit: Option<i64>, 116 pub cursor: Option<String>, 117} 118 119#[derive(Serialize)] 120#[serde(rename_all = "camelCase")] 121pub struct RepoInfo { 122 pub did: String, 123 pub head: String, 124 pub rev: String, 125 pub active: bool, 126} 127 128#[derive(Serialize)] 129pub struct ListReposOutput { 130 pub cursor: Option<String>, 131 pub repos: Vec<RepoInfo>, 132} 133 134pub async fn list_repos( 135 State(state): State<AppState>, 136 Query(params): Query<ListReposParams>, 137) -> Response { 138 let limit = params.limit.unwrap_or(50).min(1000); 139 let cursor_did = params.cursor.as_deref().unwrap_or(""); 140 141 let result = sqlx::query!( 142 r#" 143 SELECT u.did, r.repo_root_cid 144 FROM repos r 145 JOIN users u ON r.user_id = u.id 146 WHERE u.did > $1 147 ORDER BY u.did ASC 148 LIMIT $2 149 "#, 150 cursor_did, 151 limit + 1 152 ) 153 .fetch_all(&state.db) 154 .await; 155 156 match result { 157 Ok(rows) => { 158 let has_more = rows.len() as i64 > limit; 159 let repos: Vec<RepoInfo> = rows 160 .iter() 161 .take(limit as usize) 162 .map(|row| { 163 RepoInfo { 164 did: row.did.clone(), 165 head: row.repo_root_cid.clone(), 166 rev: chrono::Utc::now().timestamp_millis().to_string(), 167 active: true, 168 } 169 }) 170 .collect(); 171 172 let next_cursor = if has_more { 173 repos.last().map(|r| r.did.clone()) 174 } else { 175 None 176 }; 177 178 ( 179 StatusCode::OK, 180 Json(ListReposOutput { 181 cursor: next_cursor, 182 repos, 183 }), 184 ) 185 .into_response() 186 } 187 Err(e) => { 188 error!("DB error in list_repos: {:?}", e); 189 ( 190 StatusCode::INTERNAL_SERVER_ERROR, 191 Json(json!({"error": "InternalError"})), 192 ) 193 .into_response() 194 } 195 } 196} 197 198#[derive(Deserialize)] 199pub struct GetBlobParams { 200 pub did: String, 201 pub cid: String, 202} 203 204pub async fn get_blob( 205 State(state): State<AppState>, 206 Query(params): Query<GetBlobParams>, 207) -> Response { 208 let did = params.did.trim(); 209 let cid = params.cid.trim(); 210 211 if did.is_empty() { 212 return ( 213 StatusCode::BAD_REQUEST, 214 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 215 ) 216 .into_response(); 217 } 218 219 if cid.is_empty() { 220 return ( 221 StatusCode::BAD_REQUEST, 222 Json(json!({"error": "InvalidRequest", "message": "cid is required"})), 223 ) 224 .into_response(); 225 } 226 227 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 228 .fetch_optional(&state.db) 229 .await; 230 231 match user_exists { 232 Ok(None) => { 233 return ( 234 StatusCode::NOT_FOUND, 235 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 236 ) 237 .into_response(); 238 } 239 Err(e) => { 240 error!("DB error in get_blob: {:?}", e); 241 return ( 242 StatusCode::INTERNAL_SERVER_ERROR, 243 Json(json!({"error": "InternalError"})), 244 ) 245 .into_response(); 246 } 247 Ok(Some(_)) => {} 248 } 249 250 let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid) 251 .fetch_optional(&state.db) 252 .await; 253 254 match blob_result { 255 Ok(Some(row)) => { 256 let storage_key = &row.storage_key; 257 let mime_type = &row.mime_type; 258 259 match state.blob_store.get(&storage_key).await { 260 Ok(data) => Response::builder() 261 .status(StatusCode::OK) 262 .header(header::CONTENT_TYPE, mime_type) 263 .body(Body::from(data)) 264 .unwrap(), 265 Err(e) => { 266 error!("Failed to fetch blob from storage: {:?}", e); 267 ( 268 StatusCode::NOT_FOUND, 269 Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), 270 ) 271 .into_response() 272 } 273 } 274 } 275 Ok(None) => ( 276 StatusCode::NOT_FOUND, 277 Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), 278 ) 279 .into_response(), 280 Err(e) => { 281 error!("DB error in get_blob: {:?}", e); 282 ( 283 StatusCode::INTERNAL_SERVER_ERROR, 284 Json(json!({"error": "InternalError"})), 285 ) 286 .into_response() 287 } 288 } 289} 290 291#[derive(Deserialize)] 292pub struct ListBlobsParams { 293 pub did: String, 294 pub since: Option<String>, 295 pub limit: Option<i64>, 296 pub cursor: Option<String>, 297} 298 299#[derive(Serialize)] 300pub struct ListBlobsOutput { 301 pub cursor: Option<String>, 302 pub cids: Vec<String>, 303} 304 305pub async fn list_blobs( 306 State(state): State<AppState>, 307 Query(params): Query<ListBlobsParams>, 308) -> Response { 309 let did = params.did.trim(); 310 311 if did.is_empty() { 312 return ( 313 StatusCode::BAD_REQUEST, 314 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 315 ) 316 .into_response(); 317 } 318 319 let limit = params.limit.unwrap_or(500).min(1000); 320 let cursor_cid = params.cursor.as_deref().unwrap_or(""); 321 322 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 323 .fetch_optional(&state.db) 324 .await; 325 326 let user_id = match user_result { 327 Ok(Some(row)) => row.id, 328 Ok(None) => { 329 return ( 330 StatusCode::NOT_FOUND, 331 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 332 ) 333 .into_response(); 334 } 335 Err(e) => { 336 error!("DB error in list_blobs: {:?}", e); 337 return ( 338 StatusCode::INTERNAL_SERVER_ERROR, 339 Json(json!({"error": "InternalError"})), 340 ) 341 .into_response(); 342 } 343 }; 344 345 let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since { 346 let since_time = chrono::DateTime::parse_from_rfc3339(since) 347 .map(|dt| dt.with_timezone(&chrono::Utc)) 348 .unwrap_or_else(|_| chrono::Utc::now()); 349 sqlx::query!( 350 r#" 351 SELECT cid FROM blobs 352 WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 353 ORDER BY cid ASC 354 LIMIT $4 355 "#, 356 user_id, 357 cursor_cid, 358 since_time, 359 limit + 1 360 ) 361 .fetch_all(&state.db) 362 .await 363 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 364 } else { 365 sqlx::query!( 366 r#" 367 SELECT cid FROM blobs 368 WHERE created_by_user = $1 AND cid > $2 369 ORDER BY cid ASC 370 LIMIT $3 371 "#, 372 user_id, 373 cursor_cid, 374 limit + 1 375 ) 376 .fetch_all(&state.db) 377 .await 378 .map(|rows| rows.into_iter().map(|r| r.cid).collect()) 379 }; 380 381 match cids_result { 382 Ok(cids) => { 383 let has_more = cids.len() as i64 > limit; 384 let cids: Vec<String> = cids 385 .into_iter() 386 .take(limit as usize) 387 .collect(); 388 389 let next_cursor = if has_more { 390 cids.last().cloned() 391 } else { 392 None 393 }; 394 395 ( 396 StatusCode::OK, 397 Json(ListBlobsOutput { 398 cursor: next_cursor, 399 cids, 400 }), 401 ) 402 .into_response() 403 } 404 Err(e) => { 405 error!("DB error in list_blobs: {:?}", e); 406 ( 407 StatusCode::INTERNAL_SERVER_ERROR, 408 Json(json!({"error": "InternalError"})), 409 ) 410 .into_response() 411 } 412 } 413} 414 415#[derive(Deserialize)] 416pub struct GetRepoStatusParams { 417 pub did: String, 418} 419 420#[derive(Serialize)] 421pub struct GetRepoStatusOutput { 422 pub did: String, 423 pub active: bool, 424 pub rev: Option<String>, 425} 426 427pub async fn get_repo_status( 428 State(state): State<AppState>, 429 Query(params): Query<GetRepoStatusParams>, 430) -> Response { 431 let did = params.did.trim(); 432 433 if did.is_empty() { 434 return ( 435 StatusCode::BAD_REQUEST, 436 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 437 ) 438 .into_response(); 439 } 440 441 let result = sqlx::query!( 442 r#" 443 SELECT u.did, r.repo_root_cid 444 FROM users u 445 LEFT JOIN repos r ON u.id = r.user_id 446 WHERE u.did = $1 447 "#, 448 did 449 ) 450 .fetch_optional(&state.db) 451 .await; 452 453 match result { 454 Ok(Some(row)) => { 455 let rev = Some(chrono::Utc::now().timestamp_millis().to_string()); 456 457 ( 458 StatusCode::OK, 459 Json(GetRepoStatusOutput { 460 did: row.did, 461 active: true, 462 rev, 463 }), 464 ) 465 .into_response() 466 } 467 Ok(None) => ( 468 StatusCode::NOT_FOUND, 469 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 470 ) 471 .into_response(), 472 Err(e) => { 473 error!("DB error in get_repo_status: {:?}", e); 474 ( 475 StatusCode::INTERNAL_SERVER_ERROR, 476 Json(json!({"error": "InternalError"})), 477 ) 478 .into_response() 479 } 480 } 481} 482 483#[derive(Deserialize)] 484pub struct NotifyOfUpdateParams { 485 pub hostname: String, 486} 487 488pub async fn notify_of_update( 489 State(_state): State<AppState>, 490 Query(params): Query<NotifyOfUpdateParams>, 491) -> Response { 492 info!("Received notifyOfUpdate from hostname: {}", params.hostname); 493 // TODO: Queue job for crawler interaction or relay notification 494 info!("TODO: Queue job for notifyOfUpdate (not implemented)"); 495 496 (StatusCode::OK, Json(json!({}))).into_response() 497} 498 499#[derive(Deserialize)] 500pub struct RequestCrawlInput { 501 pub hostname: String, 502} 503 504pub async fn request_crawl( 505 State(_state): State<AppState>, 506 Json(input): Json<RequestCrawlInput>, 507) -> Response { 508 info!("Received requestCrawl for hostname: {}", input.hostname); 509 info!("TODO: Queue job for requestCrawl (not implemented)"); 510 511 (StatusCode::OK, Json(json!({}))).into_response() 512} 513 514#[derive(Deserialize)] 515pub struct GetBlocksParams { 516 pub did: String, 517 pub cids: String, 518} 519 520pub async fn get_blocks( 521 State(state): State<AppState>, 522 Query(params): Query<GetBlocksParams>, 523) -> Response { 524 let did = params.did.trim(); 525 526 if did.is_empty() { 527 return ( 528 StatusCode::BAD_REQUEST, 529 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 530 ) 531 .into_response(); 532 } 533 534 let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect(); 535 536 if cid_strings.is_empty() { 537 return ( 538 StatusCode::BAD_REQUEST, 539 Json(json!({"error": "InvalidRequest", "message": "cids is required"})), 540 ) 541 .into_response(); 542 } 543 544 let repo_result = sqlx::query!( 545 r#" 546 SELECT r.repo_root_cid 547 FROM repos r 548 JOIN users u ON r.user_id = u.id 549 WHERE u.did = $1 550 "#, 551 did 552 ) 553 .fetch_optional(&state.db) 554 .await; 555 556 let repo_root_cid_str = match repo_result { 557 Ok(Some(row)) => row.repo_root_cid, 558 Ok(None) => { 559 return ( 560 StatusCode::NOT_FOUND, 561 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 562 ) 563 .into_response(); 564 } 565 Err(e) => { 566 error!("DB error in get_blocks: {:?}", e); 567 return ( 568 StatusCode::INTERNAL_SERVER_ERROR, 569 Json(json!({"error": "InternalError"})), 570 ) 571 .into_response(); 572 } 573 }; 574 575 let root_cid = match repo_root_cid_str.parse::<Cid>() { 576 Ok(c) => c, 577 Err(e) => { 578 error!("Failed to parse root CID: {:?}", e); 579 return ( 580 StatusCode::INTERNAL_SERVER_ERROR, 581 Json(json!({"error": "InternalError"})), 582 ) 583 .into_response(); 584 } 585 }; 586 587 let mut requested_cids: Vec<Cid> = Vec::new(); 588 for cid_str in &cid_strings { 589 match cid_str.parse::<Cid>() { 590 Ok(c) => requested_cids.push(c), 591 Err(e) => { 592 error!("Failed to parse CID '{}': {:?}", cid_str, e); 593 return ( 594 StatusCode::BAD_REQUEST, 595 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})), 596 ) 597 .into_response(); 598 } 599 } 600 } 601 602 let mut buf = Vec::new(); 603 let header = encode_car_header(&root_cid); 604 if let Err(e) = ld_write(&mut buf, &header) { 605 error!("Failed to write CAR header: {:?}", e); 606 return ( 607 StatusCode::INTERNAL_SERVER_ERROR, 608 Json(json!({"error": "InternalError"})), 609 ) 610 .into_response(); 611 } 612 613 for cid in &requested_cids { 614 let cid_bytes = cid.to_bytes(); 615 let block_result = sqlx::query!( 616 "SELECT data FROM blocks WHERE cid = $1", 617 &cid_bytes 618 ) 619 .fetch_optional(&state.db) 620 .await; 621 622 match block_result { 623 Ok(Some(row)) => { 624 let mut block_data = Vec::new(); 625 block_data.extend_from_slice(&cid_bytes); 626 block_data.extend_from_slice(&row.data); 627 if let Err(e) = ld_write(&mut buf, &block_data) { 628 error!("Failed to write block: {:?}", e); 629 return ( 630 StatusCode::INTERNAL_SERVER_ERROR, 631 Json(json!({"error": "InternalError"})), 632 ) 633 .into_response(); 634 } 635 } 636 Ok(None) => { 637 return ( 638 StatusCode::NOT_FOUND, 639 Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})), 640 ) 641 .into_response(); 642 } 643 Err(e) => { 644 error!("DB error fetching block: {:?}", e); 645 return ( 646 StatusCode::INTERNAL_SERVER_ERROR, 647 Json(json!({"error": "InternalError"})), 648 ) 649 .into_response(); 650 } 651 } 652 } 653 654 Response::builder() 655 .status(StatusCode::OK) 656 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 657 .body(Body::from(buf)) 658 .unwrap() 659} 660 661#[derive(Deserialize)] 662pub struct GetRepoParams { 663 pub did: String, 664 pub since: Option<String>, 665} 666 667pub async fn get_repo( 668 State(state): State<AppState>, 669 Query(params): Query<GetRepoParams>, 670) -> Response { 671 let did = params.did.trim(); 672 673 if did.is_empty() { 674 return ( 675 StatusCode::BAD_REQUEST, 676 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 677 ) 678 .into_response(); 679 } 680 681 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 682 .fetch_optional(&state.db) 683 .await; 684 685 let user_id = match user_result { 686 Ok(Some(row)) => row.id, 687 Ok(None) => { 688 return ( 689 StatusCode::NOT_FOUND, 690 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 691 ) 692 .into_response(); 693 } 694 Err(e) => { 695 error!("DB error in get_repo: {:?}", e); 696 return ( 697 StatusCode::INTERNAL_SERVER_ERROR, 698 Json(json!({"error": "InternalError"})), 699 ) 700 .into_response(); 701 } 702 }; 703 704 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 705 .fetch_optional(&state.db) 706 .await; 707 708 let repo_root_cid_str = match repo_result { 709 Ok(Some(row)) => row.repo_root_cid, 710 Ok(None) => { 711 return ( 712 StatusCode::NOT_FOUND, 713 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 714 ) 715 .into_response(); 716 } 717 Err(e) => { 718 error!("DB error in get_repo: {:?}", e); 719 return ( 720 StatusCode::INTERNAL_SERVER_ERROR, 721 Json(json!({"error": "InternalError"})), 722 ) 723 .into_response(); 724 } 725 }; 726 727 let root_cid = match repo_root_cid_str.parse::<Cid>() { 728 Ok(c) => c, 729 Err(e) => { 730 error!("Failed to parse root CID: {:?}", e); 731 return ( 732 StatusCode::INTERNAL_SERVER_ERROR, 733 Json(json!({"error": "InternalError"})), 734 ) 735 .into_response(); 736 } 737 }; 738 739 let commit_bytes = match state.block_store.get(&root_cid).await { 740 Ok(Some(b)) => b, 741 Ok(None) => { 742 error!("Commit block not found: {}", root_cid); 743 return ( 744 StatusCode::INTERNAL_SERVER_ERROR, 745 Json(json!({"error": "InternalError"})), 746 ) 747 .into_response(); 748 } 749 Err(e) => { 750 error!("Failed to load commit block: {:?}", e); 751 return ( 752 StatusCode::INTERNAL_SERVER_ERROR, 753 Json(json!({"error": "InternalError"})), 754 ) 755 .into_response(); 756 } 757 }; 758 759 let commit = match Commit::from_cbor(&commit_bytes) { 760 Ok(c) => c, 761 Err(e) => { 762 error!("Failed to parse commit: {:?}", e); 763 return ( 764 StatusCode::INTERNAL_SERVER_ERROR, 765 Json(json!({"error": "InternalError"})), 766 ) 767 .into_response(); 768 } 769 }; 770 771 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 772 let mut visited: HashSet<Vec<u8>> = HashSet::new(); 773 774 collected_blocks.push((root_cid, commit_bytes.clone())); 775 visited.insert(root_cid.to_bytes()); 776 777 let mst_root_cid = commit.data; 778 if !visited.contains(&mst_root_cid.to_bytes()) { 779 visited.insert(mst_root_cid.to_bytes()); 780 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 781 collected_blocks.push((mst_root_cid, data)); 782 } 783 } 784 785 let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id) 786 .fetch_all(&state.db) 787 .await 788 .unwrap_or_default(); 789 790 for record in records { 791 if let Ok(cid) = record.record_cid.parse::<Cid>() { 792 if !visited.contains(&cid.to_bytes()) { 793 visited.insert(cid.to_bytes()); 794 if let Ok(Some(data)) = state.block_store.get(&cid).await { 795 collected_blocks.push((cid, data)); 796 } 797 } 798 } 799 } 800 801 let mut buf = Vec::new(); 802 let header = encode_car_header(&root_cid); 803 if let Err(e) = ld_write(&mut buf, &header) { 804 error!("Failed to write CAR header: {:?}", e); 805 return ( 806 StatusCode::INTERNAL_SERVER_ERROR, 807 Json(json!({"error": "InternalError"})), 808 ) 809 .into_response(); 810 } 811 812 for (cid, data) in &collected_blocks { 813 let mut block_data = Vec::new(); 814 block_data.extend_from_slice(&cid.to_bytes()); 815 block_data.extend_from_slice(data); 816 if let Err(e) = ld_write(&mut buf, &block_data) { 817 error!("Failed to write block: {:?}", e); 818 return ( 819 StatusCode::INTERNAL_SERVER_ERROR, 820 Json(json!({"error": "InternalError"})), 821 ) 822 .into_response(); 823 } 824 } 825 826 Response::builder() 827 .status(StatusCode::OK) 828 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 829 .body(Body::from(buf)) 830 .unwrap() 831} 832 833#[derive(Deserialize)] 834pub struct GetRecordParams { 835 pub did: String, 836 pub collection: String, 837 pub rkey: String, 838} 839 840pub async fn get_record( 841 State(state): State<AppState>, 842 Query(params): Query<GetRecordParams>, 843) -> Response { 844 let did = params.did.trim(); 845 let collection = params.collection.trim(); 846 let rkey = params.rkey.trim(); 847 848 if did.is_empty() { 849 return ( 850 StatusCode::BAD_REQUEST, 851 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 852 ) 853 .into_response(); 854 } 855 856 if collection.is_empty() { 857 return ( 858 StatusCode::BAD_REQUEST, 859 Json(json!({"error": "InvalidRequest", "message": "collection is required"})), 860 ) 861 .into_response(); 862 } 863 864 if rkey.is_empty() { 865 return ( 866 StatusCode::BAD_REQUEST, 867 Json(json!({"error": "InvalidRequest", "message": "rkey is required"})), 868 ) 869 .into_response(); 870 } 871 872 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 873 .fetch_optional(&state.db) 874 .await; 875 876 let user_id = match user_result { 877 Ok(Some(row)) => row.id, 878 Ok(None) => { 879 return ( 880 StatusCode::NOT_FOUND, 881 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 882 ) 883 .into_response(); 884 } 885 Err(e) => { 886 error!("DB error in sync get_record: {:?}", e); 887 return ( 888 StatusCode::INTERNAL_SERVER_ERROR, 889 Json(json!({"error": "InternalError"})), 890 ) 891 .into_response(); 892 } 893 }; 894 895 let record_result = sqlx::query!( 896 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 897 user_id, 898 collection, 899 rkey 900 ) 901 .fetch_optional(&state.db) 902 .await; 903 904 let record_cid_str = match record_result { 905 Ok(Some(row)) => row.record_cid, 906 Ok(None) => { 907 return ( 908 StatusCode::NOT_FOUND, 909 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 910 ) 911 .into_response(); 912 } 913 Err(e) => { 914 error!("DB error in sync get_record: {:?}", e); 915 return ( 916 StatusCode::INTERNAL_SERVER_ERROR, 917 Json(json!({"error": "InternalError"})), 918 ) 919 .into_response(); 920 } 921 }; 922 923 let record_cid = match record_cid_str.parse::<Cid>() { 924 Ok(c) => c, 925 Err(e) => { 926 error!("Failed to parse record CID: {:?}", e); 927 return ( 928 StatusCode::INTERNAL_SERVER_ERROR, 929 Json(json!({"error": "InternalError"})), 930 ) 931 .into_response(); 932 } 933 }; 934 935 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 936 .fetch_optional(&state.db) 937 .await; 938 939 let repo_root_cid_str = match repo_result { 940 Ok(Some(row)) => row.repo_root_cid, 941 Ok(None) => { 942 return ( 943 StatusCode::NOT_FOUND, 944 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 945 ) 946 .into_response(); 947 } 948 Err(e) => { 949 error!("DB error in sync get_record: {:?}", e); 950 return ( 951 StatusCode::INTERNAL_SERVER_ERROR, 952 Json(json!({"error": "InternalError"})), 953 ) 954 .into_response(); 955 } 956 }; 957 958 let root_cid = match repo_root_cid_str.parse::<Cid>() { 959 Ok(c) => c, 960 Err(e) => { 961 error!("Failed to parse root CID: {:?}", e); 962 return ( 963 StatusCode::INTERNAL_SERVER_ERROR, 964 Json(json!({"error": "InternalError"})), 965 ) 966 .into_response(); 967 } 968 }; 969 970 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 971 972 let commit_bytes = match state.block_store.get(&root_cid).await { 973 Ok(Some(b)) => b, 974 Ok(None) => { 975 error!("Commit block not found: {}", root_cid); 976 return ( 977 StatusCode::INTERNAL_SERVER_ERROR, 978 Json(json!({"error": "InternalError"})), 979 ) 980 .into_response(); 981 } 982 Err(e) => { 983 error!("Failed to load commit block: {:?}", e); 984 return ( 985 StatusCode::INTERNAL_SERVER_ERROR, 986 Json(json!({"error": "InternalError"})), 987 ) 988 .into_response(); 989 } 990 }; 991 992 collected_blocks.push((root_cid, commit_bytes.clone())); 993 994 let commit = match Commit::from_cbor(&commit_bytes) { 995 Ok(c) => c, 996 Err(e) => { 997 error!("Failed to parse commit: {:?}", e); 998 return ( 999 StatusCode::INTERNAL_SERVER_ERROR, 1000 Json(json!({"error": "InternalError"})), 1001 ) 1002 .into_response(); 1003 } 1004 }; 1005 1006 let mst_root_cid = commit.data; 1007 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 1008 collected_blocks.push((mst_root_cid, data)); 1009 } 1010 1011 if let Ok(Some(data)) = state.block_store.get(&record_cid).await { 1012 collected_blocks.push((record_cid, data)); 1013 } else { 1014 return ( 1015 StatusCode::NOT_FOUND, 1016 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 1017 ) 1018 .into_response(); 1019 } 1020 1021 let mut buf = Vec::new(); 1022 let header = encode_car_header(&root_cid); 1023 if let Err(e) = ld_write(&mut buf, &header) { 1024 error!("Failed to write CAR header: {:?}", e); 1025 return ( 1026 StatusCode::INTERNAL_SERVER_ERROR, 1027 Json(json!({"error": "InternalError"})), 1028 ) 1029 .into_response(); 1030 } 1031 1032 for (cid, data) in &collected_blocks { 1033 let mut block_data = Vec::new(); 1034 block_data.extend_from_slice(&cid.to_bytes()); 1035 block_data.extend_from_slice(data); 1036 if let Err(e) = ld_write(&mut buf, &block_data) { 1037 error!("Failed to write block: {:?}", e); 1038 return ( 1039 StatusCode::INTERNAL_SERVER_ERROR, 1040 Json(json!({"error": "InternalError"})), 1041 ) 1042 .into_response(); 1043 } 1044 } 1045 1046 Response::builder() 1047 .status(StatusCode::OK) 1048 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 1049 .body(Body::from(buf)) 1050 .unwrap() 1051}