this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let user_id_opt = if input.repo.starts_with("did:") {
29 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
30 .fetch_optional(&state.db)
31 .await
32 .map(|opt| opt.map(|r| r.id))
33 } else {
34 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo)
35 .fetch_optional(&state.db)
36 .await
37 .map(|opt| opt.map(|r| r.id))
38 };
39
40 let user_id: uuid::Uuid = match user_id_opt {
41 Ok(Some(id)) => id,
42 _ => {
43 return (
44 StatusCode::NOT_FOUND,
45 Json(json!({"error": "NotFound", "message": "Repo not found"})),
46 )
47 .into_response();
48 }
49 };
50
51 let record_row = sqlx::query!(
52 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
53 user_id,
54 input.collection,
55 input.rkey
56 )
57 .fetch_optional(&state.db)
58 .await;
59
60 let record_cid_str: String = match record_row {
61 Ok(Some(row)) => row.record_cid,
62 _ => {
63 return (
64 StatusCode::NOT_FOUND,
65 Json(json!({"error": "NotFound", "message": "Record not found"})),
66 )
67 .into_response();
68 }
69 };
70
71 if let Some(expected_cid) = &input.cid {
72 if &record_cid_str != expected_cid {
73 return (
74 StatusCode::NOT_FOUND,
75 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
76 )
77 .into_response();
78 }
79 }
80
81 let cid = match Cid::from_str(&record_cid_str) {
82 Ok(c) => c,
83 Err(_) => {
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
87 )
88 .into_response();
89 }
90 };
91
92 let block = match state.block_store.get(&cid).await {
93 Ok(Some(b)) => b,
94 _ => {
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError", "message": "Record block not found"})),
98 )
99 .into_response();
100 }
101 };
102
103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
104 Ok(v) => v,
105 Err(e) => {
106 error!("Failed to deserialize record: {:?}", e);
107 return (
108 StatusCode::INTERNAL_SERVER_ERROR,
109 Json(json!({"error": "InternalError"})),
110 )
111 .into_response();
112 }
113 };
114
115 Json(json!({
116 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
117 "cid": record_cid_str,
118 "value": value
119 }))
120 .into_response()
121}
122
123#[derive(Deserialize)]
124pub struct ListRecordsInput {
125 pub repo: String,
126 pub collection: String,
127 pub limit: Option<i32>,
128 pub cursor: Option<String>,
129 #[serde(rename = "rkeyStart")]
130 pub rkey_start: Option<String>,
131 #[serde(rename = "rkeyEnd")]
132 pub rkey_end: Option<String>,
133 pub reverse: Option<bool>,
134}
135
136#[derive(Serialize)]
137pub struct ListRecordsOutput {
138 pub cursor: Option<String>,
139 pub records: Vec<serde_json::Value>,
140}
141
142pub async fn list_records(
143 State(state): State<AppState>,
144 Query(input): Query<ListRecordsInput>,
145) -> Response {
146 let user_id_opt = if input.repo.starts_with("did:") {
147 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
148 .fetch_optional(&state.db)
149 .await
150 .map(|opt| opt.map(|r| r.id))
151 } else {
152 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo)
153 .fetch_optional(&state.db)
154 .await
155 .map(|opt| opt.map(|r| r.id))
156 };
157
158 let user_id: uuid::Uuid = match user_id_opt {
159 Ok(Some(id)) => id,
160 _ => {
161 return (
162 StatusCode::NOT_FOUND,
163 Json(json!({"error": "NotFound", "message": "Repo not found"})),
164 )
165 .into_response();
166 }
167 };
168
169 let limit = input.limit.unwrap_or(50).clamp(1, 100);
170 let reverse = input.reverse.unwrap_or(false);
171 let limit_i64 = limit as i64;
172 let order = if reverse { "ASC" } else { "DESC" };
173
174 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
175 let comparator = if reverse { ">" } else { "<" };
176 let query = format!(
177 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
178 comparator, order
179 );
180 sqlx::query_as(&query)
181 .bind(user_id)
182 .bind(&input.collection)
183 .bind(cursor)
184 .bind(limit_i64)
185 .fetch_all(&state.db)
186 .await
187 } else {
188 let mut conditions = vec!["repo_id = $1", "collection = $2"];
189 let mut param_idx = 3;
190
191 if input.rkey_start.is_some() {
192 conditions.push("rkey > $3");
193 param_idx += 1;
194 }
195
196 if input.rkey_end.is_some() {
197 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" });
198 param_idx += 1;
199 }
200
201 let limit_idx = param_idx;
202
203 let query = format!(
204 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
205 conditions.join(" AND "),
206 order,
207 limit_idx
208 );
209
210 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
211 .bind(user_id)
212 .bind(&input.collection);
213
214 if let Some(start) = &input.rkey_start {
215 query_builder = query_builder.bind(start);
216 }
217 if let Some(end) = &input.rkey_end {
218 query_builder = query_builder.bind(end);
219 }
220
221 query_builder.bind(limit_i64).fetch_all(&state.db).await
222 };
223
224 let rows = match rows_res {
225 Ok(r) => r,
226 Err(e) => {
227 error!("Error listing records: {:?}", e);
228 return (
229 StatusCode::INTERNAL_SERVER_ERROR,
230 Json(json!({"error": "InternalError"})),
231 )
232 .into_response();
233 }
234 };
235
236 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
237
238 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
239 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
240
241 for (rkey, cid_str) in &rows {
242 if let Ok(cid) = Cid::from_str(cid_str) {
243 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
244 cids.push(cid);
245 }
246 }
247
248 let blocks = match state.block_store.get_many(&cids).await {
249 Ok(b) => b,
250 Err(e) => {
251 error!("Error fetching blocks: {:?}", e);
252 return (
253 StatusCode::INTERNAL_SERVER_ERROR,
254 Json(json!({"error": "InternalError"})),
255 )
256 .into_response();
257 }
258 };
259
260 let mut records = Vec::new();
261 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
262 if let Some(block) = block_opt {
263 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) {
264 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
265 records.push(json!({
266 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
267 "cid": cid_str,
268 "value": value
269 }));
270 }
271 }
272 }
273 }
274
275 Json(ListRecordsOutput {
276 cursor: last_rkey,
277 records,
278 })
279 .into_response()
280}