this repo has no description
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::sync::car::encode_car_header; 4use crate::sync::util::assert_repo_availability; 5use axum::{ 6 extract::{Query, RawQuery, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use cid::Cid; 11use ipld_core::ipld::Ipld; 12use jacquard_repo::storage::BlockStore; 13use serde::Deserialize; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 21 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 22 .into_iter() 23 .next() 24 .ok_or("Missing required parameter: did")?; 25 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 26 Ok((did, cids)) 27} 28 29pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 30 let Some(query_string) = query else { 31 return ApiError::InvalidRequest("Missing query parameters".into()).into_response(); 32 }; 33 34 let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 35 Ok(parsed) => parsed, 36 Err(msg) => return ApiError::InvalidRequest(msg).into_response(), 37 }; 38 39 let _account = match assert_repo_availability(&state.db, &did, false).await { 40 Ok(a) => a, 41 Err(e) => return e.into_response(), 42 }; 43 44 let cids: Vec<Cid> = match cid_strings 45 .iter() 46 .map(|s| Cid::from_str(s).map_err(|_| s.clone())) 47 .collect::<Result<Vec<_>, _>>() 48 { 49 Ok(cids) => cids, 50 Err(invalid) => { 51 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response(); 52 } 53 }; 54 55 if cids.is_empty() { 56 return ApiError::InvalidRequest("No CIDs provided".into()).into_response(); 57 } 58 59 let blocks = match state.block_store.get_many(&cids).await { 60 Ok(blocks) => blocks, 61 Err(e) => { 62 error!("Failed to get blocks: {}", e); 63 return ApiError::InternalError(None).into_response(); 64 } 65 }; 66 67 let missing_cids: Vec<String> = blocks 68 .iter() 69 .zip(&cids) 70 .filter(|(block_opt, _)| block_opt.is_none()) 71 .map(|(_, cid)| cid.to_string()) 72 .collect(); 73 if !missing_cids.is_empty() { 74 return ApiError::InvalidRequest(format!( 75 "Could not find blocks: {}", 76 missing_cids.join(", ") 77 )) 78 .into_response(); 79 } 80 81 let header = match crate::sync::car::encode_car_header_null_root() { 82 Ok(h) => h, 83 Err(e) => { 84 error!("Failed to encode CAR header: {}", e); 85 return ApiError::InternalError(None).into_response(); 86 } 87 }; 88 let mut car_bytes = header; 89 for (i, block_opt) in blocks.into_iter().enumerate() { 90 if let Some(block) = block_opt { 91 let cid = cids[i]; 92 let cid_bytes = cid.to_bytes(); 93 let total_len = cid_bytes.len() + block.len(); 94 let mut writer = Vec::new(); 95 crate::sync::car::write_varint(&mut writer, total_len as u64) 96 .expect("Writing to Vec<u8> should never fail"); 97 writer 98 .write_all(&cid_bytes) 99 .expect("Writing to Vec<u8> should never fail"); 100 writer 101 .write_all(&block) 102 .expect("Writing to Vec<u8> should never fail"); 103 car_bytes.extend_from_slice(&writer); 104 } 105 } 106 ( 107 StatusCode::OK, 108 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 109 car_bytes, 110 ) 111 .into_response() 112} 113 114#[derive(Deserialize)] 115pub struct GetRepoQuery { 116 pub did: String, 117 pub since: Option<String>, 118} 119 120pub async fn get_repo( 121 State(state): State<AppState>, 122 Query(query): Query<GetRepoQuery>, 123) -> Response { 124 let account = match assert_repo_availability(&state.db, &query.did, false).await { 125 Ok(a) => a, 126 Err(e) => return e.into_response(), 127 }; 128 129 let Some(head_str) = account.repo_root_cid else { 130 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 131 }; 132 133 let Ok(head_cid) = Cid::from_str(&head_str) else { 134 return ApiError::InternalError(None).into_response(); 135 }; 136 137 if let Some(since) = &query.since { 138 return get_repo_since(&state, &query.did, &head_cid, since).await; 139 } 140 141 let mut car_bytes = match encode_car_header(&head_cid) { 142 Ok(h) => h, 143 Err(e) => { 144 error!("Failed to encode CAR header: {}", e); 145 return ApiError::InternalError(None).into_response(); 146 } 147 }; 148 let mut stack = vec![head_cid]; 149 let mut visited = std::collections::HashSet::new(); 150 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 151 while let Some(cid) = stack.pop() { 152 if visited.contains(&cid) { 153 continue; 154 } 155 visited.insert(cid); 156 if remaining == 0 { 157 break; 158 } 159 remaining -= 1; 160 if let Ok(Some(block)) = state.block_store.get(&cid).await { 161 let cid_bytes = cid.to_bytes(); 162 let total_len = cid_bytes.len() + block.len(); 163 let mut writer = Vec::new(); 164 crate::sync::car::write_varint(&mut writer, total_len as u64) 165 .expect("Writing to Vec<u8> should never fail"); 166 writer 167 .write_all(&cid_bytes) 168 .expect("Writing to Vec<u8> should never fail"); 169 writer 170 .write_all(&block) 171 .expect("Writing to Vec<u8> should never fail"); 172 car_bytes.extend_from_slice(&writer); 173 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 174 extract_links_ipld(&value, &mut stack); 175 } 176 } 177 } 178 ( 179 StatusCode::OK, 180 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 181 car_bytes, 182 ) 183 .into_response() 184} 185 186async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 187 let events = sqlx::query!( 188 r#" 189 SELECT blocks_cids, commit_cid 190 FROM repo_seq 191 WHERE did = $1 AND rev > $2 192 ORDER BY seq DESC 193 "#, 194 did, 195 since 196 ) 197 .fetch_all(&state.db) 198 .await; 199 200 let events = match events { 201 Ok(e) => e, 202 Err(e) => { 203 error!("DB error in get_repo_since: {:?}", e); 204 return ApiError::InternalError(Some("Database error".into())).into_response(); 205 } 206 }; 207 208 let mut block_cids: Vec<Cid> = Vec::new(); 209 for event in &events { 210 if let Some(cids) = &event.blocks_cids { 211 for cid_str in cids { 212 if let Ok(cid) = Cid::from_str(cid_str) 213 && !block_cids.contains(&cid) 214 { 215 block_cids.push(cid); 216 } 217 } 218 } 219 if let Some(commit_cid_str) = &event.commit_cid 220 && let Ok(cid) = Cid::from_str(commit_cid_str) 221 && !block_cids.contains(&cid) 222 { 223 block_cids.push(cid); 224 } 225 } 226 227 let mut car_bytes = match encode_car_header(head_cid) { 228 Ok(h) => h, 229 Err(e) => { 230 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e))) 231 .into_response(); 232 } 233 }; 234 235 if block_cids.is_empty() { 236 return ( 237 StatusCode::OK, 238 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 239 car_bytes, 240 ) 241 .into_response(); 242 } 243 244 let blocks = match state.block_store.get_many(&block_cids).await { 245 Ok(b) => b, 246 Err(e) => { 247 error!("Block store error in get_repo_since: {:?}", e); 248 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response(); 249 } 250 }; 251 252 for (i, block_opt) in blocks.into_iter().enumerate() { 253 if let Some(block) = block_opt { 254 let cid = block_cids[i]; 255 let cid_bytes = cid.to_bytes(); 256 let total_len = cid_bytes.len() + block.len(); 257 let mut writer = Vec::new(); 258 crate::sync::car::write_varint(&mut writer, total_len as u64) 259 .expect("Writing to Vec<u8> should never fail"); 260 writer 261 .write_all(&cid_bytes) 262 .expect("Writing to Vec<u8> should never fail"); 263 writer 264 .write_all(&block) 265 .expect("Writing to Vec<u8> should never fail"); 266 car_bytes.extend_from_slice(&writer); 267 } 268 } 269 270 ( 271 StatusCode::OK, 272 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 273 car_bytes, 274 ) 275 .into_response() 276} 277 278fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 279 match value { 280 Ipld::Link(cid) => { 281 stack.push(*cid); 282 } 283 Ipld::Map(map) => { 284 for v in map.values() { 285 extract_links_ipld(v, stack); 286 } 287 } 288 Ipld::List(arr) => { 289 for v in arr { 290 extract_links_ipld(v, stack); 291 } 292 } 293 _ => {} 294 } 295} 296 297#[derive(Deserialize)] 298pub struct GetRecordQuery { 299 pub did: String, 300 pub collection: String, 301 pub rkey: String, 302} 303 304pub async fn get_record( 305 State(state): State<AppState>, 306 Query(query): Query<GetRecordQuery>, 307) -> Response { 308 use jacquard_repo::commit::Commit; 309 use jacquard_repo::mst::Mst; 310 use std::collections::BTreeMap; 311 use std::sync::Arc; 312 313 let account = match assert_repo_availability(&state.db, &query.did, false).await { 314 Ok(a) => a, 315 Err(e) => return e.into_response(), 316 }; 317 318 let commit_cid_str = match account.repo_root_cid { 319 Some(cid) => cid, 320 None => { 321 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 322 } 323 }; 324 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else { 325 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response(); 326 }; 327 let commit_bytes = match state.block_store.get(&commit_cid).await { 328 Ok(Some(b)) => b, 329 _ => { 330 return ApiError::InternalError(Some("Commit block not found".into())).into_response(); 331 } 332 }; 333 let Ok(commit) = Commit::from_cbor(&commit_bytes) else { 334 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(); 335 }; 336 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 337 let key = format!("{}/{}", query.collection, query.rkey); 338 let record_cid = match mst.get(&key).await { 339 Ok(Some(cid)) => cid, 340 Ok(None) => { 341 return ApiError::RecordNotFound.into_response(); 342 } 343 Err(_) => { 344 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response(); 345 } 346 }; 347 let record_block = match state.block_store.get(&record_cid).await { 348 Ok(Some(b)) => b, 349 _ => { 350 return ApiError::RecordNotFound.into_response(); 351 } 352 }; 353 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 354 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 355 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response(); 356 } 357 let header = match encode_car_header(&commit_cid) { 358 Ok(h) => h, 359 Err(e) => { 360 error!("Failed to encode CAR header: {}", e); 361 return ApiError::InternalError(None).into_response(); 362 } 363 }; 364 let mut car_bytes = header; 365 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 366 let cid_bytes = cid.to_bytes(); 367 let total_len = cid_bytes.len() + data.len(); 368 let mut writer = Vec::new(); 369 crate::sync::car::write_varint(&mut writer, total_len as u64) 370 .expect("Writing to Vec<u8> should never fail"); 371 writer 372 .write_all(&cid_bytes) 373 .expect("Writing to Vec<u8> should never fail"); 374 writer 375 .write_all(data) 376 .expect("Writing to Vec<u8> should never fail"); 377 car.extend_from_slice(&writer); 378 }; 379 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 380 for (cid, data) in &proof_blocks { 381 write_block(&mut car_bytes, cid, data); 382 } 383 write_block(&mut car_bytes, &record_cid, &record_block); 384 ( 385 StatusCode::OK, 386 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 387 car_bytes, 388 ) 389 .into_response() 390}