this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::types::{AtIdentifier, Nsid, Rkey};
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::HeaderMap,
8 response::{IntoResponse, Response},
9};
10use base64::Engine;
11use cid::Cid;
12use ipld_core::ipld::Ipld;
13use jacquard_repo::storage::BlockStore;
14use serde::{Deserialize, Serialize};
15use serde_json::{Map, Value, json};
16use std::str::FromStr;
17use tracing::error;
18
19fn ipld_to_json(ipld: Ipld) -> Value {
20 match ipld {
21 Ipld::Null => Value::Null,
22 Ipld::Bool(b) => Value::Bool(b),
23 Ipld::Integer(i) => {
24 if let Ok(n) = i64::try_from(i) {
25 Value::Number(n.into())
26 } else {
27 Value::String(i.to_string())
28 }
29 }
30 Ipld::Float(f) => serde_json::Number::from_f64(f)
31 .map(Value::Number)
32 .unwrap_or(Value::Null),
33 Ipld::String(s) => Value::String(s),
34 Ipld::Bytes(b) => {
35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b);
36 json!({ "$bytes": encoded })
37 }
38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()),
39 Ipld::Map(map) => {
40 let obj: Map<String, Value> =
41 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect();
42 Value::Object(obj)
43 }
44 Ipld::Link(cid) => json!({ "$link": cid.to_string() }),
45 }
46}
47
48#[derive(Deserialize)]
49pub struct GetRecordInput {
50 pub repo: AtIdentifier,
51 pub collection: Nsid,
52 pub rkey: Rkey,
53 pub cid: Option<String>,
54}
55
56pub async fn get_record(
57 State(state): State<AppState>,
58 _headers: HeaderMap,
59 Query(input): Query<GetRecordInput>,
60) -> Response {
61 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
62 let user_id_opt = if input.repo.is_did() {
63 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
64 .fetch_optional(&state.db)
65 .await
66 .map(|opt| opt.map(|r| r.id))
67 } else {
68 let repo_str = input.repo.as_str();
69 let handle = if !repo_str.contains('.') {
70 format!("{}.{}", repo_str, hostname)
71 } else {
72 repo_str.to_string()
73 };
74 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
75 .fetch_optional(&state.db)
76 .await
77 .map(|opt| opt.map(|r| r.id))
78 };
79 let user_id: uuid::Uuid = match user_id_opt {
80 Ok(Some(id)) => id,
81 Ok(None) => {
82 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response();
83 }
84 Err(_) => {
85 return ApiError::InternalError(None).into_response();
86 }
87 };
88 let record_row = sqlx::query!(
89 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
90 user_id,
91 input.collection.as_str(),
92 input.rkey.as_str()
93 )
94 .fetch_optional(&state.db)
95 .await;
96 let record_cid_str: String = match record_row {
97 Ok(Some(row)) => row.record_cid,
98 _ => {
99 return ApiError::RecordNotFound.into_response();
100 }
101 };
102 if let Some(expected_cid) = &input.cid
103 && &record_cid_str != expected_cid
104 {
105 return ApiError::RecordNotFound.into_response();
106 }
107 let Ok(cid) = Cid::from_str(&record_cid_str) else {
108 return ApiError::InternalError(Some("Invalid CID in DB".into())).into_response();
109 };
110 let block = match state.block_store.get(&cid).await {
111 Ok(Some(b)) => b,
112 _ => {
113 return ApiError::InternalError(Some("Record block not found".into())).into_response();
114 }
115 };
116 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) {
117 Ok(v) => v,
118 Err(e) => {
119 error!("Failed to deserialize record: {:?}", e);
120 return ApiError::InternalError(None).into_response();
121 }
122 };
123 let value = ipld_to_json(ipld);
124 Json(json!({
125 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
126 "cid": record_cid_str,
127 "value": value
128 }))
129 .into_response()
130}
131#[derive(Deserialize)]
132pub struct ListRecordsInput {
133 pub repo: AtIdentifier,
134 pub collection: Nsid,
135 pub limit: Option<i32>,
136 pub cursor: Option<String>,
137 #[serde(rename = "rkeyStart")]
138 pub rkey_start: Option<Rkey>,
139 #[serde(rename = "rkeyEnd")]
140 pub rkey_end: Option<Rkey>,
141 pub reverse: Option<bool>,
142}
143#[derive(Serialize)]
144pub struct ListRecordsOutput {
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub cursor: Option<String>,
147 pub records: Vec<serde_json::Value>,
148}
149
150pub async fn list_records(
151 State(state): State<AppState>,
152 Query(input): Query<ListRecordsInput>,
153) -> Response {
154 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
155 let user_id_opt = if input.repo.is_did() {
156 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
157 .fetch_optional(&state.db)
158 .await
159 .map(|opt| opt.map(|r| r.id))
160 } else {
161 let repo_str = input.repo.as_str();
162 let handle = if !repo_str.contains('.') {
163 format!("{}.{}", repo_str, hostname)
164 } else {
165 repo_str.to_string()
166 };
167 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
168 .fetch_optional(&state.db)
169 .await
170 .map(|opt| opt.map(|r| r.id))
171 };
172 let user_id: uuid::Uuid = match user_id_opt {
173 Ok(Some(id)) => id,
174 Ok(None) => {
175 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response();
176 }
177 Err(_) => {
178 return ApiError::InternalError(None).into_response();
179 }
180 };
181 let limit = input.limit.unwrap_or(50).clamp(1, 100);
182 let reverse = input.reverse.unwrap_or(false);
183 let limit_i64 = limit as i64;
184 let order = if reverse { "ASC" } else { "DESC" };
185 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
186 let comparator = if reverse { ">" } else { "<" };
187 let query = format!(
188 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
189 comparator, order
190 );
191 sqlx::query_as(&query)
192 .bind(user_id)
193 .bind(input.collection.as_str())
194 .bind(cursor)
195 .bind(limit_i64)
196 .fetch_all(&state.db)
197 .await
198 } else {
199 let mut conditions = vec!["repo_id = $1", "collection = $2"];
200 let mut param_idx = 3;
201 if input.rkey_start.is_some() {
202 conditions.push("rkey > $3");
203 param_idx += 1;
204 }
205 if input.rkey_end.is_some() {
206 conditions.push(if param_idx == 3 {
207 "rkey < $3"
208 } else {
209 "rkey < $4"
210 });
211 param_idx += 1;
212 }
213 let limit_idx = param_idx;
214 let query = format!(
215 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
216 conditions.join(" AND "),
217 order,
218 limit_idx
219 );
220 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
221 .bind(user_id)
222 .bind(input.collection.as_str());
223 if let Some(start) = &input.rkey_start {
224 query_builder = query_builder.bind(start.as_str());
225 }
226 if let Some(end) = &input.rkey_end {
227 query_builder = query_builder.bind(end.as_str());
228 }
229 query_builder.bind(limit_i64).fetch_all(&state.db).await
230 };
231 let rows = match rows_res {
232 Ok(r) => r,
233 Err(e) => {
234 error!("Error listing records: {:?}", e);
235 return ApiError::InternalError(None).into_response();
236 }
237 };
238 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
239 let parsed_rows: Vec<(Cid, String, String)> = rows
240 .iter()
241 .filter_map(|(rkey, cid_str)| {
242 Cid::from_str(cid_str)
243 .ok()
244 .map(|cid| (cid, rkey.clone(), cid_str.clone()))
245 })
246 .collect();
247 let cids: Vec<Cid> = parsed_rows.iter().map(|(cid, _, _)| *cid).collect();
248 let blocks = match state.block_store.get_many(&cids).await {
249 Ok(b) => b,
250 Err(e) => {
251 error!("Error fetching blocks: {:?}", e);
252 return ApiError::InternalError(None).into_response();
253 }
254 };
255 let records: Vec<Value> = parsed_rows
256 .iter()
257 .zip(blocks.into_iter())
258 .filter_map(|((_, rkey, cid_str), block_opt)| {
259 block_opt.and_then(|block| {
260 serde_ipld_dagcbor::from_slice::<Ipld>(&block)
261 .ok()
262 .map(|ipld| {
263 json!({
264 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
265 "cid": cid_str,
266 "value": ipld_to_json(ipld)
267 })
268 })
269 })
270 })
271 .collect();
272 Json(ListRecordsOutput {
273 cursor: last_rkey,
274 records,
275 })
276 .into_response()
277}