this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::car::encode_car_header;
4use crate::sync::util::assert_repo_availability;
5use axum::{
6 extract::{Query, RawQuery, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use ipld_core::ipld::Ipld;
12use jacquard_repo::storage::BlockStore;
13use serde::Deserialize;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
19
20fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> {
21 let did = crate::util::parse_repeated_query_param(Some(query_string), "did")
22 .into_iter()
23 .next()
24 .ok_or("Missing required parameter: did")?;
25 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids");
26 Ok((did, cids))
27}
28
29pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response {
30 let Some(query_string) = query else {
31 return ApiError::InvalidRequest("Missing query parameters".into()).into_response();
32 };
33
34 let (did, cid_strings) = match parse_get_blocks_query(&query_string) {
35 Ok(parsed) => parsed,
36 Err(msg) => return ApiError::InvalidRequest(msg).into_response(),
37 };
38
39 let _account = match assert_repo_availability(&state.db, &did, false).await {
40 Ok(a) => a,
41 Err(e) => return e.into_response(),
42 };
43
44 let cids: Vec<Cid> = match cid_strings
45 .iter()
46 .map(|s| Cid::from_str(s).map_err(|_| s.clone()))
47 .collect::<Result<Vec<_>, _>>()
48 {
49 Ok(cids) => cids,
50 Err(invalid) => {
51 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response();
52 }
53 };
54
55 if cids.is_empty() {
56 return ApiError::InvalidRequest("No CIDs provided".into()).into_response();
57 }
58
59 let blocks = match state.block_store.get_many(&cids).await {
60 Ok(blocks) => blocks,
61 Err(e) => {
62 error!("Failed to get blocks: {}", e);
63 return ApiError::InternalError(None).into_response();
64 }
65 };
66
67 let missing_cids: Vec<String> = blocks
68 .iter()
69 .zip(&cids)
70 .filter(|(block_opt, _)| block_opt.is_none())
71 .map(|(_, cid)| cid.to_string())
72 .collect();
73 if !missing_cids.is_empty() {
74 return ApiError::InvalidRequest(format!(
75 "Could not find blocks: {}",
76 missing_cids.join(", ")
77 ))
78 .into_response();
79 }
80
81 let header = match crate::sync::car::encode_car_header_null_root() {
82 Ok(h) => h,
83 Err(e) => {
84 error!("Failed to encode CAR header: {}", e);
85 return ApiError::InternalError(None).into_response();
86 }
87 };
88 let mut car_bytes = header;
89 for (i, block_opt) in blocks.into_iter().enumerate() {
90 if let Some(block) = block_opt {
91 let cid = cids[i];
92 let cid_bytes = cid.to_bytes();
93 let total_len = cid_bytes.len() + block.len();
94 let mut writer = Vec::new();
95 crate::sync::car::write_varint(&mut writer, total_len as u64)
96 .expect("Writing to Vec<u8> should never fail");
97 writer
98 .write_all(&cid_bytes)
99 .expect("Writing to Vec<u8> should never fail");
100 writer
101 .write_all(&block)
102 .expect("Writing to Vec<u8> should never fail");
103 car_bytes.extend_from_slice(&writer);
104 }
105 }
106 (
107 StatusCode::OK,
108 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
109 car_bytes,
110 )
111 .into_response()
112}
113
114#[derive(Deserialize)]
115pub struct GetRepoQuery {
116 pub did: String,
117 pub since: Option<String>,
118}
119
120pub async fn get_repo(
121 State(state): State<AppState>,
122 Query(query): Query<GetRepoQuery>,
123) -> Response {
124 let account = match assert_repo_availability(&state.db, &query.did, false).await {
125 Ok(a) => a,
126 Err(e) => return e.into_response(),
127 };
128
129 let Some(head_str) = account.repo_root_cid else {
130 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
131 };
132
133 let Ok(head_cid) = Cid::from_str(&head_str) else {
134 return ApiError::InternalError(None).into_response();
135 };
136
137 if let Some(since) = &query.since {
138 return get_repo_since(&state, &query.did, &head_cid, since).await;
139 }
140
141 let mut car_bytes = match encode_car_header(&head_cid) {
142 Ok(h) => h,
143 Err(e) => {
144 error!("Failed to encode CAR header: {}", e);
145 return ApiError::InternalError(None).into_response();
146 }
147 };
148 let mut stack = vec![head_cid];
149 let mut visited = std::collections::HashSet::new();
150 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
151 while let Some(cid) = stack.pop() {
152 if visited.contains(&cid) {
153 continue;
154 }
155 visited.insert(cid);
156 if remaining == 0 {
157 break;
158 }
159 remaining -= 1;
160 if let Ok(Some(block)) = state.block_store.get(&cid).await {
161 let cid_bytes = cid.to_bytes();
162 let total_len = cid_bytes.len() + block.len();
163 let mut writer = Vec::new();
164 crate::sync::car::write_varint(&mut writer, total_len as u64)
165 .expect("Writing to Vec<u8> should never fail");
166 writer
167 .write_all(&cid_bytes)
168 .expect("Writing to Vec<u8> should never fail");
169 writer
170 .write_all(&block)
171 .expect("Writing to Vec<u8> should never fail");
172 car_bytes.extend_from_slice(&writer);
173 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
174 extract_links_ipld(&value, &mut stack);
175 }
176 }
177 }
178 (
179 StatusCode::OK,
180 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
181 car_bytes,
182 )
183 .into_response()
184}
185
186async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response {
187 let events = sqlx::query!(
188 r#"
189 SELECT blocks_cids, commit_cid
190 FROM repo_seq
191 WHERE did = $1 AND rev > $2
192 ORDER BY seq DESC
193 "#,
194 did,
195 since
196 )
197 .fetch_all(&state.db)
198 .await;
199
200 let events = match events {
201 Ok(e) => e,
202 Err(e) => {
203 error!("DB error in get_repo_since: {:?}", e);
204 return ApiError::InternalError(Some("Database error".into())).into_response();
205 }
206 };
207
208 let mut block_cids: Vec<Cid> = Vec::new();
209 for event in &events {
210 if let Some(cids) = &event.blocks_cids {
211 for cid_str in cids {
212 if let Ok(cid) = Cid::from_str(cid_str)
213 && !block_cids.contains(&cid)
214 {
215 block_cids.push(cid);
216 }
217 }
218 }
219 if let Some(commit_cid_str) = &event.commit_cid
220 && let Ok(cid) = Cid::from_str(commit_cid_str)
221 && !block_cids.contains(&cid)
222 {
223 block_cids.push(cid);
224 }
225 }
226
227 let mut car_bytes = match encode_car_header(head_cid) {
228 Ok(h) => h,
229 Err(e) => {
230 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e)))
231 .into_response();
232 }
233 };
234
235 if block_cids.is_empty() {
236 return (
237 StatusCode::OK,
238 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
239 car_bytes,
240 )
241 .into_response();
242 }
243
244 let blocks = match state.block_store.get_many(&block_cids).await {
245 Ok(b) => b,
246 Err(e) => {
247 error!("Block store error in get_repo_since: {:?}", e);
248 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response();
249 }
250 };
251
252 for (i, block_opt) in blocks.into_iter().enumerate() {
253 if let Some(block) = block_opt {
254 let cid = block_cids[i];
255 let cid_bytes = cid.to_bytes();
256 let total_len = cid_bytes.len() + block.len();
257 let mut writer = Vec::new();
258 crate::sync::car::write_varint(&mut writer, total_len as u64)
259 .expect("Writing to Vec<u8> should never fail");
260 writer
261 .write_all(&cid_bytes)
262 .expect("Writing to Vec<u8> should never fail");
263 writer
264 .write_all(&block)
265 .expect("Writing to Vec<u8> should never fail");
266 car_bytes.extend_from_slice(&writer);
267 }
268 }
269
270 (
271 StatusCode::OK,
272 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
273 car_bytes,
274 )
275 .into_response()
276}
277
278fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
279 match value {
280 Ipld::Link(cid) => {
281 stack.push(*cid);
282 }
283 Ipld::Map(map) => {
284 for v in map.values() {
285 extract_links_ipld(v, stack);
286 }
287 }
288 Ipld::List(arr) => {
289 for v in arr {
290 extract_links_ipld(v, stack);
291 }
292 }
293 _ => {}
294 }
295}
296
297#[derive(Deserialize)]
298pub struct GetRecordQuery {
299 pub did: String,
300 pub collection: String,
301 pub rkey: String,
302}
303
304pub async fn get_record(
305 State(state): State<AppState>,
306 Query(query): Query<GetRecordQuery>,
307) -> Response {
308 use jacquard_repo::commit::Commit;
309 use jacquard_repo::mst::Mst;
310 use std::collections::BTreeMap;
311 use std::sync::Arc;
312
313 let account = match assert_repo_availability(&state.db, &query.did, false).await {
314 Ok(a) => a,
315 Err(e) => return e.into_response(),
316 };
317
318 let commit_cid_str = match account.repo_root_cid {
319 Some(cid) => cid,
320 None => {
321 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
322 }
323 };
324 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else {
325 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response();
326 };
327 let commit_bytes = match state.block_store.get(&commit_cid).await {
328 Ok(Some(b)) => b,
329 _ => {
330 return ApiError::InternalError(Some("Commit block not found".into())).into_response();
331 }
332 };
333 let Ok(commit) = Commit::from_cbor(&commit_bytes) else {
334 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response();
335 };
336 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
337 let key = format!("{}/{}", query.collection, query.rkey);
338 let record_cid = match mst.get(&key).await {
339 Ok(Some(cid)) => cid,
340 Ok(None) => {
341 return ApiError::RecordNotFound.into_response();
342 }
343 Err(_) => {
344 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response();
345 }
346 };
347 let record_block = match state.block_store.get(&record_cid).await {
348 Ok(Some(b)) => b,
349 _ => {
350 return ApiError::RecordNotFound.into_response();
351 }
352 };
353 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
354 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() {
355 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response();
356 }
357 let header = match encode_car_header(&commit_cid) {
358 Ok(h) => h,
359 Err(e) => {
360 error!("Failed to encode CAR header: {}", e);
361 return ApiError::InternalError(None).into_response();
362 }
363 };
364 let mut car_bytes = header;
365 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
366 let cid_bytes = cid.to_bytes();
367 let total_len = cid_bytes.len() + data.len();
368 let mut writer = Vec::new();
369 crate::sync::car::write_varint(&mut writer, total_len as u64)
370 .expect("Writing to Vec<u8> should never fail");
371 writer
372 .write_all(&cid_bytes)
373 .expect("Writing to Vec<u8> should never fail");
374 writer
375 .write_all(data)
376 .expect("Writing to Vec<u8> should never fail");
377 car.extend_from_slice(&writer);
378 };
379 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
380 for (cid, data) in &proof_blocks {
381 write_block(&mut car_bytes, cid, data);
382 }
383 write_block(&mut car_bytes, &record_cid, &record_block);
384 (
385 StatusCode::OK,
386 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
387 car_bytes,
388 )
389 .into_response()
390}