this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 Json, 8}; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::Deserialize; 13use serde_json::json; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20#[derive(Deserialize)] 21pub struct GetBlocksQuery { 22 pub did: String, 23 pub cids: String, 24} 25 26pub async fn get_blocks( 27 State(state): State<AppState>, 28 Query(query): Query<GetBlocksQuery>, 29) -> Response { 30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 31 .fetch_optional(&state.db) 32 .await 33 .unwrap_or(None); 34 if user_exists.is_none() { 35 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 36 } 37 let cids_str: Vec<&str> = query.cids.split(',').collect(); 38 let mut cids = Vec::new(); 39 for s in cids_str { 40 match Cid::from_str(s) { 41 Ok(cid) => cids.push(cid), 42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 43 } 44 } 45 let blocks_res = state.block_store.get_many(&cids).await; 46 let blocks = match blocks_res { 47 Ok(blocks) => blocks, 48 Err(e) => { 49 error!("Failed to get blocks: {}", e); 50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 51 } 52 }; 53 if cids.is_empty() { 54 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 55 } 56 let root_cid = cids[0]; 57 let header = match encode_car_header(&root_cid) { 58 Ok(h) => h, 59 Err(e) => { 60 error!("Failed to encode CAR header: {}", e); 61 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 62 } 63 }; 64 let mut car_bytes = header; 65 for (i, block_opt) in blocks.into_iter().enumerate() { 66 if let Some(block) = block_opt { 67 let cid = cids[i]; 68 let cid_bytes = cid.to_bytes(); 69 let total_len = cid_bytes.len() + block.len(); 70 let mut writer = Vec::new(); 71 crate::sync::car::write_varint(&mut writer, total_len as u64) 72 .expect("Writing to Vec<u8> should never fail"); 73 writer.write_all(&cid_bytes) 74 .expect("Writing to Vec<u8> should never fail"); 75 writer.write_all(&block) 76 .expect("Writing to Vec<u8> should never fail"); 77 car_bytes.extend_from_slice(&writer); 78 } 79 } 80 ( 81 StatusCode::OK, 82 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 83 car_bytes, 84 ) 85 .into_response() 86} 87 88#[derive(Deserialize)] 89pub struct GetRepoQuery { 90 pub did: String, 91 pub since: Option<String>, 92} 93 94pub async fn get_repo( 95 State(state): State<AppState>, 96 Query(query): Query<GetRepoQuery>, 97) -> Response { 98 let repo_row = sqlx::query!( 99 r#" 100 SELECT r.repo_root_cid 101 FROM repos r 102 JOIN users u ON u.id = r.user_id 103 WHERE u.did = $1 104 "#, 105 query.did 106 ) 107 .fetch_optional(&state.db) 108 .await 109 .unwrap_or(None); 110 let head_str = match repo_row { 111 Some(r) => r.repo_root_cid, 112 None => { 113 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 114 .fetch_optional(&state.db) 115 .await 116 .unwrap_or(None); 117 if user_exists.is_none() { 118 return ( 119 StatusCode::NOT_FOUND, 120 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 121 ) 122 .into_response(); 123 } else { 124 return ( 125 StatusCode::NOT_FOUND, 126 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 127 ) 128 .into_response(); 129 } 130 } 131 }; 132 let head_cid = match Cid::from_str(&head_str) { 133 Ok(c) => c, 134 Err(_) => { 135 return ( 136 StatusCode::INTERNAL_SERVER_ERROR, 137 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 138 ) 139 .into_response(); 140 } 141 }; 142 let mut car_bytes = match encode_car_header(&head_cid) { 143 Ok(h) => h, 144 Err(e) => { 145 return ( 146 StatusCode::INTERNAL_SERVER_ERROR, 147 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 148 ) 149 .into_response(); 150 } 151 }; 152 let mut stack = vec![head_cid]; 153 let mut visited = std::collections::HashSet::new(); 154 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 155 while let Some(cid) = stack.pop() { 156 if visited.contains(&cid) { 157 continue; 158 } 159 visited.insert(cid); 160 if remaining == 0 { break; } 161 remaining -= 1; 162 if let Ok(Some(block)) = state.block_store.get(&cid).await { 163 let cid_bytes = cid.to_bytes(); 164 let total_len = cid_bytes.len() + block.len(); 165 let mut writer = Vec::new(); 166 crate::sync::car::write_varint(&mut writer, total_len as u64) 167 .expect("Writing to Vec<u8> should never fail"); 168 writer.write_all(&cid_bytes) 169 .expect("Writing to Vec<u8> should never fail"); 170 writer.write_all(&block) 171 .expect("Writing to Vec<u8> should never fail"); 172 car_bytes.extend_from_slice(&writer); 173 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 174 extract_links_ipld(&value, &mut stack); 175 } 176 } 177 } 178 ( 179 StatusCode::OK, 180 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 181 car_bytes, 182 ) 183 .into_response() 184} 185 186fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 187 match value { 188 Ipld::Link(cid) => { 189 stack.push(*cid); 190 } 191 Ipld::Map(map) => { 192 for v in map.values() { 193 extract_links_ipld(v, stack); 194 } 195 } 196 Ipld::List(arr) => { 197 for v in arr { 198 extract_links_ipld(v, stack); 199 } 200 } 201 _ => {} 202 } 203} 204 205#[derive(Deserialize)] 206pub struct GetRecordQuery { 207 pub did: String, 208 pub collection: String, 209 pub rkey: String, 210} 211 212pub async fn get_record( 213 State(state): State<AppState>, 214 Query(query): Query<GetRecordQuery>, 215) -> Response { 216 use jacquard_repo::commit::Commit; 217 use jacquard_repo::mst::Mst; 218 use std::collections::BTreeMap; 219 use std::sync::Arc; 220 221 let repo_row = sqlx::query!( 222 r#" 223 SELECT r.repo_root_cid 224 FROM repos r 225 JOIN users u ON u.id = r.user_id 226 WHERE u.did = $1 227 "#, 228 query.did 229 ) 230 .fetch_optional(&state.db) 231 .await 232 .unwrap_or(None); 233 let commit_cid_str = match repo_row { 234 Some(r) => r.repo_root_cid, 235 None => { 236 return ( 237 StatusCode::NOT_FOUND, 238 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 239 ) 240 .into_response(); 241 } 242 }; 243 let commit_cid = match Cid::from_str(&commit_cid_str) { 244 Ok(c) => c, 245 Err(_) => { 246 return ( 247 StatusCode::INTERNAL_SERVER_ERROR, 248 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})), 249 ) 250 .into_response(); 251 } 252 }; 253 let commit_bytes = match state.block_store.get(&commit_cid).await { 254 Ok(Some(b)) => b, 255 _ => { 256 return ( 257 StatusCode::INTERNAL_SERVER_ERROR, 258 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 259 ) 260 .into_response(); 261 } 262 }; 263 let commit = match Commit::from_cbor(&commit_bytes) { 264 Ok(c) => c, 265 Err(_) => { 266 return ( 267 StatusCode::INTERNAL_SERVER_ERROR, 268 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 269 ) 270 .into_response(); 271 } 272 }; 273 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 274 let key = format!("{}/{}", query.collection, query.rkey); 275 let record_cid = match mst.get(&key).await { 276 Ok(Some(cid)) => cid, 277 Ok(None) => { 278 return ( 279 StatusCode::NOT_FOUND, 280 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 281 ) 282 .into_response(); 283 } 284 Err(_) => { 285 return ( 286 StatusCode::INTERNAL_SERVER_ERROR, 287 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})), 288 ) 289 .into_response(); 290 } 291 }; 292 let record_block = match state.block_store.get(&record_cid).await { 293 Ok(Some(b)) => b, 294 _ => { 295 return ( 296 StatusCode::NOT_FOUND, 297 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 298 ) 299 .into_response(); 300 } 301 }; 302 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 303 if let Err(_) = mst.blocks_for_path(&key, &mut proof_blocks).await { 304 return ( 305 StatusCode::INTERNAL_SERVER_ERROR, 306 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})), 307 ) 308 .into_response(); 309 } 310 let header = match encode_car_header(&commit_cid) { 311 Ok(h) => h, 312 Err(e) => { 313 error!("Failed to encode CAR header: {}", e); 314 return ( 315 StatusCode::INTERNAL_SERVER_ERROR, 316 Json(json!({"error": "InternalError"})), 317 ) 318 .into_response(); 319 } 320 }; 321 let mut car_bytes = header; 322 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 323 let cid_bytes = cid.to_bytes(); 324 let total_len = cid_bytes.len() + data.len(); 325 let mut writer = Vec::new(); 326 crate::sync::car::write_varint(&mut writer, total_len as u64) 327 .expect("Writing to Vec<u8> should never fail"); 328 writer.write_all(&cid_bytes) 329 .expect("Writing to Vec<u8> should never fail"); 330 writer.write_all(data) 331 .expect("Writing to Vec<u8> should never fail"); 332 car.extend_from_slice(&writer); 333 }; 334 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 335 for (cid, data) in &proof_blocks { 336 write_block(&mut car_bytes, cid, data); 337 } 338 write_block(&mut car_bytes, &record_cid, &record_block); 339 ( 340 StatusCode::OK, 341 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 342 car_bytes, 343 ) 344 .into_response() 345}