this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use sqlx::Row; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let user_row = if input.repo.starts_with("did:") { 29 sqlx::query("SELECT id FROM users WHERE did = $1") 30 .bind(&input.repo) 31 .fetch_optional(&state.db) 32 .await 33 } else { 34 sqlx::query("SELECT id FROM users WHERE handle = $1") 35 .bind(&input.repo) 36 .fetch_optional(&state.db) 37 .await 38 }; 39 40 let user_id: uuid::Uuid = match user_row { 41 Ok(Some(row)) => row.get("id"), 42 _ => { 43 return ( 44 StatusCode::NOT_FOUND, 45 Json(json!({"error": "NotFound", "message": "Repo not found"})), 46 ) 47 .into_response(); 48 } 49 }; 50 51 let record_row = sqlx::query( 52 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 53 ) 54 .bind(user_id) 55 .bind(&input.collection) 56 .bind(&input.rkey) 57 .fetch_optional(&state.db) 58 .await; 59 60 let record_cid_str: String = match record_row { 61 Ok(Some(row)) => row.get("record_cid"), 62 _ => { 63 return ( 64 StatusCode::NOT_FOUND, 65 Json(json!({"error": "NotFound", "message": "Record not found"})), 66 ) 67 .into_response(); 68 } 69 }; 70 71 if let Some(expected_cid) = &input.cid { 72 if &record_cid_str != expected_cid { 73 return ( 74 StatusCode::NOT_FOUND, 75 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 76 ) 77 .into_response(); 78 } 79 } 80 81 let cid = match Cid::from_str(&record_cid_str) { 82 Ok(c) => c, 83 Err(_) => { 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 87 ) 88 .into_response(); 89 } 90 }; 91 92 let block = match state.block_store.get(&cid).await { 93 Ok(Some(b)) => b, 94 _ => { 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError", "message": "Record block not found"})), 98 ) 99 .into_response(); 100 } 101 }; 102 103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 104 Ok(v) => v, 105 Err(e) => { 106 error!("Failed to deserialize record: {:?}", e); 107 return ( 108 StatusCode::INTERNAL_SERVER_ERROR, 109 Json(json!({"error": "InternalError"})), 110 ) 111 .into_response(); 112 } 113 }; 114 115 Json(json!({ 116 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 117 "cid": record_cid_str, 118 "value": value 119 })) 120 .into_response() 121} 122 123#[derive(Deserialize)] 124pub struct ListRecordsInput { 125 pub repo: String, 126 pub collection: String, 127 pub limit: Option<i32>, 128 pub cursor: Option<String>, 129 #[serde(rename = "rkeyStart")] 130 pub rkey_start: Option<String>, 131 #[serde(rename = "rkeyEnd")] 132 pub rkey_end: Option<String>, 133 pub reverse: Option<bool>, 134} 135 136#[derive(Serialize)] 137pub struct ListRecordsOutput { 138 pub cursor: Option<String>, 139 pub records: Vec<serde_json::Value>, 140} 141 142pub async fn list_records( 143 State(state): State<AppState>, 144 Query(input): Query<ListRecordsInput>, 145) -> Response { 146 let user_row = if input.repo.starts_with("did:") { 147 sqlx::query("SELECT id FROM users WHERE did = $1") 148 .bind(&input.repo) 149 .fetch_optional(&state.db) 150 .await 151 } else { 152 sqlx::query("SELECT id FROM users WHERE handle = $1") 153 .bind(&input.repo) 154 .fetch_optional(&state.db) 155 .await 156 }; 157 158 let user_id: uuid::Uuid = match user_row { 159 Ok(Some(row)) => row.get("id"), 160 _ => { 161 return ( 162 StatusCode::NOT_FOUND, 163 Json(json!({"error": "NotFound", "message": "Repo not found"})), 164 ) 165 .into_response(); 166 } 167 }; 168 169 let limit = input.limit.unwrap_or(50).clamp(1, 100); 170 let reverse = input.reverse.unwrap_or(false); 171 172 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination 173 // TODO: Implement rkeyStart/End and correct cursor logic 174 175 let query_str = format!( 176 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}", 177 if let Some(_c) = &input.cursor { 178 if reverse { 179 "AND rkey < $3" 180 } else { 181 "AND rkey > $3" 182 } 183 } else { 184 "" 185 }, 186 if reverse { "DESC" } else { "ASC" }, 187 limit 188 ); 189 190 let mut query = sqlx::query(&query_str) 191 .bind(user_id) 192 .bind(&input.collection); 193 194 if let Some(c) = &input.cursor { 195 query = query.bind(c); 196 } 197 198 let rows = match query.fetch_all(&state.db).await { 199 Ok(r) => r, 200 Err(e) => { 201 error!("Error listing records: {:?}", e); 202 return ( 203 StatusCode::INTERNAL_SERVER_ERROR, 204 Json(json!({"error": "InternalError"})), 205 ) 206 .into_response(); 207 } 208 }; 209 210 let mut records = Vec::new(); 211 let mut last_rkey = None; 212 213 for row in rows { 214 let rkey: String = row.get("rkey"); 215 let cid_str: String = row.get("record_cid"); 216 last_rkey = Some(rkey.clone()); 217 218 if let Ok(cid) = Cid::from_str(&cid_str) { 219 if let Ok(Some(block)) = state.block_store.get(&cid).await { 220 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 221 records.push(json!({ 222 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 223 "cid": cid_str, 224 "value": value 225 })); 226 } 227 } 228 } 229 } 230 231 Json(ListRecordsOutput { 232 cursor: last_rkey, 233 records, 234 }) 235 .into_response() 236}