this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::Deserialize; 13use serde_json::json; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20#[derive(Deserialize)] 21pub struct GetBlocksQuery { 22 pub did: String, 23 pub cids: String, 24} 25 26pub async fn get_blocks( 27 State(state): State<AppState>, 28 Query(query): Query<GetBlocksQuery>, 29) -> Response { 30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 31 .fetch_optional(&state.db) 32 .await 33 .unwrap_or(None); 34 if user_exists.is_none() { 35 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 36 } 37 let cids_str: Vec<&str> = query.cids.split(',').collect(); 38 let mut cids = Vec::new(); 39 for s in cids_str { 40 match Cid::from_str(s) { 41 Ok(cid) => cids.push(cid), 42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 43 } 44 } 45 let blocks_res = state.block_store.get_many(&cids).await; 46 let blocks = match blocks_res { 47 Ok(blocks) => blocks, 48 Err(e) => { 49 error!("Failed to get blocks: {}", e); 50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 51 } 52 }; 53 if cids.is_empty() { 54 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 55 } 56 let root_cid = cids[0]; 57 let header = match encode_car_header(&root_cid) { 58 Ok(h) => h, 59 Err(e) => { 60 error!("Failed to encode CAR header: {}", e); 61 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 62 } 63 }; 64 let mut car_bytes = header; 65 for (i, block_opt) in blocks.into_iter().enumerate() { 66 if let Some(block) = block_opt { 67 let cid = cids[i]; 68 let cid_bytes = cid.to_bytes(); 69 let total_len = cid_bytes.len() + block.len(); 70 let mut writer = Vec::new(); 71 crate::sync::car::write_varint(&mut writer, total_len as u64) 72 .expect("Writing to Vec<u8> should never fail"); 73 writer 74 .write_all(&cid_bytes) 75 .expect("Writing to Vec<u8> should never fail"); 76 writer 77 .write_all(&block) 78 .expect("Writing to Vec<u8> should never fail"); 79 car_bytes.extend_from_slice(&writer); 80 } 81 } 82 ( 83 StatusCode::OK, 84 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 85 car_bytes, 86 ) 87 .into_response() 88} 89 90#[derive(Deserialize)] 91pub struct GetRepoQuery { 92 pub did: String, 93 pub since: Option<String>, 94} 95 96pub async fn get_repo( 97 State(state): State<AppState>, 98 Query(query): Query<GetRepoQuery>, 99) -> Response { 100 let repo_row = sqlx::query!( 101 r#" 102 SELECT r.repo_root_cid 103 FROM repos r 104 JOIN users u ON u.id = r.user_id 105 WHERE u.did = $1 106 "#, 107 query.did 108 ) 109 .fetch_optional(&state.db) 110 .await 111 .unwrap_or(None); 112 let head_str = match repo_row { 113 Some(r) => r.repo_root_cid, 114 None => { 115 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 116 .fetch_optional(&state.db) 117 .await 118 .unwrap_or(None); 119 if user_exists.is_none() { 120 return ( 121 StatusCode::NOT_FOUND, 122 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 123 ) 124 .into_response(); 125 } else { 126 return ( 127 StatusCode::NOT_FOUND, 128 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 129 ) 130 .into_response(); 131 } 132 } 133 }; 134 let head_cid = match Cid::from_str(&head_str) { 135 Ok(c) => c, 136 Err(_) => { 137 return ( 138 StatusCode::INTERNAL_SERVER_ERROR, 139 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 140 ) 141 .into_response(); 142 } 143 }; 144 let mut car_bytes = match encode_car_header(&head_cid) { 145 Ok(h) => h, 146 Err(e) => { 147 return ( 148 StatusCode::INTERNAL_SERVER_ERROR, 149 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 150 ) 151 .into_response(); 152 } 153 }; 154 let mut stack = vec![head_cid]; 155 let mut visited = std::collections::HashSet::new(); 156 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 157 while let Some(cid) = stack.pop() { 158 if visited.contains(&cid) { 159 continue; 160 } 161 visited.insert(cid); 162 if remaining == 0 { 163 break; 164 } 165 remaining -= 1; 166 if let Ok(Some(block)) = state.block_store.get(&cid).await { 167 let cid_bytes = cid.to_bytes(); 168 let total_len = cid_bytes.len() + block.len(); 169 let mut writer = Vec::new(); 170 crate::sync::car::write_varint(&mut writer, total_len as u64) 171 .expect("Writing to Vec<u8> should never fail"); 172 writer 173 .write_all(&cid_bytes) 174 .expect("Writing to Vec<u8> should never fail"); 175 writer 176 .write_all(&block) 177 .expect("Writing to Vec<u8> should never fail"); 178 car_bytes.extend_from_slice(&writer); 179 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 180 extract_links_ipld(&value, &mut stack); 181 } 182 } 183 } 184 ( 185 StatusCode::OK, 186 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 187 car_bytes, 188 ) 189 .into_response() 190} 191 192fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 193 match value { 194 Ipld::Link(cid) => { 195 stack.push(*cid); 196 } 197 Ipld::Map(map) => { 198 for v in map.values() { 199 extract_links_ipld(v, stack); 200 } 201 } 202 Ipld::List(arr) => { 203 for v in arr { 204 extract_links_ipld(v, stack); 205 } 206 } 207 _ => {} 208 } 209} 210 211#[derive(Deserialize)] 212pub struct GetRecordQuery { 213 pub did: String, 214 pub collection: String, 215 pub rkey: String, 216} 217 218pub async fn get_record( 219 State(state): State<AppState>, 220 Query(query): Query<GetRecordQuery>, 221) -> Response { 222 use jacquard_repo::commit::Commit; 223 use jacquard_repo::mst::Mst; 224 use std::collections::BTreeMap; 225 use std::sync::Arc; 226 227 let repo_row = sqlx::query!( 228 r#" 229 SELECT r.repo_root_cid 230 FROM repos r 231 JOIN users u ON u.id = r.user_id 232 WHERE u.did = $1 233 "#, 234 query.did 235 ) 236 .fetch_optional(&state.db) 237 .await 238 .unwrap_or(None); 239 let commit_cid_str = match repo_row { 240 Some(r) => r.repo_root_cid, 241 None => { 242 return ( 243 StatusCode::NOT_FOUND, 244 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 245 ) 246 .into_response(); 247 } 248 }; 249 let commit_cid = match Cid::from_str(&commit_cid_str) { 250 Ok(c) => c, 251 Err(_) => { 252 return ( 253 StatusCode::INTERNAL_SERVER_ERROR, 254 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})), 255 ) 256 .into_response(); 257 } 258 }; 259 let commit_bytes = match state.block_store.get(&commit_cid).await { 260 Ok(Some(b)) => b, 261 _ => { 262 return ( 263 StatusCode::INTERNAL_SERVER_ERROR, 264 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 265 ) 266 .into_response(); 267 } 268 }; 269 let commit = match Commit::from_cbor(&commit_bytes) { 270 Ok(c) => c, 271 Err(_) => { 272 return ( 273 StatusCode::INTERNAL_SERVER_ERROR, 274 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 275 ) 276 .into_response(); 277 } 278 }; 279 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 280 let key = format!("{}/{}", query.collection, query.rkey); 281 let record_cid = match mst.get(&key).await { 282 Ok(Some(cid)) => cid, 283 Ok(None) => { 284 return ( 285 StatusCode::NOT_FOUND, 286 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 287 ) 288 .into_response(); 289 } 290 Err(_) => { 291 return ( 292 StatusCode::INTERNAL_SERVER_ERROR, 293 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})), 294 ) 295 .into_response(); 296 } 297 }; 298 let record_block = match state.block_store.get(&record_cid).await { 299 Ok(Some(b)) => b, 300 _ => { 301 return ( 302 StatusCode::NOT_FOUND, 303 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 304 ) 305 .into_response(); 306 } 307 }; 308 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 309 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() { 310 return ( 311 StatusCode::INTERNAL_SERVER_ERROR, 312 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})), 313 ) 314 .into_response(); 315 } 316 let header = match encode_car_header(&commit_cid) { 317 Ok(h) => h, 318 Err(e) => { 319 error!("Failed to encode CAR header: {}", e); 320 return ( 321 StatusCode::INTERNAL_SERVER_ERROR, 322 Json(json!({"error": "InternalError"})), 323 ) 324 .into_response(); 325 } 326 }; 327 let mut car_bytes = header; 328 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 329 let cid_bytes = cid.to_bytes(); 330 let total_len = cid_bytes.len() + data.len(); 331 let mut writer = Vec::new(); 332 crate::sync::car::write_varint(&mut writer, total_len as u64) 333 .expect("Writing to Vec<u8> should never fail"); 334 writer 335 .write_all(&cid_bytes) 336 .expect("Writing to Vec<u8> should never fail"); 337 writer 338 .write_all(data) 339 .expect("Writing to Vec<u8> should never fail"); 340 car.extend_from_slice(&writer); 341 }; 342 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 343 for (cid, data) in &proof_blocks { 344 write_block(&mut car_bytes, cid, data); 345 } 346 write_block(&mut car_bytes, &record_cid, &record_block); 347 ( 348 StatusCode::OK, 349 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 350 car_bytes, 351 ) 352 .into_response() 353}