this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use crate::sync::util::assert_repo_availability;
4use axum::{
5 Json,
6 extract::{Query, RawQuery, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use ipld_core::ipld::Ipld;
12use jacquard_repo::storage::BlockStore;
13use serde::Deserialize;
14use serde_json::json;
15use std::io::Write;
16use std::str::FromStr;
17use tracing::error;
18
19const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
20
21fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> {
22 let did = crate::util::parse_repeated_query_param(Some(query_string), "did")
23 .into_iter()
24 .next()
25 .ok_or("Missing required parameter: did")?;
26 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids");
27 Ok((did, cids))
28}
29
30pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response {
31 let query_string = match query {
32 Some(q) => q,
33 None => {
34 return (
35 StatusCode::BAD_REQUEST,
36 Json(json!({"error": "InvalidRequest", "message": "Missing query parameters"})),
37 )
38 .into_response();
39 }
40 };
41
42 let (did, cid_strings) = match parse_get_blocks_query(&query_string) {
43 Ok(parsed) => parsed,
44 Err(msg) => {
45 return (
46 StatusCode::BAD_REQUEST,
47 Json(json!({"error": "InvalidRequest", "message": msg})),
48 )
49 .into_response();
50 }
51 };
52
53 let _account = match assert_repo_availability(&state.db, &did, false).await {
54 Ok(a) => a,
55 Err(e) => return e.into_response(),
56 };
57
58 let mut cids = Vec::new();
59 for s in &cid_strings {
60 match Cid::from_str(s) {
61 Ok(cid) => cids.push(cid),
62 Err(_) => return (
63 StatusCode::BAD_REQUEST,
64 Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", s)})),
65 )
66 .into_response(),
67 }
68 }
69
70 if cids.is_empty() {
71 return (
72 StatusCode::BAD_REQUEST,
73 Json(json!({"error": "InvalidRequest", "message": "No CIDs provided"})),
74 )
75 .into_response();
76 }
77
78 let blocks_res = state.block_store.get_many(&cids).await;
79 let blocks = match blocks_res {
80 Ok(blocks) => blocks,
81 Err(e) => {
82 error!("Failed to get blocks: {}", e);
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError", "message": "Failed to get blocks"})),
86 )
87 .into_response();
88 }
89 };
90
91 let mut missing_cids: Vec<String> = Vec::new();
92 for (i, block_opt) in blocks.iter().enumerate() {
93 if block_opt.is_none() {
94 missing_cids.push(cids[i].to_string());
95 }
96 }
97 if !missing_cids.is_empty() {
98 return (
99 StatusCode::BAD_REQUEST,
100 Json(json!({
101 "error": "InvalidRequest",
102 "message": format!("Could not find blocks: {}", missing_cids.join(", "))
103 })),
104 )
105 .into_response();
106 }
107
108 let header = match crate::sync::car::encode_car_header_null_root() {
109 Ok(h) => h,
110 Err(e) => {
111 error!("Failed to encode CAR header: {}", e);
112 return (
113 StatusCode::INTERNAL_SERVER_ERROR,
114 Json(json!({"error": "InternalError", "message": "Failed to encode CAR"})),
115 )
116 .into_response();
117 }
118 };
119 let mut car_bytes = header;
120 for (i, block_opt) in blocks.into_iter().enumerate() {
121 if let Some(block) = block_opt {
122 let cid = cids[i];
123 let cid_bytes = cid.to_bytes();
124 let total_len = cid_bytes.len() + block.len();
125 let mut writer = Vec::new();
126 crate::sync::car::write_varint(&mut writer, total_len as u64)
127 .expect("Writing to Vec<u8> should never fail");
128 writer
129 .write_all(&cid_bytes)
130 .expect("Writing to Vec<u8> should never fail");
131 writer
132 .write_all(&block)
133 .expect("Writing to Vec<u8> should never fail");
134 car_bytes.extend_from_slice(&writer);
135 }
136 }
137 (
138 StatusCode::OK,
139 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
140 car_bytes,
141 )
142 .into_response()
143}
144
145#[derive(Deserialize)]
146pub struct GetRepoQuery {
147 pub did: String,
148 pub since: Option<String>,
149}
150
151pub async fn get_repo(
152 State(state): State<AppState>,
153 Query(query): Query<GetRepoQuery>,
154) -> Response {
155 let account = match assert_repo_availability(&state.db, &query.did, false).await {
156 Ok(a) => a,
157 Err(e) => return e.into_response(),
158 };
159
160 let head_str = match account.repo_root_cid {
161 Some(cid) => cid,
162 None => {
163 return (
164 StatusCode::BAD_REQUEST,
165 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
166 )
167 .into_response();
168 }
169 };
170
171 let head_cid = match Cid::from_str(&head_str) {
172 Ok(c) => c,
173 Err(_) => {
174 return (
175 StatusCode::INTERNAL_SERVER_ERROR,
176 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
177 )
178 .into_response();
179 }
180 };
181
182 if let Some(since) = &query.since {
183 return get_repo_since(&state, &query.did, &head_cid, since).await;
184 }
185
186 let mut car_bytes = match encode_car_header(&head_cid) {
187 Ok(h) => h,
188 Err(e) => {
189 return (
190 StatusCode::INTERNAL_SERVER_ERROR,
191 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
192 )
193 .into_response();
194 }
195 };
196 let mut stack = vec![head_cid];
197 let mut visited = std::collections::HashSet::new();
198 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
199 while let Some(cid) = stack.pop() {
200 if visited.contains(&cid) {
201 continue;
202 }
203 visited.insert(cid);
204 if remaining == 0 {
205 break;
206 }
207 remaining -= 1;
208 if let Ok(Some(block)) = state.block_store.get(&cid).await {
209 let cid_bytes = cid.to_bytes();
210 let total_len = cid_bytes.len() + block.len();
211 let mut writer = Vec::new();
212 crate::sync::car::write_varint(&mut writer, total_len as u64)
213 .expect("Writing to Vec<u8> should never fail");
214 writer
215 .write_all(&cid_bytes)
216 .expect("Writing to Vec<u8> should never fail");
217 writer
218 .write_all(&block)
219 .expect("Writing to Vec<u8> should never fail");
220 car_bytes.extend_from_slice(&writer);
221 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
222 extract_links_ipld(&value, &mut stack);
223 }
224 }
225 }
226 (
227 StatusCode::OK,
228 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
229 car_bytes,
230 )
231 .into_response()
232}
233
234async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response {
235 let events = sqlx::query!(
236 r#"
237 SELECT blocks_cids, commit_cid
238 FROM repo_seq
239 WHERE did = $1 AND rev > $2
240 ORDER BY seq DESC
241 "#,
242 did,
243 since
244 )
245 .fetch_all(&state.db)
246 .await;
247
248 let events = match events {
249 Ok(e) => e,
250 Err(e) => {
251 error!("DB error in get_repo_since: {:?}", e);
252 return (
253 StatusCode::INTERNAL_SERVER_ERROR,
254 Json(json!({"error": "InternalError", "message": "Database error"})),
255 )
256 .into_response();
257 }
258 };
259
260 let mut block_cids: Vec<Cid> = Vec::new();
261 for event in &events {
262 if let Some(cids) = &event.blocks_cids {
263 for cid_str in cids {
264 if let Ok(cid) = Cid::from_str(cid_str)
265 && !block_cids.contains(&cid)
266 {
267 block_cids.push(cid);
268 }
269 }
270 }
271 if let Some(commit_cid_str) = &event.commit_cid
272 && let Ok(cid) = Cid::from_str(commit_cid_str)
273 && !block_cids.contains(&cid)
274 {
275 block_cids.push(cid);
276 }
277 }
278
279 let mut car_bytes = match encode_car_header(head_cid) {
280 Ok(h) => h,
281 Err(e) => {
282 return (
283 StatusCode::INTERNAL_SERVER_ERROR,
284 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
285 )
286 .into_response();
287 }
288 };
289
290 if block_cids.is_empty() {
291 return (
292 StatusCode::OK,
293 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
294 car_bytes,
295 )
296 .into_response();
297 }
298
299 let blocks = match state.block_store.get_many(&block_cids).await {
300 Ok(b) => b,
301 Err(e) => {
302 error!("Block store error in get_repo_since: {:?}", e);
303 return (
304 StatusCode::INTERNAL_SERVER_ERROR,
305 Json(json!({"error": "InternalError", "message": "Failed to get blocks"})),
306 )
307 .into_response();
308 }
309 };
310
311 for (i, block_opt) in blocks.into_iter().enumerate() {
312 if let Some(block) = block_opt {
313 let cid = block_cids[i];
314 let cid_bytes = cid.to_bytes();
315 let total_len = cid_bytes.len() + block.len();
316 let mut writer = Vec::new();
317 crate::sync::car::write_varint(&mut writer, total_len as u64)
318 .expect("Writing to Vec<u8> should never fail");
319 writer
320 .write_all(&cid_bytes)
321 .expect("Writing to Vec<u8> should never fail");
322 writer
323 .write_all(&block)
324 .expect("Writing to Vec<u8> should never fail");
325 car_bytes.extend_from_slice(&writer);
326 }
327 }
328
329 (
330 StatusCode::OK,
331 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
332 car_bytes,
333 )
334 .into_response()
335}
336
337fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
338 match value {
339 Ipld::Link(cid) => {
340 stack.push(*cid);
341 }
342 Ipld::Map(map) => {
343 for v in map.values() {
344 extract_links_ipld(v, stack);
345 }
346 }
347 Ipld::List(arr) => {
348 for v in arr {
349 extract_links_ipld(v, stack);
350 }
351 }
352 _ => {}
353 }
354}
355
356#[derive(Deserialize)]
357pub struct GetRecordQuery {
358 pub did: String,
359 pub collection: String,
360 pub rkey: String,
361}
362
363pub async fn get_record(
364 State(state): State<AppState>,
365 Query(query): Query<GetRecordQuery>,
366) -> Response {
367 use jacquard_repo::commit::Commit;
368 use jacquard_repo::mst::Mst;
369 use std::collections::BTreeMap;
370 use std::sync::Arc;
371
372 let account = match assert_repo_availability(&state.db, &query.did, false).await {
373 Ok(a) => a,
374 Err(e) => return e.into_response(),
375 };
376
377 let commit_cid_str = match account.repo_root_cid {
378 Some(cid) => cid,
379 None => {
380 return (
381 StatusCode::BAD_REQUEST,
382 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
383 )
384 .into_response();
385 }
386 };
387 let commit_cid = match Cid::from_str(&commit_cid_str) {
388 Ok(c) => c,
389 Err(_) => {
390 return (
391 StatusCode::INTERNAL_SERVER_ERROR,
392 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})),
393 )
394 .into_response();
395 }
396 };
397 let commit_bytes = match state.block_store.get(&commit_cid).await {
398 Ok(Some(b)) => b,
399 _ => {
400 return (
401 StatusCode::INTERNAL_SERVER_ERROR,
402 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
403 )
404 .into_response();
405 }
406 };
407 let commit = match Commit::from_cbor(&commit_bytes) {
408 Ok(c) => c,
409 Err(_) => {
410 return (
411 StatusCode::INTERNAL_SERVER_ERROR,
412 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
413 )
414 .into_response();
415 }
416 };
417 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
418 let key = format!("{}/{}", query.collection, query.rkey);
419 let record_cid = match mst.get(&key).await {
420 Ok(Some(cid)) => cid,
421 Ok(None) => {
422 return (
423 StatusCode::NOT_FOUND,
424 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
425 )
426 .into_response();
427 }
428 Err(_) => {
429 return (
430 StatusCode::INTERNAL_SERVER_ERROR,
431 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})),
432 )
433 .into_response();
434 }
435 };
436 let record_block = match state.block_store.get(&record_cid).await {
437 Ok(Some(b)) => b,
438 _ => {
439 return (
440 StatusCode::NOT_FOUND,
441 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
442 )
443 .into_response();
444 }
445 };
446 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
447 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() {
448 return (
449 StatusCode::INTERNAL_SERVER_ERROR,
450 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})),
451 )
452 .into_response();
453 }
454 let header = match encode_car_header(&commit_cid) {
455 Ok(h) => h,
456 Err(e) => {
457 error!("Failed to encode CAR header: {}", e);
458 return (
459 StatusCode::INTERNAL_SERVER_ERROR,
460 Json(json!({"error": "InternalError"})),
461 )
462 .into_response();
463 }
464 };
465 let mut car_bytes = header;
466 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
467 let cid_bytes = cid.to_bytes();
468 let total_len = cid_bytes.len() + data.len();
469 let mut writer = Vec::new();
470 crate::sync::car::write_varint(&mut writer, total_len as u64)
471 .expect("Writing to Vec<u8> should never fail");
472 writer
473 .write_all(&cid_bytes)
474 .expect("Writing to Vec<u8> should never fail");
475 writer
476 .write_all(data)
477 .expect("Writing to Vec<u8> should never fail");
478 car.extend_from_slice(&writer);
479 };
480 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
481 for (cid, data) in &proof_blocks {
482 write_block(&mut car_bytes, cid, data);
483 }
484 write_block(&mut car_bytes, &record_cid, &record_block);
485 (
486 StatusCode::OK,
487 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
488 car_bytes,
489 )
490 .into_response()
491}