this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::str::FromStr; 13use tracing::error; 14 15#[derive(Deserialize)] 16pub struct GetRecordInput { 17 pub repo: String, 18 pub collection: String, 19 pub rkey: String, 20 pub cid: Option<String>, 21} 22 23pub async fn get_record( 24 State(state): State<AppState>, 25 Query(input): Query<GetRecordInput>, 26) -> Response { 27 let user_id_opt = if input.repo.starts_with("did:") { 28 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 29 .fetch_optional(&state.db) 30 .await 31 .map(|opt| opt.map(|r| r.id)) 32 } else { 33 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 34 .fetch_optional(&state.db) 35 .await 36 .map(|opt| opt.map(|r| r.id)) 37 }; 38 39 let user_id: uuid::Uuid = match user_id_opt { 40 Ok(Some(id)) => id, 41 _ => { 42 return ( 43 StatusCode::NOT_FOUND, 44 Json(json!({"error": "NotFound", "message": "Repo not found"})), 45 ) 46 .into_response(); 47 } 48 }; 49 50 let record_row = sqlx::query!( 51 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 52 user_id, 53 input.collection, 54 input.rkey 55 ) 56 .fetch_optional(&state.db) 57 .await; 58 59 let record_cid_str: String = match record_row { 60 Ok(Some(row)) => row.record_cid, 61 _ => { 62 return ( 63 StatusCode::NOT_FOUND, 64 Json(json!({"error": "NotFound", "message": "Record not found"})), 65 ) 66 .into_response(); 67 } 68 }; 69 70 if let Some(expected_cid) = &input.cid { 71 if &record_cid_str != expected_cid { 72 return ( 73 StatusCode::NOT_FOUND, 74 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 75 ) 76 .into_response(); 77 } 78 } 79 80 let cid = match Cid::from_str(&record_cid_str) { 81 Ok(c) => c, 82 Err(_) => { 83 return ( 84 StatusCode::INTERNAL_SERVER_ERROR, 85 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 86 ) 87 .into_response(); 88 } 89 }; 90 91 let block = match state.block_store.get(&cid).await { 92 Ok(Some(b)) => b, 93 _ => { 94 return ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError", "message": "Record block not found"})), 97 ) 98 .into_response(); 99 } 100 }; 101 102 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 103 Ok(v) => v, 104 Err(e) => { 105 error!("Failed to deserialize record: {:?}", e); 106 return ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError"})), 109 ) 110 .into_response(); 111 } 112 }; 113 114 Json(json!({ 115 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 116 "cid": record_cid_str, 117 "value": value 118 })) 119 .into_response() 120} 121 122#[derive(Deserialize)] 123pub struct ListRecordsInput { 124 pub repo: String, 125 pub collection: String, 126 pub limit: Option<i32>, 127 pub cursor: Option<String>, 128 #[serde(rename = "rkeyStart")] 129 pub rkey_start: Option<String>, 130 #[serde(rename = "rkeyEnd")] 131 pub rkey_end: Option<String>, 132 pub reverse: Option<bool>, 133} 134 135#[derive(Serialize)] 136pub struct ListRecordsOutput { 137 pub cursor: Option<String>, 138 pub records: Vec<serde_json::Value>, 139} 140 141pub async fn list_records( 142 State(state): State<AppState>, 143 Query(input): Query<ListRecordsInput>, 144) -> Response { 145 let user_id_opt = if input.repo.starts_with("did:") { 146 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 147 .fetch_optional(&state.db) 148 .await 149 .map(|opt| opt.map(|r| r.id)) 150 } else { 151 sqlx::query!("SELECT id FROM users WHERE handle = $1", input.repo) 152 .fetch_optional(&state.db) 153 .await 154 .map(|opt| opt.map(|r| r.id)) 155 }; 156 157 let user_id: uuid::Uuid = match user_id_opt { 158 Ok(Some(id)) => id, 159 _ => { 160 return ( 161 StatusCode::NOT_FOUND, 162 Json(json!({"error": "NotFound", "message": "Repo not found"})), 163 ) 164 .into_response(); 165 } 166 }; 167 168 let limit = input.limit.unwrap_or(50).clamp(1, 100); 169 let reverse = input.reverse.unwrap_or(false); 170 171 // Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination 172 // TODO: Implement rkeyStart/End and correct cursor logic 173 174 let limit_i64 = limit as i64; 175 let rows_res = if let Some(cursor) = &input.cursor { 176 if reverse { 177 sqlx::query!( 178 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey < $3 ORDER BY rkey DESC LIMIT $4", 179 user_id, 180 input.collection, 181 cursor, 182 limit_i64 183 ) 184 .fetch_all(&state.db) 185 .await 186 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>()) 187 } else { 188 sqlx::query!( 189 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey > $3 ORDER BY rkey ASC LIMIT $4", 190 user_id, 191 input.collection, 192 cursor, 193 limit_i64 194 ) 195 .fetch_all(&state.db) 196 .await 197 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>()) 198 } 199 } else { 200 if reverse { 201 sqlx::query!( 202 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey DESC LIMIT $3", 203 user_id, 204 input.collection, 205 limit_i64 206 ) 207 .fetch_all(&state.db) 208 .await 209 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>()) 210 } else { 211 sqlx::query!( 212 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 ORDER BY rkey ASC LIMIT $3", 213 user_id, 214 input.collection, 215 limit_i64 216 ) 217 .fetch_all(&state.db) 218 .await 219 .map(|rows| rows.into_iter().map(|r| (r.rkey, r.record_cid)).collect::<Vec<_>>()) 220 } 221 }; 222 223 let rows = match rows_res { 224 Ok(r) => r, 225 Err(e) => { 226 error!("Error listing records: {:?}", e); 227 return ( 228 StatusCode::INTERNAL_SERVER_ERROR, 229 Json(json!({"error": "InternalError"})), 230 ) 231 .into_response(); 232 } 233 }; 234 235 let mut records = Vec::new(); 236 let mut last_rkey = None; 237 238 for (rkey, cid_str) in rows { 239 last_rkey = Some(rkey.clone()); 240 241 if let Ok(cid) = Cid::from_str(&cid_str) { 242 if let Ok(Some(block)) = state.block_store.get(&cid).await { 243 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 244 records.push(json!({ 245 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 246 "cid": cid_str, 247 "value": value 248 })); 249 } 250 } 251 } 252 } 253 254 Json(ListRecordsOutput { 255 cursor: last_rkey, 256 records, 257 }) 258 .into_response() 259}