this repo has no description
1use crate::state::AppState;
2use crate::sync::car::encode_car_header;
3use axum::{
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 Json,
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use serde::Deserialize;
12use serde_json::json;
13use std::io::Write;
14use std::str::FromStr;
15use tracing::error;
16
17#[derive(Deserialize)]
18pub struct GetBlocksQuery {
19 pub did: String,
20 pub cids: String,
21}
22
23pub async fn get_blocks(
24 State(state): State<AppState>,
25 Query(query): Query<GetBlocksQuery>,
26) -> Response {
27 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
28 .fetch_optional(&state.db)
29 .await
30 .unwrap_or(None);
31
32 if user_exists.is_none() {
33 return (StatusCode::NOT_FOUND, "Repo not found").into_response();
34 }
35
36 let cids_str: Vec<&str> = query.cids.split(',').collect();
37 let mut cids = Vec::new();
38 for s in cids_str {
39 match Cid::from_str(s) {
40 Ok(cid) => cids.push(cid),
41 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid CID").into_response(),
42 }
43 }
44
45 let blocks_res = state.block_store.get_many(&cids).await;
46 let blocks = match blocks_res {
47 Ok(blocks) => blocks,
48 Err(e) => {
49 error!("Failed to get blocks: {}", e);
50 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get blocks").into_response();
51 }
52 };
53
54 let root_cid = cids.first().cloned().unwrap_or_default();
55
56 if cids.is_empty() {
57 return (StatusCode::BAD_REQUEST, "No CIDs provided").into_response();
58 }
59
60 let header = encode_car_header(&root_cid);
61
62 let mut car_bytes = header;
63
64 for (i, block_opt) in blocks.into_iter().enumerate() {
65 if let Some(block) = block_opt {
66 let cid = cids[i];
67 let cid_bytes = cid.to_bytes();
68 let total_len = cid_bytes.len() + block.len();
69
70 let mut writer = Vec::new();
71 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
72 writer.write_all(&cid_bytes).unwrap();
73 writer.write_all(&block).unwrap();
74
75 car_bytes.extend_from_slice(&writer);
76 }
77 }
78
79 (
80 StatusCode::OK,
81 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
82 car_bytes,
83 )
84 .into_response()
85}
86
87#[derive(Deserialize)]
88pub struct GetRepoQuery {
89 pub did: String,
90 pub since: Option<String>,
91}
92
93pub async fn get_repo(
94 State(state): State<AppState>,
95 Query(query): Query<GetRepoQuery>,
96) -> Response {
97 let repo_row = sqlx::query!(
98 r#"
99 SELECT r.repo_root_cid
100 FROM repos r
101 JOIN users u ON u.id = r.user_id
102 WHERE u.did = $1
103 "#,
104 query.did
105 )
106 .fetch_optional(&state.db)
107 .await
108 .unwrap_or(None);
109
110 let head_str = match repo_row {
111 Some(r) => r.repo_root_cid,
112 None => {
113 let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
114 .fetch_optional(&state.db)
115 .await
116 .unwrap_or(None);
117
118 if user_exists.is_none() {
119 return (
120 StatusCode::NOT_FOUND,
121 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
122 )
123 .into_response();
124 } else {
125 return (
126 StatusCode::NOT_FOUND,
127 Json(json!({"error": "RepoNotFound", "message": "Repo not initialized"})),
128 )
129 .into_response();
130 }
131 }
132 };
133
134 let head_cid = match Cid::from_str(&head_str) {
135 Ok(c) => c,
136 Err(_) => {
137 return (
138 StatusCode::INTERNAL_SERVER_ERROR,
139 Json(json!({"error": "InternalError", "message": "Invalid head CID"})),
140 )
141 .into_response();
142 }
143 };
144
145 let mut car_bytes = encode_car_header(&head_cid);
146
147 let mut stack = vec![head_cid];
148 let mut visited = std::collections::HashSet::new();
149 let mut limit = 20000;
150
151 while let Some(cid) = stack.pop() {
152 if visited.contains(&cid) {
153 continue;
154 }
155 visited.insert(cid);
156 if limit == 0 { break; }
157 limit -= 1;
158
159 if let Ok(Some(block)) = state.block_store.get(&cid).await {
160 let cid_bytes = cid.to_bytes();
161 let total_len = cid_bytes.len() + block.len();
162 let mut writer = Vec::new();
163 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
164 writer.write_all(&cid_bytes).unwrap();
165 writer.write_all(&block).unwrap();
166 car_bytes.extend_from_slice(&writer);
167
168 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
169 extract_links_json(&value, &mut stack);
170 }
171 }
172 }
173
174 (
175 StatusCode::OK,
176 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
177 car_bytes,
178 )
179 .into_response()
180}
181
182fn extract_links_json(value: &serde_json::Value, stack: &mut Vec<Cid>) {
183 match value {
184 serde_json::Value::Object(map) => {
185 if let Some(serde_json::Value::String(s)) = map.get("/") {
186 if let Ok(cid) = Cid::from_str(s) {
187 stack.push(cid);
188 }
189 } else if let Some(serde_json::Value::String(s)) = map.get("$link") {
190 if let Ok(cid) = Cid::from_str(s) {
191 stack.push(cid);
192 }
193 } else {
194 for v in map.values() {
195 extract_links_json(v, stack);
196 }
197 }
198 }
199 serde_json::Value::Array(arr) => {
200 for v in arr {
201 extract_links_json(v, stack);
202 }
203 }
204 _ => {}
205 }
206}
207
208#[derive(Deserialize)]
209pub struct GetRecordQuery {
210 pub did: String,
211 pub collection: String,
212 pub rkey: String,
213}
214
215pub async fn get_record(
216 State(state): State<AppState>,
217 Query(query): Query<GetRecordQuery>,
218) -> Response {
219 let user = sqlx::query!("SELECT id FROM users WHERE did = $1", query.did)
220 .fetch_optional(&state.db)
221 .await
222 .unwrap_or(None);
223
224 let user_id = match user {
225 Some(u) => u.id,
226 None => {
227 return (
228 StatusCode::NOT_FOUND,
229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
230 )
231 .into_response();
232 }
233 };
234
235 let record = sqlx::query!(
236 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
237 user_id,
238 query.collection,
239 query.rkey
240 )
241 .fetch_optional(&state.db)
242 .await
243 .unwrap_or(None);
244
245 let record_cid_str = match record {
246 Some(r) => r.record_cid,
247 None => {
248 return (
249 StatusCode::NOT_FOUND,
250 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
251 )
252 .into_response();
253 }
254 };
255
256 let cid = match Cid::from_str(&record_cid_str) {
257 Ok(c) => c,
258 Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "Invalid CID").into_response(),
259 };
260
261 let block_res = state.block_store.get(&cid).await;
262 let block = match block_res {
263 Ok(Some(b)) => b,
264 _ => return (StatusCode::NOT_FOUND, "Block not found").into_response(),
265 };
266
267 let header = encode_car_header(&cid);
268 let mut car_bytes = header;
269
270 let cid_bytes = cid.to_bytes();
271 let total_len = cid_bytes.len() + block.len();
272 let mut writer = Vec::new();
273 crate::sync::car::write_varint(&mut writer, total_len as u64).unwrap();
274 writer.write_all(&cid_bytes).unwrap();
275 writer.write_all(&block).unwrap();
276 car_bytes.extend_from_slice(&writer);
277
278 (
279 StatusCode::OK,
280 [(axum::http::header::CONTENT_TYPE, "application/vnd.ipld.car")],
281 car_bytes,
282 )
283 .into_response()
284}