this repo has no description
at main 11 kB view raw
1use crate::api::error::ApiError; 2use crate::scheduled::generate_repo_car_from_user_blocks; 3use crate::state::AppState; 4use crate::sync::car::encode_car_header; 5use crate::sync::util::assert_repo_availability; 6use axum::{ 7 extract::{Query, RawQuery, State}, 8 http::StatusCode, 9 response::{IntoResponse, Response}, 10}; 11use cid::Cid; 12use jacquard_repo::storage::BlockStore; 13use serde::Deserialize; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> { 19 let did = crate::util::parse_repeated_query_param(Some(query_string), "did") 20 .into_iter() 21 .next() 22 .ok_or("Missing required parameter: did")?; 23 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids"); 24 Ok((did, cids)) 25} 26 27pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response { 28 let Some(query_string) = query else { 29 return ApiError::InvalidRequest("Missing query parameters".into()).into_response(); 30 }; 31 32 let (did, cid_strings) = match parse_get_blocks_query(&query_string) { 33 Ok(parsed) => parsed, 34 Err(msg) => return ApiError::InvalidRequest(msg).into_response(), 35 }; 36 37 let _account = match assert_repo_availability(&state.db, &did, false).await { 38 Ok(a) => a, 39 Err(e) => return e.into_response(), 40 }; 41 42 let cids: Vec<Cid> = match cid_strings 43 .iter() 44 .map(|s| Cid::from_str(s).map_err(|_| s.clone())) 45 .collect::<Result<Vec<_>, _>>() 46 { 47 Ok(cids) => cids, 48 Err(invalid) => { 49 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response(); 50 } 51 }; 52 53 if cids.is_empty() { 54 return ApiError::InvalidRequest("No CIDs provided".into()).into_response(); 55 } 56 57 let blocks = match state.block_store.get_many(&cids).await { 58 Ok(blocks) => blocks, 59 Err(e) => { 60 error!("Failed to get blocks: {}", e); 61 return ApiError::InternalError(None).into_response(); 62 } 63 }; 64 65 let missing_cids: Vec<String> = blocks 66 .iter() 67 .zip(&cids) 68 .filter(|(block_opt, _)| block_opt.is_none()) 69 .map(|(_, cid)| cid.to_string()) 70 .collect(); 71 if !missing_cids.is_empty() { 72 return ApiError::InvalidRequest(format!( 73 "Could not find blocks: {}", 74 missing_cids.join(", ") 75 )) 76 .into_response(); 77 } 78 79 let header = match crate::sync::car::encode_car_header_null_root() { 80 Ok(h) => h, 81 Err(e) => { 82 error!("Failed to encode CAR header: {}", e); 83 return ApiError::InternalError(None).into_response(); 84 } 85 }; 86 let mut car_bytes = header; 87 for (i, block_opt) in blocks.into_iter().enumerate() { 88 if let Some(block) = block_opt { 89 let cid = cids[i]; 90 let cid_bytes = cid.to_bytes(); 91 let total_len = cid_bytes.len() + block.len(); 92 let mut writer = Vec::new(); 93 crate::sync::car::write_varint(&mut writer, total_len as u64) 94 .expect("Writing to Vec<u8> should never fail"); 95 writer 96 .write_all(&cid_bytes) 97 .expect("Writing to Vec<u8> should never fail"); 98 writer 99 .write_all(&block) 100 .expect("Writing to Vec<u8> should never fail"); 101 car_bytes.extend_from_slice(&writer); 102 } 103 } 104 ( 105 StatusCode::OK, 106 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 107 car_bytes, 108 ) 109 .into_response() 110} 111 112#[derive(Deserialize)] 113pub struct GetRepoQuery { 114 pub did: String, 115 pub since: Option<String>, 116} 117 118pub async fn get_repo( 119 State(state): State<AppState>, 120 Query(query): Query<GetRepoQuery>, 121) -> Response { 122 let account = match assert_repo_availability(&state.db, &query.did, false).await { 123 Ok(a) => a, 124 Err(e) => return e.into_response(), 125 }; 126 127 let Some(head_str) = account.repo_root_cid else { 128 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 129 }; 130 131 let Ok(head_cid) = Cid::from_str(&head_str) else { 132 return ApiError::InternalError(None).into_response(); 133 }; 134 135 if let Some(since) = &query.since { 136 return get_repo_since(&state, &query.did, &head_cid, since).await; 137 } 138 139 let car_bytes = match generate_repo_car_from_user_blocks( 140 &state.db, 141 &state.block_store, 142 account.user_id, 143 &head_cid, 144 ) 145 .await 146 { 147 Ok(bytes) => bytes, 148 Err(e) => { 149 error!("Failed to generate repo CAR: {}", e); 150 return ApiError::InternalError(None).into_response(); 151 } 152 }; 153 154 ( 155 StatusCode::OK, 156 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 157 car_bytes, 158 ) 159 .into_response() 160} 161 162async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response { 163 let events = sqlx::query!( 164 r#" 165 SELECT blocks_cids, commit_cid 166 FROM repo_seq 167 WHERE did = $1 AND rev > $2 168 ORDER BY seq DESC 169 "#, 170 did, 171 since 172 ) 173 .fetch_all(&state.db) 174 .await; 175 176 let events = match events { 177 Ok(e) => e, 178 Err(e) => { 179 error!("DB error in get_repo_since: {:?}", e); 180 return ApiError::InternalError(Some("Database error".into())).into_response(); 181 } 182 }; 183 184 let block_cids: Vec<Cid> = events 185 .iter() 186 .flat_map(|event| { 187 let block_cids = event 188 .blocks_cids 189 .as_ref() 190 .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect()) 191 .unwrap_or_else(Vec::new); 192 let commit_cid = event 193 .commit_cid 194 .as_ref() 195 .and_then(|s| Cid::from_str(s).ok()); 196 block_cids.into_iter().chain(commit_cid) 197 }) 198 .fold(Vec::new(), |mut acc, cid| { 199 if !acc.contains(&cid) { 200 acc.push(cid); 201 } 202 acc 203 }); 204 205 let mut car_bytes = match encode_car_header(head_cid) { 206 Ok(h) => h, 207 Err(e) => { 208 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e))) 209 .into_response(); 210 } 211 }; 212 213 if block_cids.is_empty() { 214 return ( 215 StatusCode::OK, 216 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 217 car_bytes, 218 ) 219 .into_response(); 220 } 221 222 let blocks = match state.block_store.get_many(&block_cids).await { 223 Ok(b) => b, 224 Err(e) => { 225 error!("Block store error in get_repo_since: {:?}", e); 226 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response(); 227 } 228 }; 229 230 for (i, block_opt) in blocks.into_iter().enumerate() { 231 if let Some(block) = block_opt { 232 let cid = block_cids[i]; 233 let cid_bytes = cid.to_bytes(); 234 let total_len = cid_bytes.len() + block.len(); 235 let mut writer = Vec::new(); 236 crate::sync::car::write_varint(&mut writer, total_len as u64) 237 .expect("Writing to Vec<u8> should never fail"); 238 writer 239 .write_all(&cid_bytes) 240 .expect("Writing to Vec<u8> should never fail"); 241 writer 242 .write_all(&block) 243 .expect("Writing to Vec<u8> should never fail"); 244 car_bytes.extend_from_slice(&writer); 245 } 246 } 247 248 ( 249 StatusCode::OK, 250 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 251 car_bytes, 252 ) 253 .into_response() 254} 255 256#[derive(Deserialize)] 257pub struct GetRecordQuery { 258 pub did: String, 259 pub collection: String, 260 pub rkey: String, 261} 262 263pub async fn get_record( 264 State(state): State<AppState>, 265 Query(query): Query<GetRecordQuery>, 266) -> Response { 267 use jacquard_repo::commit::Commit; 268 use jacquard_repo::mst::Mst; 269 use std::collections::BTreeMap; 270 use std::sync::Arc; 271 272 let account = match assert_repo_availability(&state.db, &query.did, false).await { 273 Ok(a) => a, 274 Err(e) => return e.into_response(), 275 }; 276 277 let commit_cid_str = match account.repo_root_cid { 278 Some(cid) => cid, 279 None => { 280 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response(); 281 } 282 }; 283 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else { 284 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response(); 285 }; 286 let commit_bytes = match state.block_store.get(&commit_cid).await { 287 Ok(Some(b)) => b, 288 _ => { 289 return ApiError::InternalError(Some("Commit block not found".into())).into_response(); 290 } 291 }; 292 let Ok(commit) = Commit::from_cbor(&commit_bytes) else { 293 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response(); 294 }; 295 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 296 let key = format!("{}/{}", query.collection, query.rkey); 297 let record_cid = match mst.get(&key).await { 298 Ok(Some(cid)) => cid, 299 Ok(None) => { 300 return ApiError::RecordNotFound.into_response(); 301 } 302 Err(_) => { 303 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response(); 304 } 305 }; 306 let record_block = match state.block_store.get(&record_cid).await { 307 Ok(Some(b)) => b, 308 _ => { 309 return ApiError::RecordNotFound.into_response(); 310 } 311 }; 312 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 313 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 314 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response(); 315 } 316 let header = match encode_car_header(&commit_cid) { 317 Ok(h) => h, 318 Err(e) => { 319 error!("Failed to encode CAR header: {}", e); 320 return ApiError::InternalError(None).into_response(); 321 } 322 }; 323 let mut car_bytes = header; 324 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 325 let cid_bytes = cid.to_bytes(); 326 let total_len = cid_bytes.len() + data.len(); 327 let mut writer = Vec::new(); 328 crate::sync::car::write_varint(&mut writer, total_len as u64) 329 .expect("Writing to Vec<u8> should never fail"); 330 writer 331 .write_all(&cid_bytes) 332 .expect("Writing to Vec<u8> should never fail"); 333 writer 334 .write_all(data) 335 .expect("Writing to Vec<u8> should never fail"); 336 car.extend_from_slice(&writer); 337 }; 338 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 339 proof_blocks 340 .iter() 341 .for_each(|(cid, data)| write_block(&mut car_bytes, cid, data)); 342 write_block(&mut car_bytes, &record_cid, &record_block); 343 ( 344 StatusCode::OK, 345 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 346 car_bytes, 347 ) 348 .into_response() 349}