this repo has no description
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::storage::BlockStore;
10use serde::{Deserialize, Serialize};
11use serde_json::json;
12use sqlx::Row;
13use std::str::FromStr;
14use tracing::error;
15
16#[derive(Deserialize)]
17pub struct GetRecordInput {
18 pub repo: String,
19 pub collection: String,
20 pub rkey: String,
21 pub cid: Option<String>,
22}
23
24pub async fn get_record(
25 State(state): State<AppState>,
26 Query(input): Query<GetRecordInput>,
27) -> Response {
28 let user_row = if input.repo.starts_with("did:") {
29 sqlx::query("SELECT id FROM users WHERE did = $1")
30 .bind(&input.repo)
31 .fetch_optional(&state.db)
32 .await
33 } else {
34 sqlx::query("SELECT id FROM users WHERE handle = $1")
35 .bind(&input.repo)
36 .fetch_optional(&state.db)
37 .await
38 };
39
40 let user_id: uuid::Uuid = match user_row {
41 Ok(Some(row)) => row.get("id"),
42 _ => {
43 return (
44 StatusCode::NOT_FOUND,
45 Json(json!({"error": "NotFound", "message": "Repo not found"})),
46 )
47 .into_response();
48 }
49 };
50
51 let record_row = sqlx::query(
52 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
53 )
54 .bind(user_id)
55 .bind(&input.collection)
56 .bind(&input.rkey)
57 .fetch_optional(&state.db)
58 .await;
59
60 let record_cid_str: String = match record_row {
61 Ok(Some(row)) => row.get("record_cid"),
62 _ => {
63 return (
64 StatusCode::NOT_FOUND,
65 Json(json!({"error": "NotFound", "message": "Record not found"})),
66 )
67 .into_response();
68 }
69 };
70
71 if let Some(expected_cid) = &input.cid {
72 if &record_cid_str != expected_cid {
73 return (
74 StatusCode::NOT_FOUND,
75 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})),
76 )
77 .into_response();
78 }
79 }
80
81 let cid = match Cid::from_str(&record_cid_str) {
82 Ok(c) => c,
83 Err(_) => {
84 return (
85 StatusCode::INTERNAL_SERVER_ERROR,
86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})),
87 )
88 .into_response();
89 }
90 };
91
92 let block = match state.block_store.get(&cid).await {
93 Ok(Some(b)) => b,
94 _ => {
95 return (
96 StatusCode::INTERNAL_SERVER_ERROR,
97 Json(json!({"error": "InternalError", "message": "Record block not found"})),
98 )
99 .into_response();
100 }
101 };
102
103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
104 Ok(v) => v,
105 Err(e) => {
106 error!("Failed to deserialize record: {:?}", e);
107 return (
108 StatusCode::INTERNAL_SERVER_ERROR,
109 Json(json!({"error": "InternalError"})),
110 )
111 .into_response();
112 }
113 };
114
115 Json(json!({
116 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
117 "cid": record_cid_str,
118 "value": value
119 }))
120 .into_response()
121}
122
123#[derive(Deserialize)]
124pub struct ListRecordsInput {
125 pub repo: String,
126 pub collection: String,
127 pub limit: Option<i32>,
128 pub cursor: Option<String>,
129 #[serde(rename = "rkeyStart")]
130 pub rkey_start: Option<String>,
131 #[serde(rename = "rkeyEnd")]
132 pub rkey_end: Option<String>,
133 pub reverse: Option<bool>,
134}
135
136#[derive(Serialize)]
137pub struct ListRecordsOutput {
138 pub cursor: Option<String>,
139 pub records: Vec<serde_json::Value>,
140}
141
142pub async fn list_records(
143 State(state): State<AppState>,
144 Query(input): Query<ListRecordsInput>,
145) -> Response {
146 let user_row = if input.repo.starts_with("did:") {
147 sqlx::query("SELECT id FROM users WHERE did = $1")
148 .bind(&input.repo)
149 .fetch_optional(&state.db)
150 .await
151 } else {
152 sqlx::query("SELECT id FROM users WHERE handle = $1")
153 .bind(&input.repo)
154 .fetch_optional(&state.db)
155 .await
156 };
157
158 let user_id: uuid::Uuid = match user_row {
159 Ok(Some(row)) => row.get("id"),
160 _ => {
161 return (
162 StatusCode::NOT_FOUND,
163 Json(json!({"error": "NotFound", "message": "Repo not found"})),
164 )
165 .into_response();
166 }
167 };
168
169 let limit = input.limit.unwrap_or(50).clamp(1, 100);
170 let reverse = input.reverse.unwrap_or(false);
171
172 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
173 // TODO: Implement rkeyStart/End and correct cursor logic
174
175 let query_str = format!(
176 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}",
177 if let Some(_c) = &input.cursor {
178 if reverse {
179 "AND rkey < $3"
180 } else {
181 "AND rkey > $3"
182 }
183 } else {
184 ""
185 },
186 if reverse { "DESC" } else { "ASC" },
187 limit
188 );
189
190 let mut query = sqlx::query(&query_str)
191 .bind(user_id)
192 .bind(&input.collection);
193
194 if let Some(c) = &input.cursor {
195 query = query.bind(c);
196 }
197
198 let rows = match query.fetch_all(&state.db).await {
199 Ok(r) => r,
200 Err(e) => {
201 error!("Error listing records: {:?}", e);
202 return (
203 StatusCode::INTERNAL_SERVER_ERROR,
204 Json(json!({"error": "InternalError"})),
205 )
206 .into_response();
207 }
208 };
209
210 let mut records = Vec::new();
211 let mut last_rkey = None;
212
213 for row in rows {
214 let rkey: String = row.get("rkey");
215 let cid_str: String = row.get("record_cid");
216 last_rkey = Some(rkey.clone());
217
218 if let Ok(cid) = Cid::from_str(&cid_str) {
219 if let Ok(Some(block)) = state.block_store.get(&cid).await {
220 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
221 records.push(json!({
222 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
223 "cid": cid_str,
224 "value": value
225 }));
226 }
227 }
228 }
229 }
230
231 Json(ListRecordsOutput {
232 cursor: last_rkey,
233 records,
234 })
235 .into_response()
236}