this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let user_id_opt = if input.repo.starts_with("did:") { 29 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 30 .fetch_optional(&state.db) 31 .await 32 .map(|opt| opt.map(|r| r.id)) 33 } else { 34 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 35 .fetch_optional(&state.db) 36 .await 37 .map(|opt| opt.map(|r| r.id)) 38 }; 39 40 let user_id: uuid::Uuid = match user_id_opt { 41 Ok(Some(id)) => id, 42 _ => { 43 return ( 44 StatusCode::NOT_FOUND, 45 Json(json!({"error": "NotFound", "message": "Repo not found"})), 46 ) 47 .into_response(); 48 } 49 }; 50 51 let record_row = sqlx::query!( 52 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 53 user_id, 54 input.collection, 55 input.rkey 56 ) 57 .fetch_optional(&state.db) 58 .await; 59 60 let record_cid_str: String = match record_row { 61 Ok(Some(row)) => row.record_cid, 62 _ => { 63 return ( 64 StatusCode::NOT_FOUND, 65 Json(json!({"error": "NotFound", "message": "Record not found"})), 66 ) 67 .into_response(); 68 } 69 }; 70 71 if let Some(expected_cid) = &input.cid { 72 if &record_cid_str != expected_cid { 73 return ( 74 StatusCode::NOT_FOUND, 75 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 76 ) 77 .into_response(); 78 } 79 } 80 81 let cid = match Cid::from_str(&record_cid_str) { 82 Ok(c) => c, 83 Err(_) => { 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 87 ) 88 .into_response(); 89 } 90 }; 91 92 let block = match state.block_store.get(&cid).await { 93 Ok(Some(b)) => b, 94 _ => { 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError", "message": "Record block not found"})), 98 ) 99 .into_response(); 100 } 101 }; 102 103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 104 Ok(v) => v, 105 Err(e) => { 106 error!("Failed to deserialize record: {:?}", e); 107 return ( 108 StatusCode::INTERNAL_SERVER_ERROR, 109 Json(json!({"error": "InternalError"})), 110 ) 111 .into_response(); 112 } 113 }; 114 115 Json(json!({ 116 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 117 "cid": record_cid_str, 118 "value": value 119 })) 120 .into_response() 121} 122 123#[derive(Deserialize)] 124pub struct ListRecordsInput { 125 pub repo: String, 126 pub collection: String, 127 pub limit: Option<i32>, 128 pub cursor: Option<String>, 129 #[serde(rename = "rkeyStart")] 130 pub rkey_start: Option<String>, 131 #[serde(rename = "rkeyEnd")] 132 pub rkey_end: Option<String>, 133 pub reverse: Option<bool>, 134} 135 136#[derive(Serialize)] 137pub struct ListRecordsOutput { 138 pub cursor: Option<String>, 139 pub records: Vec<serde_json::Value>, 140} 141 142pub async fn list_records( 143 State(state): State<AppState>, 144 Query(input): Query<ListRecordsInput>, 145) -> Response { 146 let user_id_opt = if input.repo.starts_with("did:") { 147 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 148 .fetch_optional(&state.db) 149 .await 150 .map(|opt| opt.map(|r| r.id)) 151 } else { 152 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 153 .fetch_optional(&state.db) 154 .await 155 .map(|opt| opt.map(|r| r.id)) 156 }; 157 158 let user_id: uuid::Uuid = match user_id_opt { 159 Ok(Some(id)) => id, 160 _ => { 161 return ( 162 StatusCode::NOT_FOUND, 163 Json(json!({"error": "NotFound", "message": "Repo not found"})), 164 ) 165 .into_response(); 166 } 167 }; 168 169 let limit = input.limit.unwrap_or(50).clamp(1, 100); 170 let reverse = input.reverse.unwrap_or(false); 171 let limit_i64 = limit as i64; 172 let order = if reverse { "ASC" } else { "DESC" }; 173 174 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 175 let comparator = if reverse { ">" } else { "<" }; 176 let query = format!( 177 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 178 comparator, order 179 ); 180 sqlx::query_as(&query) 181 .bind(user_id) 182 .bind(&input.collection) 183 .bind(cursor) 184 .bind(limit_i64) 185 .fetch_all(&state.db) 186 .await 187 } else { 188 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 189 let mut param_idx = 3; 190 191 if input.rkey_start.is_some() { 192 conditions.push("rkey > $3"); 193 param_idx += 1; 194 } 195 196 if input.rkey_end.is_some() { 197 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" }); 198 param_idx += 1; 199 } 200 201 let limit_idx = param_idx; 202 203 let query = format!( 204 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 205 conditions.join(" AND "), 206 order, 207 limit_idx 208 ); 209 210 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 211 .bind(user_id) 212 .bind(&input.collection); 213 214 if let Some(start) = &input.rkey_start { 215 query_builder = query_builder.bind(start); 216 } 217 if let Some(end) = &input.rkey_end { 218 query_builder = query_builder.bind(end); 219 } 220 221 query_builder.bind(limit_i64).fetch_all(&state.db).await 222 }; 223 224 let rows = match rows_res { 225 Ok(r) => r, 226 Err(e) => { 227 error!("Error listing records: {:?}", e); 228 return ( 229 StatusCode::INTERNAL_SERVER_ERROR, 230 Json(json!({"error": "InternalError"})), 231 ) 232 .into_response(); 233 } 234 }; 235 236 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 237 238 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 239 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 240 241 for (rkey, cid_str) in &rows { 242 if let Ok(cid) = Cid::from_str(cid_str) { 243 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 244 cids.push(cid); 245 } 246 } 247 248 let blocks = match state.block_store.get_many(&cids).await { 249 Ok(b) => b, 250 Err(e) => { 251 error!("Error fetching blocks: {:?}", e); 252 return ( 253 StatusCode::INTERNAL_SERVER_ERROR, 254 Json(json!({"error": "InternalError"})), 255 ) 256 .into_response(); 257 } 258 }; 259 260 let mut records = Vec::new(); 261 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 262 if let Some(block) = block_opt { 263 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) { 264 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 265 records.push(json!({ 266 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 267 "cid": cid_str, 268 "value": value 269 })); 270 } 271 } 272 } 273 } 274 275 Json(ListRecordsOutput { 276 cursor: last_rkey, 277 records, 278 }) 279 .into_response() 280}