this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8};
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::Deserialize;
13use serde_json::json;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
19
20#[derive(Deserialize)]
21pub struct GetBlocksQuery {
22 pub did: String,
23 pub cids: String,
24}
25
26pub async fn get_blocks(
27 State(state): State<AppState>,
28 Query(query): Query<GetBlocksQuery>,
29) -> Response {
30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
31 .fetch_optional(&state.db)
32 .await
33 .unwrap_or(None);
34 if user_exists.is_none() {
35 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
36 }
37 let cids_str: Vec<&str> = query.cids.split(',').collect();
38 let mut cids = Vec::new();
39 for s in cids_str {
40 match Cid::from_str(s) {
41 Ok(cid) => cids.push(cid),
42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
43 }
44 }
45 let blocks_res = state.block_store.get_many(&cids).await;
46 let blocks = match blocks_res {
47 Ok(blocks) => blocks,
48 Err(e) => {
49 error!("Failed to get blocks: {}", e);
50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
51 }
52 };
53 if cids.is_empty() {
54 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
55 }
56 let root_cid = cids[0];
57 let header = match encode_car_header(&root_cid) {
58 Ok(h) => h,
59 Err(e) => {
60 error!("Failed to encode CAR header: {}", e);
61 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response();
62 }
63 };
64 let mut car_bytes = header;
65 for (i, block_opt) in blocks.into_iter().enumerate() {
66 if let Some(block) = block_opt {
67 let cid = cids[i];
68 let cid_bytes = cid.to_bytes();
69 let total_len = cid_bytes.len() + block.len();
70 let mut writer = Vec::new();
71 crate::sync::car::write_varint(&mut writer, total_len as u64)
72 .expect("Writing to Vec<u8> should never fail");
73 writer.write_all(&cid_bytes)
74 .expect("Writing to Vec<u8> should never fail");
75 writer.write_all(&block)
76 .expect("Writing to Vec<u8> should never fail");
77 car_bytes.extend_from_slice(&writer);
78 }
79 }
80 (
81 StatusCode::OK,
82 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
83 car_bytes,
84 )
85 .into_response()
86}
87
88#[derive(Deserialize)]
89pub struct GetRepoQuery {
90 pub did: String,
91 pub since: Option<String>,
92}
93
94pub async fn get_repo(
95 State(state): State<AppState>,
96 Query(query): Query<GetRepoQuery>,
97) -> Response {
98 let repo_row = sqlx::query!(
99 r#"
100 SELECT r.repo_root_cid
101 FROM repos r
102 JOIN users u ON u.id = r.user_id
103 WHERE u.did = $1
104 "#,
105 query.did
106 )
107 .fetch_optional(&state.db)
108 .await
109 .unwrap_or(None);
110 let head_str = match repo_row {
111 Some(r) => r.repo_root_cid,
112 None => {
113 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
114 .fetch_optional(&state.db)
115 .await
116 .unwrap_or(None);
117 if user_exists.is_none() {
118 return (
119 StatusCode::NOT_FOUND,
120 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
121 )
122 .into_response();
123 } else {
124 return (
125 StatusCode::NOT_FOUND,
126 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
127 )
128 .into_response();
129 }
130 }
131 };
132 let head_cid = match Cid::from_str(&head_str) {
133 Ok(c) => c,
134 Err(_) => {
135 return (
136 StatusCode::INTERNAL_SERVER_ERROR,
137 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
138 )
139 .into_response();
140 }
141 };
142 let mut car_bytes = match encode_car_header(&head_cid) {
143 Ok(h) => h,
144 Err(e) => {
145 return (
146 StatusCode::INTERNAL_SERVER_ERROR,
147 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
148 )
149 .into_response();
150 }
151 };
152 let mut stack = vec![head_cid];
153 let mut visited = std::collections::HashSet::new();
154 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
155 while let Some(cid) = stack.pop() {
156 if visited.contains(&cid) {
157 continue;
158 }
159 visited.insert(cid);
160 if remaining == 0 { break; }
161 remaining -= 1;
162 if let Ok(Some(block)) = state.block_store.get(&cid).await {
163 let cid_bytes = cid.to_bytes();
164 let total_len = cid_bytes.len() + block.len();
165 let mut writer = Vec::new();
166 crate::sync::car::write_varint(&mut writer, total_len as u64)
167 .expect("Writing to Vec<u8> should never fail");
168 writer.write_all(&cid_bytes)
169 .expect("Writing to Vec<u8> should never fail");
170 writer.write_all(&block)
171 .expect("Writing to Vec<u8> should never fail");
172 car_bytes.extend_from_slice(&writer);
173 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
174 extract_links_ipld(&value, &mut stack);
175 }
176 }
177 }
178 (
179 StatusCode::OK,
180 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
181 car_bytes,
182 )
183 .into_response()
184}
185
186fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
187 match value {
188 Ipld::Link(cid) => {
189 stack.push(*cid);
190 }
191 Ipld::Map(map) => {
192 for v in map.values() {
193 extract_links_ipld(v, stack);
194 }
195 }
196 Ipld::List(arr) => {
197 for v in arr {
198 extract_links_ipld(v, stack);
199 }
200 }
201 _ => {}
202 }
203}
204
205#[derive(Deserialize)]
206pub struct GetRecordQuery {
207 pub did: String,
208 pub collection: String,
209 pub rkey: String,
210}
211
212pub async fn get_record(
213 State(state): State<AppState>,
214 Query(query): Query<GetRecordQuery>,
215) -> Response {
216 use jacquard_repo::commit::Commit;
217 use jacquard_repo::mst::Mst;
218 use std::collections::BTreeMap;
219 use std::sync::Arc;
220
221 let repo_row = sqlx::query!(
222 r#"
223 SELECT r.repo_root_cid
224 FROM repos r
225 JOIN users u ON u.id = r.user_id
226 WHERE u.did = $1
227 "#,
228 query.did
229 )
230 .fetch_optional(&state.db)
231 .await
232 .unwrap_or(None);
233 let commit_cid_str = match repo_row {
234 Some(r) => r.repo_root_cid,
235 None => {
236 return (
237 StatusCode::NOT_FOUND,
238 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
239 )
240 .into_response();
241 }
242 };
243 let commit_cid = match Cid::from_str(&commit_cid_str) {
244 Ok(c) => c,
245 Err(_) => {
246 return (
247 StatusCode::INTERNAL_SERVER_ERROR,
248 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})),
249 )
250 .into_response();
251 }
252 };
253 let commit_bytes = match state.block_store.get(&commit_cid).await {
254 Ok(Some(b)) => b,
255 _ => {
256 return (
257 StatusCode::INTERNAL_SERVER_ERROR,
258 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
259 )
260 .into_response();
261 }
262 };
263 let commit = match Commit::from_cbor(&commit_bytes) {
264 Ok(c) => c,
265 Err(_) => {
266 return (
267 StatusCode::INTERNAL_SERVER_ERROR,
268 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
269 )
270 .into_response();
271 }
272 };
273 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
274 let key = format!("{}/{}", query.collection, query.rkey);
275 let record_cid = match mst.get(&key).await {
276 Ok(Some(cid)) => cid,
277 Ok(None) => {
278 return (
279 StatusCode::NOT_FOUND,
280 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
281 )
282 .into_response();
283 }
284 Err(_) => {
285 return (
286 StatusCode::INTERNAL_SERVER_ERROR,
287 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})),
288 )
289 .into_response();
290 }
291 };
292 let record_block = match state.block_store.get(&record_cid).await {
293 Ok(Some(b)) => b,
294 _ => {
295 return (
296 StatusCode::NOT_FOUND,
297 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
298 )
299 .into_response();
300 }
301 };
302 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
303 if let Err(_) = mst.blocks_for_path(&key, &mut proof_blocks).await {
304 return (
305 StatusCode::INTERNAL_SERVER_ERROR,
306 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})),
307 )
308 .into_response();
309 }
310 let header = match encode_car_header(&commit_cid) {
311 Ok(h) => h,
312 Err(e) => {
313 error!("Failed to encode CAR header: {}", e);
314 return (
315 StatusCode::INTERNAL_SERVER_ERROR,
316 Json(json!({"error": "InternalError"})),
317 )
318 .into_response();
319 }
320 };
321 let mut car_bytes = header;
322 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
323 let cid_bytes = cid.to_bytes();
324 let total_len = cid_bytes.len() + data.len();
325 let mut writer = Vec::new();
326 crate::sync::car::write_varint(&mut writer, total_len as u64)
327 .expect("Writing to Vec<u8> should never fail");
328 writer.write_all(&cid_bytes)
329 .expect("Writing to Vec<u8> should never fail");
330 writer.write_all(data)
331 .expect("Writing to Vec<u8> should never fail");
332 car.extend_from_slice(&writer);
333 };
334 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
335 for (cid, data) in &proof_blocks {
336 write_block(&mut car_bytes, cid, data);
337 }
338 write_block(&mut car_bytes, &record_cid, &record_block);
339 (
340 StatusCode::OK,
341 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
342 car_bytes,
343 )
344 .into_response()
345}