this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
29 let user_id_opt = if input.repo.starts_with("did:") {
30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
31 .fetch_optional(&state.db)
32 .await
33 .map(|opt| opt.map(|r| r.id))
34 } else {
35 let suffix = format!(".{}", hostname);
36 let short_handle = if input.repo.ends_with(&suffix) {
37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
38 } else {
39 &input.repo
40 };
41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
42 .fetch_optional(&state.db)
43 .await
44 .map(|opt| opt.map(|r| r.id))
45 };
46 let user_id: uuid::Uuid = match user_id_opt {
47 Ok(Some(id)) => id,
48 _ => {
49 return (
50 StatusCode::NOT_FOUND,
51 Json(json!({"error": "NotFound", "message": "Repo not found"})),
52 )
53 .into_response();
54 }
55 };
56 let record_row = sqlx::query!(
57 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
58 user_id,
59 input.collection,
60 input.rkey
61 )
62 .fetch_optional(&state.db)
63 .await;
64 let record_cid_str: String = match record_row {
65 Ok(Some(row)) => row.record_cid,
66 _ => {
67 return (
68 StatusCode::NOT_FOUND,
69 Json(json!({"error": "NotFound", "message": "Record not found"})),
70 )
71 .into_response();
72 }
73 };
74 if let Some(expected_cid) = &input.cid
75 && &record_cid_str != expected_cid {
76 return (
77 StatusCode::NOT_FOUND,
78 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
79 )
80 .into_response();
81 }
82 let cid = match Cid::from_str(&record_cid_str) {
83 Ok(c) => c,
84 Err(_) => {
85 return (
86 StatusCode::INTERNAL_SERVER_ERROR,
87 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
88 )
89 .into_response();
90 }
91 };
92 let block = match state.block_store.get(&cid).await {
93 Ok(Some(b)) => b,
94 _ => {
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError", "message": "Record block not found"})),
98 )
99 .into_response();
100 }
101 };
102 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
103 Ok(v) => v,
104 Err(e) => {
105 error!("Failed to deserialize record: {:?}", e);
106 return (
107 StatusCode::INTERNAL_SERVER_ERROR,
108 Json(json!({"error": "InternalError"})),
109 )
110 .into_response();
111 }
112 };
113 Json(json!({
114 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
115 "cid": record_cid_str,
116 "value": value
117 }))
118 .into_response()
119}
120#[derive(Deserialize)]
121pub struct ListRecordsInput {
122 pub repo: String,
123 pub collection: String,
124 pub limit: Option<i32>,
125 pub cursor: Option<String>,
126 #[serde(rename = "rkeyStart")]
127 pub rkey_start: Option<String>,
128 #[serde(rename = "rkeyEnd")]
129 pub rkey_end: Option<String>,
130 pub reverse: Option<bool>,
131}
132#[derive(Serialize)]
133pub struct ListRecordsOutput {
134 pub cursor: Option<String>,
135 pub records: Vec<serde_json::Value>,
136}
137pub async fn list_records(
138 State(state): State<AppState>,
139 Query(input): Query<ListRecordsInput>,
140) -> Response {
141 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
142 let user_id_opt = if input.repo.starts_with("did:") {
143 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
144 .fetch_optional(&state.db)
145 .await
146 .map(|opt| opt.map(|r| r.id))
147 } else {
148 let suffix = format!(".{}", hostname);
149 let short_handle = if input.repo.ends_with(&suffix) {
150 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
151 } else {
152 &input.repo
153 };
154 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
155 .fetch_optional(&state.db)
156 .await
157 .map(|opt| opt.map(|r| r.id))
158 };
159 let user_id: uuid::Uuid = match user_id_opt {
160 Ok(Some(id)) => id,
161 _ => {
162 return (
163 StatusCode::NOT_FOUND,
164 Json(json!({"error": "NotFound", "message": "Repo not found"})),
165 )
166 .into_response();
167 }
168 };
169 let limit = input.limit.unwrap_or(50).clamp(1, 100);
170 let reverse = input.reverse.unwrap_or(false);
171 let limit_i64 = limit as i64;
172 let order = if reverse { "ASC" } else { "DESC" };
173 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
174 let comparator = if reverse { ">" } else { "<" };
175 let query = format!(
176 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
177 comparator, order
178 );
179 sqlx::query_as(&query)
180 .bind(user_id)
181 .bind(&input.collection)
182 .bind(cursor)
183 .bind(limit_i64)
184 .fetch_all(&state.db)
185 .await
186 } else {
187 let mut conditions = vec!["repo_id = $1", "collection = $2"];
188 let mut param_idx = 3;
189 if input.rkey_start.is_some() {
190 conditions.push("rkey > $3");
191 param_idx += 1;
192 }
193 if input.rkey_end.is_some() {
194 conditions.push(if param_idx == 3 {
195 "rkey < $3"
196 } else {
197 "rkey < $4"
198 });
199 param_idx += 1;
200 }
201 let limit_idx = param_idx;
202 let query = format!(
203 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
204 conditions.join(" AND "),
205 order,
206 limit_idx
207 );
208 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
209 .bind(user_id)
210 .bind(&input.collection);
211 if let Some(start) = &input.rkey_start {
212 query_builder = query_builder.bind(start);
213 }
214 if let Some(end) = &input.rkey_end {
215 query_builder = query_builder.bind(end);
216 }
217 query_builder.bind(limit_i64).fetch_all(&state.db).await
218 };
219 let rows = match rows_res {
220 Ok(r) => r,
221 Err(e) => {
222 error!("Error listing records: {:?}", e);
223 return (
224 StatusCode::INTERNAL_SERVER_ERROR,
225 Json(json!({"error": "InternalError"})),
226 )
227 .into_response();
228 }
229 };
230 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
231 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
232 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
233 for (rkey, cid_str) in &rows {
234 if let Ok(cid) = Cid::from_str(cid_str) {
235 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
236 cids.push(cid);
237 }
238 }
239 let blocks = match state.block_store.get_many(&cids).await {
240 Ok(b) => b,
241 Err(e) => {
242 error!("Error fetching blocks: {:?}", e);
243 return (
244 StatusCode::INTERNAL_SERVER_ERROR,
245 Json(json!({"error": "InternalError"})),
246 )
247 .into_response();
248 }
249 };
250 let mut records = Vec::new();
251 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
252 if let Some(block) = block_opt
253 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
254 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
255 records.push(json!({
256 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
257 "cid": cid_str,
258 "value": value
259 }));
260 }
261 }
262 Json(ListRecordsOutput {
263 cursor: last_rkey,
264 records,
265 })
266 .into_response()
267}