this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, RawQuery, State},
6 http::StatusCode,
7 response::{IntoResponse, Response},
8};
9use cid::Cid;
10use jacquard_repo::storage::BlockStore;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::collections::HashMap;
14use std::str::FromStr;
15use tracing::{error, info};
16
17#[derive(Deserialize)]
18pub struct GetRecordInput {
19 pub repo: String,
20 pub collection: String,
21 pub rkey: String,
22 pub cid: Option<String>,
23}
24
25async fn proxy_get_record_to_appview(state: &AppState, raw_query: Option<&str>) -> Response {
26 let resolved = match state.appview_registry.get_appview_for_method("com.atproto.repo.getRecord").await {
27 Some(r) => r,
28 None => {
29 return (
30 StatusCode::NOT_FOUND,
31 Json(json!({"error": "NotFound", "message": "Repo not found"})),
32 )
33 .into_response();
34 }
35 };
36 let target_url = match raw_query {
37 Some(q) => format!("{}/xrpc/com.atproto.repo.getRecord?{}", resolved.url, q),
38 None => format!("{}/xrpc/com.atproto.repo.getRecord", resolved.url),
39 };
40 info!("Proxying getRecord to AppView: {}", target_url);
41 let client = proxy_client();
42 match client.get(&target_url).send().await {
43 Ok(resp) => {
44 let status =
45 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
46 let content_type = resp
47 .headers()
48 .get("content-type")
49 .and_then(|v| v.to_str().ok())
50 .map(|s| s.to_string());
51 match resp.bytes().await {
52 Ok(body) => {
53 let mut builder = Response::builder().status(status);
54 if let Some(ct) = content_type {
55 builder = builder.header("content-type", ct);
56 }
57 builder
58 .body(axum::body::Body::from(body))
59 .unwrap_or_else(|_| {
60 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response()
61 })
62 }
63 Err(e) => {
64 error!("Error reading AppView response: {:?}", e);
65 (
66 StatusCode::BAD_GATEWAY,
67 Json(json!({"error": "UpstreamError"})),
68 )
69 .into_response()
70 }
71 }
72 }
73 Err(e) => {
74 error!("Error proxying to AppView: {:?}", e);
75 (
76 StatusCode::BAD_GATEWAY,
77 Json(json!({"error": "UpstreamError"})),
78 )
79 .into_response()
80 }
81 }
82}
83
84pub async fn get_record(
85 State(state): State<AppState>,
86 Query(input): Query<GetRecordInput>,
87 RawQuery(raw_query): RawQuery,
88) -> Response {
89 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
90 let user_id_opt = if input.repo.starts_with("did:") {
91 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
92 .fetch_optional(&state.db)
93 .await
94 .map(|opt| opt.map(|r| r.id))
95 } else {
96 let suffix = format!(".{}", hostname);
97 let short_handle = if input.repo.ends_with(&suffix) {
98 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
99 } else {
100 &input.repo
101 };
102 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
103 .fetch_optional(&state.db)
104 .await
105 .map(|opt| opt.map(|r| r.id))
106 };
107 let user_id: uuid::Uuid = match user_id_opt {
108 Ok(Some(id)) => id,
109 _ => {
110 return proxy_get_record_to_appview(&state, raw_query.as_deref()).await;
111 }
112 };
113 let record_row = sqlx::query!(
114 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
115 user_id,
116 input.collection,
117 input.rkey
118 )
119 .fetch_optional(&state.db)
120 .await;
121 let record_cid_str: String = match record_row {
122 Ok(Some(row)) => row.record_cid,
123 _ => {
124 return (
125 StatusCode::NOT_FOUND,
126 Json(json!({"error": "NotFound", "message": "Record not found"})),
127 )
128 .into_response();
129 }
130 };
131 if let Some(expected_cid) = &input.cid
132 && &record_cid_str != expected_cid {
133 return (
134 StatusCode::NOT_FOUND,
135 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
136 )
137 .into_response();
138 }
139 let cid = match Cid::from_str(&record_cid_str) {
140 Ok(c) => c,
141 Err(_) => {
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
145 )
146 .into_response();
147 }
148 };
149 let block = match state.block_store.get(&cid).await {
150 Ok(Some(b)) => b,
151 _ => {
152 return (
153 StatusCode::INTERNAL_SERVER_ERROR,
154 Json(json!({"error": "InternalError", "message": "Record block not found"})),
155 )
156 .into_response();
157 }
158 };
159 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
160 Ok(v) => v,
161 Err(e) => {
162 error!("Failed to deserialize record: {:?}", e);
163 return (
164 StatusCode::INTERNAL_SERVER_ERROR,
165 Json(json!({"error": "InternalError"})),
166 )
167 .into_response();
168 }
169 };
170 Json(json!({
171 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
172 "cid": record_cid_str,
173 "value": value
174 }))
175 .into_response()
176}
177#[derive(Deserialize)]
178pub struct ListRecordsInput {
179 pub repo: String,
180 pub collection: String,
181 pub limit: Option<i32>,
182 pub cursor: Option<String>,
183 #[serde(rename = "rkeyStart")]
184 pub rkey_start: Option<String>,
185 #[serde(rename = "rkeyEnd")]
186 pub rkey_end: Option<String>,
187 pub reverse: Option<bool>,
188}
189#[derive(Serialize)]
190pub struct ListRecordsOutput {
191 pub cursor: Option<String>,
192 pub records: Vec<serde_json::Value>,
193}
194
195async fn proxy_list_records_to_appview(state: &AppState, raw_query: Option<&str>) -> Response {
196 let resolved = match state.appview_registry.get_appview_for_method("com.atproto.repo.listRecords").await {
197 Some(r) => r,
198 None => {
199 return (
200 StatusCode::NOT_FOUND,
201 Json(json!({"error": "NotFound", "message": "Repo not found"})),
202 )
203 .into_response();
204 }
205 };
206 let target_url = match raw_query {
207 Some(q) => format!("{}/xrpc/com.atproto.repo.listRecords?{}", resolved.url, q),
208 None => format!("{}/xrpc/com.atproto.repo.listRecords", resolved.url),
209 };
210 info!("Proxying listRecords to AppView: {}", target_url);
211 let client = proxy_client();
212 match client.get(&target_url).send().await {
213 Ok(resp) => {
214 let status =
215 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
216 let content_type = resp
217 .headers()
218 .get("content-type")
219 .and_then(|v| v.to_str().ok())
220 .map(|s| s.to_string());
221 match resp.bytes().await {
222 Ok(body) => {
223 let mut builder = Response::builder().status(status);
224 if let Some(ct) = content_type {
225 builder = builder.header("content-type", ct);
226 }
227 builder
228 .body(axum::body::Body::from(body))
229 .unwrap_or_else(|_| {
230 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response()
231 })
232 }
233 Err(e) => {
234 error!("Error reading AppView response: {:?}", e);
235 (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response()
236 }
237 }
238 }
239 Err(e) => {
240 error!("Error proxying to AppView: {:?}", e);
241 (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response()
242 }
243 }
244}
245
246pub async fn list_records(
247 State(state): State<AppState>,
248 Query(input): Query<ListRecordsInput>,
249 RawQuery(raw_query): RawQuery,
250) -> Response {
251 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
252 let user_id_opt = if input.repo.starts_with("did:") {
253 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
254 .fetch_optional(&state.db)
255 .await
256 .map(|opt| opt.map(|r| r.id))
257 } else {
258 let suffix = format!(".{}", hostname);
259 let short_handle = if input.repo.ends_with(&suffix) {
260 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo)
261 } else {
262 &input.repo
263 };
264 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle)
265 .fetch_optional(&state.db)
266 .await
267 .map(|opt| opt.map(|r| r.id))
268 };
269 let user_id: uuid::Uuid = match user_id_opt {
270 Ok(Some(id)) => id,
271 _ => {
272 return proxy_list_records_to_appview(&state, raw_query.as_deref()).await;
273 }
274 };
275 let limit = input.limit.unwrap_or(50).clamp(1, 100);
276 let reverse = input.reverse.unwrap_or(false);
277 let limit_i64 = limit as i64;
278 let order = if reverse { "ASC" } else { "DESC" };
279 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
280 let comparator = if reverse { ">" } else { "<" };
281 let query = format!(
282 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
283 comparator, order
284 );
285 sqlx::query_as(&query)
286 .bind(user_id)
287 .bind(&input.collection)
288 .bind(cursor)
289 .bind(limit_i64)
290 .fetch_all(&state.db)
291 .await
292 } else {
293 let mut conditions = vec!["repo_id = $1", "collection = $2"];
294 let mut param_idx = 3;
295 if input.rkey_start.is_some() {
296 conditions.push("rkey > $3");
297 param_idx += 1;
298 }
299 if input.rkey_end.is_some() {
300 conditions.push(if param_idx == 3 {
301 "rkey < $3"
302 } else {
303 "rkey < $4"
304 });
305 param_idx += 1;
306 }
307 let limit_idx = param_idx;
308 let query = format!(
309 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
310 conditions.join(" AND "),
311 order,
312 limit_idx
313 );
314 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
315 .bind(user_id)
316 .bind(&input.collection);
317 if let Some(start) = &input.rkey_start {
318 query_builder = query_builder.bind(start);
319 }
320 if let Some(end) = &input.rkey_end {
321 query_builder = query_builder.bind(end);
322 }
323 query_builder.bind(limit_i64).fetch_all(&state.db).await
324 };
325 let rows = match rows_res {
326 Ok(r) => r,
327 Err(e) => {
328 error!("Error listing records: {:?}", e);
329 return (
330 StatusCode::INTERNAL_SERVER_ERROR,
331 Json(json!({"error": "InternalError"})),
332 )
333 .into_response();
334 }
335 };
336 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
337 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
338 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
339 for (rkey, cid_str) in &rows {
340 if let Ok(cid) = Cid::from_str(cid_str) {
341 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
342 cids.push(cid);
343 }
344 }
345 let blocks = match state.block_store.get_many(&cids).await {
346 Ok(b) => b,
347 Err(e) => {
348 error!("Error fetching blocks: {:?}", e);
349 return (
350 StatusCode::INTERNAL_SERVER_ERROR,
351 Json(json!({"error": "InternalError"})),
352 )
353 .into_response();
354 }
355 };
356 let mut records = Vec::new();
357 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
358 if let Some(block) = block_opt
359 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
360 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
361 records.push(json!({
362 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
363 "cid": cid_str,
364 "value": value
365 }));
366 }
367 }
368 Json(ListRecordsOutput {
369 cursor: last_rkey,
370 records,
371 })
372 .into_response()
373}