this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 Json, 8}; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::Deserialize; 13use serde_json::json; 14use std::io::Write; 15use std::str::FromStr; 16use tracing::error; 17 18#[derive(Deserialize)] 19pub struct GetBlocksQuery { 20 pub did: String, 21 pub cids: String, 22} 23 24pub async fn get_blocks( 25 State(state): State<AppState>, 26 Query(query): Query<GetBlocksQuery>, 27) -> Response { 28 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 29 .fetch_optional(&state.db) 30 .await 31 .unwrap_or(None); 32 33 if user_exists.is_none() { 34 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 35 } 36 37 let cids_str: Vec<&str> = query.cids.split(',').collect(); 38 let mut cids = Vec::new(); 39 for s in cids_str { 40 match Cid::from_str(s) { 41 Ok(cid) => cids.push(cid), 42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 43 } 44 } 45 46 let blocks_res = state.block_store.get_many(&cids).await; 47 let blocks = match blocks_res { 48 Ok(blocks) => blocks, 49 Err(e) => { 50 error!("Failed to get blocks: {}", e); 51 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 52 } 53 }; 54 55 let root_cid = cids.first().cloned().unwrap_or_default(); 56 57 if cids.is_empty() { 58 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 59 } 60 61 let header = encode_car_header(&root_cid); 62 63 let mut car_bytes = header; 64 65 for (i, block_opt) in blocks.into_iter().enumerate() { 66 if let Some(block) = block_opt { 67 let cid = cids[i]; 68 let cid_bytes = cid.to_bytes(); 69 let total_len = cid_bytes.len() + block.len(); 70 71 let mut writer = Vec::new(); 72 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 73 writer.write_all(&cid_bytes).unwrap(); 74 writer.write_all(&block).unwrap(); 75 76 car_bytes.extend_from_slice(&writer); 77 } 78 } 79 80 ( 81 StatusCode::OK, 82 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 83 car_bytes, 84 ) 85 .into_response() 86} 87 88#[derive(Deserialize)] 89pub struct GetRepoQuery { 90 pub did: String, 91 pub since: Option<String>, 92} 93 94pub async fn get_repo( 95 State(state): State<AppState>, 96 Query(query): Query<GetRepoQuery>, 97) -> Response { 98 let repo_row = sqlx::query!( 99 r#" 100 SELECT r.repo_root_cid 101 FROM repos r 102 JOIN users u ON u.id = r.user_id 103 WHERE u.did = $1 104 "#, 105 query.did 106 ) 107 .fetch_optional(&state.db) 108 .await 109 .unwrap_or(None); 110 111 let head_str = match repo_row { 112 Some(r) => r.repo_root_cid, 113 None => { 114 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 115 .fetch_optional(&state.db) 116 .await 117 .unwrap_or(None); 118 119 if user_exists.is_none() { 120 return ( 121 StatusCode::NOT_FOUND, 122 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 123 ) 124 .into_response(); 125 } else { 126 return ( 127 StatusCode::NOT_FOUND, 128 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 129 ) 130 .into_response(); 131 } 132 } 133 }; 134 135 let head_cid = match Cid::from_str(&head_str) { 136 Ok(c) => c, 137 Err(_) => { 138 return ( 139 StatusCode::INTERNAL_SERVER_ERROR, 140 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 141 ) 142 .into_response(); 143 } 144 }; 145 146 let mut car_bytes = encode_car_header(&head_cid); 147 148 let mut stack = vec![head_cid]; 149 let mut visited = std::collections::HashSet::new(); 150 let mut limit = 20000; 151 152 while let Some(cid) = stack.pop() { 153 if visited.contains(&cid) { 154 continue; 155 } 156 visited.insert(cid); 157 if limit == 0 { break; } 158 limit -= 1; 159 160 if let Ok(Some(block)) = state.block_store.get(&cid).await { 161 let cid_bytes = cid.to_bytes(); 162 let total_len = cid_bytes.len() + block.len(); 163 let mut writer = Vec::new(); 164 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 165 writer.write_all(&cid_bytes).unwrap(); 166 writer.write_all(&block).unwrap(); 167 car_bytes.extend_from_slice(&writer); 168 169 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) { 170 extract_links_ipld(&value, &mut stack); 171 } 172 } 173 } 174 175 ( 176 StatusCode::OK, 177 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 178 car_bytes, 179 ) 180 .into_response() 181} 182 183fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) { 184 match value { 185 Ipld::Link(cid) => { 186 stack.push(*cid); 187 } 188 Ipld::Map(map) => { 189 for v in map.values() { 190 extract_links_ipld(v, stack); 191 } 192 } 193 Ipld::List(arr) => { 194 for v in arr { 195 extract_links_ipld(v, stack); 196 } 197 } 198 _ => {} 199 } 200} 201 202#[derive(Deserialize)] 203pub struct GetRecordQuery { 204 pub did: String, 205 pub collection: String, 206 pub rkey: String, 207} 208 209pub async fn get_record( 210 State(state): State<AppState>, 211 Query(query): Query<GetRecordQuery>, 212) -> Response { 213 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 214 .fetch_optional(&state.db) 215 .await 216 .unwrap_or(None); 217 218 let user_id = match user { 219 Some(u) => u.id, 220 None => { 221 return ( 222 StatusCode::NOT_FOUND, 223 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 224 ) 225 .into_response(); 226 } 227 }; 228 229 let record = sqlx::query!( 230 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 231 user_id, 232 query.collection, 233 query.rkey 234 ) 235 .fetch_optional(&state.db) 236 .await 237 .unwrap_or(None); 238 239 let record_cid_str = match record { 240 Some(r) => r.record_cid, 241 None => { 242 return ( 243 StatusCode::NOT_FOUND, 244 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 245 ) 246 .into_response(); 247 } 248 }; 249 250 let cid = match Cid::from_str(&record_cid_str) { 251 Ok(c) => c, 252 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(), 253 }; 254 255 let block_res = state.block_store.get(&cid).await; 256 let block = match block_res { 257 Ok(Some(b)) => b, 258 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(), 259 }; 260 261 let header = encode_car_header(&cid); 262 let mut car_bytes = header; 263 264 let cid_bytes = cid.to_bytes(); 265 let total_len = cid_bytes.len() + block.len(); 266 let mut writer = Vec::new(); 267 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 268 writer.write_all(&cid_bytes).unwrap(); 269 writer.write_all(&block).unwrap(); 270 car_bytes.extend_from_slice(&writer); 271 272 ( 273 StatusCode::OK, 274 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 275 car_bytes, 276 ) 277 .into_response() 278}