this repo has no description
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::types::{AtIdentifier, Nsid, Rkey};
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::HeaderMap,
8 response::{IntoResponse, Response},
9};
10use base64::Engine;
11use cid::Cid;
12use ipld_core::ipld::Ipld;
13use jacquard_repo::storage::BlockStore;
14use serde::{Deserialize, Serialize};
15use serde_json::{Map, Value, json};
16use std::collections::HashMap;
17use std::str::FromStr;
18use tracing::error;
19
20fn ipld_to_json(ipld: Ipld) -> Value {
21 match ipld {
22 Ipld::Null => Value::Null,
23 Ipld::Bool(b) => Value::Bool(b),
24 Ipld::Integer(i) => {
25 if let Ok(n) = i64::try_from(i) {
26 Value::Number(n.into())
27 } else {
28 Value::String(i.to_string())
29 }
30 }
31 Ipld::Float(f) => serde_json::Number::from_f64(f)
32 .map(Value::Number)
33 .unwrap_or(Value::Null),
34 Ipld::String(s) => Value::String(s),
35 Ipld::Bytes(b) => {
36 let encoded = base64::engine::general_purpose::STANDARD.encode(&b);
37 json!({ "$bytes": encoded })
38 }
39 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()),
40 Ipld::Map(map) => {
41 let obj: Map<String, Value> =
42 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect();
43 Value::Object(obj)
44 }
45 Ipld::Link(cid) => json!({ "$link": cid.to_string() }),
46 }
47}
48
49#[derive(Deserialize)]
50pub struct GetRecordInput {
51 pub repo: AtIdentifier,
52 pub collection: Nsid,
53 pub rkey: Rkey,
54 pub cid: Option<String>,
55}
56
57pub async fn get_record(
58 State(state): State<AppState>,
59 _headers: HeaderMap,
60 Query(input): Query<GetRecordInput>,
61) -> Response {
62 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
63 let user_id_opt = if input.repo.is_did() {
64 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
65 .fetch_optional(&state.db)
66 .await
67 .map(|opt| opt.map(|r| r.id))
68 } else {
69 let repo_str = input.repo.as_str();
70 let handle = if !repo_str.contains('.') {
71 format!("{}.{}", repo_str, hostname)
72 } else {
73 repo_str.to_string()
74 };
75 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
76 .fetch_optional(&state.db)
77 .await
78 .map(|opt| opt.map(|r| r.id))
79 };
80 let user_id: uuid::Uuid = match user_id_opt {
81 Ok(Some(id)) => id,
82 Ok(None) => {
83 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response();
84 }
85 Err(_) => {
86 return ApiError::InternalError(None).into_response();
87 }
88 };
89 let record_row = sqlx::query!(
90 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
91 user_id,
92 input.collection.as_str(),
93 input.rkey.as_str()
94 )
95 .fetch_optional(&state.db)
96 .await;
97 let record_cid_str: String = match record_row {
98 Ok(Some(row)) => row.record_cid,
99 _ => {
100 return ApiError::RecordNotFound.into_response();
101 }
102 };
103 if let Some(expected_cid) = &input.cid
104 && &record_cid_str != expected_cid
105 {
106 return ApiError::RecordNotFound.into_response();
107 }
108 let Ok(cid) = Cid::from_str(&record_cid_str) else {
109 return ApiError::InternalError(Some("Invalid CID in DB".into())).into_response();
110 };
111 let block = match state.block_store.get(&cid).await {
112 Ok(Some(b)) => b,
113 _ => {
114 return ApiError::InternalError(Some("Record block not found".into())).into_response();
115 }
116 };
117 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) {
118 Ok(v) => v,
119 Err(e) => {
120 error!("Failed to deserialize record: {:?}", e);
121 return ApiError::InternalError(None).into_response();
122 }
123 };
124 let value = ipld_to_json(ipld);
125 Json(json!({
126 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
127 "cid": record_cid_str,
128 "value": value
129 }))
130 .into_response()
131}
132#[derive(Deserialize)]
133pub struct ListRecordsInput {
134 pub repo: AtIdentifier,
135 pub collection: Nsid,
136 pub limit: Option<i32>,
137 pub cursor: Option<String>,
138 #[serde(rename = "rkeyStart")]
139 pub rkey_start: Option<Rkey>,
140 #[serde(rename = "rkeyEnd")]
141 pub rkey_end: Option<Rkey>,
142 pub reverse: Option<bool>,
143}
144#[derive(Serialize)]
145pub struct ListRecordsOutput {
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub cursor: Option<String>,
148 pub records: Vec<serde_json::Value>,
149}
150
151pub async fn list_records(
152 State(state): State<AppState>,
153 Query(input): Query<ListRecordsInput>,
154) -> Response {
155 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
156 let user_id_opt = if input.repo.is_did() {
157 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str())
158 .fetch_optional(&state.db)
159 .await
160 .map(|opt| opt.map(|r| r.id))
161 } else {
162 let repo_str = input.repo.as_str();
163 let handle = if !repo_str.contains('.') {
164 format!("{}.{}", repo_str, hostname)
165 } else {
166 repo_str.to_string()
167 };
168 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle)
169 .fetch_optional(&state.db)
170 .await
171 .map(|opt| opt.map(|r| r.id))
172 };
173 let user_id: uuid::Uuid = match user_id_opt {
174 Ok(Some(id)) => id,
175 Ok(None) => {
176 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response();
177 }
178 Err(_) => {
179 return ApiError::InternalError(None).into_response();
180 }
181 };
182 let limit = input.limit.unwrap_or(50).clamp(1, 100);
183 let reverse = input.reverse.unwrap_or(false);
184 let limit_i64 = limit as i64;
185 let order = if reverse { "ASC" } else { "DESC" };
186 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor {
187 let comparator = if reverse { ">" } else { "<" };
188 let query = format!(
189 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4",
190 comparator, order
191 );
192 sqlx::query_as(&query)
193 .bind(user_id)
194 .bind(input.collection.as_str())
195 .bind(cursor)
196 .bind(limit_i64)
197 .fetch_all(&state.db)
198 .await
199 } else {
200 let mut conditions = vec!["repo_id = $1", "collection = $2"];
201 let mut param_idx = 3;
202 if input.rkey_start.is_some() {
203 conditions.push("rkey > $3");
204 param_idx += 1;
205 }
206 if input.rkey_end.is_some() {
207 conditions.push(if param_idx == 3 {
208 "rkey < $3"
209 } else {
210 "rkey < $4"
211 });
212 param_idx += 1;
213 }
214 let limit_idx = param_idx;
215 let query = format!(
216 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}",
217 conditions.join(" AND "),
218 order,
219 limit_idx
220 );
221 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query)
222 .bind(user_id)
223 .bind(input.collection.as_str());
224 if let Some(start) = &input.rkey_start {
225 query_builder = query_builder.bind(start.as_str());
226 }
227 if let Some(end) = &input.rkey_end {
228 query_builder = query_builder.bind(end.as_str());
229 }
230 query_builder.bind(limit_i64).fetch_all(&state.db).await
231 };
232 let rows = match rows_res {
233 Ok(r) => r,
234 Err(e) => {
235 error!("Error listing records: {:?}", e);
236 return ApiError::InternalError(None).into_response();
237 }
238 };
239 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone());
240 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new();
241 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
242 for (rkey, cid_str) in &rows {
243 if let Ok(cid) = Cid::from_str(cid_str) {
244 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone()));
245 cids.push(cid);
246 }
247 }
248 let blocks = match state.block_store.get_many(&cids).await {
249 Ok(b) => b,
250 Err(e) => {
251 error!("Error fetching blocks: {:?}", e);
252 return ApiError::InternalError(None).into_response();
253 }
254 };
255 let mut records = Vec::new();
256 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) {
257 if let Some(block) = block_opt
258 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid)
259 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block)
260 {
261 let value = ipld_to_json(ipld);
262 records.push(json!({
263 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
264 "cid": cid_str,
265 "value": value
266 }));
267 }
268 }
269 Json(ListRecordsOutput {
270 cursor: last_rkey,
271 records,
272 })
273 .into_response()
274}