this repo has no description
1use crate::api::error::ApiError; 2use crate::scheduled::generate_repo_car_from_user_blocks; 3use crate::state::AppState; 4use crate::sync::car::encode_car_header; 5use crate::sync::util::assert_repo_availability; 6use axum::{ 7 extract::{Query, RawQuery, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard_repo::storage::BlockStore; 13use serde::Deserialize; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 19 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 20 .into_iter() 21 .next() 22 .ok_or("Missing required parameter: did")?; 23 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 24 Ok((did, cids)) 25} 26 27pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 28 let Some(query_string) = query else { 29 return ApiError::InvalidRequest("Missing query parameters".into()).into_response(); 30 }; 31 32 let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 33 Ok(parsed) => parsed, 34 Err(msg) => return ApiError::InvalidRequest(msg).into_response(), 35 }; 36 37 let _account = match assert_repo_availability(&state.db, &did, false).await { 38 Ok(a) => a, 39 Err(e) => return e.into_response(), 40 }; 41 42 let cids: Vec<Cid> = match cid_strings 43 .iter() 44 .map(|s| Cid::from_str(s).map_err(|_| s.clone())) 45 .collect::<Result<Vec<_>, _>>() 46 { 47 Ok(cids) => cids, 48 Err(invalid) => { 49 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response(); 50 } 51 }; 52 53 if cids.is_empty() { 54 return ApiError::InvalidRequest("No CIDs provided".into()).into_response(); 55 } 56 57 let blocks = match state.block_store.get_many(&cids).await { 58 Ok(blocks) => blocks, 59 Err(e) => { 60 error!("Failed to get blocks: {}", e); 61 return ApiError::InternalError(None).into_response(); 62 } 63 }; 64 65 let missing_cids: Vec<String> = blocks 66 .iter() 67 .zip(&cids) 68 .filter(|(block_opt, _)| block_opt.is_none()) 69 .map(|(_, cid)| cid.to_string()) 70 .collect(); 71 if !missing_cids.is_empty() { 72 return ApiError::InvalidRequest(format!( 73 "Could not find blocks: {}", 74 missing_cids.join(", ") 75 )) 76 .into_response(); 77 } 78 79 let header = match crate::sync::car::encode_car_header_null_root() { 80 Ok(h) => h, 81 Err(e) => { 82 error!("Failed to encode CAR header: {}", e); 83 return ApiError::InternalError(None).into_response(); 84 } 85 }; 86 let mut car_bytes = header; 87 for (i, block_opt) in blocks.into_iter().enumerate() { 88 if let Some(block) = block_opt { 89 let cid = cids[i]; 90 let cid_bytes = cid.to_bytes(); 91 let total_len = cid_bytes.len() + block.len(); 92 let mut writer = Vec::new(); 93 crate::sync::car::write_varint(&mut writer, total_len as u64) 94 .expect("Writing to Vec<u8> should never fail"); 95 writer 96 .write_all(&cid_bytes) 97 .expect("Writing to Vec<u8> should never fail"); 98 writer 99 .write_all(&block) 100 .expect("Writing to Vec<u8> should never fail"); 101 car_bytes.extend_from_slice(&writer); 102 } 103 } 104 ( 105 StatusCode::OK, 106 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 107 car_bytes, 108 ) 109 .into_response() 110} 111 112#[derive(Deserialize)] 113pub struct GetRepoQuery { 114 pub did: String, 115 pub since: Option<String>, 116} 117 118pub async fn get_repo( 119 State(state): State<AppState>, 120 Query(query): Query<GetRepoQuery>, 121) -> Response { 122 let account = match assert_repo_availability(&state.db, &query.did, false).await { 123 Ok(a) => a, 124 Err(e) => return e.into_response(), 125 }; 126 127 let Some(head_str) = account.repo_root_cid else { 128 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 129 }; 130 131 let Ok(head_cid) = Cid::from_str(&head_str) else { 132 return ApiError::InternalError(None).into_response(); 133 }; 134 135 if let Some(since) = &query.since { 136 return get_repo_since(&state, &query.did, &head_cid, since).await; 137 } 138 139 let car_bytes = match generate_repo_car_from_user_blocks( 140 &state.db, 141 &state.block_store, 142 account.user_id, 143 &head_cid, 144 ) 145 .await 146 { 147 Ok(bytes) => bytes, 148 Err(e) => { 149 error!("Failed to generate repo CAR: {}", e); 150 return ApiError::InternalError(None).into_response(); 151 } 152 }; 153 154 ( 155 StatusCode::OK, 156 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 157 car_bytes, 158 ) 159 .into_response() 160} 161 162async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 163 let events = sqlx::query!( 164 r#" 165 SELECT blocks_cids, commit_cid 166 FROM repo_seq 167 WHERE did = $1 AND rev > $2 168 ORDER BY seq DESC 169 "#, 170 did, 171 since 172 ) 173 .fetch_all(&state.db) 174 .await; 175 176 let events = match events { 177 Ok(e) => e, 178 Err(e) => { 179 error!("DB error in get_repo_since: {:?}", e); 180 return ApiError::InternalError(Some("Database error".into())).into_response(); 181 } 182 }; 183 184 let mut block_cids: Vec<Cid> = Vec::new(); 185 for event in &events { 186 if let Some(cids) = &event.blocks_cids { 187 for cid_str in cids { 188 if let Ok(cid) = Cid::from_str(cid_str) 189 && !block_cids.contains(&cid) 190 { 191 block_cids.push(cid); 192 } 193 } 194 } 195 if let Some(commit_cid_str) = &event.commit_cid 196 && let Ok(cid) = Cid::from_str(commit_cid_str) 197 && !block_cids.contains(&cid) 198 { 199 block_cids.push(cid); 200 } 201 } 202 203 let mut car_bytes = match encode_car_header(head_cid) { 204 Ok(h) => h, 205 Err(e) => { 206 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e))) 207 .into_response(); 208 } 209 }; 210 211 if block_cids.is_empty() { 212 return ( 213 StatusCode::OK, 214 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 215 car_bytes, 216 ) 217 .into_response(); 218 } 219 220 let blocks = match state.block_store.get_many(&block_cids).await { 221 Ok(b) => b, 222 Err(e) => { 223 error!("Block store error in get_repo_since: {:?}", e); 224 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response(); 225 } 226 }; 227 228 for (i, block_opt) in blocks.into_iter().enumerate() { 229 if let Some(block) = block_opt { 230 let cid = block_cids[i]; 231 let cid_bytes = cid.to_bytes(); 232 let total_len = cid_bytes.len() + block.len(); 233 let mut writer = Vec::new(); 234 crate::sync::car::write_varint(&mut writer, total_len as u64) 235 .expect("Writing to Vec<u8> should never fail"); 236 writer 237 .write_all(&cid_bytes) 238 .expect("Writing to Vec<u8> should never fail"); 239 writer 240 .write_all(&block) 241 .expect("Writing to Vec<u8> should never fail"); 242 car_bytes.extend_from_slice(&writer); 243 } 244 } 245 246 ( 247 StatusCode::OK, 248 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 249 car_bytes, 250 ) 251 .into_response() 252} 253 254#[derive(Deserialize)] 255pub struct GetRecordQuery { 256 pub did: String, 257 pub collection: String, 258 pub rkey: String, 259} 260 261pub async fn get_record( 262 State(state): State<AppState>, 263 Query(query): Query<GetRecordQuery>, 264) -> Response { 265 use jacquard_repo::commit::Commit; 266 use jacquard_repo::mst::Mst; 267 use std::collections::BTreeMap; 268 use std::sync::Arc; 269 270 let account = match assert_repo_availability(&state.db, &query.did, false).await { 271 Ok(a) => a, 272 Err(e) => return e.into_response(), 273 }; 274 275 let commit_cid_str = match account.repo_root_cid { 276 Some(cid) => cid, 277 None => { 278 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 279 } 280 }; 281 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else { 282 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response(); 283 }; 284 let commit_bytes = match state.block_store.get(&commit_cid).await { 285 Ok(Some(b)) => b, 286 _ => { 287 return ApiError::InternalError(Some("Commit block not found".into())).into_response(); 288 } 289 }; 290 let Ok(commit) = Commit::from_cbor(&commit_bytes) else { 291 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(); 292 }; 293 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 294 let key = format!("{}/{}", query.collection, query.rkey); 295 let record_cid = match mst.get(&key).await { 296 Ok(Some(cid)) => cid, 297 Ok(None) => { 298 return ApiError::RecordNotFound.into_response(); 299 } 300 Err(_) => { 301 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response(); 302 } 303 }; 304 let record_block = match state.block_store.get(&record_cid).await { 305 Ok(Some(b)) => b, 306 _ => { 307 return ApiError::RecordNotFound.into_response(); 308 } 309 }; 310 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 311 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 312 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response(); 313 } 314 let header = match encode_car_header(&commit_cid) { 315 Ok(h) => h, 316 Err(e) => { 317 error!("Failed to encode CAR header: {}", e); 318 return ApiError::InternalError(None).into_response(); 319 } 320 }; 321 let mut car_bytes = header; 322 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 323 let cid_bytes = cid.to_bytes(); 324 let total_len = cid_bytes.len() + data.len(); 325 let mut writer = Vec::new(); 326 crate::sync::car::write_varint(&mut writer, total_len as u64) 327 .expect("Writing to Vec<u8> should never fail"); 328 writer 329 .write_all(&cid_bytes) 330 .expect("Writing to Vec<u8> should never fail"); 331 writer 332 .write_all(data) 333 .expect("Writing to Vec<u8> should never fail"); 334 car.extend_from_slice(&writer); 335 }; 336 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 337 for (cid, data) in &proof_blocks { 338 write_block(&mut car_bytes, cid, data); 339 } 340 write_block(&mut car_bytes, &record_cid, &record_block); 341 ( 342 StatusCode::OK, 343 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 344 car_bytes, 345 ) 346 .into_response() 347}