this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use base64::Engine;
10use cid::Cid;
11use ipld_core::ipld::Ipld;
12use jacquard_repo::storage::BlockStore;
13use serde::{Deserialize, Serialize};
14use serde_json::{Map, Value, json};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{error, info};
18
19fn ipld_to_json(ipld: Ipld) -> Value {
20 match ipld {
21 Ipld::Null => Value::Null,
22 Ipld::Bool(b) => Value::Bool(b),
23 Ipld::Integer(i) => {
24 if let Ok(n) = i64::try_from(i) {
25 Value::Number(n.into())
26 } else {
27 Value::String(i.to_string())
28 }
29 }
30 Ipld::Float(f) => serde_json::Number::from_f64(f)
31 .map(Value::Number)
32 .unwrap_or(Value::Null),
33 Ipld::String(s) => Value::String(s),
34 Ipld::Bytes(b) => {
35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b);
36 json!({ "$bytes": encoded })
37 }
38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()),
39 Ipld::Map(map) => {
40 let obj: Map<String, Value> =
41 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect();
42 Value::Object(obj)
43 }
44 Ipld::Link(cid) => json!({ "$link": cid.to_string() }),
45 }
46}
47
48#[derive(Deserialize)]
49pub struct GetRecordInput {
50 pub repo: String,
51 pub collection: String,
52 pub rkey: String,
53 pub cid: Option<String>,
54}
55
56pub async fn get_record(
57 State(state): State<AppState>,
58 headers: HeaderMap,
59 Query(input): Query<GetRecordInput>,
60) -> Response {
61 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
62 let user_id_opt = if input.repo.starts_with("did:") {
63 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
64 .fetch_optional(&state.db)
65 .await
66 .map(|opt| opt.map(|r| r.id))
67 } else {
68 let handle = if !input.repo.contains('.') {
69 format!("{}.{}", input.repo, hostname)
70 } else {
71 input.repo.clone()
72 };
73 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
74 .fetch_optional(&state.db)
75 .await
76 .map(|opt| opt.map(|r| r.id))
77 };
78 let user_id: uuid::Uuid = match user_id_opt {
79 Ok(Some(id)) => id,
80 Ok(None) => {
81 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) {
82 let did = proxy_header.split('#').next().unwrap_or(proxy_header);
83 if let Some(resolved) = state.did_resolver.resolve_did(did).await {
84 let mut url = format!(
85 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
86 resolved.url.trim_end_matches('/'),
87 urlencoding::encode(&input.repo),
88 urlencoding::encode(&input.collection),
89 urlencoding::encode(&input.rkey)
90 );
91 if let Some(cid) = &input.cid {
92 url.push_str(&format!("&cid={}", urlencoding::encode(cid)));
93 }
94 info!("Proxying getRecord to {}: {}", did, url);
95 match proxy_client().get(&url).send().await {
96 Ok(resp) => {
97 let status = resp.status();
98 let body = match resp.bytes().await {
99 Ok(b) => b,
100 Err(e) => {
101 error!("Error reading proxy response: {:?}", e);
102 return (
103 StatusCode::BAD_GATEWAY,
104 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})),
105 )
106 .into_response();
107 }
108 };
109 return Response::builder()
110 .status(status)
111 .header("content-type", "application/json")
112 .body(axum::body::Body::from(body))
113 .unwrap_or_else(|_| {
114 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
115 .into_response()
116 });
117 }
118 Err(e) => {
119 error!("Error proxying request: {:?}", e);
120 return (
121 StatusCode::BAD_GATEWAY,
122 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})),
123 )
124 .into_response();
125 }
126 }
127 } else {
128 error!("Could not resolve DID from atproto-proxy header: {}", did);
129 return (
130 StatusCode::BAD_GATEWAY,
131 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})),
132 )
133 .into_response();
134 }
135 }
136 return (
137 StatusCode::NOT_FOUND,
138 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
139 )
140 .into_response();
141 }
142 Err(_) => {
143 return (
144 StatusCode::INTERNAL_SERVER_ERROR,
145 Json(json!({"error": "InternalError"})),
146 )
147 .into_response();
148 }
149 };
150 let record_row = sqlx::query!(
151 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
152 user_id,
153 input.collection,
154 input.rkey
155 )
156 .fetch_optional(&state.db)
157 .await;
158 let record_cid_str: String = match record_row {
159 Ok(Some(row)) => row.record_cid,
160 _ => {
161 return (
162 StatusCode::NOT_FOUND,
163 Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
164 )
165 .into_response();
166 }
167 };
168 if let Some(expected_cid) = &input.cid
169 && &record_cid_str != expected_cid
170 {
171 return (
172 StatusCode::NOT_FOUND,
173 Json(json!({"error": "RecordNotFound", "message": "Record CID mismatch"})),
174 )
175 .into_response();
176 }
177 let cid = match Cid::from_str(&record_cid_str) {
178 Ok(c) => c,
179 Err(_) => {
180 return (
181 StatusCode::INTERNAL_SERVER_ERROR,
182 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
183 )
184 .into_response();
185 }
186 };
187 let block = match state.block_store.get(&cid).await {
188 Ok(Some(b)) => b,
189 _ => {
190 return (
191 StatusCode::INTERNAL_SERVER_ERROR,
192 Json(json!({"error": "InternalError", "message": "Record block not found"})),
193 )
194 .into_response();
195 }
196 };
197 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) {
198 Ok(v) => v,
199 Err(e) => {
200 error!("Failed to deserialize record: {:?}", e);
201 return (
202 StatusCode::INTERNAL_SERVER_ERROR,
203 Json(json!({"error": "InternalError"})),
204 )
205 .into_response();
206 }
207 };
208 let value = ipld_to_json(ipld);
209 Json(json!({
210 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
211 "cid": record_cid_str,
212 "value": value
213 }))
214 .into_response()
215}
216#[derive(Deserialize)]
217pub struct ListRecordsInput {
218 pub repo: String,
219 pub collection: String,
220 pub limit: Option<i32>,
221 pub cursor: Option<String>,
222 #[serde(rename = "rkeyStart")]
223 pub rkey_start: Option<String>,
224 #[serde(rename = "rkeyEnd")]
225 pub rkey_end: Option<String>,
226 pub reverse: Option<bool>,
227}
228#[derive(Serialize)]
229pub struct ListRecordsOutput {
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub cursor: Option<String>,
232 pub records: Vec<serde_json::Value>,
233}
234
235pub async fn list_records(
236 State(state): State<AppState>,
237 Query(input): Query<ListRecordsInput>,
238) -> Response {
239 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
240 let user_id_opt = if input.repo.starts_with("did:") {
241 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
242 .fetch_optional(&state.db)
243 .await
244 .map(|opt| opt.map(|r| r.id))
245 } else {
246 let handle = if !input.repo.contains('.') {
247 format!("{}.{}", input.repo, hostname)
248 } else {
249 input.repo.clone()
250 };
251 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
252 .fetch_optional(&state.db)
253 .await
254 .map(|opt| opt.map(|r| r.id))
255 };
256 let user_id: uuid::Uuid = match user_id_opt {
257 Ok(Some(id)) => id,
258 Ok(None) => {
259 return (
260 StatusCode::NOT_FOUND,
261 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
262 )
263 .into_response();
264 }
265 Err(_) => {
266 return (
267 StatusCode::INTERNAL_SERVER_ERROR,
268 Json(json!({"error": "InternalError"})),
269 )
270 .into_response();
271 }
272 };
273 let limit = input.limit.unwrap_or(50).clamp(1, 100);
274 let reverse = input.reverse.unwrap_or(false);
275 let limit_i64 = limit as i64;
276 let order = if reverse { "ASC" } else { "DESC" };
277 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
278 let comparator = if reverse { ">" } else { "<" };
279 let query = format!(
280 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
281 comparator, order
282 );
283 sqlx::query_as(&query)
284 .bind(user_id)
285 .bind(&input.collection)
286 .bind(cursor)
287 .bind(limit_i64)
288 .fetch_all(&state.db)
289 .await
290 } else {
291 let mut conditions = vec!["repo_id = $1", "collection = $2"];
292 let mut param_idx = 3;
293 if input.rkey_start.is_some() {
294 conditions.push("rkey > $3");
295 param_idx += 1;
296 }
297 if input.rkey_end.is_some() {
298 conditions.push(if param_idx == 3 {
299 "rkey < $3"
300 } else {
301 "rkey < $4"
302 });
303 param_idx += 1;
304 }
305 let limit_idx = param_idx;
306 let query = format!(
307 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
308 conditions.join(" AND "),
309 order,
310 limit_idx
311 );
312 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
313 .bind(user_id)
314 .bind(&input.collection);
315 if let Some(start) = &input.rkey_start {
316 query_builder = query_builder.bind(start);
317 }
318 if let Some(end) = &input.rkey_end {
319 query_builder = query_builder.bind(end);
320 }
321 query_builder.bind(limit_i64).fetch_all(&state.db).await
322 };
323 let rows = match rows_res {
324 Ok(r) => r,
325 Err(e) => {
326 error!("Error listing records: {:?}", e);
327 return (
328 StatusCode::INTERNAL_SERVER_ERROR,
329 Json(json!({"error": "InternalError"})),
330 )
331 .into_response();
332 }
333 };
334 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
335 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
336 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
337 for (rkey, cid_str) in &rows {
338 if let Ok(cid) = Cid::from_str(cid_str) {
339 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
340 cids.push(cid);
341 }
342 }
343 let blocks = match state.block_store.get_many(&cids).await {
344 Ok(b) => b,
345 Err(e) => {
346 error!("Error fetching blocks: {:?}", e);
347 return (
348 StatusCode::INTERNAL_SERVER_ERROR,
349 Json(json!({"error": "InternalError"})),
350 )
351 .into_response();
352 }
353 };
354 let mut records = Vec::new();
355 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
356 if let Some(block) = block_opt
357 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
358 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
359 {
360 let value = ipld_to_json(ipld);
361 records.push(json!({
362 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
363 "cid": cid_str,
364 "value": value
365 }));
366 }
367 }
368 Json(ListRecordsOutput {
369 cursor: last_rkey,
370 records,
371 })
372 .into_response()
373}