this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15#[derive(Deserialize)]
16pub struct GetRecordInput {
17 pub repo: String,
18 pub collection: String,
19 pub rkey: String,
20 pub cid: Option<String>,
21}
22pub async fn get_record(
23 State(state): State<AppState>,
24 Query(input): Query<GetRecordInput>,
25) -> Response {
26 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
27 let user_id_opt = if input.repo.starts_with("did:") {
28 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
29 .fetch_optional(&state.db)
30 .await
31 .map(|opt| opt.map(|r| r.id))
32 } else {
33 let suffix = format!(".{}", hostname);
34 let short_handle = if input.repo.ends_with(&suffix) {
35 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
36 } else {
37 &input.repo
38 };
39 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
40 .fetch_optional(&state.db)
41 .await
42 .map(|opt| opt.map(|r| r.id))
43 };
44 let user_id: uuid::Uuid = match user_id_opt {
45 Ok(Some(id)) => id,
46 _ => {
47 return (
48 StatusCode::NOT_FOUND,
49 Json(json!({"error": "NotFound", "message": "Repo not found"})),
50 )
51 .into_response();
52 }
53 };
54 let record_row = sqlx::query!(
55 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
56 user_id,
57 input.collection,
58 input.rkey
59 )
60 .fetch_optional(&state.db)
61 .await;
62 let record_cid_str: String = match record_row {
63 Ok(Some(row)) => row.record_cid,
64 _ => {
65 return (
66 StatusCode::NOT_FOUND,
67 Json(json!({"error": "NotFound", "message": "Record not found"})),
68 )
69 .into_response();
70 }
71 };
72 if let Some(expected_cid) = &input.cid {
73 if &record_cid_str != expected_cid {
74 return (
75 StatusCode::NOT_FOUND,
76 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
77 )
78 .into_response();
79 }
80 }
81 let cid = match Cid::from_str(&record_cid_str) {
82 Ok(c) => c,
83 Err(_) => {
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
87 )
88 .into_response();
89 }
90 };
91 let block = match state.block_store.get(&cid).await {
92 Ok(Some(b)) => b,
93 _ => {
94 return (
95 StatusCode::INTERNAL_SERVER_ERROR,
96 Json(json!({"error": "InternalError", "message": "Record block not found"})),
97 )
98 .into_response();
99 }
100 };
101 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
102 Ok(v) => v,
103 Err(e) => {
104 error!("Failed to deserialize record: {:?}", e);
105 return (
106 StatusCode::INTERNAL_SERVER_ERROR,
107 Json(json!({"error": "InternalError"})),
108 )
109 .into_response();
110 }
111 };
112 Json(json!({
113 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
114 "cid": record_cid_str,
115 "value": value
116 }))
117 .into_response()
118}
119#[derive(Deserialize)]
120pub struct ListRecordsInput {
121 pub repo: String,
122 pub collection: String,
123 pub limit: Option<i32>,
124 pub cursor: Option<String>,
125 #[serde(rename = "rkeyStart")]
126 pub rkey_start: Option<String>,
127 #[serde(rename = "rkeyEnd")]
128 pub rkey_end: Option<String>,
129 pub reverse: Option<bool>,
130}
131#[derive(Serialize)]
132pub struct ListRecordsOutput {
133 pub cursor: Option<String>,
134 pub records: Vec<serde_json::Value>,
135}
136pub async fn list_records(
137 State(state): State<AppState>,
138 Query(input): Query<ListRecordsInput>,
139) -> Response {
140 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
141 let user_id_opt = if input.repo.starts_with("did:") {
142 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
143 .fetch_optional(&state.db)
144 .await
145 .map(|opt| opt.map(|r| r.id))
146 } else {
147 let suffix = format!(".{}", hostname);
148 let short_handle = if input.repo.ends_with(&suffix) {
149 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
150 } else {
151 &input.repo
152 };
153 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
154 .fetch_optional(&state.db)
155 .await
156 .map(|opt| opt.map(|r| r.id))
157 };
158 let user_id: uuid::Uuid = match user_id_opt {
159 Ok(Some(id)) => id,
160 _ => {
161 return (
162 StatusCode::NOT_FOUND,
163 Json(json!({"error": "NotFound", "message": "Repo not found"})),
164 )
165 .into_response();
166 }
167 };
168 let limit = input.limit.unwrap_or(50).clamp(1, 100);
169 let reverse = input.reverse.unwrap_or(false);
170 let limit_i64 = limit as i64;
171 let order = if reverse { "ASC" } else { "DESC" };
172 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
173 let comparator = if reverse { ">" } else { "<" };
174 let query = format!(
175 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
176 comparator, order
177 );
178 sqlx::query_as(&query)
179 .bind(user_id)
180 .bind(&input.collection)
181 .bind(cursor)
182 .bind(limit_i64)
183 .fetch_all(&state.db)
184 .await
185 } else {
186 let mut conditions = vec!["repo_id = $1", "collection = $2"];
187 let mut param_idx = 3;
188 if input.rkey_start.is_some() {
189 conditions.push("rkey > $3");
190 param_idx += 1;
191 }
192 if input.rkey_end.is_some() {
193 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" });
194 param_idx += 1;
195 }
196 let limit_idx = param_idx;
197 let query = format!(
198 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
199 conditions.join(" AND "),
200 order,
201 limit_idx
202 );
203 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
204 .bind(user_id)
205 .bind(&input.collection);
206 if let Some(start) = &input.rkey_start {
207 query_builder = query_builder.bind(start);
208 }
209 if let Some(end) = &input.rkey_end {
210 query_builder = query_builder.bind(end);
211 }
212 query_builder.bind(limit_i64).fetch_all(&state.db).await
213 };
214 let rows = match rows_res {
215 Ok(r) => r,
216 Err(e) => {
217 error!("Error listing records: {:?}", e);
218 return (
219 StatusCode::INTERNAL_SERVER_ERROR,
220 Json(json!({"error": "InternalError"})),
221 )
222 .into_response();
223 }
224 };
225 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
226 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
227 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
228 for (rkey, cid_str) in &rows {
229 if let Ok(cid) = Cid::from_str(cid_str) {
230 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
231 cids.push(cid);
232 }
233 }
234 let blocks = match state.block_store.get_many(&cids).await {
235 Ok(b) => b,
236 Err(e) => {
237 error!("Error fetching blocks: {:?}", e);
238 return (
239 StatusCode::INTERNAL_SERVER_ERROR,
240 Json(json!({"error": "InternalError"})),
241 )
242 .into_response();
243 }
244 };
245 let mut records = Vec::new();
246 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
247 if let Some(block) = block_opt {
248 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) {
249 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
250 records.push(json!({
251 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
252 "cid": cid_str,
253 "value": value
254 }));
255 }
256 }
257 }
258 }
259 Json(ListRecordsOutput {
260 cursor: last_rkey,
261 records,
262 })
263 .into_response()
264}