this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::car::encode_car_header;
4use crate::sync::util::assert_repo_availability;
5use axum::{
6 extract::{Query, RawQuery, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use ipld_core::ipld::Ipld;
12use jacquard_repo::storage::BlockStore;
13use serde::Deserialize;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
19
20fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> {
21 let did = crate::util::parse_repeated_query_param(Some(query_string), "did")
22 .into_iter()
23 .next()
24 .ok_or("Missing required parameter: did")?;
25 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids");
26 Ok((did, cids))
27}
28
29pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response {
30 let Some(query_string) = query else {
31 return ApiError::InvalidRequest("Missing query parameters".into()).into_response();
32 };
33
34 let (did, cid_strings) = match parse_get_blocks_query(&query_string) {
35 Ok(parsed) => parsed,
36 Err(msg) => return ApiError::InvalidRequest(msg).into_response(),
37 };
38
39 let _account = match assert_repo_availability(&state.db, &did, false).await {
40 Ok(a) => a,
41 Err(e) => return e.into_response(),
42 };
43
44 let cids: Vec<Cid> = match cid_strings
45 .iter()
46 .map(|s| Cid::from_str(s).map_err(|_| s.clone()))
47 .collect::<Result<Vec<_>, _>>()
48 {
49 Ok(cids) => cids,
50 Err(invalid) => {
51 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response()
52 }
53 };
54
55 if cids.is_empty() {
56 return ApiError::InvalidRequest("No CIDs provided".into()).into_response();
57 }
58
59 let blocks = match state.block_store.get_many(&cids).await {
60 Ok(blocks) => blocks,
61 Err(e) => {
62 error!("Failed to get blocks: {}", e);
63 return ApiError::InternalError(None).into_response();
64 }
65 };
66
67 let missing_cids: Vec<String> = blocks
68 .iter()
69 .zip(&cids)
70 .filter_map(|(block_opt, cid)| block_opt.is_none().then(|| cid.to_string()))
71 .collect();
72 if !missing_cids.is_empty() {
73 return ApiError::InvalidRequest(format!(
74 "Could not find blocks: {}",
75 missing_cids.join(", ")
76 ))
77 .into_response();
78 }
79
80 let header = match crate::sync::car::encode_car_header_null_root() {
81 Ok(h) => h,
82 Err(e) => {
83 error!("Failed to encode CAR header: {}", e);
84 return ApiError::InternalError(None).into_response();
85 }
86 };
87 let mut car_bytes = header;
88 for (i, block_opt) in blocks.into_iter().enumerate() {
89 if let Some(block) = block_opt {
90 let cid = cids[i];
91 let cid_bytes = cid.to_bytes();
92 let total_len = cid_bytes.len() + block.len();
93 let mut writer = Vec::new();
94 crate::sync::car::write_varint(&mut writer, total_len as u64)
95 .expect("Writing to Vec<u8> should never fail");
96 writer
97 .write_all(&cid_bytes)
98 .expect("Writing to Vec<u8> should never fail");
99 writer
100 .write_all(&block)
101 .expect("Writing to Vec<u8> should never fail");
102 car_bytes.extend_from_slice(&writer);
103 }
104 }
105 (
106 StatusCode::OK,
107 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
108 car_bytes,
109 )
110 .into_response()
111}
112
113#[derive(Deserialize)]
114pub struct GetRepoQuery {
115 pub did: String,
116 pub since: Option<String>,
117}
118
119pub async fn get_repo(
120 State(state): State<AppState>,
121 Query(query): Query<GetRepoQuery>,
122) -> Response {
123 let account = match assert_repo_availability(&state.db, &query.did, false).await {
124 Ok(a) => a,
125 Err(e) => return e.into_response(),
126 };
127
128 let Some(head_str) = account.repo_root_cid else {
129 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
130 };
131
132 let Ok(head_cid) = Cid::from_str(&head_str) else {
133 return ApiError::InternalError(None).into_response();
134 };
135
136 if let Some(since) = &query.since {
137 return get_repo_since(&state, &query.did, &head_cid, since).await;
138 }
139
140 let mut car_bytes = match encode_car_header(&head_cid) {
141 Ok(h) => h,
142 Err(e) => {
143 error!("Failed to encode CAR header: {}", e);
144 return ApiError::InternalError(None).into_response();
145 }
146 };
147 let mut stack = vec![head_cid];
148 let mut visited = std::collections::HashSet::new();
149 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
150 while let Some(cid) = stack.pop() {
151 if visited.contains(&cid) {
152 continue;
153 }
154 visited.insert(cid);
155 if remaining == 0 {
156 break;
157 }
158 remaining -= 1;
159 if let Ok(Some(block)) = state.block_store.get(&cid).await {
160 let cid_bytes = cid.to_bytes();
161 let total_len = cid_bytes.len() + block.len();
162 let mut writer = Vec::new();
163 crate::sync::car::write_varint(&mut writer, total_len as u64)
164 .expect("Writing to Vec<u8> should never fail");
165 writer
166 .write_all(&cid_bytes)
167 .expect("Writing to Vec<u8> should never fail");
168 writer
169 .write_all(&block)
170 .expect("Writing to Vec<u8> should never fail");
171 car_bytes.extend_from_slice(&writer);
172 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
173 extract_links_ipld(&value, &mut stack);
174 }
175 }
176 }
177 (
178 StatusCode::OK,
179 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
180 car_bytes,
181 )
182 .into_response()
183}
184
185async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response {
186 let events = sqlx::query!(
187 r#"
188 SELECT blocks_cids, commit_cid
189 FROM repo_seq
190 WHERE did = $1 AND rev > $2
191 ORDER BY seq DESC
192 "#,
193 did,
194 since
195 )
196 .fetch_all(&state.db)
197 .await;
198
199 let events = match events {
200 Ok(e) => e,
201 Err(e) => {
202 error!("DB error in get_repo_since: {:?}", e);
203 return ApiError::InternalError(Some("Database error".into())).into_response();
204 }
205 };
206
207 let mut block_cids: Vec<Cid> = Vec::new();
208 for event in &events {
209 if let Some(cids) = &event.blocks_cids {
210 for cid_str in cids {
211 if let Ok(cid) = Cid::from_str(cid_str)
212 && !block_cids.contains(&cid)
213 {
214 block_cids.push(cid);
215 }
216 }
217 }
218 if let Some(commit_cid_str) = &event.commit_cid
219 && let Ok(cid) = Cid::from_str(commit_cid_str)
220 && !block_cids.contains(&cid)
221 {
222 block_cids.push(cid);
223 }
224 }
225
226 let mut car_bytes = match encode_car_header(head_cid) {
227 Ok(h) => h,
228 Err(e) => {
229 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e)))
230 .into_response();
231 }
232 };
233
234 if block_cids.is_empty() {
235 return (
236 StatusCode::OK,
237 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
238 car_bytes,
239 )
240 .into_response();
241 }
242
243 let blocks = match state.block_store.get_many(&block_cids).await {
244 Ok(b) => b,
245 Err(e) => {
246 error!("Block store error in get_repo_since: {:?}", e);
247 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response();
248 }
249 };
250
251 for (i, block_opt) in blocks.into_iter().enumerate() {
252 if let Some(block) = block_opt {
253 let cid = block_cids[i];
254 let cid_bytes = cid.to_bytes();
255 let total_len = cid_bytes.len() + block.len();
256 let mut writer = Vec::new();
257 crate::sync::car::write_varint(&mut writer, total_len as u64)
258 .expect("Writing to Vec<u8> should never fail");
259 writer
260 .write_all(&cid_bytes)
261 .expect("Writing to Vec<u8> should never fail");
262 writer
263 .write_all(&block)
264 .expect("Writing to Vec<u8> should never fail");
265 car_bytes.extend_from_slice(&writer);
266 }
267 }
268
269 (
270 StatusCode::OK,
271 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
272 car_bytes,
273 )
274 .into_response()
275}
276
277fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
278 match value {
279 Ipld::Link(cid) => {
280 stack.push(*cid);
281 }
282 Ipld::Map(map) => {
283 for v in map.values() {
284 extract_links_ipld(v, stack);
285 }
286 }
287 Ipld::List(arr) => {
288 for v in arr {
289 extract_links_ipld(v, stack);
290 }
291 }
292 _ => {}
293 }
294}
295
296#[derive(Deserialize)]
297pub struct GetRecordQuery {
298 pub did: String,
299 pub collection: String,
300 pub rkey: String,
301}
302
303pub async fn get_record(
304 State(state): State<AppState>,
305 Query(query): Query<GetRecordQuery>,
306) -> Response {
307 use jacquard_repo::commit::Commit;
308 use jacquard_repo::mst::Mst;
309 use std::collections::BTreeMap;
310 use std::sync::Arc;
311
312 let account = match assert_repo_availability(&state.db, &query.did, false).await {
313 Ok(a) => a,
314 Err(e) => return e.into_response(),
315 };
316
317 let commit_cid_str = match account.repo_root_cid {
318 Some(cid) => cid,
319 None => {
320 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
321 }
322 };
323 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else {
324 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response();
325 };
326 let commit_bytes = match state.block_store.get(&commit_cid).await {
327 Ok(Some(b)) => b,
328 _ => {
329 return ApiError::InternalError(Some("Commit block not found".into())).into_response();
330 }
331 };
332 let Ok(commit) = Commit::from_cbor(&commit_bytes) else {
333 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response();
334 };
335 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
336 let key = format!("{}/{}", query.collection, query.rkey);
337 let record_cid = match mst.get(&key).await {
338 Ok(Some(cid)) => cid,
339 Ok(None) => {
340 return ApiError::RecordNotFound.into_response();
341 }
342 Err(_) => {
343 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response();
344 }
345 };
346 let record_block = match state.block_store.get(&record_cid).await {
347 Ok(Some(b)) => b,
348 _ => {
349 return ApiError::RecordNotFound.into_response();
350 }
351 };
352 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
353 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() {
354 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response();
355 }
356 let header = match encode_car_header(&commit_cid) {
357 Ok(h) => h,
358 Err(e) => {
359 error!("Failed to encode CAR header: {}", e);
360 return ApiError::InternalError(None).into_response();
361 }
362 };
363 let mut car_bytes = header;
364 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
365 let cid_bytes = cid.to_bytes();
366 let total_len = cid_bytes.len() + data.len();
367 let mut writer = Vec::new();
368 crate::sync::car::write_varint(&mut writer, total_len as u64)
369 .expect("Writing to Vec<u8> should never fail");
370 writer
371 .write_all(&cid_bytes)
372 .expect("Writing to Vec<u8> should never fail");
373 writer
374 .write_all(data)
375 .expect("Writing to Vec<u8> should never fail");
376 car.extend_from_slice(&writer);
377 };
378 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
379 for (cid, data) in &proof_blocks {
380 write_block(&mut car_bytes, cid, data);
381 }
382 write_block(&mut car_bytes, &record_cid, &record_block);
383 (
384 StatusCode::OK,
385 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
386 car_bytes,
387 )
388 .into_response()
389}