this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::Deserialize;
13use serde_json::json;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
19
20#[derive(Deserialize)]
21pub struct GetBlocksQuery {
22 pub did: String,
23 pub cids: String,
24}
25
26pub async fn get_blocks(
27 State(state): State<AppState>,
28 Query(query): Query<GetBlocksQuery>,
29) -> Response {
30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
31 .fetch_optional(&state.db)
32 .await
33 .unwrap_or(None);
34 if user_exists.is_none() {
35 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
36 }
37 let cids_str: Vec<&str> = query.cids.split(',').collect();
38 let mut cids = Vec::new();
39 for s in cids_str {
40 match Cid::from_str(s) {
41 Ok(cid) => cids.push(cid),
42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
43 }
44 }
45 let blocks_res = state.block_store.get_many(&cids).await;
46 let blocks = match blocks_res {
47 Ok(blocks) => blocks,
48 Err(e) => {
49 error!("Failed to get blocks: {}", e);
50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
51 }
52 };
53 if cids.is_empty() {
54 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
55 }
56 let root_cid = cids[0];
57 let header = match encode_car_header(&root_cid) {
58 Ok(h) => h,
59 Err(e) => {
60 error!("Failed to encode CAR header: {}", e);
61 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response();
62 }
63 };
64 let mut car_bytes = header;
65 for (i, block_opt) in blocks.into_iter().enumerate() {
66 if let Some(block) = block_opt {
67 let cid = cids[i];
68 let cid_bytes = cid.to_bytes();
69 let total_len = cid_bytes.len() + block.len();
70 let mut writer = Vec::new();
71 crate::sync::car::write_varint(&mut writer, total_len as u64)
72 .expect("Writing to Vec<u8> should never fail");
73 writer
74 .write_all(&cid_bytes)
75 .expect("Writing to Vec<u8> should never fail");
76 writer
77 .write_all(&block)
78 .expect("Writing to Vec<u8> should never fail");
79 car_bytes.extend_from_slice(&writer);
80 }
81 }
82 (
83 StatusCode::OK,
84 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
85 car_bytes,
86 )
87 .into_response()
88}
89
90#[derive(Deserialize)]
91pub struct GetRepoQuery {
92 pub did: String,
93 pub since: Option<String>,
94}
95
96pub async fn get_repo(
97 State(state): State<AppState>,
98 Query(query): Query<GetRepoQuery>,
99) -> Response {
100 let repo_row = sqlx::query!(
101 r#"
102 SELECT r.repo_root_cid
103 FROM repos r
104 JOIN users u ON u.id = r.user_id
105 WHERE u.did = $1
106 "#,
107 query.did
108 )
109 .fetch_optional(&state.db)
110 .await
111 .unwrap_or(None);
112 let head_str = match repo_row {
113 Some(r) => r.repo_root_cid,
114 None => {
115 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
116 .fetch_optional(&state.db)
117 .await
118 .unwrap_or(None);
119 if user_exists.is_none() {
120 return (
121 StatusCode::NOT_FOUND,
122 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
123 )
124 .into_response();
125 } else {
126 return (
127 StatusCode::NOT_FOUND,
128 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
129 )
130 .into_response();
131 }
132 }
133 };
134 let head_cid = match Cid::from_str(&head_str) {
135 Ok(c) => c,
136 Err(_) => {
137 return (
138 StatusCode::INTERNAL_SERVER_ERROR,
139 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
140 )
141 .into_response();
142 }
143 };
144 let mut car_bytes = match encode_car_header(&head_cid) {
145 Ok(h) => h,
146 Err(e) => {
147 return (
148 StatusCode::INTERNAL_SERVER_ERROR,
149 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
150 )
151 .into_response();
152 }
153 };
154 let mut stack = vec![head_cid];
155 let mut visited = std::collections::HashSet::new();
156 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
157 while let Some(cid) = stack.pop() {
158 if visited.contains(&cid) {
159 continue;
160 }
161 visited.insert(cid);
162 if remaining == 0 {
163 break;
164 }
165 remaining -= 1;
166 if let Ok(Some(block)) = state.block_store.get(&cid).await {
167 let cid_bytes = cid.to_bytes();
168 let total_len = cid_bytes.len() + block.len();
169 let mut writer = Vec::new();
170 crate::sync::car::write_varint(&mut writer, total_len as u64)
171 .expect("Writing to Vec<u8> should never fail");
172 writer
173 .write_all(&cid_bytes)
174 .expect("Writing to Vec<u8> should never fail");
175 writer
176 .write_all(&block)
177 .expect("Writing to Vec<u8> should never fail");
178 car_bytes.extend_from_slice(&writer);
179 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
180 extract_links_ipld(&value, &mut stack);
181 }
182 }
183 }
184 (
185 StatusCode::OK,
186 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
187 car_bytes,
188 )
189 .into_response()
190}
191
192fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
193 match value {
194 Ipld::Link(cid) => {
195 stack.push(*cid);
196 }
197 Ipld::Map(map) => {
198 for v in map.values() {
199 extract_links_ipld(v, stack);
200 }
201 }
202 Ipld::List(arr) => {
203 for v in arr {
204 extract_links_ipld(v, stack);
205 }
206 }
207 _ => {}
208 }
209}
210
211#[derive(Deserialize)]
212pub struct GetRecordQuery {
213 pub did: String,
214 pub collection: String,
215 pub rkey: String,
216}
217
218pub async fn get_record(
219 State(state): State<AppState>,
220 Query(query): Query<GetRecordQuery>,
221) -> Response {
222 use jacquard_repo::commit::Commit;
223 use jacquard_repo::mst::Mst;
224 use std::collections::BTreeMap;
225 use std::sync::Arc;
226
227 let repo_row = sqlx::query!(
228 r#"
229 SELECT r.repo_root_cid
230 FROM repos r
231 JOIN users u ON u.id = r.user_id
232 WHERE u.did = $1
233 "#,
234 query.did
235 )
236 .fetch_optional(&state.db)
237 .await
238 .unwrap_or(None);
239 let commit_cid_str = match repo_row {
240 Some(r) => r.repo_root_cid,
241 None => {
242 return (
243 StatusCode::NOT_FOUND,
244 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
245 )
246 .into_response();
247 }
248 };
249 let commit_cid = match Cid::from_str(&commit_cid_str) {
250 Ok(c) => c,
251 Err(_) => {
252 return (
253 StatusCode::INTERNAL_SERVER_ERROR,
254 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})),
255 )
256 .into_response();
257 }
258 };
259 let commit_bytes = match state.block_store.get(&commit_cid).await {
260 Ok(Some(b)) => b,
261 _ => {
262 return (
263 StatusCode::INTERNAL_SERVER_ERROR,
264 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
265 )
266 .into_response();
267 }
268 };
269 let commit = match Commit::from_cbor(&commit_bytes) {
270 Ok(c) => c,
271 Err(_) => {
272 return (
273 StatusCode::INTERNAL_SERVER_ERROR,
274 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
275 )
276 .into_response();
277 }
278 };
279 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
280 let key = format!("{}/{}", query.collection, query.rkey);
281 let record_cid = match mst.get(&key).await {
282 Ok(Some(cid)) => cid,
283 Ok(None) => {
284 return (
285 StatusCode::NOT_FOUND,
286 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
287 )
288 .into_response();
289 }
290 Err(_) => {
291 return (
292 StatusCode::INTERNAL_SERVER_ERROR,
293 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})),
294 )
295 .into_response();
296 }
297 };
298 let record_block = match state.block_store.get(&record_cid).await {
299 Ok(Some(b)) => b,
300 _ => {
301 return (
302 StatusCode::NOT_FOUND,
303 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
304 )
305 .into_response();
306 }
307 };
308 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
309 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() {
310 return (
311 StatusCode::INTERNAL_SERVER_ERROR,
312 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})),
313 )
314 .into_response();
315 }
316 let header = match encode_car_header(&commit_cid) {
317 Ok(h) => h,
318 Err(e) => {
319 error!("Failed to encode CAR header: {}", e);
320 return (
321 StatusCode::INTERNAL_SERVER_ERROR,
322 Json(json!({"error": "InternalError"})),
323 )
324 .into_response();
325 }
326 };
327 let mut car_bytes = header;
328 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
329 let cid_bytes = cid.to_bytes();
330 let total_len = cid_bytes.len() + data.len();
331 let mut writer = Vec::new();
332 crate::sync::car::write_varint(&mut writer, total_len as u64)
333 .expect("Writing to Vec<u8> should never fail");
334 writer
335 .write_all(&cid_bytes)
336 .expect("Writing to Vec<u8> should never fail");
337 writer
338 .write_all(data)
339 .expect("Writing to Vec<u8> should never fail");
340 car.extend_from_slice(&writer);
341 };
342 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
343 for (cid, data) in &proof_blocks {
344 write_block(&mut car_bytes, cid, data);
345 }
346 write_block(&mut car_bytes, &record_cid, &record_block);
347 (
348 StatusCode::OK,
349 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
350 car_bytes,
351 )
352 .into_response()
353}