this repo has no description
1use crate::api::proxy_client::proxy_client;
2use crate::state::AppState;
3use axum::{
4 Json,
5 extract::{Query, State},
6 http::{HeaderMap, StatusCode},
7 response::{IntoResponse, Response},
8};
9use base64::Engine;
10use cid::Cid;
11use ipld_core::ipld::Ipld;
12use jacquard_repo::storage::BlockStore;
13use serde::{Deserialize, Serialize};
14use serde_json::{json, Map, Value};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{error, info};
18
19fn ipld_to_json(ipld: Ipld) -> Value {
20 match ipld {
21 Ipld::Null => Value::Null,
22 Ipld::Bool(b) => Value::Bool(b),
23 Ipld::Integer(i) => {
24 if let Ok(n) = i64::try_from(i) {
25 Value::Number(n.into())
26 } else {
27 Value::String(i.to_string())
28 }
29 }
30 Ipld::Float(f) => serde_json::Number::from_f64(f)
31 .map(Value::Number)
32 .unwrap_or(Value::Null),
33 Ipld::String(s) => Value::String(s),
34 Ipld::Bytes(b) => {
35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b);
36 json!({ "$bytes": encoded })
37 }
38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()),
39 Ipld::Map(map) => {
40 let obj: Map<String, Value> = map
41 .into_iter()
42 .map(|(k, v)| (k, ipld_to_json(v)))
43 .collect();
44 Value::Object(obj)
45 }
46 Ipld::Link(cid) => json!({ "$link": cid.to_string() }),
47 }
48}
49
50#[derive(Deserialize)]
51pub struct GetRecordInput {
52 pub repo: String,
53 pub collection: String,
54 pub rkey: String,
55 pub cid: Option<String>,
56}
57
58pub async fn get_record(
59 State(state): State<AppState>,
60 headers: HeaderMap,
61 Query(input): Query<GetRecordInput>,
62) -> Response {
63 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
64 let user_id_opt = if input.repo.starts_with("did:") {
65 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
66 .fetch_optional(&state.db)
67 .await
68 .map(|opt| opt.map(|r| r.id))
69 } else {
70 let handle = if !input.repo.contains('.') {
71 format!("{}.{}", input.repo, hostname)
72 } else {
73 input.repo.clone()
74 };
75 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
76 .fetch_optional(&state.db)
77 .await
78 .map(|opt| opt.map(|r| r.id))
79 };
80 let user_id: uuid::Uuid = match user_id_opt {
81 Ok(Some(id)) => id,
82 Ok(None) => {
83 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) {
84 let did = proxy_header.split('#').next().unwrap_or(proxy_header);
85 if let Some(resolved) = state.did_resolver.resolve_did(did).await {
86 let mut url = format!(
87 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
88 resolved.url.trim_end_matches('/'),
89 urlencoding::encode(&input.repo),
90 urlencoding::encode(&input.collection),
91 urlencoding::encode(&input.rkey)
92 );
93 if let Some(cid) = &input.cid {
94 url.push_str(&format!("&cid={}", urlencoding::encode(cid)));
95 }
96 info!("Proxying getRecord to {}: {}", did, url);
97 match proxy_client().get(&url).send().await {
98 Ok(resp) => {
99 let status = resp.status();
100 let body = match resp.bytes().await {
101 Ok(b) => b,
102 Err(e) => {
103 error!("Error reading proxy response: {:?}", e);
104 return (
105 StatusCode::BAD_GATEWAY,
106 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})),
107 )
108 .into_response();
109 }
110 };
111 return Response::builder()
112 .status(status)
113 .header("content-type", "application/json")
114 .body(axum::body::Body::from(body))
115 .unwrap_or_else(|_| {
116 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
117 .into_response()
118 });
119 }
120 Err(e) => {
121 error!("Error proxying request: {:?}", e);
122 return (
123 StatusCode::BAD_GATEWAY,
124 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})),
125 )
126 .into_response();
127 }
128 }
129 } else {
130 error!("Could not resolve DID from atproto-proxy header: {}", did);
131 return (
132 StatusCode::BAD_GATEWAY,
133 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})),
134 )
135 .into_response();
136 }
137 }
138 return (
139 StatusCode::NOT_FOUND,
140 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
141 )
142 .into_response();
143 }
144 Err(_) => {
145 return (
146 StatusCode::INTERNAL_SERVER_ERROR,
147 Json(json!({"error": "InternalError"})),
148 )
149 .into_response();
150 }
151 };
152 let record_row = sqlx::query!(
153 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
154 user_id,
155 input.collection,
156 input.rkey
157 )
158 .fetch_optional(&state.db)
159 .await;
160 let record_cid_str: String = match record_row {
161 Ok(Some(row)) => row.record_cid,
162 _ => {
163 return (
164 StatusCode::NOT_FOUND,
165 Json(json!({"error": "NotFound", "message": "Record not found"})),
166 )
167 .into_response();
168 }
169 };
170 if let Some(expected_cid) = &input.cid
171 && &record_cid_str != expected_cid
172 {
173 return (
174 StatusCode::NOT_FOUND,
175 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
176 )
177 .into_response();
178 }
179 let cid = match Cid::from_str(&record_cid_str) {
180 Ok(c) => c,
181 Err(_) => {
182 return (
183 StatusCode::INTERNAL_SERVER_ERROR,
184 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
185 )
186 .into_response();
187 }
188 };
189 let block = match state.block_store.get(&cid).await {
190 Ok(Some(b)) => b,
191 _ => {
192 return (
193 StatusCode::INTERNAL_SERVER_ERROR,
194 Json(json!({"error": "InternalError", "message": "Record block not found"})),
195 )
196 .into_response();
197 }
198 };
199 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) {
200 Ok(v) => v,
201 Err(e) => {
202 error!("Failed to deserialize record: {:?}", e);
203 return (
204 StatusCode::INTERNAL_SERVER_ERROR,
205 Json(json!({"error": "InternalError"})),
206 )
207 .into_response();
208 }
209 };
210 let value = ipld_to_json(ipld);
211 Json(json!({
212 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
213 "cid": record_cid_str,
214 "value": value
215 }))
216 .into_response()
217}
218#[derive(Deserialize)]
219pub struct ListRecordsInput {
220 pub repo: String,
221 pub collection: String,
222 pub limit: Option<i32>,
223 pub cursor: Option<String>,
224 #[serde(rename = "rkeyStart")]
225 pub rkey_start: Option<String>,
226 #[serde(rename = "rkeyEnd")]
227 pub rkey_end: Option<String>,
228 pub reverse: Option<bool>,
229}
230#[derive(Serialize)]
231pub struct ListRecordsOutput {
232 #[serde(skip_serializing_if = "Option::is_none")]
233 pub cursor: Option<String>,
234 pub records: Vec<serde_json::Value>,
235}
236
237pub async fn list_records(
238 State(state): State<AppState>,
239 Query(input): Query<ListRecordsInput>,
240) -> Response {
241 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
242 let user_id_opt = if input.repo.starts_with("did:") {
243 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
244 .fetch_optional(&state.db)
245 .await
246 .map(|opt| opt.map(|r| r.id))
247 } else {
248 let handle = if !input.repo.contains('.') {
249 format!("{}.{}", input.repo, hostname)
250 } else {
251 input.repo.clone()
252 };
253 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
254 .fetch_optional(&state.db)
255 .await
256 .map(|opt| opt.map(|r| r.id))
257 };
258 let user_id: uuid::Uuid = match user_id_opt {
259 Ok(Some(id)) => id,
260 Ok(None) => {
261 return (
262 StatusCode::NOT_FOUND,
263 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})),
264 )
265 .into_response();
266 }
267 Err(_) => {
268 return (
269 StatusCode::INTERNAL_SERVER_ERROR,
270 Json(json!({"error": "InternalError"})),
271 )
272 .into_response();
273 }
274 };
275 let limit = input.limit.unwrap_or(50).clamp(1, 100);
276 let reverse = input.reverse.unwrap_or(false);
277 let limit_i64 = limit as i64;
278 let order = if reverse { "ASC" } else { "DESC" };
279 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
280 let comparator = if reverse { ">" } else { "<" };
281 let query = format!(
282 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
283 comparator, order
284 );
285 sqlx::query_as(&query)
286 .bind(user_id)
287 .bind(&input.collection)
288 .bind(cursor)
289 .bind(limit_i64)
290 .fetch_all(&state.db)
291 .await
292 } else {
293 let mut conditions = vec!["repo_id = $1", "collection = $2"];
294 let mut param_idx = 3;
295 if input.rkey_start.is_some() {
296 conditions.push("rkey > $3");
297 param_idx += 1;
298 }
299 if input.rkey_end.is_some() {
300 conditions.push(if param_idx == 3 {
301 "rkey < $3"
302 } else {
303 "rkey < $4"
304 });
305 param_idx += 1;
306 }
307 let limit_idx = param_idx;
308 let query = format!(
309 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
310 conditions.join(" AND "),
311 order,
312 limit_idx
313 );
314 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
315 .bind(user_id)
316 .bind(&input.collection);
317 if let Some(start) = &input.rkey_start {
318 query_builder = query_builder.bind(start);
319 }
320 if let Some(end) = &input.rkey_end {
321 query_builder = query_builder.bind(end);
322 }
323 query_builder.bind(limit_i64).fetch_all(&state.db).await
324 };
325 let rows = match rows_res {
326 Ok(r) => r,
327 Err(e) => {
328 error!("Error listing records: {:?}", e);
329 return (
330 StatusCode::INTERNAL_SERVER_ERROR,
331 Json(json!({"error": "InternalError"})),
332 )
333 .into_response();
334 }
335 };
336 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
337 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
338 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
339 for (rkey, cid_str) in &rows {
340 if let Ok(cid) = Cid::from_str(cid_str) {
341 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
342 cids.push(cid);
343 }
344 }
345 let blocks = match state.block_store.get_many(&cids).await {
346 Ok(b) => b,
347 Err(e) => {
348 error!("Error fetching blocks: {:?}", e);
349 return (
350 StatusCode::INTERNAL_SERVER_ERROR,
351 Json(json!({"error": "InternalError"})),
352 )
353 .into_response();
354 }
355 };
356 let mut records = Vec::new();
357 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
358 if let Some(block) = block_opt
359 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
360 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
361 {
362 let value = ipld_to_json(ipld);
363 records.push(json!({
364 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
365 "cid": cid_str,
366 "value": value
367 }));
368 }
369 }
370 Json(ListRecordsOutput {
371 cursor: last_rkey,
372 records,
373 })
374 .into_response()
375}