this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use std::str::FromStr;
13use tracing::error;
14
15#[derive(Deserialize)]
16pub struct GetRecordInput {
17 pub repo: String,
18 pub collection: String,
19 pub rkey: String,
20 pub cid: Option<String>,
21}
22
23pub async fn get_record(
24 State(state): State<AppState>,
25 Query(input): Query<GetRecordInput>,
26) -> Response {
27 let user_id_opt = if input.repo.starts_with("did:") {
28 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
29 .fetch_optional(&state.db)
30 .await
31 .map(|opt| opt.map(|r| r.id))
32 } else {
33 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo)
34 .fetch_optional(&state.db)
35 .await
36 .map(|opt| opt.map(|r| r.id))
37 };
38
39 let user_id: uuid::Uuid = match user_id_opt {
40 Ok(Some(id)) => id,
41 _ => {
42 return (
43 StatusCode::NOT_FOUND,
44 Json(json!({"error": "NotFound", "message": "Repo not found"})),
45 )
46 .into_response();
47 }
48 };
49
50 let record_row = sqlx::query!(
51 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
52 user_id,
53 input.collection,
54 input.rkey
55 )
56 .fetch_optional(&state.db)
57 .await;
58
59 let record_cid_str: String = match record_row {
60 Ok(Some(row)) => row.record_cid,
61 _ => {
62 return (
63 StatusCode::NOT_FOUND,
64 Json(json!({"error": "NotFound", "message": "Record not found"})),
65 )
66 .into_response();
67 }
68 };
69
70 if let Some(expected_cid) = &input.cid {
71 if &record_cid_str != expected_cid {
72 return (
73 StatusCode::NOT_FOUND,
74 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
75 )
76 .into_response();
77 }
78 }
79
80 let cid = match Cid::from_str(&record_cid_str) {
81 Ok(c) => c,
82 Err(_) => {
83 return (
84 StatusCode::INTERNAL_SERVER_ERROR,
85 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
86 )
87 .into_response();
88 }
89 };
90
91 let block = match state.block_store.get(&cid).await {
92 Ok(Some(b)) => b,
93 _ => {
94 return (
95 StatusCode::INTERNAL_SERVER_ERROR,
96 Json(json!({"error": "InternalError", "message": "Record block not found"})),
97 )
98 .into_response();
99 }
100 };
101
102 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
103 Ok(v) => v,
104 Err(e) => {
105 error!("Failed to deserialize record: {:?}", e);
106 return (
107 StatusCode::INTERNAL_SERVER_ERROR,
108 Json(json!({"error": "InternalError"})),
109 )
110 .into_response();
111 }
112 };
113
114 Json(json!({
115 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
116 "cid": record_cid_str,
117 "value": value
118 }))
119 .into_response()
120}
121
122#[derive(Deserialize)]
123pub struct ListRecordsInput {
124 pub repo: String,
125 pub collection: String,
126 pub limit: Option<i32>,
127 pub cursor: Option<String>,
128 #[serde(rename = "rkeyStart")]
129 pub rkey_start: Option<String>,
130 #[serde(rename = "rkeyEnd")]
131 pub rkey_end: Option<String>,
132 pub reverse: Option<bool>,
133}
134
135#[derive(Serialize)]
136pub struct ListRecordsOutput {
137 pub cursor: Option<String>,
138 pub records: Vec<serde_json::Value>,
139}
140
141pub async fn list_records(
142 State(state): State<AppState>,
143 Query(input): Query<ListRecordsInput>,
144) -> Response {
145 let user_id_opt = if input.repo.starts_with("did:") {
146 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo)
147 .fetch_optional(&state.db)
148 .await
149 .map(|opt| opt.map(|r| r.id))
150 } else {
151 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo)
152 .fetch_optional(&state.db)
153 .await
154 .map(|opt| opt.map(|r| r.id))
155 };
156
157 let user_id: uuid::Uuid = match user_id_opt {
158 Ok(Some(id)) => id,
159 _ => {
160 return (
161 StatusCode::NOT_FOUND,
162 Json(json!({"error": "NotFound", "message": "Repo not found"})),
163 )
164 .into_response();
165 }
166 };
167
168 let limit = input.limit.unwrap_or(50).clamp(1, 100);
169 let reverse = input.reverse.unwrap_or(false);
170
171 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
172 // TODO: Implement rkeyStart/End and correct cursor logic
173
174 let limit_i64 = limit as i64;
175 let rows_res = if let Some(cursor) = &input.cursor {
176 if reverse {
177 sqlx::query!(
178 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey < $3 ORDER BY rkey DESC LIMIT $4",
179 user_id,
180 input.collection,
181 cursor,
182 limit_i64
183 )
184 .fetch_all(&state.db)
185 .await
186 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
187 } else {
188 sqlx::query!(
189 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey > $3 ORDER BY rkey ASC LIMIT $4",
190 user_id,
191 input.collection,
192 cursor,
193 limit_i64
194 )
195 .fetch_all(&state.db)
196 .await
197 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
198 }
199 } else {
200 if reverse {
201 sqlx::query!(
202 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey DESC LIMIT $3",
203 user_id,
204 input.collection,
205 limit_i64
206 )
207 .fetch_all(&state.db)
208 .await
209 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
210 } else {
211 sqlx::query!(
212 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey ASC LIMIT $3",
213 user_id,
214 input.collection,
215 limit_i64
216 )
217 .fetch_all(&state.db)
218 .await
219 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>())
220 }
221 };
222
223 let rows = match rows_res {
224 Ok(r) => r,
225 Err(e) => {
226 error!("Error listing records: {:?}", e);
227 return (
228 StatusCode::INTERNAL_SERVER_ERROR,
229 Json(json!({"error": "InternalError"})),
230 )
231 .into_response();
232 }
233 };
234
235 let mut records = Vec::new();
236 let mut last_rkey = None;
237
238 for (rkey, cid_str) in rows {
239 last_rkey = Some(rkey.clone());
240
241 if let Ok(cid) = Cid::from_str(&cid_str) {
242 if let Ok(Some(block)) = state.block_store.get(&cid).await {
243 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
244 records.push(json!({
245 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
246 "cid": cid_str,
247 "value": value
248 }));
249 }
250 }
251 }
252 }
253
254 Json(ListRecordsOutput {
255 cursor: last_rkey,
256 records,
257 })
258 .into_response()
259}