this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8};
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::Deserialize;
13use serde_json::json;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18const MAX_REPO_BLOCKS_TRAVERSAL: usize = 20_000;
19
20#[derive(Deserialize)]
21pub struct GetBlocksQuery {
22 pub did: String,
23 pub cids: String,
24}
25
26pub async fn get_blocks(
27 State(state): State<AppState>,
28 Query(query): Query<GetBlocksQuery>,
29) -> Response {
30 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
31 .fetch_optional(&state.db)
32 .await
33 .unwrap_or(None);
34
35 if user_exists.is_none() {
36 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
37 }
38
39 let cids_str: Vec<&str> = query.cids.split(',').collect();
40 let mut cids = Vec::new();
41 for s in cids_str {
42 match Cid::from_str(s) {
43 Ok(cid) => cids.push(cid),
44 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
45 }
46 }
47
48 let blocks_res = state.block_store.get_many(&cids).await;
49 let blocks = match blocks_res {
50 Ok(blocks) => blocks,
51 Err(e) => {
52 error!("Failed to get blocks: {}", e);
53 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
54 }
55 };
56
57 if cids.is_empty() {
58 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
59 }
60
61 let root_cid = cids[0];
62
63 let header = match encode_car_header(&root_cid) {
64 Ok(h) => h,
65 Err(e) => {
66 error!("Failed to encode CAR header: {}", e);
67 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to encode CAR").into_response();
68 }
69 };
70
71 let mut car_bytes = header;
72
73 for (i, block_opt) in blocks.into_iter().enumerate() {
74 if let Some(block) = block_opt {
75 let cid = cids[i];
76 let cid_bytes = cid.to_bytes();
77 let total_len = cid_bytes.len() + block.len();
78
79 let mut writer = Vec::new();
80 crate::sync::car::write_varint(&mut writer, total_len as u64)
81 .expect("Writing to Vec<u8> should never fail");
82 writer.write_all(&cid_bytes)
83 .expect("Writing to Vec<u8> should never fail");
84 writer.write_all(&block)
85 .expect("Writing to Vec<u8> should never fail");
86
87 car_bytes.extend_from_slice(&writer);
88 }
89 }
90
91 (
92 StatusCode::OK,
93 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
94 car_bytes,
95 )
96 .into_response()
97}
98
99#[derive(Deserialize)]
100pub struct GetRepoQuery {
101 pub did: String,
102 pub since: Option<String>,
103}
104
105pub async fn get_repo(
106 State(state): State<AppState>,
107 Query(query): Query<GetRepoQuery>,
108) -> Response {
109 let repo_row = sqlx::query!(
110 r#"
111 SELECT r.repo_root_cid
112 FROM repos r
113 JOIN users u ON u.id = r.user_id
114 WHERE u.did = $1
115 "#,
116 query.did
117 )
118 .fetch_optional(&state.db)
119 .await
120 .unwrap_or(None);
121
122 let head_str = match repo_row {
123 Some(r) => r.repo_root_cid,
124 None => {
125 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
126 .fetch_optional(&state.db)
127 .await
128 .unwrap_or(None);
129
130 if user_exists.is_none() {
131 return (
132 StatusCode::NOT_FOUND,
133 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
134 )
135 .into_response();
136 } else {
137 return (
138 StatusCode::NOT_FOUND,
139 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
140 )
141 .into_response();
142 }
143 }
144 };
145
146 let head_cid = match Cid::from_str(&head_str) {
147 Ok(c) => c,
148 Err(_) => {
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
152 )
153 .into_response();
154 }
155 };
156
157 let mut car_bytes = match encode_car_header(&head_cid) {
158 Ok(h) => h,
159 Err(e) => {
160 return (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": format!("Failed to encode CAR header: {}", e)})),
163 )
164 .into_response();
165 }
166 };
167
168 let mut stack = vec![head_cid];
169 let mut visited = std::collections::HashSet::new();
170 let mut remaining = MAX_REPO_BLOCKS_TRAVERSAL;
171
172 while let Some(cid) = stack.pop() {
173 if visited.contains(&cid) {
174 continue;
175 }
176 visited.insert(cid);
177 if remaining == 0 { break; }
178 remaining -= 1;
179
180 if let Ok(Some(block)) = state.block_store.get(&cid).await {
181 let cid_bytes = cid.to_bytes();
182 let total_len = cid_bytes.len() + block.len();
183 let mut writer = Vec::new();
184 crate::sync::car::write_varint(&mut writer, total_len as u64)
185 .expect("Writing to Vec<u8> should never fail");
186 writer.write_all(&cid_bytes)
187 .expect("Writing to Vec<u8> should never fail");
188 writer.write_all(&block)
189 .expect("Writing to Vec<u8> should never fail");
190 car_bytes.extend_from_slice(&writer);
191
192 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
193 extract_links_ipld(&value, &mut stack);
194 }
195 }
196 }
197
198 (
199 StatusCode::OK,
200 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
201 car_bytes,
202 )
203 .into_response()
204}
205
206fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
207 match value {
208 Ipld::Link(cid) => {
209 stack.push(*cid);
210 }
211 Ipld::Map(map) => {
212 for v in map.values() {
213 extract_links_ipld(v, stack);
214 }
215 }
216 Ipld::List(arr) => {
217 for v in arr {
218 extract_links_ipld(v, stack);
219 }
220 }
221 _ => {}
222 }
223}
224
225#[derive(Deserialize)]
226pub struct GetRecordQuery {
227 pub did: String,
228 pub collection: String,
229 pub rkey: String,
230}
231
232pub async fn get_record(
233 State(state): State<AppState>,
234 Query(query): Query<GetRecordQuery>,
235) -> Response {
236 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
237 .fetch_optional(&state.db)
238 .await
239 .unwrap_or(None);
240
241 let user_id = match user {
242 Some(u) => u.id,
243 None => {
244 return (
245 StatusCode::NOT_FOUND,
246 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
247 )
248 .into_response();
249 }
250 };
251
252 let record = sqlx::query!(
253 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
254 user_id,
255 query.collection,
256 query.rkey
257 )
258 .fetch_optional(&state.db)
259 .await
260 .unwrap_or(None);
261
262 let record_cid_str = match record {
263 Some(r) => r.record_cid,
264 None => {
265 return (
266 StatusCode::NOT_FOUND,
267 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
268 )
269 .into_response();
270 }
271 };
272
273 let cid = match Cid::from_str(&record_cid_str) {
274 Ok(c) => c,
275 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(),
276 };
277
278 let block_res = state.block_store.get(&cid).await;
279 let block = match block_res {
280 Ok(Some(b)) => b,
281 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(),
282 };
283
284 let header = match encode_car_header(&cid) {
285 Ok(h) => h,
286 Err(e) => {
287 return (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to encode CAR header: {}", e)).into_response();
288 }
289 };
290 let mut car_bytes = header;
291
292 let cid_bytes = cid.to_bytes();
293 let total_len = cid_bytes.len() + block.len();
294 let mut writer = Vec::new();
295 crate::sync::car::write_varint(&mut writer, total_len as u64)
296 .expect("Writing to Vec<u8> should never fail");
297 writer.write_all(&cid_bytes)
298 .expect("Writing to Vec<u8> should never fail");
299 writer.write_all(&block)
300 .expect("Writing to Vec<u8> should never fail");
301 car_bytes.extend_from_slice(&writer);
302
303 (
304 StatusCode::OK,
305 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
306 car_bytes,
307 )
308 .into_response()
309}