this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::str::FromStr; 13use tracing::error; 14 15#[derive(Deserialize)] 16pub struct GetRecordInput { 17 pub repo: String, 18 pub collection: String, 19 pub rkey: String, 20 pub cid: Option<String>, 21} 22 23pub async fn get_record( 24 State(state): State<AppState>, 25 Query(input): Query<GetRecordInput>, 26) -> Response { 27 let user_id_opt = if input.repo.starts_with("did:") { 28 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 29 .fetch_optional(&state.db) 30 .await 31 .map(|opt| opt.map(|r| r.id)) 32 } else { 33 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 34 .fetch_optional(&state.db) 35 .await 36 .map(|opt| opt.map(|r| r.id)) 37 }; 38 39 let user_id: uuid::Uuid = match user_id_opt { 40 Ok(Some(id)) => id, 41 _ => { 42 return ( 43 StatusCode::NOT_FOUND, 44 Json(json!({"error": "NotFound", "message": "Repo not found"})), 45 ) 46 .into_response(); 47 } 48 }; 49 50 let record_row = sqlx::query!( 51 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 52 user_id, 53 input.collection, 54 input.rkey 55 ) 56 .fetch_optional(&state.db) 57 .await; 58 59 let record_cid_str: String = match record_row { 60 Ok(Some(row)) => row.record_cid, 61 _ => { 62 return ( 63 StatusCode::NOT_FOUND, 64 Json(json!({"error": "NotFound", "message": "Record not found"})), 65 ) 66 .into_response(); 67 } 68 }; 69 70 if let Some(expected_cid) = &input.cid { 71 if &record_cid_str != expected_cid { 72 return ( 73 StatusCode::NOT_FOUND, 74 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 75 ) 76 .into_response(); 77 } 78 } 79 80 let cid = match Cid::from_str(&record_cid_str) { 81 Ok(c) => c, 82 Err(_) => { 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 86 ) 87 .into_response(); 88 } 89 }; 90 91 let block = match state.block_store.get(&cid).await { 92 Ok(Some(b)) => b, 93 _ => { 94 return ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError", "message": "Record block not found"})), 97 ) 98 .into_response(); 99 } 100 }; 101 102 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 103 Ok(v) => v, 104 Err(e) => { 105 error!("Failed to deserialize record: {:?}", e); 106 return ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError"})), 109 ) 110 .into_response(); 111 } 112 }; 113 114 Json(json!({ 115 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 116 "cid": record_cid_str, 117 "value": value 118 })) 119 .into_response() 120} 121 122#[derive(Deserialize)] 123pub struct ListRecordsInput { 124 pub repo: String, 125 pub collection: String, 126 pub limit: Option<i32>, 127 pub cursor: Option<String>, 128 #[serde(rename = "rkeyStart")] 129 pub rkey_start: Option<String>, 130 #[serde(rename = "rkeyEnd")] 131 pub rkey_end: Option<String>, 132 pub reverse: Option<bool>, 133} 134 135#[derive(Serialize)] 136pub struct ListRecordsOutput { 137 pub cursor: Option<String>, 138 pub records: Vec<serde_json::Value>, 139} 140 141pub async fn list_records( 142 State(state): State<AppState>, 143 Query(input): Query<ListRecordsInput>, 144) -> Response { 145 let user_id_opt = if input.repo.starts_with("did:") { 146 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 147 .fetch_optional(&state.db) 148 .await 149 .map(|opt| opt.map(|r| r.id)) 150 } else { 151 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 152 .fetch_optional(&state.db) 153 .await 154 .map(|opt| opt.map(|r| r.id)) 155 }; 156 157 let user_id: uuid::Uuid = match user_id_opt { 158 Ok(Some(id)) => id, 159 _ => { 160 return ( 161 StatusCode::NOT_FOUND, 162 Json(json!({"error": "NotFound", "message": "Repo not found"})), 163 ) 164 .into_response(); 165 } 166 }; 167 168 let limit = input.limit.unwrap_or(50).clamp(1, 100); 169 let reverse = input.reverse.unwrap_or(false); 170 let limit_i64 = limit as i64; 171 let order = if reverse { "ASC" } else { "DESC" }; 172 173 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 174 let comparator = if reverse { ">" } else { "<" }; 175 let query = format!( 176 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 177 comparator, order 178 ); 179 sqlx::query_as(&query) 180 .bind(user_id) 181 .bind(&input.collection) 182 .bind(cursor) 183 .bind(limit_i64) 184 .fetch_all(&state.db) 185 .await 186 } else { 187 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 188 let mut param_idx = 3; 189 190 if input.rkey_start.is_some() { 191 conditions.push("rkey > $3"); 192 param_idx += 1; 193 } 194 195 if input.rkey_end.is_some() { 196 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" }); 197 param_idx += 1; 198 } 199 200 let limit_idx = param_idx; 201 202 let query = format!( 203 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 204 conditions.join(" AND "), 205 order, 206 limit_idx 207 ); 208 209 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 210 .bind(user_id) 211 .bind(&input.collection); 212 213 if let Some(start) = &input.rkey_start { 214 query_builder = query_builder.bind(start); 215 } 216 if let Some(end) = &input.rkey_end { 217 query_builder = query_builder.bind(end); 218 } 219 220 query_builder.bind(limit_i64).fetch_all(&state.db).await 221 }; 222 223 let rows = match rows_res { 224 Ok(r) => r, 225 Err(e) => { 226 error!("Error listing records: {:?}", e); 227 return ( 228 StatusCode::INTERNAL_SERVER_ERROR, 229 Json(json!({"error": "InternalError"})), 230 ) 231 .into_response(); 232 } 233 }; 234 235 let mut records = Vec::new(); 236 let mut last_rkey = None; 237 238 for (rkey, cid_str) in rows { 239 last_rkey = Some(rkey.clone()); 240 241 if let Ok(cid) = Cid::from_str(&cid_str) { 242 if let Ok(Some(block)) = state.block_store.get(&cid).await { 243 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 244 records.push(json!({ 245 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 246 "cid": cid_str, 247 "value": value 248 })); 249 } 250 } 251 } 252 } 253 254 Json(ListRecordsOutput { 255 cursor: last_rkey, 256 records, 257 }) 258 .into_response() 259}