this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use crate::sync::util::assert_repo_availability; 4use axum::{ 5 Json, 6 extract::{Query, RawQuery, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use cid::Cid; 11use ipld_core::ipld::Ipld; 12use jacquard_repo::storage::BlockStore; 13use serde::Deserialize; 14use serde_json::json; 15use std::io::Write; 16use std::str::FromStr; 17use tracing::error; 18 19const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 20 21fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 22 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 23 .into_iter() 24 .next() 25 .ok_or("Missing required parameter: did")?; 26 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 27 Ok((did, cids)) 28} 29 30pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 31 let query_string = match query { 32 Some(q) => q, 33 None => { 34 return ( 35 StatusCode::BAD_REQUEST, 36 Json(json!({"error": "InvalidRequest", "message": "Missing query parameters"})), 37 ) 38 .into_response(); 39 } 40 }; 41 42 let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 43 Ok(parsed) => parsed, 44 Err(msg) => { 45 return ( 46 StatusCode::BAD_REQUEST, 47 Json(json!({"error": "InvalidRequest", "message": msg})), 48 ) 49 .into_response(); 50 } 51 }; 52 53 let _account = match assert_repo_availability(&state.db, &did, false).await { 54 Ok(a) => a, 55 Err(e) => return e.into_response(), 56 }; 57 58 let mut cids = Vec::new(); 59 for s in &cid_strings { 60 match Cid::from_str(s) { 61 Ok(cid) => cids.push(cid), 62 Err(_) => return ( 63 StatusCode::BAD_REQUEST, 64 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", s)})), 65 ) 66 .into_response(), 67 } 68 } 69 70 if cids.is_empty() { 71 return ( 72 StatusCode::BAD_REQUEST, 73 Json(json!({"error": "InvalidRequest", "message": "No CIDs provided"})), 74 ) 75 .into_response(); 76 } 77 78 let blocks_res = state.block_store.get_many(&cids).await; 79 let blocks = match blocks_res { 80 Ok(blocks) => blocks, 81 Err(e) => { 82 error!("Failed to get blocks: {}", e); 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 86 ) 87 .into_response(); 88 } 89 }; 90 91 let mut missing_cids: Vec<String> = Vec::new(); 92 for (i, block_opt) in blocks.iter().enumerate() { 93 if block_opt.is_none() { 94 missing_cids.push(cids[i].to_string()); 95 } 96 } 97 if !missing_cids.is_empty() { 98 return ( 99 StatusCode::BAD_REQUEST, 100 Json(json!({ 101 "error": "InvalidRequest", 102 "message": format!("Could not find blocks: {}", missing_cids.join(", ")) 103 })), 104 ) 105 .into_response(); 106 } 107 108 let header = match crate::sync::car::encode_car_header_null_root() { 109 Ok(h) => h, 110 Err(e) => { 111 error!("Failed to encode CAR header: {}", e); 112 return ( 113 StatusCode::INTERNAL_SERVER_ERROR, 114 Json(json!({"error": "InternalError", "message": "Failed to encode CAR"})), 115 ) 116 .into_response(); 117 } 118 }; 119 let mut car_bytes = header; 120 for (i, block_opt) in blocks.into_iter().enumerate() { 121 if let Some(block) = block_opt { 122 let cid = cids[i]; 123 let cid_bytes = cid.to_bytes(); 124 let total_len = cid_bytes.len() + block.len(); 125 let mut writer = Vec::new(); 126 crate::sync::car::write_varint(&mut writer, total_len as u64) 127 .expect("Writing to Vec<u8> should never fail"); 128 writer 129 .write_all(&cid_bytes) 130 .expect("Writing to Vec<u8> should never fail"); 131 writer 132 .write_all(&block) 133 .expect("Writing to Vec<u8> should never fail"); 134 car_bytes.extend_from_slice(&writer); 135 } 136 } 137 ( 138 StatusCode::OK, 139 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 140 car_bytes, 141 ) 142 .into_response() 143} 144 145#[derive(Deserialize)] 146pub struct GetRepoQuery { 147 pub did: String, 148 pub since: Option<String>, 149} 150 151pub async fn get_repo( 152 State(state): State<AppState>, 153 Query(query): Query<GetRepoQuery>, 154) -> Response { 155 let account = match assert_repo_availability(&state.db, &query.did, false).await { 156 Ok(a) => a, 157 Err(e) => return e.into_response(), 158 }; 159 160 let head_str = match account.repo_root_cid { 161 Some(cid) => cid, 162 None => { 163 return ( 164 StatusCode::BAD_REQUEST, 165 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 166 ) 167 .into_response(); 168 } 169 }; 170 171 let head_cid = match Cid::from_str(&head_str) { 172 Ok(c) => c, 173 Err(_) => { 174 return ( 175 StatusCode::INTERNAL_SERVER_ERROR, 176 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 177 ) 178 .into_response(); 179 } 180 }; 181 182 if let Some(since) = &query.since { 183 return get_repo_since(&state, &query.did, &head_cid, since).await; 184 } 185 186 let mut car_bytes = match encode_car_header(&head_cid) { 187 Ok(h) => h, 188 Err(e) => { 189 return ( 190 StatusCode::INTERNAL_SERVER_ERROR, 191 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 192 ) 193 .into_response(); 194 } 195 }; 196 let mut stack = vec![head_cid]; 197 let mut visited = std::collections::HashSet::new(); 198 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 199 while let Some(cid) = stack.pop() { 200 if visited.contains(&cid) { 201 continue; 202 } 203 visited.insert(cid); 204 if remaining == 0 { 205 break; 206 } 207 remaining -= 1; 208 if let Ok(Some(block)) = state.block_store.get(&cid).await { 209 let cid_bytes = cid.to_bytes(); 210 let total_len = cid_bytes.len() + block.len(); 211 let mut writer = Vec::new(); 212 crate::sync::car::write_varint(&mut writer, total_len as u64) 213 .expect("Writing to Vec<u8> should never fail"); 214 writer 215 .write_all(&cid_bytes) 216 .expect("Writing to Vec<u8> should never fail"); 217 writer 218 .write_all(&block) 219 .expect("Writing to Vec<u8> should never fail"); 220 car_bytes.extend_from_slice(&writer); 221 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 222 extract_links_ipld(&value, &mut stack); 223 } 224 } 225 } 226 ( 227 StatusCode::OK, 228 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 229 car_bytes, 230 ) 231 .into_response() 232} 233 234async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 235 let events = sqlx::query!( 236 r#" 237 SELECT blocks_cids, commit_cid 238 FROM repo_seq 239 WHERE did = $1 AND rev > $2 240 ORDER BY seq DESC 241 "#, 242 did, 243 since 244 ) 245 .fetch_all(&state.db) 246 .await; 247 248 let events = match events { 249 Ok(e) => e, 250 Err(e) => { 251 error!("DB error in get_repo_since: {:?}", e); 252 return ( 253 StatusCode::INTERNAL_SERVER_ERROR, 254 Json(json!({"error": "InternalError", "message": "Database error"})), 255 ) 256 .into_response(); 257 } 258 }; 259 260 let mut block_cids: Vec<Cid> = Vec::new(); 261 for event in &events { 262 if let Some(cids) = &event.blocks_cids { 263 for cid_str in cids { 264 if let Ok(cid) = Cid::from_str(cid_str) 265 && !block_cids.contains(&cid) 266 { 267 block_cids.push(cid); 268 } 269 } 270 } 271 if let Some(commit_cid_str) = &event.commit_cid 272 && let Ok(cid) = Cid::from_str(commit_cid_str) 273 && !block_cids.contains(&cid) 274 { 275 block_cids.push(cid); 276 } 277 } 278 279 let mut car_bytes = match encode_car_header(head_cid) { 280 Ok(h) => h, 281 Err(e) => { 282 return ( 283 StatusCode::INTERNAL_SERVER_ERROR, 284 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 285 ) 286 .into_response(); 287 } 288 }; 289 290 if block_cids.is_empty() { 291 return ( 292 StatusCode::OK, 293 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 294 car_bytes, 295 ) 296 .into_response(); 297 } 298 299 let blocks = match state.block_store.get_many(&block_cids).await { 300 Ok(b) => b, 301 Err(e) => { 302 error!("Block store error in get_repo_since: {:?}", e); 303 return ( 304 StatusCode::INTERNAL_SERVER_ERROR, 305 Json(json!({"error": "InternalError", "message": "Failed to get blocks"})), 306 ) 307 .into_response(); 308 } 309 }; 310 311 for (i, block_opt) in blocks.into_iter().enumerate() { 312 if let Some(block) = block_opt { 313 let cid = block_cids[i]; 314 let cid_bytes = cid.to_bytes(); 315 let total_len = cid_bytes.len() + block.len(); 316 let mut writer = Vec::new(); 317 crate::sync::car::write_varint(&mut writer, total_len as u64) 318 .expect("Writing to Vec<u8> should never fail"); 319 writer 320 .write_all(&cid_bytes) 321 .expect("Writing to Vec<u8> should never fail"); 322 writer 323 .write_all(&block) 324 .expect("Writing to Vec<u8> should never fail"); 325 car_bytes.extend_from_slice(&writer); 326 } 327 } 328 329 ( 330 StatusCode::OK, 331 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 332 car_bytes, 333 ) 334 .into_response() 335} 336 337fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 338 match value { 339 Ipld::Link(cid) => { 340 stack.push(*cid); 341 } 342 Ipld::Map(map) => { 343 for v in map.values() { 344 extract_links_ipld(v, stack); 345 } 346 } 347 Ipld::List(arr) => { 348 for v in arr { 349 extract_links_ipld(v, stack); 350 } 351 } 352 _ => {} 353 } 354} 355 356#[derive(Deserialize)] 357pub struct GetRecordQuery { 358 pub did: String, 359 pub collection: String, 360 pub rkey: String, 361} 362 363pub async fn get_record( 364 State(state): State<AppState>, 365 Query(query): Query<GetRecordQuery>, 366) -> Response { 367 use jacquard_repo::commit::Commit; 368 use jacquard_repo::mst::Mst; 369 use std::collections::BTreeMap; 370 use std::sync::Arc; 371 372 let account = match assert_repo_availability(&state.db, &query.did, false).await { 373 Ok(a) => a, 374 Err(e) => return e.into_response(), 375 }; 376 377 let commit_cid_str = match account.repo_root_cid { 378 Some(cid) => cid, 379 None => { 380 return ( 381 StatusCode::BAD_REQUEST, 382 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 383 ) 384 .into_response(); 385 } 386 }; 387 let commit_cid = match Cid::from_str(&commit_cid_str) { 388 Ok(c) => c, 389 Err(_) => { 390 return ( 391 StatusCode::INTERNAL_SERVER_ERROR, 392 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})), 393 ) 394 .into_response(); 395 } 396 }; 397 let commit_bytes = match state.block_store.get(&commit_cid).await { 398 Ok(Some(b)) => b, 399 _ => { 400 return ( 401 StatusCode::INTERNAL_SERVER_ERROR, 402 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 403 ) 404 .into_response(); 405 } 406 }; 407 let commit = match Commit::from_cbor(&commit_bytes) { 408 Ok(c) => c, 409 Err(_) => { 410 return ( 411 StatusCode::INTERNAL_SERVER_ERROR, 412 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 413 ) 414 .into_response(); 415 } 416 }; 417 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 418 let key = format!("{}/{}", query.collection, query.rkey); 419 let record_cid = match mst.get(&key).await { 420 Ok(Some(cid)) => cid, 421 Ok(None) => { 422 return ( 423 StatusCode::NOT_FOUND, 424 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 425 ) 426 .into_response(); 427 } 428 Err(_) => { 429 return ( 430 StatusCode::INTERNAL_SERVER_ERROR, 431 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})), 432 ) 433 .into_response(); 434 } 435 }; 436 let record_block = match state.block_store.get(&record_cid).await { 437 Ok(Some(b)) => b, 438 _ => { 439 return ( 440 StatusCode::NOT_FOUND, 441 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 442 ) 443 .into_response(); 444 } 445 }; 446 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 447 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 448 return ( 449 StatusCode::INTERNAL_SERVER_ERROR, 450 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})), 451 ) 452 .into_response(); 453 } 454 let header = match encode_car_header(&commit_cid) { 455 Ok(h) => h, 456 Err(e) => { 457 error!("Failed to encode CAR header: {}", e); 458 return ( 459 StatusCode::INTERNAL_SERVER_ERROR, 460 Json(json!({"error": "InternalError"})), 461 ) 462 .into_response(); 463 } 464 }; 465 let mut car_bytes = header; 466 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 467 let cid_bytes = cid.to_bytes(); 468 let total_len = cid_bytes.len() + data.len(); 469 let mut writer = Vec::new(); 470 crate::sync::car::write_varint(&mut writer, total_len as u64) 471 .expect("Writing to Vec<u8> should never fail"); 472 writer 473 .write_all(&cid_bytes) 474 .expect("Writing to Vec<u8> should never fail"); 475 writer 476 .write_all(data) 477 .expect("Writing to Vec<u8> should never fail"); 478 car.extend_from_slice(&writer); 479 }; 480 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 481 for (cid, data) in &proof_blocks { 482 write_block(&mut car_bytes, cid, data); 483 } 484 write_block(&mut car_bytes, &record_cid, &record_block); 485 ( 486 StatusCode::OK, 487 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 488 car_bytes, 489 ) 490 .into_response() 491}