this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
29 let user_id_opt = if input.repo.starts_with("did:") {
30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
31 .fetch_optional(&state.db)
32 .await
33 .map(|opt| opt.map(|r| r.id))
34 } else {
35 let suffix = format!(".{}", hostname);
36 let short_handle = if input.repo.ends_with(&suffix) {
37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
38 } else {
39 &input.repo
40 };
41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
42 .fetch_optional(&state.db)
43 .await
44 .map(|opt| opt.map(|r| r.id))
45 };
46 let user_id: uuid::Uuid = match user_id_opt {
47 Ok(Some(id)) => id,
48 Ok(None) => {
49 return (
50 StatusCode::NOT_FOUND,
51 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
52 )
53 .into_response();
54 }
55 Err(_) => {
56 return (
57 StatusCode::INTERNAL_SERVER_ERROR,
58 Json(json!({"error": "InternalError"})),
59 )
60 .into_response();
61 }
62 };
63 let record_row = sqlx::query!(
64 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
65 user_id,
66 input.collection,
67 input.rkey
68 )
69 .fetch_optional(&state.db)
70 .await;
71 let record_cid_str: String = match record_row {
72 Ok(Some(row)) => row.record_cid,
73 _ => {
74 return (
75 StatusCode::NOT_FOUND,
76 Json(json!({"error": "NotFound", "message": "Record not found"})),
77 )
78 .into_response();
79 }
80 };
81 if let Some(expected_cid) = &input.cid
82 && &record_cid_str != expected_cid {
83 return (
84 StatusCode::NOT_FOUND,
85 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
86 )
87 .into_response();
88 }
89 let cid = match Cid::from_str(&record_cid_str) {
90 Ok(c) => c,
91 Err(_) => {
92 return (
93 StatusCode::INTERNAL_SERVER_ERROR,
94 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
95 )
96 .into_response();
97 }
98 };
99 let block = match state.block_store.get(&cid).await {
100 Ok(Some(b)) => b,
101 _ => {
102 return (
103 StatusCode::INTERNAL_SERVER_ERROR,
104 Json(json!({"error": "InternalError", "message": "Record block not found"})),
105 )
106 .into_response();
107 }
108 };
109 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
110 Ok(v) => v,
111 Err(e) => {
112 error!("Failed to deserialize record: {:?}", e);
113 return (
114 StatusCode::INTERNAL_SERVER_ERROR,
115 Json(json!({"error": "InternalError"})),
116 )
117 .into_response();
118 }
119 };
120 Json(json!({
121 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
122 "cid": record_cid_str,
123 "value": value
124 }))
125 .into_response()
126}
127#[derive(Deserialize)]
128pub struct ListRecordsInput {
129 pub repo: String,
130 pub collection: String,
131 pub limit: Option<i32>,
132 pub cursor: Option<String>,
133 #[serde(rename = "rkeyStart")]
134 pub rkey_start: Option<String>,
135 #[serde(rename = "rkeyEnd")]
136 pub rkey_end: Option<String>,
137 pub reverse: Option<bool>,
138}
139#[derive(Serialize)]
140pub struct ListRecordsOutput {
141 pub cursor: Option<String>,
142 pub records: Vec<serde_json::Value>,
143}
144
145pub async fn list_records(
146 State(state): State<AppState>,
147 Query(input): Query<ListRecordsInput>,
148) -> Response {
149 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
150 let user_id_opt = if input.repo.starts_with("did:") {
151 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
152 .fetch_optional(&state.db)
153 .await
154 .map(|opt| opt.map(|r| r.id))
155 } else {
156 let suffix = format!(".{}", hostname);
157 let short_handle = if input.repo.ends_with(&suffix) {
158 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
159 } else {
160 &input.repo
161 };
162 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
163 .fetch_optional(&state.db)
164 .await
165 .map(|opt| opt.map(|r| r.id))
166 };
167 let user_id: uuid::Uuid = match user_id_opt {
168 Ok(Some(id)) => id,
169 Ok(None) => {
170 return (
171 StatusCode::NOT_FOUND,
172 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
173 )
174 .into_response();
175 }
176 Err(_) => {
177 return (
178 StatusCode::INTERNAL_SERVER_ERROR,
179 Json(json!({"error": "InternalError"})),
180 )
181 .into_response();
182 }
183 };
184 let limit = input.limit.unwrap_or(50).clamp(1, 100);
185 let reverse = input.reverse.unwrap_or(false);
186 let limit_i64 = limit as i64;
187 let order = if reverse { "ASC" } else { "DESC" };
188 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
189 let comparator = if reverse { ">" } else { "<" };
190 let query = format!(
191 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
192 comparator, order
193 );
194 sqlx::query_as(&query)
195 .bind(user_id)
196 .bind(&input.collection)
197 .bind(cursor)
198 .bind(limit_i64)
199 .fetch_all(&state.db)
200 .await
201 } else {
202 let mut conditions = vec!["repo_id = $1", "collection = $2"];
203 let mut param_idx = 3;
204 if input.rkey_start.is_some() {
205 conditions.push("rkey > $3");
206 param_idx += 1;
207 }
208 if input.rkey_end.is_some() {
209 conditions.push(if param_idx == 3 {
210 "rkey < $3"
211 } else {
212 "rkey < $4"
213 });
214 param_idx += 1;
215 }
216 let limit_idx = param_idx;
217 let query = format!(
218 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
219 conditions.join(" AND "),
220 order,
221 limit_idx
222 );
223 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
224 .bind(user_id)
225 .bind(&input.collection);
226 if let Some(start) = &input.rkey_start {
227 query_builder = query_builder.bind(start);
228 }
229 if let Some(end) = &input.rkey_end {
230 query_builder = query_builder.bind(end);
231 }
232 query_builder.bind(limit_i64).fetch_all(&state.db).await
233 };
234 let rows = match rows_res {
235 Ok(r) => r,
236 Err(e) => {
237 error!("Error listing records: {:?}", e);
238 return (
239 StatusCode::INTERNAL_SERVER_ERROR,
240 Json(json!({"error": "InternalError"})),
241 )
242 .into_response();
243 }
244 };
245 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
246 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
247 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
248 for (rkey, cid_str) in &rows {
249 if let Ok(cid) = Cid::from_str(cid_str) {
250 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
251 cids.push(cid);
252 }
253 }
254 let blocks = match state.block_store.get_many(&cids).await {
255 Ok(b) => b,
256 Err(e) => {
257 error!("Error fetching blocks: {:?}", e);
258 return (
259 StatusCode::INTERNAL_SERVER_ERROR,
260 Json(json!({"error": "InternalError"})),
261 )
262 .into_response();
263 }
264 };
265 let mut records = Vec::new();
266 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
267 if let Some(block) = block_opt
268 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
269 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
270 records.push(json!({
271 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
272 "cid": cid_str,
273 "value": value
274 }));
275 }
276 }
277 Json(ListRecordsOutput {
278 cursor: last_rkey,
279 records,
280 })
281 .into_response()
282}