this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::collections::HashMap;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
29 let user_id_opt = if input.repo.starts_with("did:") {
30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
31 .fetch_optional(&state.db)
32 .await
33 .map(|opt| opt.map(|r| r.id))
34 } else {
35 let suffix = format!(".{}", hostname);
36 let short_handle = if input.repo.ends_with(&suffix) {
37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
38 } else {
39 &input.repo
40 };
41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
42 .fetch_optional(&state.db)
43 .await
44 .map(|opt| opt.map(|r| r.id))
45 };
46 let user_id: uuid::Uuid = match user_id_opt {
47 Ok(Some(id)) => id,
48 _ => {
49 return (
50 StatusCode::NOT_FOUND,
51 Json(json!({"error": "NotFound", "message": "Repo not found"})),
52 )
53 .into_response();
54 }
55 };
56 let record_row = sqlx::query!(
57 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
58 user_id,
59 input.collection,
60 input.rkey
61 )
62 .fetch_optional(&state.db)
63 .await;
64 let record_cid_str: String = match record_row {
65 Ok(Some(row)) => row.record_cid,
66 _ => {
67 return (
68 StatusCode::NOT_FOUND,
69 Json(json!({"error": "NotFound", "message": "Record not found"})),
70 )
71 .into_response();
72 }
73 };
74 if let Some(expected_cid) = &input.cid {
75 if &record_cid_str != expected_cid {
76 return (
77 StatusCode::NOT_FOUND,
78 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
79 )
80 .into_response();
81 }
82 }
83 let cid = match Cid::from_str(&record_cid_str) {
84 Ok(c) => c,
85 Err(_) => {
86 return (
87 StatusCode::INTERNAL_SERVER_ERROR,
88 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
89 )
90 .into_response();
91 }
92 };
93 let block = match state.block_store.get(&cid).await {
94 Ok(Some(b)) => b,
95 _ => {
96 return (
97 StatusCode::INTERNAL_SERVER_ERROR,
98 Json(json!({"error": "InternalError", "message": "Record block not found"})),
99 )
100 .into_response();
101 }
102 };
103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
104 Ok(v) => v,
105 Err(e) => {
106 error!("Failed to deserialize record: {:?}", e);
107 return (
108 StatusCode::INTERNAL_SERVER_ERROR,
109 Json(json!({"error": "InternalError"})),
110 )
111 .into_response();
112 }
113 };
114 Json(json!({
115 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
116 "cid": record_cid_str,
117 "value": value
118 }))
119 .into_response()
120}
121#[derive(Deserialize)]
122pub struct ListRecordsInput {
123 pub repo: String,
124 pub collection: String,
125 pub limit: Option<i32>,
126 pub cursor: Option<String>,
127 #[serde(rename = "rkeyStart")]
128 pub rkey_start: Option<String>,
129 #[serde(rename = "rkeyEnd")]
130 pub rkey_end: Option<String>,
131 pub reverse: Option<bool>,
132}
133#[derive(Serialize)]
134pub struct ListRecordsOutput {
135 pub cursor: Option<String>,
136 pub records: Vec<serde_json::Value>,
137}
138pub async fn list_records(
139 State(state): State<AppState>,
140 Query(input): Query<ListRecordsInput>,
141) -> Response {
142 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
143 let user_id_opt = if input.repo.starts_with("did:") {
144 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
145 .fetch_optional(&state.db)
146 .await
147 .map(|opt| opt.map(|r| r.id))
148 } else {
149 let suffix = format!(".{}", hostname);
150 let short_handle = if input.repo.ends_with(&suffix) {
151 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
152 } else {
153 &input.repo
154 };
155 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
156 .fetch_optional(&state.db)
157 .await
158 .map(|opt| opt.map(|r| r.id))
159 };
160 let user_id: uuid::Uuid = match user_id_opt {
161 Ok(Some(id)) => id,
162 _ => {
163 return (
164 StatusCode::NOT_FOUND,
165 Json(json!({"error": "NotFound", "message": "Repo not found"})),
166 )
167 .into_response();
168 }
169 };
170 let limit = input.limit.unwrap_or(50).clamp(1, 100);
171 let reverse = input.reverse.unwrap_or(false);
172 let limit_i64 = limit as i64;
173 let order = if reverse { "ASC" } else { "DESC" };
174 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
175 let comparator = if reverse { ">" } else { "<" };
176 let query = format!(
177 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
178 comparator, order
179 );
180 sqlx::query_as(&query)
181 .bind(user_id)
182 .bind(&input.collection)
183 .bind(cursor)
184 .bind(limit_i64)
185 .fetch_all(&state.db)
186 .await
187 } else {
188 let mut conditions = vec!["repo_id = $1", "collection = $2"];
189 let mut param_idx = 3;
190 if input.rkey_start.is_some() {
191 conditions.push("rkey > $3");
192 param_idx += 1;
193 }
194 if input.rkey_end.is_some() {
195 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" });
196 param_idx += 1;
197 }
198 let limit_idx = param_idx;
199 let query = format!(
200 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
201 conditions.join(" AND "),
202 order,
203 limit_idx
204 );
205 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
206 .bind(user_id)
207 .bind(&input.collection);
208 if let Some(start) = &input.rkey_start {
209 query_builder = query_builder.bind(start);
210 }
211 if let Some(end) = &input.rkey_end {
212 query_builder = query_builder.bind(end);
213 }
214 query_builder.bind(limit_i64).fetch_all(&state.db).await
215 };
216 let rows = match rows_res {
217 Ok(r) => r,
218 Err(e) => {
219 error!("Error listing records: {:?}", e);
220 return (
221 StatusCode::INTERNAL_SERVER_ERROR,
222 Json(json!({"error": "InternalError"})),
223 )
224 .into_response();
225 }
226 };
227 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
228 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
229 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
230 for (rkey, cid_str) in &rows {
231 if let Ok(cid) = Cid::from_str(cid_str) {
232 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
233 cids.push(cid);
234 }
235 }
236 let blocks = match state.block_store.get_many(&cids).await {
237 Ok(b) => b,
238 Err(e) => {
239 error!("Error fetching blocks: {:?}", e);
240 return (
241 StatusCode::INTERNAL_SERVER_ERROR,
242 Json(json!({"error": "InternalError"})),
243 )
244 .into_response();
245 }
246 };
247 let mut records = Vec::new();
248 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
249 if let Some(block) = block_opt {
250 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) {
251 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
252 records.push(json!({
253 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
254 "cid": cid_str,
255 "value": value
256 }));
257 }
258 }
259 }
260 }
261 Json(ListRecordsOutput {
262 cursor: last_rkey,
263 records,
264 })
265 .into_response()
266}