this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::collections::HashMap;
14use std::str::FromStr;
15use tracing::{error, info};
16
17#[derive(Deserialize)]
18pub struct GetRecordInput {
19 pub repo: String,
20 pub collection: String,
21 pub rkey: String,
22 pub cid: Option<String>,
23}
24
25pub async fn get_record(
26 State(state): State<AppState>,
27 headers: HeaderMap,
28 Query(input): Query<GetRecordInput>,
29) -> Response {
30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
31 let user_id_opt = if input.repo.starts_with("did:") {
32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
33 .fetch_optional(&state.db)
34 .await
35 .map(|opt| opt.map(|r| r.id))
36 } else {
37 let handle = if !input.repo.contains('.') {
38 format!("{}.{}", input.repo, hostname)
39 } else {
40 input.repo.clone()
41 };
42 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
43 .fetch_optional(&state.db)
44 .await
45 .map(|opt| opt.map(|r| r.id))
46 };
47 let user_id: uuid::Uuid = match user_id_opt {
48 Ok(Some(id)) => id,
49 Ok(None) => {
50 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) {
51 let did = proxy_header.split('#').next().unwrap_or(proxy_header);
52 if let Some(resolved) = state.did_resolver.resolve_did(did).await {
53 let mut url = format!(
54 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
55 resolved.url.trim_end_matches('/'),
56 urlencoding::encode(&input.repo),
57 urlencoding::encode(&input.collection),
58 urlencoding::encode(&input.rkey)
59 );
60 if let Some(cid) = &input.cid {
61 url.push_str(&format!("&cid={}", urlencoding::encode(cid)));
62 }
63 info!("Proxying getRecord to {}: {}", did, url);
64 match proxy_client().get(&url).send().await {
65 Ok(resp) => {
66 let status = resp.status();
67 let body = match resp.bytes().await {
68 Ok(b) => b,
69 Err(e) => {
70 error!("Error reading proxy response: {:?}", e);
71 return (
72 StatusCode::BAD_GATEWAY,
73 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})),
74 )
75 .into_response();
76 }
77 };
78 return Response::builder()
79 .status(status)
80 .header("content-type", "application/json")
81 .body(axum::body::Body::from(body))
82 .unwrap_or_else(|_| {
83 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
84 .into_response()
85 });
86 }
87 Err(e) => {
88 error!("Error proxying request: {:?}", e);
89 return (
90 StatusCode::BAD_GATEWAY,
91 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})),
92 )
93 .into_response();
94 }
95 }
96 } else {
97 error!("Could not resolve DID from atproto-proxy header: {}", did);
98 return (
99 StatusCode::BAD_GATEWAY,
100 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})),
101 )
102 .into_response();
103 }
104 }
105 return (
106 StatusCode::NOT_FOUND,
107 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
108 )
109 .into_response();
110 }
111 Err(_) => {
112 return (
113 StatusCode::INTERNAL_SERVER_ERROR,
114 Json(json!({"error": "InternalError"})),
115 )
116 .into_response();
117 }
118 };
119 let record_row = sqlx::query!(
120 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
121 user_id,
122 input.collection,
123 input.rkey
124 )
125 .fetch_optional(&state.db)
126 .await;
127 let record_cid_str: String = match record_row {
128 Ok(Some(row)) => row.record_cid,
129 _ => {
130 return (
131 StatusCode::NOT_FOUND,
132 Json(json!({"error": "NotFound", "message": "Record not found"})),
133 )
134 .into_response();
135 }
136 };
137 if let Some(expected_cid) = &input.cid
138 && &record_cid_str != expected_cid
139 {
140 return (
141 StatusCode::NOT_FOUND,
142 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
143 )
144 .into_response();
145 }
146 let cid = match Cid::from_str(&record_cid_str) {
147 Ok(c) => c,
148 Err(_) => {
149 return (
150 StatusCode::INTERNAL_SERVER_ERROR,
151 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
152 )
153 .into_response();
154 }
155 };
156 let block = match state.block_store.get(&cid).await {
157 Ok(Some(b)) => b,
158 _ => {
159 return (
160 StatusCode::INTERNAL_SERVER_ERROR,
161 Json(json!({"error": "InternalError", "message": "Record block not found"})),
162 )
163 .into_response();
164 }
165 };
166 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
167 Ok(v) => v,
168 Err(e) => {
169 error!("Failed to deserialize record: {:?}", e);
170 return (
171 StatusCode::INTERNAL_SERVER_ERROR,
172 Json(json!({"error": "InternalError"})),
173 )
174 .into_response();
175 }
176 };
177 Json(json!({
178 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
179 "cid": record_cid_str,
180 "value": value
181 }))
182 .into_response()
183}
184#[derive(Deserialize)]
185pub struct ListRecordsInput {
186 pub repo: String,
187 pub collection: String,
188 pub limit: Option<i32>,
189 pub cursor: Option<String>,
190 #[serde(rename = "rkeyStart")]
191 pub rkey_start: Option<String>,
192 #[serde(rename = "rkeyEnd")]
193 pub rkey_end: Option<String>,
194 pub reverse: Option<bool>,
195}
196#[derive(Serialize)]
197pub struct ListRecordsOutput {
198 #[serde(skip_serializing_if = "Option::is_none")]
199 pub cursor: Option<String>,
200 pub records: Vec<serde_json::Value>,
201}
202
203pub async fn list_records(
204 State(state): State<AppState>,
205 Query(input): Query<ListRecordsInput>,
206) -> Response {
207 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
208 let user_id_opt = if input.repo.starts_with("did:") {
209 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
210 .fetch_optional(&state.db)
211 .await
212 .map(|opt| opt.map(|r| r.id))
213 } else {
214 let handle = if !input.repo.contains('.') {
215 format!("{}.{}", input.repo, hostname)
216 } else {
217 input.repo.clone()
218 };
219 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
220 .fetch_optional(&state.db)
221 .await
222 .map(|opt| opt.map(|r| r.id))
223 };
224 let user_id: uuid::Uuid = match user_id_opt {
225 Ok(Some(id)) => id,
226 Ok(None) => {
227 return (
228 StatusCode::NOT_FOUND,
229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
230 )
231 .into_response();
232 }
233 Err(_) => {
234 return (
235 StatusCode::INTERNAL_SERVER_ERROR,
236 Json(json!({"error": "InternalError"})),
237 )
238 .into_response();
239 }
240 };
241 let limit = input.limit.unwrap_or(50).clamp(1, 100);
242 let reverse = input.reverse.unwrap_or(false);
243 let limit_i64 = limit as i64;
244 let order = if reverse { "ASC" } else { "DESC" };
245 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
246 let comparator = if reverse { ">" } else { "<" };
247 let query = format!(
248 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
249 comparator, order
250 );
251 sqlx::query_as(&query)
252 .bind(user_id)
253 .bind(&input.collection)
254 .bind(cursor)
255 .bind(limit_i64)
256 .fetch_all(&state.db)
257 .await
258 } else {
259 let mut conditions = vec!["repo_id = $1", "collection = $2"];
260 let mut param_idx = 3;
261 if input.rkey_start.is_some() {
262 conditions.push("rkey > $3");
263 param_idx += 1;
264 }
265 if input.rkey_end.is_some() {
266 conditions.push(if param_idx == 3 {
267 "rkey < $3"
268 } else {
269 "rkey < $4"
270 });
271 param_idx += 1;
272 }
273 let limit_idx = param_idx;
274 let query = format!(
275 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
276 conditions.join(" AND "),
277 order,
278 limit_idx
279 );
280 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
281 .bind(user_id)
282 .bind(&input.collection);
283 if let Some(start) = &input.rkey_start {
284 query_builder = query_builder.bind(start);
285 }
286 if let Some(end) = &input.rkey_end {
287 query_builder = query_builder.bind(end);
288 }
289 query_builder.bind(limit_i64).fetch_all(&state.db).await
290 };
291 let rows = match rows_res {
292 Ok(r) => r,
293 Err(e) => {
294 error!("Error listing records: {:?}", e);
295 return (
296 StatusCode::INTERNAL_SERVER_ERROR,
297 Json(json!({"error": "InternalError"})),
298 )
299 .into_response();
300 }
301 };
302 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
303 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
304 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
305 for (rkey, cid_str) in &rows {
306 if let Ok(cid) = Cid::from_str(cid_str) {
307 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
308 cids.push(cid);
309 }
310 }
311 let blocks = match state.block_store.get_many(&cids).await {
312 Ok(b) => b,
313 Err(e) => {
314 error!("Error fetching blocks: {:?}", e);
315 return (
316 StatusCode::INTERNAL_SERVER_ERROR,
317 Json(json!({"error": "InternalError"})),
318 )
319 .into_response();
320 }
321 };
322 let mut records = Vec::new();
323 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
324 if let Some(block) = block_opt
325 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
326 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block)
327 {
328 records.push(json!({
329 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
330 "cid": cid_str,
331 "value": value
332 }));
333 }
334 }
335 Json(ListRecordsOutput {
336 cursor: last_rkey,
337 records,
338 })
339 .into_response()
340}