this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8};
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::Deserialize;
13use serde_json::json;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
18#[derive(Deserialize)]
19pub struct GetBlocksQuery {
20 pub did: String,
21 pub cids: String,
22}
23pub async fn get_blocks(
24 State(state): State<AppState>,
25 Query(query): Query<GetBlocksQuery>,
26) -> Response {
27 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
28 .fetch_optional(&state.db)
29 .await
30 .unwrap_or(None);
31 if user_exists.is_none() {
32 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
33 }
34 let cids_str: Vec<&str> = query.cids.split(',').collect();
35 let mut cids = Vec::new();
36 for s in cids_str {
37 match Cid::from_str(s) {
38 Ok(cid) => cids.push(cid),
39 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
40 }
41 }
42 let blocks_res = state.block_store.get_many(&cids).await;
43 let blocks = match blocks_res {
44 Ok(blocks) => blocks,
45 Err(e) => {
46 error!("Failed to get blocks: {}", e);
47 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
48 }
49 };
50 if cids.is_empty() {
51 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
52 }
53 let root_cid = cids[0];
54 let header = match encode_car_header(&root_cid) {
55 Ok(h) => h,
56 Err(e) => {
57 error!("Failed to encode CAR header: {}", e);
58 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response();
59 }
60 };
61 let mut car_bytes = header;
62 for (i, block_opt) in blocks.into_iter().enumerate() {
63 if let Some(block) = block_opt {
64 let cid = cids[i];
65 let cid_bytes = cid.to_bytes();
66 let total_len = cid_bytes.len() + block.len();
67 let mut writer = Vec::new();
68 crate::sync::car::write_varint(&mut writer, total_len as u64)
69 .expect("Writing to Vec<u8> should never fail");
70 writer.write_all(&cid_bytes)
71 .expect("Writing to Vec<u8> should never fail");
72 writer.write_all(&block)
73 .expect("Writing to Vec<u8> should never fail");
74 car_bytes.extend_from_slice(&writer);
75 }
76 }
77 (
78 StatusCode::OK,
79 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
80 car_bytes,
81 )
82 .into_response()
83}
84#[derive(Deserialize)]
85pub struct GetRepoQuery {
86 pub did: String,
87 pub since: Option<String>,
88}
89pub async fn get_repo(
90 State(state): State<AppState>,
91 Query(query): Query<GetRepoQuery>,
92) -> Response {
93 let repo_row = sqlx::query!(
94 r#"
95 SELECT r.repo_root_cid
96 FROM repos r
97 JOIN users u ON u.id = r.user_id
98 WHERE u.did = $1
99 "#,
100 query.did
101 )
102 .fetch_optional(&state.db)
103 .await
104 .unwrap_or(None);
105 let head_str = match repo_row {
106 Some(r) => r.repo_root_cid,
107 None => {
108 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
109 .fetch_optional(&state.db)
110 .await
111 .unwrap_or(None);
112 if user_exists.is_none() {
113 return (
114 StatusCode::NOT_FOUND,
115 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
116 )
117 .into_response();
118 } else {
119 return (
120 StatusCode::NOT_FOUND,
121 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
122 )
123 .into_response();
124 }
125 }
126 };
127 let head_cid = match Cid::from_str(&head_str) {
128 Ok(c) => c,
129 Err(_) => {
130 return (
131 StatusCode::INTERNAL_SERVER_ERROR,
132 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
133 )
134 .into_response();
135 }
136 };
137 let mut car_bytes = match encode_car_header(&head_cid) {
138 Ok(h) => h,
139 Err(e) => {
140 return (
141 StatusCode::INTERNAL_SERVER_ERROR,
142 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
143 )
144 .into_response();
145 }
146 };
147 let mut stack = vec![head_cid];
148 let mut visited = std::collections::HashSet::new();
149 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
150 while let Some(cid) = stack.pop() {
151 if visited.contains(&cid) {
152 continue;
153 }
154 visited.insert(cid);
155 if remaining == 0 { break; }
156 remaining -= 1;
157 if let Ok(Some(block)) = state.block_store.get(&cid).await {
158 let cid_bytes = cid.to_bytes();
159 let total_len = cid_bytes.len() + block.len();
160 let mut writer = Vec::new();
161 crate::sync::car::write_varint(&mut writer, total_len as u64)
162 .expect("Writing to Vec<u8> should never fail");
163 writer.write_all(&cid_bytes)
164 .expect("Writing to Vec<u8> should never fail");
165 writer.write_all(&block)
166 .expect("Writing to Vec<u8> should never fail");
167 car_bytes.extend_from_slice(&writer);
168 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
169 extract_links_ipld(&value, &mut stack);
170 }
171 }
172 }
173 (
174 StatusCode::OK,
175 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
176 car_bytes,
177 )
178 .into_response()
179}
180fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
181 match value {
182 Ipld::Link(cid) => {
183 stack.push(*cid);
184 }
185 Ipld::Map(map) => {
186 for v in map.values() {
187 extract_links_ipld(v, stack);
188 }
189 }
190 Ipld::List(arr) => {
191 for v in arr {
192 extract_links_ipld(v, stack);
193 }
194 }
195 _ => {}
196 }
197}
198#[derive(Deserialize)]
199pub struct GetRecordQuery {
200 pub did: String,
201 pub collection: String,
202 pub rkey: String,
203}
204pub async fn get_record(
205 State(state): State<AppState>,
206 Query(query): Query<GetRecordQuery>,
207) -> Response {
208 use jacquard_repo::commit::Commit;
209 use jacquard_repo::mst::Mst;
210 use std::collections::BTreeMap;
211 use std::sync::Arc;
212 let repo_row = sqlx::query!(
213 r#"
214 SELECT r.repo_root_cid
215 FROM repos r
216 JOIN users u ON u.id = r.user_id
217 WHERE u.did = $1
218 "#,
219 query.did
220 )
221 .fetch_optional(&state.db)
222 .await
223 .unwrap_or(None);
224 let commit_cid_str = match repo_row {
225 Some(r) => r.repo_root_cid,
226 None => {
227 return (
228 StatusCode::NOT_FOUND,
229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
230 )
231 .into_response();
232 }
233 };
234 let commit_cid = match Cid::from_str(&commit_cid_str) {
235 Ok(c) => c,
236 Err(_) => {
237 return (
238 StatusCode::INTERNAL_SERVER_ERROR,
239 Json(json!({"error": "InternalError", "message": "Invalid commit CID"})),
240 )
241 .into_response();
242 }
243 };
244 let commit_bytes = match state.block_store.get(&commit_cid).await {
245 Ok(Some(b)) => b,
246 _ => {
247 return (
248 StatusCode::INTERNAL_SERVER_ERROR,
249 Json(json!({"error": "InternalError", "message": "Commit block not found"})),
250 )
251 .into_response();
252 }
253 };
254 let commit = match Commit::from_cbor(&commit_bytes) {
255 Ok(c) => c,
256 Err(_) => {
257 return (
258 StatusCode::INTERNAL_SERVER_ERROR,
259 Json(json!({"error": "InternalError", "message": "Failed to parse commit"})),
260 )
261 .into_response();
262 }
263 };
264 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
265 let key = format!("{}/{}", query.collection, query.rkey);
266 let record_cid = match mst.get(&key).await {
267 Ok(Some(cid)) => cid,
268 Ok(None) => {
269 return (
270 StatusCode::NOT_FOUND,
271 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
272 )
273 .into_response();
274 }
275 Err(_) => {
276 return (
277 StatusCode::INTERNAL_SERVER_ERROR,
278 Json(json!({"error": "InternalError", "message": "Failed to lookup record"})),
279 )
280 .into_response();
281 }
282 };
283 let record_block = match state.block_store.get(&record_cid).await {
284 Ok(Some(b)) => b,
285 _ => {
286 return (
287 StatusCode::NOT_FOUND,
288 Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
289 )
290 .into_response();
291 }
292 };
293 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
294 if let Err(_) = mst.blocks_for_path(&key, &mut proof_blocks).await {
295 return (
296 StatusCode::INTERNAL_SERVER_ERROR,
297 Json(json!({"error": "InternalError", "message": "Failed to build proof path"})),
298 )
299 .into_response();
300 }
301 let header = match encode_car_header(&commit_cid) {
302 Ok(h) => h,
303 Err(e) => {
304 error!("Failed to encode CAR header: {}", e);
305 return (
306 StatusCode::INTERNAL_SERVER_ERROR,
307 Json(json!({"error": "InternalError"})),
308 )
309 .into_response();
310 }
311 };
312 let mut car_bytes = header;
313 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
314 let cid_bytes = cid.to_bytes();
315 let total_len = cid_bytes.len() + data.len();
316 let mut writer = Vec::new();
317 crate::sync::car::write_varint(&mut writer, total_len as u64)
318 .expect("Writing to Vec<u8> should never fail");
319 writer.write_all(&cid_bytes)
320 .expect("Writing to Vec<u8> should never fail");
321 writer.write_all(data)
322 .expect("Writing to Vec<u8> should never fail");
323 car.extend_from_slice(&writer);
324 };
325 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
326 for (cid, data) in &proof_blocks {
327 write_block(&mut car_bytes, cid, data);
328 }
329 write_block(&mut car_bytes, &record_cid, &record_block);
330 (
331 StatusCode::OK,
332 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
333 car_bytes,
334 )
335 .into_response()
336}