this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::{HeaderMap, StatusCode},
6 response::{IntoResponse, Response},
7};
8use base64::Engine;
9use cid::Cid;
10use ipld_core::ipld::Ipld;
11use jacquard_repo::storage::BlockStore;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::error;
17
18fn ipld_to_json(ipld: Ipld) -> Value {
19 match ipld {
20 Ipld::Null => Value::Null,
21 Ipld::Bool(b) => Value::Bool(b),
22 Ipld::Integer(i) => {
23 if let Ok(n) = i64::try_from(i) {
24 Value::Number(n.into())
25 } else {
26 Value::String(i.to_string())
27 }
28 }
29 Ipld::Float(f) => serde_json::Number::from_f64(f)
30 .map(Value::Number)
31 .unwrap_or(Value::Null),
32 Ipld::String(s) => Value::String(s),
33 Ipld::Bytes(b) => {
34 let encoded = base64::engine::general_purpose::STANDARD.encode(&b);
35 json!({ "$bytes": encoded })
36 }
37 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()),
38 Ipld::Map(map) => {
39 let obj: Map<String, Value> =
40 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect();
41 Value::Object(obj)
42 }
43 Ipld::Link(cid) => json!({ "$link": cid.to_string() }),
44 }
45}
46
47#[derive(Deserialize)]
48pub struct GetRecordInput {
49 pub repo: String,
50 pub collection: String,
51 pub rkey: String,
52 pub cid: Option<String>,
53}
54
55pub async fn get_record(
56 State(state): State<AppState>,
57 headers: HeaderMap,
58 Query(input): Query<GetRecordInput>,
59) -> Response {
60 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
61 let user_id_opt = if input.repo.starts_with("did:") {
62 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
63 .fetch_optional(&state.db)
64 .await
65 .map(|opt| opt.map(|r| r.id))
66 } else {
67 let handle = if !input.repo.contains('.') {
68 format!("{}.{}", input.repo, hostname)
69 } else {
70 input.repo.clone()
71 };
72 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
73 .fetch_optional(&state.db)
74 .await
75 .map(|opt| opt.map(|r| r.id))
76 };
77 let user_id: uuid::Uuid = match user_id_opt {
78 Ok(Some(id)) => id,
79 Ok(None) => {
80 return (
81 StatusCode::NOT_FOUND,
82 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
83 )
84 .into_response();
85 }
86 Err(_) => {
87 return (
88 StatusCode::INTERNAL_SERVER_ERROR,
89 Json(json!({"error": "InternalError"})),
90 )
91 .into_response();
92 }
93 };
94 let record_row = sqlx::query!(
95 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
96 user_id,
97 input.collection,
98 input.rkey
99 )
100 .fetch_optional(&state.db)
101 .await;
102 let record_cid_str: String = match record_row {
103 Ok(Some(row)) => row.record_cid,
104 _ => {
105 return (
106 StatusCode::NOT_FOUND,
107 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
108 )
109 .into_response();
110 }
111 };
112 if let Some(expected_cid) = &input.cid
113 && &record_cid_str != expected_cid
114 {
115 return (
116 StatusCode::NOT_FOUND,
117 Json(json!({"error": "RecordNotFound", "message": "Record CID mismatch"})),
118 )
119 .into_response();
120 }
121 let cid = match Cid::from_str(&record_cid_str) {
122 Ok(c) => c,
123 Err(_) => {
124 return (
125 StatusCode::INTERNAL_SERVER_ERROR,
126 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
127 )
128 .into_response();
129 }
130 };
131 let block = match state.block_store.get(&cid).await {
132 Ok(Some(b)) => b,
133 _ => {
134 return (
135 StatusCode::INTERNAL_SERVER_ERROR,
136 Json(json!({"error": "InternalError", "message": "Record block not found"})),
137 )
138 .into_response();
139 }
140 };
141 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) {
142 Ok(v) => v,
143 Err(e) => {
144 error!("Failed to deserialize record: {:?}", e);
145 return (
146 StatusCode::INTERNAL_SERVER_ERROR,
147 Json(json!({"error": "InternalError"})),
148 )
149 .into_response();
150 }
151 };
152 let value = ipld_to_json(ipld);
153 Json(json!({
154 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
155 "cid": record_cid_str,
156 "value": value
157 }))
158 .into_response()
159}
160#[derive(Deserialize)]
161pub struct ListRecordsInput {
162 pub repo: String,
163 pub collection: String,
164 pub limit: Option<i32>,
165 pub cursor: Option<String>,
166 #[serde(rename = "rkeyStart")]
167 pub rkey_start: Option<String>,
168 #[serde(rename = "rkeyEnd")]
169 pub rkey_end: Option<String>,
170 pub reverse: Option<bool>,
171}
172#[derive(Serialize)]
173pub struct ListRecordsOutput {
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub cursor: Option<String>,
176 pub records: Vec<serde_json::Value>,
177}
178
179pub async fn list_records(
180 State(state): State<AppState>,
181 Query(input): Query<ListRecordsInput>,
182) -> Response {
183 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
184 let user_id_opt = if input.repo.starts_with("did:") {
185 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
186 .fetch_optional(&state.db)
187 .await
188 .map(|opt| opt.map(|r| r.id))
189 } else {
190 let handle = if !input.repo.contains('.') {
191 format!("{}.{}", input.repo, hostname)
192 } else {
193 input.repo.clone()
194 };
195 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
196 .fetch_optional(&state.db)
197 .await
198 .map(|opt| opt.map(|r| r.id))
199 };
200 let user_id: uuid::Uuid = match user_id_opt {
201 Ok(Some(id)) => id,
202 Ok(None) => {
203 return (
204 StatusCode::NOT_FOUND,
205 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
206 )
207 .into_response();
208 }
209 Err(_) => {
210 return (
211 StatusCode::INTERNAL_SERVER_ERROR,
212 Json(json!({"error": "InternalError"})),
213 )
214 .into_response();
215 }
216 };
217 let limit = input.limit.unwrap_or(50).clamp(1, 100);
218 let reverse = input.reverse.unwrap_or(false);
219 let limit_i64 = limit as i64;
220 let order = if reverse { "ASC" } else { "DESC" };
221 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
222 let comparator = if reverse { ">" } else { "<" };
223 let query = format!(
224 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
225 comparator, order
226 );
227 sqlx::query_as(&query)
228 .bind(user_id)
229 .bind(&input.collection)
230 .bind(cursor)
231 .bind(limit_i64)
232 .fetch_all(&state.db)
233 .await
234 } else {
235 let mut conditions = vec!["repo_id = $1", "collection = $2"];
236 let mut param_idx = 3;
237 if input.rkey_start.is_some() {
238 conditions.push("rkey > $3");
239 param_idx += 1;
240 }
241 if input.rkey_end.is_some() {
242 conditions.push(if param_idx == 3 {
243 "rkey < $3"
244 } else {
245 "rkey < $4"
246 });
247 param_idx += 1;
248 }
249 let limit_idx = param_idx;
250 let query = format!(
251 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
252 conditions.join(" AND "),
253 order,
254 limit_idx
255 );
256 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
257 .bind(user_id)
258 .bind(&input.collection);
259 if let Some(start) = &input.rkey_start {
260 query_builder = query_builder.bind(start);
261 }
262 if let Some(end) = &input.rkey_end {
263 query_builder = query_builder.bind(end);
264 }
265 query_builder.bind(limit_i64).fetch_all(&state.db).await
266 };
267 let rows = match rows_res {
268 Ok(r) => r,
269 Err(e) => {
270 error!("Error listing records: {:?}", e);
271 return (
272 StatusCode::INTERNAL_SERVER_ERROR,
273 Json(json!({"error": "InternalError"})),
274 )
275 .into_response();
276 }
277 };
278 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
279 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
280 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
281 for (rkey, cid_str) in &rows {
282 if let Ok(cid) = Cid::from_str(cid_str) {
283 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
284 cids.push(cid);
285 }
286 }
287 let blocks = match state.block_store.get_many(&cids).await {
288 Ok(b) => b,
289 Err(e) => {
290 error!("Error fetching blocks: {:?}", e);
291 return (
292 StatusCode::INTERNAL_SERVER_ERROR,
293 Json(json!({"error": "InternalError"})),
294 )
295 .into_response();
296 }
297 };
298 let mut records = Vec::new();
299 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
300 if let Some(block) = block_opt
301 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
302 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
303 {
304 let value = ipld_to_json(ipld);
305 records.push(json!({
306 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
307 "cid": cid_str,
308 "value": value
309 }));
310 }
311 }
312 Json(ListRecordsOutput {
313 cursor: last_rkey,
314 records,
315 })
316 .into_response()
317}