this repo has no description
1use crate::state::AppState; 2use crate::sync::car::encode_car_header; 3use axum::{ 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7 Json, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use serde::Deserialize; 12use serde_json::json; 13use std::io::Write; 14use std::str::FromStr; 15use tracing::error; 16 17#[derive(Deserialize)] 18pub struct GetBlocksQuery { 19 pub did: String, 20 pub cids: String, 21} 22 23pub async fn get_blocks( 24 State(state): State<AppState>, 25 Query(query): Query<GetBlocksQuery>, 26) -> Response { 27 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 28 .fetch_optional(&state.db) 29 .await 30 .unwrap_or(None); 31 32 if user_exists.is_none() { 33 return (StatusCode::NOT_FOUND, "Repo not found").into_response(); 34 } 35 36 let cids_str: Vec<&str> = query.cids.split(',').collect(); 37 let mut cids = Vec::new(); 38 for s in cids_str { 39 match Cid::from_str(s) { 40 Ok(cid) => cids.push(cid), 41 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(), 42 } 43 } 44 45 let blocks_res = state.block_store.get_many(&cids).await; 46 let blocks = match blocks_res { 47 Ok(blocks) => blocks, 48 Err(e) => { 49 error!("Failed to get blocks: {}", e); 50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response(); 51 } 52 }; 53 54 let root_cid = cids.first().cloned().unwrap_or_default(); 55 56 if cids.is_empty() { 57 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response(); 58 } 59 60 let header = encode_car_header(&root_cid); 61 62 let mut car_bytes = header; 63 64 for (i, block_opt) in blocks.into_iter().enumerate() { 65 if let Some(block) = block_opt { 66 let cid = cids[i]; 67 let cid_bytes = cid.to_bytes(); 68 let total_len = cid_bytes.len() + block.len(); 69 70 let mut writer = Vec::new(); 71 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 72 writer.write_all(&cid_bytes).unwrap(); 73 writer.write_all(&block).unwrap(); 74 75 car_bytes.extend_from_slice(&writer); 76 } 77 } 78 79 ( 80 StatusCode::OK, 81 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 82 car_bytes, 83 ) 84 .into_response() 85} 86 87#[derive(Deserialize)] 88pub struct GetRepoQuery { 89 pub did: String, 90 pub since: Option<String>, 91} 92 93pub async fn get_repo( 94 State(state): State<AppState>, 95 Query(query): Query<GetRepoQuery>, 96) -> Response { 97 let repo_row = sqlx::query!( 98 r#" 99 SELECT r.repo_root_cid 100 FROM repos r 101 JOIN users u ON u.id = r.user_id 102 WHERE u.did = $1 103 "#, 104 query.did 105 ) 106 .fetch_optional(&state.db) 107 .await 108 .unwrap_or(None); 109 110 let head_str = match repo_row { 111 Some(r) => r.repo_root_cid, 112 None => { 113 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 114 .fetch_optional(&state.db) 115 .await 116 .unwrap_or(None); 117 118 if user_exists.is_none() { 119 return ( 120 StatusCode::NOT_FOUND, 121 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 122 ) 123 .into_response(); 124 } else { 125 return ( 126 StatusCode::NOT_FOUND, 127 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})), 128 ) 129 .into_response(); 130 } 131 } 132 }; 133 134 let head_cid = match Cid::from_str(&head_str) { 135 Ok(c) => c, 136 Err(_) => { 137 return ( 138 StatusCode::INTERNAL_SERVER_ERROR, 139 Json(json!({"error": "InternalError", "message": "Invalid head CID"})), 140 ) 141 .into_response(); 142 } 143 }; 144 145 let mut car_bytes = encode_car_header(&head_cid); 146 147 let mut stack = vec![head_cid]; 148 let mut visited = std::collections::HashSet::new(); 149 let mut limit = 20000; 150 151 while let Some(cid) = stack.pop() { 152 if visited.contains(&cid) { 153 continue; 154 } 155 visited.insert(cid); 156 if limit == 0 { break; } 157 limit -= 1; 158 159 if let Ok(Some(block)) = state.block_store.get(&cid).await { 160 let cid_bytes = cid.to_bytes(); 161 let total_len = cid_bytes.len() + block.len(); 162 let mut writer = Vec::new(); 163 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 164 writer.write_all(&cid_bytes).unwrap(); 165 writer.write_all(&block).unwrap(); 166 car_bytes.extend_from_slice(&writer); 167 168 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 169 extract_links_json(&value, &mut stack); 170 } 171 } 172 } 173 174 ( 175 StatusCode::OK, 176 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 177 car_bytes, 178 ) 179 .into_response() 180} 181 182fn extract_links_json(value: &serde_json::Value, stack: &mut Vec<Cid>) { 183 match value { 184 serde_json::Value::Object(map) => { 185 if let Some(serde_json::Value::String(s)) = map.get("/") { 186 if let Ok(cid) = Cid::from_str(s) { 187 stack.push(cid); 188 } 189 } else if let Some(serde_json::Value::String(s)) = map.get("$link") { 190 if let Ok(cid) = Cid::from_str(s) { 191 stack.push(cid); 192 } 193 } else { 194 for v in map.values() { 195 extract_links_json(v, stack); 196 } 197 } 198 } 199 serde_json::Value::Array(arr) => { 200 for v in arr { 201 extract_links_json(v, stack); 202 } 203 } 204 _ => {} 205 } 206} 207 208#[derive(Deserialize)] 209pub struct GetRecordQuery { 210 pub did: String, 211 pub collection: String, 212 pub rkey: String, 213} 214 215pub async fn get_record( 216 State(state): State<AppState>, 217 Query(query): Query<GetRecordQuery>, 218) -> Response { 219 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did) 220 .fetch_optional(&state.db) 221 .await 222 .unwrap_or(None); 223 224 let user_id = match user { 225 Some(u) => u.id, 226 None => { 227 return ( 228 StatusCode::NOT_FOUND, 229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 230 ) 231 .into_response(); 232 } 233 }; 234 235 let record = sqlx::query!( 236 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 237 user_id, 238 query.collection, 239 query.rkey 240 ) 241 .fetch_optional(&state.db) 242 .await 243 .unwrap_or(None); 244 245 let record_cid_str = match record { 246 Some(r) => r.record_cid, 247 None => { 248 return ( 249 StatusCode::NOT_FOUND, 250 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 251 ) 252 .into_response(); 253 } 254 }; 255 256 let cid = match Cid::from_str(&record_cid_str) { 257 Ok(c) => c, 258 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(), 259 }; 260 261 let block_res = state.block_store.get(&cid).await; 262 let block = match block_res { 263 Ok(Some(b)) => b, 264 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(), 265 }; 266 267 let header = encode_car_header(&cid); 268 let mut car_bytes = header; 269 270 let cid_bytes = cid.to_bytes(); 271 let total_len = cid_bytes.len() + block.len(); 272 let mut writer = Vec::new(); 273 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap(); 274 writer.write_all(&cid_bytes).unwrap(); 275 writer.write_all(&block).unwrap(); 276 car_bytes.extend_from_slice(&writer); 277 278 ( 279 StatusCode::OK, 280 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")], 281 car_bytes, 282 ) 283 .into_response() 284}