this repo has no description
1use crate::state::AppState; 2use crate::sync::car::{encode_car_header, ld_write}; 3use axum::{ 4 Json, 5 body::Body, 6 extract::{Query, State}, 7 http::StatusCode, 8 http::header, 9 response::{IntoResponse, Response}, 10}; 11use bytes::Bytes; 12use cid::Cid; 13use jacquard_repo::{commit::Commit, storage::BlockStore}; 14use serde::Deserialize; 15use serde_json::json; 16use std::collections::HashSet; 17use tracing::error; 18 19#[derive(Deserialize)] 20pub struct GetBlocksParams { 21 pub did: String, 22 pub cids: String, 23} 24 25pub async fn get_blocks( 26 State(state): State<AppState>, 27 Query(params): Query<GetBlocksParams>, 28) -> Response { 29 let did = params.did.trim(); 30 31 if did.is_empty() { 32 return ( 33 StatusCode::BAD_REQUEST, 34 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 35 ) 36 .into_response(); 37 } 38 39 let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect(); 40 41 if cid_strings.is_empty() { 42 return ( 43 StatusCode::BAD_REQUEST, 44 Json(json!({"error": "InvalidRequest", "message": "cids is required"})), 45 ) 46 .into_response(); 47 } 48 49 let repo_result = sqlx::query!( 50 r#" 51 SELECT r.repo_root_cid 52 FROM repos r 53 JOIN users u ON r.user_id = u.id 54 WHERE u.did = $1 55 "#, 56 did 57 ) 58 .fetch_optional(&state.db) 59 .await; 60 61 let repo_root_cid_str = match repo_result { 62 Ok(Some(row)) => row.repo_root_cid, 63 Ok(None) => { 64 return ( 65 StatusCode::NOT_FOUND, 66 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 67 ) 68 .into_response(); 69 } 70 Err(e) => { 71 error!("DB error in get_blocks: {:?}", e); 72 return ( 73 StatusCode::INTERNAL_SERVER_ERROR, 74 Json(json!({"error": "InternalError"})), 75 ) 76 .into_response(); 77 } 78 }; 79 80 let root_cid = match repo_root_cid_str.parse::<Cid>() { 81 Ok(c) => c, 82 Err(e) => { 83 error!("Failed to parse root CID: {:?}", e); 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError"})), 87 ) 88 .into_response(); 89 } 90 }; 91 92 let mut requested_cids: Vec<Cid> = Vec::new(); 93 for cid_str in &cid_strings { 94 match cid_str.parse::<Cid>() { 95 Ok(c) => requested_cids.push(c), 96 Err(e) => { 97 error!("Failed to parse CID '{}': {:?}", cid_str, e); 98 return ( 99 StatusCode::BAD_REQUEST, 100 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})), 101 ) 102 .into_response(); 103 } 104 } 105 } 106 107 let mut buf = Vec::new(); 108 let car_header = encode_car_header(&root_cid); 109 if let Err(e) = ld_write(&mut buf, &car_header) { 110 error!("Failed to write CAR header: {:?}", e); 111 return ( 112 StatusCode::INTERNAL_SERVER_ERROR, 113 Json(json!({"error": "InternalError"})), 114 ) 115 .into_response(); 116 } 117 118 for cid in &requested_cids { 119 let cid_bytes = cid.to_bytes(); 120 let block_result = sqlx::query!( 121 "SELECT data FROM blocks WHERE cid = $1", 122 &cid_bytes 123 ) 124 .fetch_optional(&state.db) 125 .await; 126 127 match block_result { 128 Ok(Some(row)) => { 129 let mut block_data = Vec::new(); 130 block_data.extend_from_slice(&cid_bytes); 131 block_data.extend_from_slice(&row.data); 132 if let Err(e) = ld_write(&mut buf, &block_data) { 133 error!("Failed to write block: {:?}", e); 134 return ( 135 StatusCode::INTERNAL_SERVER_ERROR, 136 Json(json!({"error": "InternalError"})), 137 ) 138 .into_response(); 139 } 140 } 141 Ok(None) => { 142 return ( 143 StatusCode::NOT_FOUND, 144 Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})), 145 ) 146 .into_response(); 147 } 148 Err(e) => { 149 error!("DB error fetching block: {:?}", e); 150 return ( 151 StatusCode::INTERNAL_SERVER_ERROR, 152 Json(json!({"error": "InternalError"})), 153 ) 154 .into_response(); 155 } 156 } 157 } 158 159 Response::builder() 160 .status(StatusCode::OK) 161 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 162 .body(Body::from(buf)) 163 .unwrap() 164} 165 166#[derive(Deserialize)] 167pub struct GetRepoParams { 168 pub did: String, 169 pub since: Option<String>, 170} 171 172pub async fn get_repo( 173 State(state): State<AppState>, 174 Query(params): Query<GetRepoParams>, 175) -> Response { 176 let did = params.did.trim(); 177 178 if did.is_empty() { 179 return ( 180 StatusCode::BAD_REQUEST, 181 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 182 ) 183 .into_response(); 184 } 185 186 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 187 .fetch_optional(&state.db) 188 .await; 189 190 let user_id = match user_result { 191 Ok(Some(row)) => row.id, 192 Ok(None) => { 193 return ( 194 StatusCode::NOT_FOUND, 195 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 196 ) 197 .into_response(); 198 } 199 Err(e) => { 200 error!("DB error in get_repo: {:?}", e); 201 return ( 202 StatusCode::INTERNAL_SERVER_ERROR, 203 Json(json!({"error": "InternalError"})), 204 ) 205 .into_response(); 206 } 207 }; 208 209 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 210 .fetch_optional(&state.db) 211 .await; 212 213 let repo_root_cid_str = match repo_result { 214 Ok(Some(row)) => row.repo_root_cid, 215 Ok(None) => { 216 return ( 217 StatusCode::NOT_FOUND, 218 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 219 ) 220 .into_response(); 221 } 222 Err(e) => { 223 error!("DB error in get_repo: {:?}", e); 224 return ( 225 StatusCode::INTERNAL_SERVER_ERROR, 226 Json(json!({"error": "InternalError"})), 227 ) 228 .into_response(); 229 } 230 }; 231 232 let root_cid = match repo_root_cid_str.parse::<Cid>() { 233 Ok(c) => c, 234 Err(e) => { 235 error!("Failed to parse root CID: {:?}", e); 236 return ( 237 StatusCode::INTERNAL_SERVER_ERROR, 238 Json(json!({"error": "InternalError"})), 239 ) 240 .into_response(); 241 } 242 }; 243 244 let commit_bytes = match state.block_store.get(&root_cid).await { 245 Ok(Some(b)) => b, 246 Ok(None) => { 247 error!("Commit block not found: {}", root_cid); 248 return ( 249 StatusCode::INTERNAL_SERVER_ERROR, 250 Json(json!({"error": "InternalError"})), 251 ) 252 .into_response(); 253 } 254 Err(e) => { 255 error!("Failed to load commit block: {:?}", e); 256 return ( 257 StatusCode::INTERNAL_SERVER_ERROR, 258 Json(json!({"error": "InternalError"})), 259 ) 260 .into_response(); 261 } 262 }; 263 264 let commit = match Commit::from_cbor(&commit_bytes) { 265 Ok(c) => c, 266 Err(e) => { 267 error!("Failed to parse commit: {:?}", e); 268 return ( 269 StatusCode::INTERNAL_SERVER_ERROR, 270 Json(json!({"error": "InternalError"})), 271 ) 272 .into_response(); 273 } 274 }; 275 276 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 277 let mut visited: HashSet<Vec<u8>> = HashSet::new(); 278 279 collected_blocks.push((root_cid, commit_bytes.clone())); 280 visited.insert(root_cid.to_bytes()); 281 282 let mst_root_cid = commit.data; 283 if !visited.contains(&mst_root_cid.to_bytes()) { 284 visited.insert(mst_root_cid.to_bytes()); 285 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 286 collected_blocks.push((mst_root_cid, data)); 287 } 288 } 289 290 let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id) 291 .fetch_all(&state.db) 292 .await 293 .unwrap_or_default(); 294 295 for record in records { 296 if let Ok(cid) = record.record_cid.parse::<Cid>() { 297 if !visited.contains(&cid.to_bytes()) { 298 visited.insert(cid.to_bytes()); 299 if let Ok(Some(data)) = state.block_store.get(&cid).await { 300 collected_blocks.push((cid, data)); 301 } 302 } 303 } 304 } 305 306 let mut buf = Vec::new(); 307 let car_header = encode_car_header(&root_cid); 308 if let Err(e) = ld_write(&mut buf, &car_header) { 309 error!("Failed to write CAR header: {:?}", e); 310 return ( 311 StatusCode::INTERNAL_SERVER_ERROR, 312 Json(json!({"error": "InternalError"})), 313 ) 314 .into_response(); 315 } 316 317 for (cid, data) in &collected_blocks { 318 let mut block_data = Vec::new(); 319 block_data.extend_from_slice(&cid.to_bytes()); 320 block_data.extend_from_slice(data); 321 if let Err(e) = ld_write(&mut buf, &block_data) { 322 error!("Failed to write block: {:?}", e); 323 return ( 324 StatusCode::INTERNAL_SERVER_ERROR, 325 Json(json!({"error": "InternalError"})), 326 ) 327 .into_response(); 328 } 329 } 330 331 Response::builder() 332 .status(StatusCode::OK) 333 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 334 .body(Body::from(buf)) 335 .unwrap() 336} 337 338#[derive(Deserialize)] 339pub struct GetRecordParams { 340 pub did: String, 341 pub collection: String, 342 pub rkey: String, 343} 344 345pub async fn get_record( 346 State(state): State<AppState>, 347 Query(params): Query<GetRecordParams>, 348) -> Response { 349 let did = params.did.trim(); 350 let collection = params.collection.trim(); 351 let rkey = params.rkey.trim(); 352 353 if did.is_empty() { 354 return ( 355 StatusCode::BAD_REQUEST, 356 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 357 ) 358 .into_response(); 359 } 360 361 if collection.is_empty() { 362 return ( 363 StatusCode::BAD_REQUEST, 364 Json(json!({"error": "InvalidRequest", "message": "collection is required"})), 365 ) 366 .into_response(); 367 } 368 369 if rkey.is_empty() { 370 return ( 371 StatusCode::BAD_REQUEST, 372 Json(json!({"error": "InvalidRequest", "message": "rkey is required"})), 373 ) 374 .into_response(); 375 } 376 377 let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) 378 .fetch_optional(&state.db) 379 .await; 380 381 let user_id = match user_result { 382 Ok(Some(row)) => row.id, 383 Ok(None) => { 384 return ( 385 StatusCode::NOT_FOUND, 386 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 387 ) 388 .into_response(); 389 } 390 Err(e) => { 391 error!("DB error in sync get_record: {:?}", e); 392 return ( 393 StatusCode::INTERNAL_SERVER_ERROR, 394 Json(json!({"error": "InternalError"})), 395 ) 396 .into_response(); 397 } 398 }; 399 400 let record_result = sqlx::query!( 401 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 402 user_id, 403 collection, 404 rkey 405 ) 406 .fetch_optional(&state.db) 407 .await; 408 409 let record_cid_str = match record_result { 410 Ok(Some(row)) => row.record_cid, 411 Ok(None) => { 412 return ( 413 StatusCode::NOT_FOUND, 414 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 415 ) 416 .into_response(); 417 } 418 Err(e) => { 419 error!("DB error in sync get_record: {:?}", e); 420 return ( 421 StatusCode::INTERNAL_SERVER_ERROR, 422 Json(json!({"error": "InternalError"})), 423 ) 424 .into_response(); 425 } 426 }; 427 428 let record_cid = match record_cid_str.parse::<Cid>() { 429 Ok(c) => c, 430 Err(e) => { 431 error!("Failed to parse record CID: {:?}", e); 432 return ( 433 StatusCode::INTERNAL_SERVER_ERROR, 434 Json(json!({"error": "InternalError"})), 435 ) 436 .into_response(); 437 } 438 }; 439 440 let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) 441 .fetch_optional(&state.db) 442 .await; 443 444 let repo_root_cid_str = match repo_result { 445 Ok(Some(row)) => row.repo_root_cid, 446 Ok(None) => { 447 return ( 448 StatusCode::NOT_FOUND, 449 Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), 450 ) 451 .into_response(); 452 } 453 Err(e) => { 454 error!("DB error in sync get_record: {:?}", e); 455 return ( 456 StatusCode::INTERNAL_SERVER_ERROR, 457 Json(json!({"error": "InternalError"})), 458 ) 459 .into_response(); 460 } 461 }; 462 463 let root_cid = match repo_root_cid_str.parse::<Cid>() { 464 Ok(c) => c, 465 Err(e) => { 466 error!("Failed to parse root CID: {:?}", e); 467 return ( 468 StatusCode::INTERNAL_SERVER_ERROR, 469 Json(json!({"error": "InternalError"})), 470 ) 471 .into_response(); 472 } 473 }; 474 475 let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); 476 477 let commit_bytes = match state.block_store.get(&root_cid).await { 478 Ok(Some(b)) => b, 479 Ok(None) => { 480 error!("Commit block not found: {}", root_cid); 481 return ( 482 StatusCode::INTERNAL_SERVER_ERROR, 483 Json(json!({"error": "InternalError"})), 484 ) 485 .into_response(); 486 } 487 Err(e) => { 488 error!("Failed to load commit block: {:?}", e); 489 return ( 490 StatusCode::INTERNAL_SERVER_ERROR, 491 Json(json!({"error": "InternalError"})), 492 ) 493 .into_response(); 494 } 495 }; 496 497 collected_blocks.push((root_cid, commit_bytes.clone())); 498 499 let commit = match Commit::from_cbor(&commit_bytes) { 500 Ok(c) => c, 501 Err(e) => { 502 error!("Failed to parse commit: {:?}", e); 503 return ( 504 StatusCode::INTERNAL_SERVER_ERROR, 505 Json(json!({"error": "InternalError"})), 506 ) 507 .into_response(); 508 } 509 }; 510 511 let mst_root_cid = commit.data; 512 if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { 513 collected_blocks.push((mst_root_cid, data)); 514 } 515 516 if let Ok(Some(data)) = state.block_store.get(&record_cid).await { 517 collected_blocks.push((record_cid, data)); 518 } else { 519 return ( 520 StatusCode::NOT_FOUND, 521 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 522 ) 523 .into_response(); 524 } 525 526 let mut buf = Vec::new(); 527 let car_header = encode_car_header(&root_cid); 528 if let Err(e) = ld_write(&mut buf, &car_header) { 529 error!("Failed to write CAR header: {:?}", e); 530 return ( 531 StatusCode::INTERNAL_SERVER_ERROR, 532 Json(json!({"error": "InternalError"})), 533 ) 534 .into_response(); 535 } 536 537 for (cid, data) in &collected_blocks { 538 let mut block_data = Vec::new(); 539 block_data.extend_from_slice(&cid.to_bytes()); 540 block_data.extend_from_slice(data); 541 if let Err(e) = ld_write(&mut buf, &block_data) { 542 error!("Failed to write block: {:?}", e); 543 return ( 544 StatusCode::INTERNAL_SERVER_ERROR, 545 Json(json!({"error": "InternalError"})), 546 ) 547 .into_response(); 548 } 549 } 550 551 Response::builder() 552 .status(StatusCode::OK) 553 .header(header::CONTENT_TYPE, "application/vnd.ipld.car") 554 .body(Body::from(buf)) 555 .unwrap() 556}