this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::collections::HashMap;
14use std::str::FromStr;
15use tracing::{error, info};
16
17#[derive(Deserialize)]
18pub struct GetRecordInput {
19 pub repo: String,
20 pub collection: String,
21 pub rkey: String,
22 pub cid: Option<String>,
23}
24
25pub async fn get_record(
26 State(state): State<AppState>,
27 headers: HeaderMap,
28 Query(input): Query<GetRecordInput>,
29) -> Response {
30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
31 let user_id_opt = if input.repo.starts_with("did:") {
32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
33 .fetch_optional(&state.db)
34 .await
35 .map(|opt| opt.map(|r| r.id))
36 } else {
37 let suffix = format!(".{}", hostname);
38 let short_handle = if input.repo.ends_with(&suffix) {
39 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
40 } else {
41 &input.repo
42 };
43 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
44 .fetch_optional(&state.db)
45 .await
46 .map(|opt| opt.map(|r| r.id))
47 };
48 let user_id: uuid::Uuid = match user_id_opt {
49 Ok(Some(id)) => id,
50 Ok(None) => {
51 if let Some(proxy_header) = headers
52 .get("atproto-proxy")
53 .and_then(|h| h.to_str().ok())
54 {
55 let did = proxy_header.split('#').next().unwrap_or(proxy_header);
56 if let Some(resolved) = state.did_resolver.resolve_did(did).await {
57 let mut url = format!(
58 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
59 resolved.url.trim_end_matches('/'),
60 urlencoding::encode(&input.repo),
61 urlencoding::encode(&input.collection),
62 urlencoding::encode(&input.rkey)
63 );
64 if let Some(cid) = &input.cid {
65 url.push_str(&format!("&cid={}", urlencoding::encode(cid)));
66 }
67 info!("Proxying getRecord to {}: {}", did, url);
68 match proxy_client().get(&url).send().await {
69 Ok(resp) => {
70 let status = resp.status();
71 let body = match resp.bytes().await {
72 Ok(b) => b,
73 Err(e) => {
74 error!("Error reading proxy response: {:?}", e);
75 return (
76 StatusCode::BAD_GATEWAY,
77 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})),
78 )
79 .into_response();
80 }
81 };
82 return Response::builder()
83 .status(status)
84 .header("content-type", "application/json")
85 .body(axum::body::Body::from(body))
86 .unwrap_or_else(|_| {
87 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response()
88 });
89 }
90 Err(e) => {
91 error!("Error proxying request: {:?}", e);
92 return (
93 StatusCode::BAD_GATEWAY,
94 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})),
95 )
96 .into_response();
97 }
98 }
99 } else {
100 error!("Could not resolve DID from atproto-proxy header: {}", did);
101 return (
102 StatusCode::BAD_GATEWAY,
103 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})),
104 )
105 .into_response();
106 }
107 }
108 return (
109 StatusCode::NOT_FOUND,
110 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
111 )
112 .into_response();
113 }
114 Err(_) => {
115 return (
116 StatusCode::INTERNAL_SERVER_ERROR,
117 Json(json!({"error": "InternalError"})),
118 )
119 .into_response();
120 }
121 };
122 let record_row = sqlx::query!(
123 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
124 user_id,
125 input.collection,
126 input.rkey
127 )
128 .fetch_optional(&state.db)
129 .await;
130 let record_cid_str: String = match record_row {
131 Ok(Some(row)) => row.record_cid,
132 _ => {
133 return (
134 StatusCode::NOT_FOUND,
135 Json(json!({"error": "NotFound", "message": "Record not found"})),
136 )
137 .into_response();
138 }
139 };
140 if let Some(expected_cid) = &input.cid
141 && &record_cid_str != expected_cid {
142 return (
143 StatusCode::NOT_FOUND,
144 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
145 )
146 .into_response();
147 }
148 let cid = match Cid::from_str(&record_cid_str) {
149 Ok(c) => c,
150 Err(_) => {
151 return (
152 StatusCode::INTERNAL_SERVER_ERROR,
153 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
154 )
155 .into_response();
156 }
157 };
158 let block = match state.block_store.get(&cid).await {
159 Ok(Some(b)) => b,
160 _ => {
161 return (
162 StatusCode::INTERNAL_SERVER_ERROR,
163 Json(json!({"error": "InternalError", "message": "Record block not found"})),
164 )
165 .into_response();
166 }
167 };
168 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
169 Ok(v) => v,
170 Err(e) => {
171 error!("Failed to deserialize record: {:?}", e);
172 return (
173 StatusCode::INTERNAL_SERVER_ERROR,
174 Json(json!({"error": "InternalError"})),
175 )
176 .into_response();
177 }
178 };
179 Json(json!({
180 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
181 "cid": record_cid_str,
182 "value": value
183 }))
184 .into_response()
185}
186#[derive(Deserialize)]
187pub struct ListRecordsInput {
188 pub repo: String,
189 pub collection: String,
190 pub limit: Option<i32>,
191 pub cursor: Option<String>,
192 #[serde(rename = "rkeyStart")]
193 pub rkey_start: Option<String>,
194 #[serde(rename = "rkeyEnd")]
195 pub rkey_end: Option<String>,
196 pub reverse: Option<bool>,
197}
198#[derive(Serialize)]
199pub struct ListRecordsOutput {
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub cursor: Option<String>,
202 pub records: Vec<serde_json::Value>,
203}
204
205pub async fn list_records(
206 State(state): State<AppState>,
207 Query(input): Query<ListRecordsInput>,
208) -> Response {
209 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
210 let user_id_opt = if input.repo.starts_with("did:") {
211 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
212 .fetch_optional(&state.db)
213 .await
214 .map(|opt| opt.map(|r| r.id))
215 } else {
216 let suffix = format!(".{}", hostname);
217 let short_handle = if input.repo.ends_with(&suffix) {
218 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
219 } else {
220 &input.repo
221 };
222 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
223 .fetch_optional(&state.db)
224 .await
225 .map(|opt| opt.map(|r| r.id))
226 };
227 let user_id: uuid::Uuid = match user_id_opt {
228 Ok(Some(id)) => id,
229 Ok(None) => {
230 return (
231 StatusCode::NOT_FOUND,
232 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
233 )
234 .into_response();
235 }
236 Err(_) => {
237 return (
238 StatusCode::INTERNAL_SERVER_ERROR,
239 Json(json!({"error": "InternalError"})),
240 )
241 .into_response();
242 }
243 };
244 let limit = input.limit.unwrap_or(50).clamp(1, 100);
245 let reverse = input.reverse.unwrap_or(false);
246 let limit_i64 = limit as i64;
247 let order = if reverse { "ASC" } else { "DESC" };
248 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
249 let comparator = if reverse { ">" } else { "<" };
250 let query = format!(
251 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
252 comparator, order
253 );
254 sqlx::query_as(&query)
255 .bind(user_id)
256 .bind(&input.collection)
257 .bind(cursor)
258 .bind(limit_i64)
259 .fetch_all(&state.db)
260 .await
261 } else {
262 let mut conditions = vec!["repo_id = $1", "collection = $2"];
263 let mut param_idx = 3;
264 if input.rkey_start.is_some() {
265 conditions.push("rkey > $3");
266 param_idx += 1;
267 }
268 if input.rkey_end.is_some() {
269 conditions.push(if param_idx == 3 {
270 "rkey < $3"
271 } else {
272 "rkey < $4"
273 });
274 param_idx += 1;
275 }
276 let limit_idx = param_idx;
277 let query = format!(
278 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
279 conditions.join(" AND "),
280 order,
281 limit_idx
282 );
283 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
284 .bind(user_id)
285 .bind(&input.collection);
286 if let Some(start) = &input.rkey_start {
287 query_builder = query_builder.bind(start);
288 }
289 if let Some(end) = &input.rkey_end {
290 query_builder = query_builder.bind(end);
291 }
292 query_builder.bind(limit_i64).fetch_all(&state.db).await
293 };
294 let rows = match rows_res {
295 Ok(r) => r,
296 Err(e) => {
297 error!("Error listing records: {:?}", e);
298 return (
299 StatusCode::INTERNAL_SERVER_ERROR,
300 Json(json!({"error": "InternalError"})),
301 )
302 .into_response();
303 }
304 };
305 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
306 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
307 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
308 for (rkey, cid_str) in &rows {
309 if let Ok(cid) = Cid::from_str(cid_str) {
310 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
311 cids.push(cid);
312 }
313 }
314 let blocks = match state.block_store.get_many(&cids).await {
315 Ok(b) => b,
316 Err(e) => {
317 error!("Error fetching blocks: {:?}", e);
318 return (
319 StatusCode::INTERNAL_SERVER_ERROR,
320 Json(json!({"error": "InternalError"})),
321 )
322 .into_response();
323 }
324 };
325 let mut records = Vec::new();
326 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
327 if let Some(block) = block_opt
328 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
329 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
330 records.push(json!({
331 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
332 "cid": cid_str,
333 "value": value
334 }));
335 }
336 }
337 Json(ListRecordsOutput {
338 cursor: last_rkey,
339 records,
340 })
341 .into_response()
342}