this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::collections::HashMap;
14use std::str::FromStr;
15use tracing::{error, info};
16
17#[derive(Deserialize)]
18pub struct GetRecordInput {
19 pub repo: String,
20 pub collection: String,
21 pub rkey: String,
22 pub cid: Option<String>,
23}
24
25pub async fn get_record(
26 State(state): State<AppState>,
27 headers: HeaderMap,
28 Query(input): Query<GetRecordInput>,
29) -> Response {
30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
31 let user_id_opt = if input.repo.starts_with("did:") {
32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
33 .fetch_optional(&state.db)
34 .await
35 .map(|opt| opt.map(|r| r.id))
36 } else {
37 let suffix = format!(".{}", hostname);
38 let short_handle = if input.repo.ends_with(&suffix) {
39 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
40 } else {
41 &input.repo
42 };
43 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
44 .fetch_optional(&state.db)
45 .await
46 .map(|opt| opt.map(|r| r.id))
47 };
48 let user_id: uuid::Uuid = match user_id_opt {
49 Ok(Some(id)) => id,
50 Ok(None) => {
51 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) {
52 let did = proxy_header.split('#').next().unwrap_or(proxy_header);
53 if let Some(resolved) = state.did_resolver.resolve_did(did).await {
54 let mut url = format!(
55 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
56 resolved.url.trim_end_matches('/'),
57 urlencoding::encode(&input.repo),
58 urlencoding::encode(&input.collection),
59 urlencoding::encode(&input.rkey)
60 );
61 if let Some(cid) = &input.cid {
62 url.push_str(&format!("&cid={}", urlencoding::encode(cid)));
63 }
64 info!("Proxying getRecord to {}: {}", did, url);
65 match proxy_client().get(&url).send().await {
66 Ok(resp) => {
67 let status = resp.status();
68 let body = match resp.bytes().await {
69 Ok(b) => b,
70 Err(e) => {
71 error!("Error reading proxy response: {:?}", e);
72 return (
73 StatusCode::BAD_GATEWAY,
74 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})),
75 )
76 .into_response();
77 }
78 };
79 return Response::builder()
80 .status(status)
81 .header("content-type", "application/json")
82 .body(axum::body::Body::from(body))
83 .unwrap_or_else(|_| {
84 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
85 .into_response()
86 });
87 }
88 Err(e) => {
89 error!("Error proxying request: {:?}", e);
90 return (
91 StatusCode::BAD_GATEWAY,
92 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})),
93 )
94 .into_response();
95 }
96 }
97 } else {
98 error!("Could not resolve DID from atproto-proxy header: {}", did);
99 return (
100 StatusCode::BAD_GATEWAY,
101 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})),
102 )
103 .into_response();
104 }
105 }
106 return (
107 StatusCode::NOT_FOUND,
108 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
109 )
110 .into_response();
111 }
112 Err(_) => {
113 return (
114 StatusCode::INTERNAL_SERVER_ERROR,
115 Json(json!({"error": "InternalError"})),
116 )
117 .into_response();
118 }
119 };
120 let record_row = sqlx::query!(
121 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
122 user_id,
123 input.collection,
124 input.rkey
125 )
126 .fetch_optional(&state.db)
127 .await;
128 let record_cid_str: String = match record_row {
129 Ok(Some(row)) => row.record_cid,
130 _ => {
131 return (
132 StatusCode::NOT_FOUND,
133 Json(json!({"error": "NotFound", "message": "Record not found"})),
134 )
135 .into_response();
136 }
137 };
138 if let Some(expected_cid) = &input.cid
139 && &record_cid_str != expected_cid
140 {
141 return (
142 StatusCode::NOT_FOUND,
143 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
144 )
145 .into_response();
146 }
147 let cid = match Cid::from_str(&record_cid_str) {
148 Ok(c) => c,
149 Err(_) => {
150 return (
151 StatusCode::INTERNAL_SERVER_ERROR,
152 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
153 )
154 .into_response();
155 }
156 };
157 let block = match state.block_store.get(&cid).await {
158 Ok(Some(b)) => b,
159 _ => {
160 return (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError", "message": "Record block not found"})),
163 )
164 .into_response();
165 }
166 };
167 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
168 Ok(v) => v,
169 Err(e) => {
170 error!("Failed to deserialize record: {:?}", e);
171 return (
172 StatusCode::INTERNAL_SERVER_ERROR,
173 Json(json!({"error": "InternalError"})),
174 )
175 .into_response();
176 }
177 };
178 Json(json!({
179 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
180 "cid": record_cid_str,
181 "value": value
182 }))
183 .into_response()
184}
185#[derive(Deserialize)]
186pub struct ListRecordsInput {
187 pub repo: String,
188 pub collection: String,
189 pub limit: Option<i32>,
190 pub cursor: Option<String>,
191 #[serde(rename = "rkeyStart")]
192 pub rkey_start: Option<String>,
193 #[serde(rename = "rkeyEnd")]
194 pub rkey_end: Option<String>,
195 pub reverse: Option<bool>,
196}
197#[derive(Serialize)]
198pub struct ListRecordsOutput {
199 #[serde(skip_serializing_if = "Option::is_none")]
200 pub cursor: Option<String>,
201 pub records: Vec<serde_json::Value>,
202}
203
204pub async fn list_records(
205 State(state): State<AppState>,
206 Query(input): Query<ListRecordsInput>,
207) -> Response {
208 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
209 let user_id_opt = if input.repo.starts_with("did:") {
210 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
211 .fetch_optional(&state.db)
212 .await
213 .map(|opt| opt.map(|r| r.id))
214 } else {
215 let suffix = format!(".{}", hostname);
216 let short_handle = if input.repo.ends_with(&suffix) {
217 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
218 } else {
219 &input.repo
220 };
221 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
222 .fetch_optional(&state.db)
223 .await
224 .map(|opt| opt.map(|r| r.id))
225 };
226 let user_id: uuid::Uuid = match user_id_opt {
227 Ok(Some(id)) => id,
228 Ok(None) => {
229 return (
230 StatusCode::NOT_FOUND,
231 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
232 )
233 .into_response();
234 }
235 Err(_) => {
236 return (
237 StatusCode::INTERNAL_SERVER_ERROR,
238 Json(json!({"error": "InternalError"})),
239 )
240 .into_response();
241 }
242 };
243 let limit = input.limit.unwrap_or(50).clamp(1, 100);
244 let reverse = input.reverse.unwrap_or(false);
245 let limit_i64 = limit as i64;
246 let order = if reverse { "ASC" } else { "DESC" };
247 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
248 let comparator = if reverse { ">" } else { "<" };
249 let query = format!(
250 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
251 comparator, order
252 );
253 sqlx::query_as(&query)
254 .bind(user_id)
255 .bind(&input.collection)
256 .bind(cursor)
257 .bind(limit_i64)
258 .fetch_all(&state.db)
259 .await
260 } else {
261 let mut conditions = vec!["repo_id = $1", "collection = $2"];
262 let mut param_idx = 3;
263 if input.rkey_start.is_some() {
264 conditions.push("rkey > $3");
265 param_idx += 1;
266 }
267 if input.rkey_end.is_some() {
268 conditions.push(if param_idx == 3 {
269 "rkey < $3"
270 } else {
271 "rkey < $4"
272 });
273 param_idx += 1;
274 }
275 let limit_idx = param_idx;
276 let query = format!(
277 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
278 conditions.join(" AND "),
279 order,
280 limit_idx
281 );
282 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
283 .bind(user_id)
284 .bind(&input.collection);
285 if let Some(start) = &input.rkey_start {
286 query_builder = query_builder.bind(start);
287 }
288 if let Some(end) = &input.rkey_end {
289 query_builder = query_builder.bind(end);
290 }
291 query_builder.bind(limit_i64).fetch_all(&state.db).await
292 };
293 let rows = match rows_res {
294 Ok(r) => r,
295 Err(e) => {
296 error!("Error listing records: {:?}", e);
297 return (
298 StatusCode::INTERNAL_SERVER_ERROR,
299 Json(json!({"error": "InternalError"})),
300 )
301 .into_response();
302 }
303 };
304 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
305 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
306 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
307 for (rkey, cid_str) in &rows {
308 if let Ok(cid) = Cid::from_str(cid_str) {
309 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
310 cids.push(cid);
311 }
312 }
313 let blocks = match state.block_store.get_many(&cids).await {
314 Ok(b) => b,
315 Err(e) => {
316 error!("Error fetching blocks: {:?}", e);
317 return (
318 StatusCode::INTERNAL_SERVER_ERROR,
319 Json(json!({"error": "InternalError"})),
320 )
321 .into_response();
322 }
323 };
324 let mut records = Vec::new();
325 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
326 if let Some(block) = block_opt
327 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
328 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block)
329 {
330 records.push(json!({
331 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
332 "cid": cid_str,
333 "value": value
334 }));
335 }
336 }
337 Json(ListRecordsOutput {
338 cursor: last_rkey,
339 records,
340 })
341 .into_response()
342}