this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8};
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::Deserialize;
13use serde_json::json;
14use std::io::Write;
15use std::str::FromStr;
16use tracing::error;
17
18#[derive(Deserialize)]
19pub struct GetBlocksQuery {
20 pub did: String,
21 pub cids: String,
22}
23
24pub async fn get_blocks(
25 State(state): State<AppState>,
26 Query(query): Query<GetBlocksQuery>,
27) -> Response {
28 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
29 .fetch_optional(&state.db)
30 .await
31 .unwrap_or(None);
32
33 if user_exists.is_none() {
34 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
35 }
36
37 let cids_str: Vec<&str> = query.cids.split(',').collect();
38 let mut cids = Vec::new();
39 for s in cids_str {
40 match Cid::from_str(s) {
41 Ok(cid) => cids.push(cid),
42 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
43 }
44 }
45
46 let blocks_res = state.block_store.get_many(&cids).await;
47 let blocks = match blocks_res {
48 Ok(blocks) => blocks,
49 Err(e) => {
50 error!("Failed to get blocks: {}", e);
51 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
52 }
53 };
54
55 let root_cid = cids.first().cloned().unwrap_or_default();
56
57 if cids.is_empty() {
58 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
59 }
60
61 let header = encode_car_header(&root_cid);
62
63 let mut car_bytes = header;
64
65 for (i, block_opt) in blocks.into_iter().enumerate() {
66 if let Some(block) = block_opt {
67 let cid = cids[i];
68 let cid_bytes = cid.to_bytes();
69 let total_len = cid_bytes.len() + block.len();
70
71 let mut writer = Vec::new();
72 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
73 writer.write_all(&cid_bytes).unwrap();
74 writer.write_all(&block).unwrap();
75
76 car_bytes.extend_from_slice(&writer);
77 }
78 }
79
80 (
81 StatusCode::OK,
82 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
83 car_bytes,
84 )
85 .into_response()
86}
87
88#[derive(Deserialize)]
89pub struct GetRepoQuery {
90 pub did: String,
91 pub since: Option<String>,
92}
93
94pub async fn get_repo(
95 State(state): State<AppState>,
96 Query(query): Query<GetRepoQuery>,
97) -> Response {
98 let repo_row = sqlx::query!(
99 r#"
100 SELECT r.repo_root_cid
101 FROM repos r
102 JOIN users u ON u.id = r.user_id
103 WHERE u.did = $1
104 "#,
105 query.did
106 )
107 .fetch_optional(&state.db)
108 .await
109 .unwrap_or(None);
110
111 let head_str = match repo_row {
112 Some(r) => r.repo_root_cid,
113 None => {
114 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
115 .fetch_optional(&state.db)
116 .await
117 .unwrap_or(None);
118
119 if user_exists.is_none() {
120 return (
121 StatusCode::NOT_FOUND,
122 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
123 )
124 .into_response();
125 } else {
126 return (
127 StatusCode::NOT_FOUND,
128 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
129 )
130 .into_response();
131 }
132 }
133 };
134
135 let head_cid = match Cid::from_str(&head_str) {
136 Ok(c) => c,
137 Err(_) => {
138 return (
139 StatusCode::INTERNAL_SERVER_ERROR,
140 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
141 )
142 .into_response();
143 }
144 };
145
146 let mut car_bytes = encode_car_header(&head_cid);
147
148 let mut stack = vec![head_cid];
149 let mut visited = std::collections::HashSet::new();
150 let mut limit = 20000;
151
152 while let Some(cid) = stack.pop() {
153 if visited.contains(&cid) {
154 continue;
155 }
156 visited.insert(cid);
157 if limit == 0 { break; }
158 limit -= 1;
159
160 if let Ok(Some(block)) = state.block_store.get(&cid).await {
161 let cid_bytes = cid.to_bytes();
162 let total_len = cid_bytes.len() + block.len();
163 let mut writer = Vec::new();
164 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
165 writer.write_all(&cid_bytes).unwrap();
166 writer.write_all(&block).unwrap();
167 car_bytes.extend_from_slice(&writer);
168
169 if let Ok(value) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
170 extract_links_ipld(&value, &mut stack);
171 }
172 }
173 }
174
175 (
176 StatusCode::OK,
177 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
178 car_bytes,
179 )
180 .into_response()
181}
182
183fn extract_links_ipld(value: &Ipld, stack: &mut Vec<Cid>) {
184 match value {
185 Ipld::Link(cid) => {
186 stack.push(*cid);
187 }
188 Ipld::Map(map) => {
189 for v in map.values() {
190 extract_links_ipld(v, stack);
191 }
192 }
193 Ipld::List(arr) => {
194 for v in arr {
195 extract_links_ipld(v, stack);
196 }
197 }
198 _ => {}
199 }
200}
201
202#[derive(Deserialize)]
203pub struct GetRecordQuery {
204 pub did: String,
205 pub collection: String,
206 pub rkey: String,
207}
208
209pub async fn get_record(
210 State(state): State<AppState>,
211 Query(query): Query<GetRecordQuery>,
212) -> Response {
213 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
214 .fetch_optional(&state.db)
215 .await
216 .unwrap_or(None);
217
218 let user_id = match user {
219 Some(u) => u.id,
220 None => {
221 return (
222 StatusCode::NOT_FOUND,
223 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
224 )
225 .into_response();
226 }
227 };
228
229 let record = sqlx::query!(
230 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
231 user_id,
232 query.collection,
233 query.rkey
234 )
235 .fetch_optional(&state.db)
236 .await
237 .unwrap_or(None);
238
239 let record_cid_str = match record {
240 Some(r) => r.record_cid,
241 None => {
242 return (
243 StatusCode::NOT_FOUND,
244 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
245 )
246 .into_response();
247 }
248 };
249
250 let cid = match Cid::from_str(&record_cid_str) {
251 Ok(c) => c,
252 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(),
253 };
254
255 let block_res = state.block_store.get(&cid).await;
256 let block = match block_res {
257 Ok(Some(b)) => b,
258 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(),
259 };
260
261 let header = encode_car_header(&cid);
262 let mut car_bytes = header;
263
264 let cid_bytes = cid.to_bytes();
265 let total_len = cid_bytes.len() + block.len();
266 let mut writer = Vec::new();
267 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
268 writer.write_all(&cid_bytes).unwrap();
269 writer.write_all(&block).unwrap();
270 car_bytes.extend_from_slice(&writer);
271
272 (
273 StatusCode::OK,
274 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
275 car_bytes,
276 )
277 .into_response()
278}