this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
29
30 let user_id_opt = if input.repo.starts_with("did:") {
31 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
32 .fetch_optional(&state.db)
33 .await
34 .map(|opt| opt.map(|r| r.id))
35 } else {
36 let suffix = format!(".{}", hostname);
37 let short_handle = if input.repo.ends_with(&suffix) {
38 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
39 } else {
40 &input.repo
41 };
42 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
43 .fetch_optional(&state.db)
44 .await
45 .map(|opt| opt.map(|r| r.id))
46 };
47
48 let user_id: uuid::Uuid = match user_id_opt {
49 Ok(Some(id)) => id,
50 _ => {
51 return (
52 StatusCode::NOT_FOUND,
53 Json(json!({"error": "NotFound", "message": "Repo not found"})),
54 )
55 .into_response();
56 }
57 };
58
59 let record_row = sqlx::query!(
60 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
61 user_id,
62 input.collection,
63 input.rkey
64 )
65 .fetch_optional(&state.db)
66 .await;
67
68 let record_cid_str: String = match record_row {
69 Ok(Some(row)) => row.record_cid,
70 _ => {
71 return (
72 StatusCode::NOT_FOUND,
73 Json(json!({"error": "NotFound", "message": "Record not found"})),
74 )
75 .into_response();
76 }
77 };
78
79 if let Some(expected_cid) = &input.cid {
80 if &record_cid_str != expected_cid {
81 return (
82 StatusCode::NOT_FOUND,
83 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
84 )
85 .into_response();
86 }
87 }
88
89 let cid = match Cid::from_str(&record_cid_str) {
90 Ok(c) => c,
91 Err(_) => {
92 return (
93 StatusCode::INTERNAL_SERVER_ERROR,
94 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
95 )
96 .into_response();
97 }
98 };
99
100 let block = match state.block_store.get(&cid).await {
101 Ok(Some(b)) => b,
102 _ => {
103 return (
104 StatusCode::INTERNAL_SERVER_ERROR,
105 Json(json!({"error": "InternalError", "message": "Record block not found"})),
106 )
107 .into_response();
108 }
109 };
110
111 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
112 Ok(v) => v,
113 Err(e) => {
114 error!("Failed to deserialize record: {:?}", e);
115 return (
116 StatusCode::INTERNAL_SERVER_ERROR,
117 Json(json!({"error": "InternalError"})),
118 )
119 .into_response();
120 }
121 };
122
123 Json(json!({
124 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
125 "cid": record_cid_str,
126 "value": value
127 }))
128 .into_response()
129}
130
131#[derive(Deserialize)]
132pub struct ListRecordsInput {
133 pub repo: String,
134 pub collection: String,
135 pub limit: Option<i32>,
136 pub cursor: Option<String>,
137 #[serde(rename = "rkeyStart")]
138 pub rkey_start: Option<String>,
139 #[serde(rename = "rkeyEnd")]
140 pub rkey_end: Option<String>,
141 pub reverse: Option<bool>,
142}
143
144#[derive(Serialize)]
145pub struct ListRecordsOutput {
146 pub cursor: Option<String>,
147 pub records: Vec<serde_json::Value>,
148}
149
150pub async fn list_records(
151 State(state): State<AppState>,
152 Query(input): Query<ListRecordsInput>,
153) -> Response {
154 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
155
156 let user_id_opt = if input.repo.starts_with("did:") {
157 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
158 .fetch_optional(&state.db)
159 .await
160 .map(|opt| opt.map(|r| r.id))
161 } else {
162 let suffix = format!(".{}", hostname);
163 let short_handle = if input.repo.ends_with(&suffix) {
164 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
165 } else {
166 &input.repo
167 };
168 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
169 .fetch_optional(&state.db)
170 .await
171 .map(|opt| opt.map(|r| r.id))
172 };
173
174 let user_id: uuid::Uuid = match user_id_opt {
175 Ok(Some(id)) => id,
176 _ => {
177 return (
178 StatusCode::NOT_FOUND,
179 Json(json!({"error": "NotFound", "message": "Repo not found"})),
180 )
181 .into_response();
182 }
183 };
184
185 let limit = input.limit.unwrap_or(50).clamp(1, 100);
186 let reverse = input.reverse.unwrap_or(false);
187 let limit_i64 = limit as i64;
188 let order = if reverse { "ASC" } else { "DESC" };
189
190 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
191 let comparator = if reverse { ">" } else { "<" };
192 let query = format!(
193 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
194 comparator, order
195 );
196 sqlx::query_as(&query)
197 .bind(user_id)
198 .bind(&input.collection)
199 .bind(cursor)
200 .bind(limit_i64)
201 .fetch_all(&state.db)
202 .await
203 } else {
204 let mut conditions = vec!["repo_id = $1", "collection = $2"];
205 let mut param_idx = 3;
206
207 if input.rkey_start.is_some() {
208 conditions.push("rkey > $3");
209 param_idx += 1;
210 }
211
212 if input.rkey_end.is_some() {
213 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" });
214 param_idx += 1;
215 }
216
217 let limit_idx = param_idx;
218
219 let query = format!(
220 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
221 conditions.join(" AND "),
222 order,
223 limit_idx
224 );
225
226 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
227 .bind(user_id)
228 .bind(&input.collection);
229
230 if let Some(start) = &input.rkey_start {
231 query_builder = query_builder.bind(start);
232 }
233 if let Some(end) = &input.rkey_end {
234 query_builder = query_builder.bind(end);
235 }
236
237 query_builder.bind(limit_i64).fetch_all(&state.db).await
238 };
239
240 let rows = match rows_res {
241 Ok(r) => r,
242 Err(e) => {
243 error!("Error listing records: {:?}", e);
244 return (
245 StatusCode::INTERNAL_SERVER_ERROR,
246 Json(json!({"error": "InternalError"})),
247 )
248 .into_response();
249 }
250 };
251
252 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
253
254 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
255 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
256
257 for (rkey, cid_str) in &rows {
258 if let Ok(cid) = Cid::from_str(cid_str) {
259 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
260 cids.push(cid);
261 }
262 }
263
264 let blocks = match state.block_store.get_many(&cids).await {
265 Ok(b) => b,
266 Err(e) => {
267 error!("Error fetching blocks: {:?}", e);
268 return (
269 StatusCode::INTERNAL_SERVER_ERROR,
270 Json(json!({"error": "InternalError"})),
271 )
272 .into_response();
273 }
274 };
275
276 let mut records = Vec::new();
277 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
278 if let Some(block) = block_opt {
279 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) {
280 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
281 records.push(json!({
282 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
283 "cid": cid_str,
284 "value": value
285 }));
286 }
287 }
288 }
289 }
290
291 Json(ListRecordsOutput {
292 cursor: last_rkey,
293 records,
294 })
295 .into_response()
296}