this repo has no description
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::sync::car::encode_car_header; 4use crate::sync::util::assert_repo_availability; 5use axum::{ 6 extract::{Query, RawQuery, State}, 7 http::StatusCode, 8 response::{IntoResponse, Response}, 9}; 10use cid::Cid; 11use ipld_core::ipld::Ipld; 12use jacquard_repo::storage::BlockStore; 13use serde::Deserialize; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 21 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 22 .into_iter() 23 .next() 24 .ok_or("Missing required parameter: did")?; 25 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 26 Ok((did, cids)) 27} 28 29pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 30 let Some(query_string) = query else { 31 return ApiError::InvalidRequest("Missing query parameters".into()).into_response(); 32 }; 33 34 let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 35 Ok(parsed) => parsed, 36 Err(msg) => return ApiError::InvalidRequest(msg).into_response(), 37 }; 38 39 let _account = match assert_repo_availability(&state.db, &did, false).await { 40 Ok(a) => a, 41 Err(e) => return e.into_response(), 42 }; 43 44 let cids: Vec<Cid> = match cid_strings 45 .iter() 46 .map(|s| Cid::from_str(s).map_err(|_| s.clone())) 47 .collect::<Result<Vec<_>, _>>() 48 { 49 Ok(cids) => cids, 50 Err(invalid) => { 51 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response() 52 } 53 }; 54 55 if cids.is_empty() { 56 return ApiError::InvalidRequest("No CIDs provided".into()).into_response(); 57 } 58 59 let blocks = match state.block_store.get_many(&cids).await { 60 Ok(blocks) => blocks, 61 Err(e) => { 62 error!("Failed to get blocks: {}", e); 63 return ApiError::InternalError(None).into_response(); 64 } 65 }; 66 67 let missing_cids: Vec<String> = blocks 68 .iter() 69 .zip(&cids) 70 .filter_map(|(block_opt, cid)| block_opt.is_none().then(|| cid.to_string())) 71 .collect(); 72 if !missing_cids.is_empty() { 73 return ApiError::InvalidRequest(format!( 74 "Could not find blocks: {}", 75 missing_cids.join(", ") 76 )) 77 .into_response(); 78 } 79 80 let header = match crate::sync::car::encode_car_header_null_root() { 81 Ok(h) => h, 82 Err(e) => { 83 error!("Failed to encode CAR header: {}", e); 84 return ApiError::InternalError(None).into_response(); 85 } 86 }; 87 let mut car_bytes = header; 88 for (i, block_opt) in blocks.into_iter().enumerate() { 89 if let Some(block) = block_opt { 90 let cid = cids[i]; 91 let cid_bytes = cid.to_bytes(); 92 let total_len = cid_bytes.len() + block.len(); 93 let mut writer = Vec::new(); 94 crate::sync::car::write_varint(&mut writer, total_len as u64) 95 .expect("Writing to Vec<u8> should never fail"); 96 writer 97 .write_all(&cid_bytes) 98 .expect("Writing to Vec<u8> should never fail"); 99 writer 100 .write_all(&block) 101 .expect("Writing to Vec<u8> should never fail"); 102 car_bytes.extend_from_slice(&writer); 103 } 104 } 105 ( 106 StatusCode::OK, 107 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 108 car_bytes, 109 ) 110 .into_response() 111} 112 113#[derive(Deserialize)] 114pub struct GetRepoQuery { 115 pub did: String, 116 pub since: Option<String>, 117} 118 119pub async fn get_repo( 120 State(state): State<AppState>, 121 Query(query): Query<GetRepoQuery>, 122) -> Response { 123 let account = match assert_repo_availability(&state.db, &query.did, false).await { 124 Ok(a) => a, 125 Err(e) => return e.into_response(), 126 }; 127 128 let Some(head_str) = account.repo_root_cid else { 129 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 130 }; 131 132 let Ok(head_cid) = Cid::from_str(&head_str) else { 133 return ApiError::InternalError(None).into_response(); 134 }; 135 136 if let Some(since) = &query.since { 137 return get_repo_since(&state, &query.did, &head_cid, since).await; 138 } 139 140 let mut car_bytes = match encode_car_header(&head_cid) { 141 Ok(h) => h, 142 Err(e) => { 143 error!("Failed to encode CAR header: {}", e); 144 return ApiError::InternalError(None).into_response(); 145 } 146 }; 147 let mut stack = vec![head_cid]; 148 let mut visited = std::collections::HashSet::new(); 149 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 150 while let Some(cid) = stack.pop() { 151 if visited.contains(&cid) { 152 continue; 153 } 154 visited.insert(cid); 155 if remaining == 0 { 156 break; 157 } 158 remaining -= 1; 159 if let Ok(Some(block)) = state.block_store.get(&cid).await { 160 let cid_bytes = cid.to_bytes(); 161 let total_len = cid_bytes.len() + block.len(); 162 let mut writer = Vec::new(); 163 crate::sync::car::write_varint(&mut writer, total_len as u64) 164 .expect("Writing to Vec<u8> should never fail"); 165 writer 166 .write_all(&cid_bytes) 167 .expect("Writing to Vec<u8> should never fail"); 168 writer 169 .write_all(&block) 170 .expect("Writing to Vec<u8> should never fail"); 171 car_bytes.extend_from_slice(&writer); 172 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 173 extract_links_ipld(&value, &mut stack); 174 } 175 } 176 } 177 ( 178 StatusCode::OK, 179 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 180 car_bytes, 181 ) 182 .into_response() 183} 184 185async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 186 let events = sqlx::query!( 187 r#" 188 SELECT blocks_cids, commit_cid 189 FROM repo_seq 190 WHERE did = $1 AND rev > $2 191 ORDER BY seq DESC 192 "#, 193 did, 194 since 195 ) 196 .fetch_all(&state.db) 197 .await; 198 199 let events = match events { 200 Ok(e) => e, 201 Err(e) => { 202 error!("DB error in get_repo_since: {:?}", e); 203 return ApiError::InternalError(Some("Database error".into())).into_response(); 204 } 205 }; 206 207 let mut block_cids: Vec<Cid> = Vec::new(); 208 for event in &events { 209 if let Some(cids) = &event.blocks_cids { 210 for cid_str in cids { 211 if let Ok(cid) = Cid::from_str(cid_str) 212 && !block_cids.contains(&cid) 213 { 214 block_cids.push(cid); 215 } 216 } 217 } 218 if let Some(commit_cid_str) = &event.commit_cid 219 && let Ok(cid) = Cid::from_str(commit_cid_str) 220 && !block_cids.contains(&cid) 221 { 222 block_cids.push(cid); 223 } 224 } 225 226 let mut car_bytes = match encode_car_header(head_cid) { 227 Ok(h) => h, 228 Err(e) => { 229 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e))) 230 .into_response(); 231 } 232 }; 233 234 if block_cids.is_empty() { 235 return ( 236 StatusCode::OK, 237 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 238 car_bytes, 239 ) 240 .into_response(); 241 } 242 243 let blocks = match state.block_store.get_many(&block_cids).await { 244 Ok(b) => b, 245 Err(e) => { 246 error!("Block store error in get_repo_since: {:?}", e); 247 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response(); 248 } 249 }; 250 251 for (i, block_opt) in blocks.into_iter().enumerate() { 252 if let Some(block) = block_opt { 253 let cid = block_cids[i]; 254 let cid_bytes = cid.to_bytes(); 255 let total_len = cid_bytes.len() + block.len(); 256 let mut writer = Vec::new(); 257 crate::sync::car::write_varint(&mut writer, total_len as u64) 258 .expect("Writing to Vec<u8> should never fail"); 259 writer 260 .write_all(&cid_bytes) 261 .expect("Writing to Vec<u8> should never fail"); 262 writer 263 .write_all(&block) 264 .expect("Writing to Vec<u8> should never fail"); 265 car_bytes.extend_from_slice(&writer); 266 } 267 } 268 269 ( 270 StatusCode::OK, 271 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 272 car_bytes, 273 ) 274 .into_response() 275} 276 277fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 278 match value { 279 Ipld::Link(cid) => { 280 stack.push(*cid); 281 } 282 Ipld::Map(map) => { 283 for v in map.values() { 284 extract_links_ipld(v, stack); 285 } 286 } 287 Ipld::List(arr) => { 288 for v in arr { 289 extract_links_ipld(v, stack); 290 } 291 } 292 _ => {} 293 } 294} 295 296#[derive(Deserialize)] 297pub struct GetRecordQuery { 298 pub did: String, 299 pub collection: String, 300 pub rkey: String, 301} 302 303pub async fn get_record( 304 State(state): State<AppState>, 305 Query(query): Query<GetRecordQuery>, 306) -> Response { 307 use jacquard_repo::commit::Commit; 308 use jacquard_repo::mst::Mst; 309 use std::collections::BTreeMap; 310 use std::sync::Arc; 311 312 let account = match assert_repo_availability(&state.db, &query.did, false).await { 313 Ok(a) => a, 314 Err(e) => return e.into_response(), 315 }; 316 317 let commit_cid_str = match account.repo_root_cid { 318 Some(cid) => cid, 319 None => { 320 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 321 } 322 }; 323 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else { 324 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response(); 325 }; 326 let commit_bytes = match state.block_store.get(&commit_cid).await { 327 Ok(Some(b)) => b, 328 _ => { 329 return ApiError::InternalError(Some("Commit block not found".into())).into_response(); 330 } 331 }; 332 let Ok(commit) = Commit::from_cbor(&commit_bytes) else { 333 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(); 334 }; 335 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 336 let key = format!("{}/{}", query.collection, query.rkey); 337 let record_cid = match mst.get(&key).await { 338 Ok(Some(cid)) => cid, 339 Ok(None) => { 340 return ApiError::RecordNotFound.into_response(); 341 } 342 Err(_) => { 343 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response(); 344 } 345 }; 346 let record_block = match state.block_store.get(&record_cid).await { 347 Ok(Some(b)) => b, 348 _ => { 349 return ApiError::RecordNotFound.into_response(); 350 } 351 }; 352 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 353 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 354 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response(); 355 } 356 let header = match encode_car_header(&commit_cid) { 357 Ok(h) => h, 358 Err(e) => { 359 error!("Failed to encode CAR header: {}", e); 360 return ApiError::InternalError(None).into_response(); 361 } 362 }; 363 let mut car_bytes = header; 364 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 365 let cid_bytes = cid.to_bytes(); 366 let total_len = cid_bytes.len() + data.len(); 367 let mut writer = Vec::new(); 368 crate::sync::car::write_varint(&mut writer, total_len as u64) 369 .expect("Writing to Vec<u8> should never fail"); 370 writer 371 .write_all(&cid_bytes) 372 .expect("Writing to Vec<u8> should never fail"); 373 writer 374 .write_all(data) 375 .expect("Writing to Vec<u8> should never fail"); 376 car.extend_from_slice(&writer); 377 }; 378 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 379 for (cid, data) in &proof_blocks { 380 write_block(&mut car_bytes, cid, data); 381 } 382 write_block(&mut car_bytes, &record_cid, &record_block); 383 ( 384 StatusCode::OK, 385 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 386 car_bytes, 387 ) 388 .into_response() 389}