this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 Json, 8}; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::Deserialize; 13use serde_json::json; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 18#[derive(Deserialize)] 19pub struct GetBlocksQuery { 20 pub did: String, 21 pub cids: String, 22} 23pub async fn get_blocks( 24 State(state): State<AppState>, 25 Query(query): Query<GetBlocksQuery>, 26) -> Response { 27 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 28 .fetch_optional(&state.db) 29 .await 30 .unwrap_or(None); 31 if user_exists.is_none() { 32 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 33 } 34 let cids_str: Vec<&str> = query.cids.split(',').collect(); 35 let mut cids = Vec::new(); 36 for s in cids_str { 37 match Cid::from_str(s) { 38 Ok(cid) => cids.push(cid), 39 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 40 } 41 } 42 let blocks_res = state.block_store.get_many(&cids).await; 43 let blocks = match blocks_res { 44 Ok(blocks) => blocks, 45 Err(e) => { 46 error!("Failed to get blocks: {}", e); 47 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 48 } 49 }; 50 if cids.is_empty() { 51 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 52 } 53 let root_cid = cids[0]; 54 let header = match encode_car_header(&root_cid) { 55 Ok(h) => h, 56 Err(e) => { 57 error!("Failed to encode CAR header: {}", e); 58 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 59 } 60 }; 61 let mut car_bytes = header; 62 for (i, block_opt) in blocks.into_iter().enumerate() { 63 if let Some(block) = block_opt { 64 let cid = cids[i]; 65 let cid_bytes = cid.to_bytes(); 66 let total_len = cid_bytes.len() + block.len(); 67 let mut writer = Vec::new(); 68 crate::sync::car::write_varint(&mut writer, total_len as u64) 69 .expect("Writing to Vec<u8> should never fail"); 70 writer.write_all(&cid_bytes) 71 .expect("Writing to Vec<u8> should never fail"); 72 writer.write_all(&block) 73 .expect("Writing to Vec<u8> should never fail"); 74 car_bytes.extend_from_slice(&writer); 75 } 76 } 77 ( 78 StatusCode::OK, 79 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 80 car_bytes, 81 ) 82 .into_response() 83} 84#[derive(Deserialize)] 85pub struct GetRepoQuery { 86 pub did: String, 87 pub since: Option<String>, 88} 89pub async fn get_repo( 90 State(state): State<AppState>, 91 Query(query): Query<GetRepoQuery>, 92) -> Response { 93 let repo_row = sqlx::query!( 94 r#" 95 SELECT r.repo_root_cid 96 FROM repos r 97 JOIN users u ON u.id = r.user_id 98 WHERE u.did = $1 99 "#, 100 query.did 101 ) 102 .fetch_optional(&state.db) 103 .await 104 .unwrap_or(None); 105 let head_str = match repo_row { 106 Some(r) => r.repo_root_cid, 107 None => { 108 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 109 .fetch_optional(&state.db) 110 .await 111 .unwrap_or(None); 112 if user_exists.is_none() { 113 return ( 114 StatusCode::NOT_FOUND, 115 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 116 ) 117 .into_response(); 118 } else { 119 return ( 120 StatusCode::NOT_FOUND, 121 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 122 ) 123 .into_response(); 124 } 125 } 126 }; 127 let head_cid = match Cid::from_str(&head_str) { 128 Ok(c) => c, 129 Err(_) => { 130 return ( 131 StatusCode::INTERNAL_SERVER_ERROR, 132 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 133 ) 134 .into_response(); 135 } 136 }; 137 let mut car_bytes = match encode_car_header(&head_cid) { 138 Ok(h) => h, 139 Err(e) => { 140 return ( 141 StatusCode::INTERNAL_SERVER_ERROR, 142 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 143 ) 144 .into_response(); 145 } 146 }; 147 let mut stack = vec![head_cid]; 148 let mut visited = std::collections::HashSet::new(); 149 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 150 while let Some(cid) = stack.pop() { 151 if visited.contains(&cid) { 152 continue; 153 } 154 visited.insert(cid); 155 if remaining == 0 { break; } 156 remaining -= 1; 157 if let Ok(Some(block)) = state.block_store.get(&cid).await { 158 let cid_bytes = cid.to_bytes(); 159 let total_len = cid_bytes.len() + block.len(); 160 let mut writer = Vec::new(); 161 crate::sync::car::write_varint(&mut writer, total_len as u64) 162 .expect("Writing to Vec<u8> should never fail"); 163 writer.write_all(&cid_bytes) 164 .expect("Writing to Vec<u8> should never fail"); 165 writer.write_all(&block) 166 .expect("Writing to Vec<u8> should never fail"); 167 car_bytes.extend_from_slice(&writer); 168 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 169 extract_links_ipld(&value, &mut stack); 170 } 171 } 172 } 173 ( 174 StatusCode::OK, 175 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 176 car_bytes, 177 ) 178 .into_response() 179} 180fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 181 match value { 182 Ipld::Link(cid) => { 183 stack.push(*cid); 184 } 185 Ipld::Map(map) => { 186 for v in map.values() { 187 extract_links_ipld(v, stack); 188 } 189 } 190 Ipld::List(arr) => { 191 for v in arr { 192 extract_links_ipld(v, stack); 193 } 194 } 195 _ => {} 196 } 197} 198#[derive(Deserialize)] 199pub struct GetRecordQuery { 200 pub did: String, 201 pub collection: String, 202 pub rkey: String, 203} 204pub async fn get_record( 205 State(state): State<AppState>, 206 Query(query): Query<GetRecordQuery>, 207) -> Response { 208 use jacquard_repo::commit::Commit; 209 use jacquard_repo::mst::Mst; 210 use std::collections::BTreeMap; 211 use std::sync::Arc; 212 let repo_row = sqlx::query!( 213 r#" 214 SELECT r.repo_root_cid 215 FROM repos r 216 JOIN users u ON u.id = r.user_id 217 WHERE u.did = $1 218 "#, 219 query.did 220 ) 221 .fetch_optional(&state.db) 222 .await 223 .unwrap_or(None); 224 let commit_cid_str = match repo_row { 225 Some(r) => r.repo_root_cid, 226 None => { 227 return ( 228 StatusCode::NOT_FOUND, 229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 230 ) 231 .into_response(); 232 } 233 }; 234 let commit_cid = match Cid::from_str(&commit_cid_str) { 235 Ok(c) => c, 236 Err(_) => { 237 return ( 238 StatusCode::INTERNAL_SERVER_ERROR, 239 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})), 240 ) 241 .into_response(); 242 } 243 }; 244 let commit_bytes = match state.block_store.get(&commit_cid).await { 245 Ok(Some(b)) => b, 246 _ => { 247 return ( 248 StatusCode::INTERNAL_SERVER_ERROR, 249 Json(json!({"error": "InternalError", "message": "Commit block not found"})), 250 ) 251 .into_response(); 252 } 253 }; 254 let commit = match Commit::from_cbor(&commit_bytes) { 255 Ok(c) => c, 256 Err(_) => { 257 return ( 258 StatusCode::INTERNAL_SERVER_ERROR, 259 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})), 260 ) 261 .into_response(); 262 } 263 }; 264 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None); 265 let key = format!("{}/{}", query.collection, query.rkey); 266 let record_cid = match mst.get(&key).await { 267 Ok(Some(cid)) => cid, 268 Ok(None) => { 269 return ( 270 StatusCode::NOT_FOUND, 271 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 272 ) 273 .into_response(); 274 } 275 Err(_) => { 276 return ( 277 StatusCode::INTERNAL_SERVER_ERROR, 278 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})), 279 ) 280 .into_response(); 281 } 282 }; 283 let record_block = match state.block_store.get(&record_cid).await { 284 Ok(Some(b)) => b, 285 _ => { 286 return ( 287 StatusCode::NOT_FOUND, 288 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), 289 ) 290 .into_response(); 291 } 292 }; 293 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new(); 294 if let Err(_) = mst.blocks_for_path(&key, &mut proof_blocks).await { 295 return ( 296 StatusCode::INTERNAL_SERVER_ERROR, 297 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})), 298 ) 299 .into_response(); 300 } 301 let header = match encode_car_header(&commit_cid) { 302 Ok(h) => h, 303 Err(e) => { 304 error!("Failed to encode CAR header: {}", e); 305 return ( 306 StatusCode::INTERNAL_SERVER_ERROR, 307 Json(json!({"error": "InternalError"})), 308 ) 309 .into_response(); 310 } 311 }; 312 let mut car_bytes = header; 313 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| { 314 let cid_bytes = cid.to_bytes(); 315 let total_len = cid_bytes.len() + data.len(); 316 let mut writer = Vec::new(); 317 crate::sync::car::write_varint(&mut writer, total_len as u64) 318 .expect("Writing to Vec<u8> should never fail"); 319 writer.write_all(&cid_bytes) 320 .expect("Writing to Vec<u8> should never fail"); 321 writer.write_all(data) 322 .expect("Writing to Vec<u8> should never fail"); 323 car.extend_from_slice(&writer); 324 }; 325 write_block(&mut car_bytes, &commit_cid, &commit_bytes); 326 for (cid, data) in &proof_blocks { 327 write_block(&mut car_bytes, cid, data); 328 } 329 write_block(&mut car_bytes, &record_cid, &record_block); 330 ( 331 StatusCode::OK, 332 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 333 car_bytes, 334 ) 335 .into_response() 336}