this repo has no description
1use crate::api::error::ApiError;
2use crate::scheduled::generate_repo_car_from_user_blocks;
3use crate::state::AppState;
4use crate::sync::car::encode_car_header;
5use crate::sync::util::assert_repo_availability;
6use axum::{
7 extract::{Query, RawQuery, State},
8 http::StatusCode,
9 response::{IntoResponse, Response},
10};
11use cid::Cid;
12use jacquard_repo::storage::BlockStore;
13use serde::Deserialize;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18fn parse_get_blocks_query(query_string: &str) -> Result<(String, Vec<String>), String> {
19 let did = crate::util::parse_repeated_query_param(Some(query_string), "did")
20 .into_iter()
21 .next()
22 .ok_or("Missing required parameter: did")?;
23 let cids = crate::util::parse_repeated_query_param(Some(query_string), "cids");
24 Ok((did, cids))
25}
26
27pub async fn get_blocks(State(state): State<AppState>, RawQuery(query): RawQuery) -> Response {
28 let Some(query_string) = query else {
29 return ApiError::InvalidRequest("Missing query parameters".into()).into_response();
30 };
31
32 let (did, cid_strings) = match parse_get_blocks_query(&query_string) {
33 Ok(parsed) => parsed,
34 Err(msg) => return ApiError::InvalidRequest(msg).into_response(),
35 };
36
37 let _account = match assert_repo_availability(&state.db, &did, false).await {
38 Ok(a) => a,
39 Err(e) => return e.into_response(),
40 };
41
42 let cids: Vec<Cid> = match cid_strings
43 .iter()
44 .map(|s| Cid::from_str(s).map_err(|_| s.clone()))
45 .collect::<Result<Vec<_>, _>>()
46 {
47 Ok(cids) => cids,
48 Err(invalid) => {
49 return ApiError::InvalidRequest(format!("Invalid CID: {}", invalid)).into_response();
50 }
51 };
52
53 if cids.is_empty() {
54 return ApiError::InvalidRequest("No CIDs provided".into()).into_response();
55 }
56
57 let blocks = match state.block_store.get_many(&cids).await {
58 Ok(blocks) => blocks,
59 Err(e) => {
60 error!("Failed to get blocks: {}", e);
61 return ApiError::InternalError(None).into_response();
62 }
63 };
64
65 let missing_cids: Vec<String> = blocks
66 .iter()
67 .zip(&cids)
68 .filter(|(block_opt, _)| block_opt.is_none())
69 .map(|(_, cid)| cid.to_string())
70 .collect();
71 if !missing_cids.is_empty() {
72 return ApiError::InvalidRequest(format!(
73 "Could not find blocks: {}",
74 missing_cids.join(", ")
75 ))
76 .into_response();
77 }
78
79 let header = match crate::sync::car::encode_car_header_null_root() {
80 Ok(h) => h,
81 Err(e) => {
82 error!("Failed to encode CAR header: {}", e);
83 return ApiError::InternalError(None).into_response();
84 }
85 };
86 let mut car_bytes = header;
87 for (i, block_opt) in blocks.into_iter().enumerate() {
88 if let Some(block) = block_opt {
89 let cid = cids[i];
90 let cid_bytes = cid.to_bytes();
91 let total_len = cid_bytes.len() + block.len();
92 let mut writer = Vec::new();
93 crate::sync::car::write_varint(&mut writer, total_len as u64)
94 .expect("Writing to Vec<u8> should never fail");
95 writer
96 .write_all(&cid_bytes)
97 .expect("Writing to Vec<u8> should never fail");
98 writer
99 .write_all(&block)
100 .expect("Writing to Vec<u8> should never fail");
101 car_bytes.extend_from_slice(&writer);
102 }
103 }
104 (
105 StatusCode::OK,
106 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
107 car_bytes,
108 )
109 .into_response()
110}
111
112#[derive(Deserialize)]
113pub struct GetRepoQuery {
114 pub did: String,
115 pub since: Option<String>,
116}
117
118pub async fn get_repo(
119 State(state): State<AppState>,
120 Query(query): Query<GetRepoQuery>,
121) -> Response {
122 let account = match assert_repo_availability(&state.db, &query.did, false).await {
123 Ok(a) => a,
124 Err(e) => return e.into_response(),
125 };
126
127 let Some(head_str) = account.repo_root_cid else {
128 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
129 };
130
131 let Ok(head_cid) = Cid::from_str(&head_str) else {
132 return ApiError::InternalError(None).into_response();
133 };
134
135 if let Some(since) = &query.since {
136 return get_repo_since(&state, &query.did, &head_cid, since).await;
137 }
138
139 let car_bytes = match generate_repo_car_from_user_blocks(
140 &state.db,
141 &state.block_store,
142 account.user_id,
143 &head_cid,
144 )
145 .await
146 {
147 Ok(bytes) => bytes,
148 Err(e) => {
149 error!("Failed to generate repo CAR: {}", e);
150 return ApiError::InternalError(None).into_response();
151 }
152 };
153
154 (
155 StatusCode::OK,
156 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
157 car_bytes,
158 )
159 .into_response()
160}
161
162async fn get_repo_since(state: &AppState, did: &str, head_cid: &Cid, since: &str) -> Response {
163 let events = sqlx::query!(
164 r#"
165 SELECT blocks_cids, commit_cid
166 FROM repo_seq
167 WHERE did = $1 AND rev > $2
168 ORDER BY seq DESC
169 "#,
170 did,
171 since
172 )
173 .fetch_all(&state.db)
174 .await;
175
176 let events = match events {
177 Ok(e) => e,
178 Err(e) => {
179 error!("DB error in get_repo_since: {:?}", e);
180 return ApiError::InternalError(Some("Database error".into())).into_response();
181 }
182 };
183
184 let block_cids: Vec<Cid> = events
185 .iter()
186 .flat_map(|event| {
187 let block_cids = event
188 .blocks_cids
189 .as_ref()
190 .map(|cids| cids.iter().filter_map(|s| Cid::from_str(s).ok()).collect())
191 .unwrap_or_else(Vec::new);
192 let commit_cid = event
193 .commit_cid
194 .as_ref()
195 .and_then(|s| Cid::from_str(s).ok());
196 block_cids.into_iter().chain(commit_cid)
197 })
198 .fold(Vec::new(), |mut acc, cid| {
199 if !acc.contains(&cid) {
200 acc.push(cid);
201 }
202 acc
203 });
204
205 let mut car_bytes = match encode_car_header(head_cid) {
206 Ok(h) => h,
207 Err(e) => {
208 return ApiError::InternalError(Some(format!("Failed to encode CAR header: {}", e)))
209 .into_response();
210 }
211 };
212
213 if block_cids.is_empty() {
214 return (
215 StatusCode::OK,
216 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
217 car_bytes,
218 )
219 .into_response();
220 }
221
222 let blocks = match state.block_store.get_many(&block_cids).await {
223 Ok(b) => b,
224 Err(e) => {
225 error!("Block store error in get_repo_since: {:?}", e);
226 return ApiError::InternalError(Some("Failed to get blocks".into())).into_response();
227 }
228 };
229
230 for (i, block_opt) in blocks.into_iter().enumerate() {
231 if let Some(block) = block_opt {
232 let cid = block_cids[i];
233 let cid_bytes = cid.to_bytes();
234 let total_len = cid_bytes.len() + block.len();
235 let mut writer = Vec::new();
236 crate::sync::car::write_varint(&mut writer, total_len as u64)
237 .expect("Writing to Vec<u8> should never fail");
238 writer
239 .write_all(&cid_bytes)
240 .expect("Writing to Vec<u8> should never fail");
241 writer
242 .write_all(&block)
243 .expect("Writing to Vec<u8> should never fail");
244 car_bytes.extend_from_slice(&writer);
245 }
246 }
247
248 (
249 StatusCode::OK,
250 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
251 car_bytes,
252 )
253 .into_response()
254}
255
256#[derive(Deserialize)]
257pub struct GetRecordQuery {
258 pub did: String,
259 pub collection: String,
260 pub rkey: String,
261}
262
263pub async fn get_record(
264 State(state): State<AppState>,
265 Query(query): Query<GetRecordQuery>,
266) -> Response {
267 use jacquard_repo::commit::Commit;
268 use jacquard_repo::mst::Mst;
269 use std::collections::BTreeMap;
270 use std::sync::Arc;
271
272 let account = match assert_repo_availability(&state.db, &query.did, false).await {
273 Ok(a) => a,
274 Err(e) => return e.into_response(),
275 };
276
277 let commit_cid_str = match account.repo_root_cid {
278 Some(cid) => cid,
279 None => {
280 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
281 }
282 };
283 let Ok(commit_cid) = Cid::from_str(&commit_cid_str) else {
284 return ApiError::InternalError(Some("Invalid commit CID".into())).into_response();
285 };
286 let commit_bytes = match state.block_store.get(&commit_cid).await {
287 Ok(Some(b)) => b,
288 _ => {
289 return ApiError::InternalError(Some("Commit block not found".into())).into_response();
290 }
291 };
292 let Ok(commit) = Commit::from_cbor(&commit_bytes) else {
293 return ApiError::InternalError(Some("Failed to parse commit".into())).into_response();
294 };
295 let mst = Mst::load(Arc::new(state.block_store.clone()), commit.data, None);
296 let key = format!("{}/{}", query.collection, query.rkey);
297 let record_cid = match mst.get(&key).await {
298 Ok(Some(cid)) => cid,
299 Ok(None) => {
300 return ApiError::RecordNotFound.into_response();
301 }
302 Err(_) => {
303 return ApiError::InternalError(Some("Failed to lookup record".into())).into_response();
304 }
305 };
306 let record_block = match state.block_store.get(&record_cid).await {
307 Ok(Some(b)) => b,
308 _ => {
309 return ApiError::RecordNotFound.into_response();
310 }
311 };
312 let mut proof_blocks: BTreeMap<Cid, bytes::Bytes> = BTreeMap::new();
313 if mst.blocks_for_path(&key, &mut proof_blocks).await.is_err() {
314 return ApiError::InternalError(Some("Failed to build proof path".into())).into_response();
315 }
316 let header = match encode_car_header(&commit_cid) {
317 Ok(h) => h,
318 Err(e) => {
319 error!("Failed to encode CAR header: {}", e);
320 return ApiError::InternalError(None).into_response();
321 }
322 };
323 let mut car_bytes = header;
324 let write_block = |car: &mut Vec<u8>, cid: &Cid, data: &[u8]| {
325 let cid_bytes = cid.to_bytes();
326 let total_len = cid_bytes.len() + data.len();
327 let mut writer = Vec::new();
328 crate::sync::car::write_varint(&mut writer, total_len as u64)
329 .expect("Writing to Vec<u8> should never fail");
330 writer
331 .write_all(&cid_bytes)
332 .expect("Writing to Vec<u8> should never fail");
333 writer
334 .write_all(data)
335 .expect("Writing to Vec<u8> should never fail");
336 car.extend_from_slice(&writer);
337 };
338 write_block(&mut car_bytes, &commit_cid, &commit_bytes);
339 proof_blocks
340 .iter()
341 .for_each(|(cid, data)| write_block(&mut car_bytes, cid, data));
342 write_block(&mut car_bytes, &record_cid, &record_block);
343 (
344 StatusCode::OK,
345 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
346 car_bytes,
347 )
348 .into_response()
349}