this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 Json, 8}; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::Deserialize; 13use serde_json::json; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000; 19 20#[derive(Deserialize)] 21pub struct GetBlocksQuery { 22 pub did: String, 23 pub cids: String, 24} 25 26pub async fn get_blocks( 27 State(state): State<AppState>, 28 Query(query): Query<GetBlocksQuery>, 29) -> Response { 30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 31 .fetch_optional(&state.db) 32 .await 33 .unwrap_or(None); 34 35 if user_exists.is_none() { 36 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 37 } 38 39 let cids_str: Vec<&str> = query.cids.split(',').collect(); 40 let mut cids = Vec::new(); 41 for s in cids_str { 42 match Cid::from_str(s) { 43 Ok(cid) => cids.push(cid), 44 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 45 } 46 } 47 48 let blocks_res = state.block_store.get_many(&cids).await; 49 let blocks = match blocks_res { 50 Ok(blocks) => blocks, 51 Err(e) => { 52 error!("Failed to get blocks: {}", e); 53 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 54 } 55 }; 56 57 if cids.is_empty() { 58 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 59 } 60 61 let root_cid = cids[0]; 62 63 let header = match encode_car_header(&root_cid) { 64 Ok(h) => h, 65 Err(e) => { 66 error!("Failed to encode CAR header: {}", e); 67 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response(); 68 } 69 }; 70 71 let mut car_bytes = header; 72 73 for (i, block_opt) in blocks.into_iter().enumerate() { 74 if let Some(block) = block_opt { 75 let cid = cids[i]; 76 let cid_bytes = cid.to_bytes(); 77 let total_len = cid_bytes.len() + block.len(); 78 79 let mut writer = Vec::new(); 80 crate::sync::car::write_varint(&mut writer, total_len as u64) 81 .expect("Writing to Vec<u8> should never fail"); 82 writer.write_all(&cid_bytes) 83 .expect("Writing to Vec<u8> should never fail"); 84 writer.write_all(&block) 85 .expect("Writing to Vec<u8> should never fail"); 86 87 car_bytes.extend_from_slice(&writer); 88 } 89 } 90 91 ( 92 StatusCode::OK, 93 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 94 car_bytes, 95 ) 96 .into_response() 97} 98 99#[derive(Deserialize)] 100pub struct GetRepoQuery { 101 pub did: String, 102 pub since: Option<String>, 103} 104 105pub async fn get_repo( 106 State(state): State<AppState>, 107 Query(query): Query<GetRepoQuery>, 108) -> Response { 109 let repo_row = sqlx::query!( 110 r#" 111 SELECT r.repo_root_cid 112 FROM repos r 113 JOIN users u ON u.id = r.user_id 114 WHERE u.did = $1 115 "#, 116 query.did 117 ) 118 .fetch_optional(&state.db) 119 .await 120 .unwrap_or(None); 121 122 let head_str = match repo_row { 123 Some(r) => r.repo_root_cid, 124 None => { 125 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 126 .fetch_optional(&state.db) 127 .await 128 .unwrap_or(None); 129 130 if user_exists.is_none() { 131 return ( 132 StatusCode::NOT_FOUND, 133 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 134 ) 135 .into_response(); 136 } else { 137 return ( 138 StatusCode::NOT_FOUND, 139 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 140 ) 141 .into_response(); 142 } 143 } 144 }; 145 146 let head_cid = match Cid::from_str(&head_str) { 147 Ok(c) => c, 148 Err(_) => { 149 return ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 152 ) 153 .into_response(); 154 } 155 }; 156 157 let mut car_bytes = match encode_car_header(&head_cid) { 158 Ok(h) => h, 159 Err(e) => { 160 return ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})), 163 ) 164 .into_response(); 165 } 166 }; 167 168 let mut stack = vec![head_cid]; 169 let mut visited = std::collections::HashSet::new(); 170 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL; 171 172 while let Some(cid) = stack.pop() { 173 if visited.contains(&cid) { 174 continue; 175 } 176 visited.insert(cid); 177 if remaining == 0 { break; } 178 remaining -= 1; 179 180 if let Ok(Some(block)) = state.block_store.get(&cid).await { 181 let cid_bytes = cid.to_bytes(); 182 let total_len = cid_bytes.len() + block.len(); 183 let mut writer = Vec::new(); 184 crate::sync::car::write_varint(&mut writer, total_len as u64) 185 .expect("Writing to Vec<u8> should never fail"); 186 writer.write_all(&cid_bytes) 187 .expect("Writing to Vec<u8> should never fail"); 188 writer.write_all(&block) 189 .expect("Writing to Vec<u8> should never fail"); 190 car_bytes.extend_from_slice(&writer); 191 192 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 193 extract_links_ipld(&value, &mut stack); 194 } 195 } 196 } 197 198 ( 199 StatusCode::OK, 200 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 201 car_bytes, 202 ) 203 .into_response() 204} 205 206fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 207 match value { 208 Ipld::Link(cid) => { 209 stack.push(*cid); 210 } 211 Ipld::Map(map) => { 212 for v in map.values() { 213 extract_links_ipld(v, stack); 214 } 215 } 216 Ipld::List(arr) => { 217 for v in arr { 218 extract_links_ipld(v, stack); 219 } 220 } 221 _ => {} 222 } 223} 224 225#[derive(Deserialize)] 226pub struct GetRecordQuery { 227 pub did: String, 228 pub collection: String, 229 pub rkey: String, 230} 231 232pub async fn get_record( 233 State(state): State<AppState>, 234 Query(query): Query<GetRecordQuery>, 235) -> Response { 236 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 237 .fetch_optional(&state.db) 238 .await 239 .unwrap_or(None); 240 241 let user_id = match user { 242 Some(u) => u.id, 243 None => { 244 return ( 245 StatusCode::NOT_FOUND, 246 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 247 ) 248 .into_response(); 249 } 250 }; 251 252 let record = sqlx::query!( 253 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 254 user_id, 255 query.collection, 256 query.rkey 257 ) 258 .fetch_optional(&state.db) 259 .await 260 .unwrap_or(None); 261 262 let record_cid_str = match record { 263 Some(r) => r.record_cid, 264 None => { 265 return ( 266 StatusCode::NOT_FOUND, 267 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 268 ) 269 .into_response(); 270 } 271 }; 272 273 let cid = match Cid::from_str(&record_cid_str) { 274 Ok(c) => c, 275 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(), 276 }; 277 278 let block_res = state.block_store.get(&cid).await; 279 let block = match block_res { 280 Ok(Some(b)) => b, 281 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(), 282 }; 283 284 let header = match encode_car_header(&cid) { 285 Ok(h) => h, 286 Err(e) => { 287 return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to encode CAR header: {}", e)).into_response(); 288 } 289 }; 290 let mut car_bytes = header; 291 292 let cid_bytes = cid.to_bytes(); 293 let total_len = cid_bytes.len() + block.len(); 294 let mut writer = Vec::new(); 295 crate::sync::car::write_varint(&mut writer, total_len as u64) 296 .expect("Writing to Vec<u8> should never fail"); 297 writer.write_all(&cid_bytes) 298 .expect("Writing to Vec<u8> should never fail"); 299 writer.write_all(&block) 300 .expect("Writing to Vec<u8> should never fail"); 301 car_bytes.extend_from_slice(&writer); 302 303 ( 304 StatusCode::OK, 305 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 306 car_bytes, 307 ) 308 .into_response() 309}