this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 29 30 let user_id_opt = if input.repo.starts_with("did:") { 31 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 32 .fetch_optional(&state.db) 33 .await 34 .map(|opt| opt.map(|r| r.id)) 35 } else { 36 let suffix = format!(".{}", hostname); 37 let short_handle = if input.repo.ends_with(&suffix) { 38 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 39 } else { 40 &input.repo 41 }; 42 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 43 .fetch_optional(&state.db) 44 .await 45 .map(|opt| opt.map(|r| r.id)) 46 }; 47 48 let user_id: uuid::Uuid = match user_id_opt { 49 Ok(Some(id)) => id, 50 _ => { 51 return ( 52 StatusCode::NOT_FOUND, 53 Json(json!({"error": "NotFound", "message": "Repo not found"})), 54 ) 55 .into_response(); 56 } 57 }; 58 59 let record_row = sqlx::query!( 60 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 61 user_id, 62 input.collection, 63 input.rkey 64 ) 65 .fetch_optional(&state.db) 66 .await; 67 68 let record_cid_str: String = match record_row { 69 Ok(Some(row)) => row.record_cid, 70 _ => { 71 return ( 72 StatusCode::NOT_FOUND, 73 Json(json!({"error": "NotFound", "message": "Record not found"})), 74 ) 75 .into_response(); 76 } 77 }; 78 79 if let Some(expected_cid) = &input.cid { 80 if &record_cid_str != expected_cid { 81 return ( 82 StatusCode::NOT_FOUND, 83 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 84 ) 85 .into_response(); 86 } 87 } 88 89 let cid = match Cid::from_str(&record_cid_str) { 90 Ok(c) => c, 91 Err(_) => { 92 return ( 93 StatusCode::INTERNAL_SERVER_ERROR, 94 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 95 ) 96 .into_response(); 97 } 98 }; 99 100 let block = match state.block_store.get(&cid).await { 101 Ok(Some(b)) => b, 102 _ => { 103 return ( 104 StatusCode::INTERNAL_SERVER_ERROR, 105 Json(json!({"error": "InternalError", "message": "Record block not found"})), 106 ) 107 .into_response(); 108 } 109 }; 110 111 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 112 Ok(v) => v, 113 Err(e) => { 114 error!("Failed to deserialize record: {:?}", e); 115 return ( 116 StatusCode::INTERNAL_SERVER_ERROR, 117 Json(json!({"error": "InternalError"})), 118 ) 119 .into_response(); 120 } 121 }; 122 123 Json(json!({ 124 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 125 "cid": record_cid_str, 126 "value": value 127 })) 128 .into_response() 129} 130 131#[derive(Deserialize)] 132pub struct ListRecordsInput { 133 pub repo: String, 134 pub collection: String, 135 pub limit: Option<i32>, 136 pub cursor: Option<String>, 137 #[serde(rename = "rkeyStart")] 138 pub rkey_start: Option<String>, 139 #[serde(rename = "rkeyEnd")] 140 pub rkey_end: Option<String>, 141 pub reverse: Option<bool>, 142} 143 144#[derive(Serialize)] 145pub struct ListRecordsOutput { 146 pub cursor: Option<String>, 147 pub records: Vec<serde_json::Value>, 148} 149 150pub async fn list_records( 151 State(state): State<AppState>, 152 Query(input): Query<ListRecordsInput>, 153) -> Response { 154 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 155 156 let user_id_opt = if input.repo.starts_with("did:") { 157 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 158 .fetch_optional(&state.db) 159 .await 160 .map(|opt| opt.map(|r| r.id)) 161 } else { 162 let suffix = format!(".{}", hostname); 163 let short_handle = if input.repo.ends_with(&suffix) { 164 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 165 } else { 166 &input.repo 167 }; 168 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 169 .fetch_optional(&state.db) 170 .await 171 .map(|opt| opt.map(|r| r.id)) 172 }; 173 174 let user_id: uuid::Uuid = match user_id_opt { 175 Ok(Some(id)) => id, 176 _ => { 177 return ( 178 StatusCode::NOT_FOUND, 179 Json(json!({"error": "NotFound", "message": "Repo not found"})), 180 ) 181 .into_response(); 182 } 183 }; 184 185 let limit = input.limit.unwrap_or(50).clamp(1, 100); 186 let reverse = input.reverse.unwrap_or(false); 187 let limit_i64 = limit as i64; 188 let order = if reverse { "ASC" } else { "DESC" }; 189 190 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 191 let comparator = if reverse { ">" } else { "<" }; 192 let query = format!( 193 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 194 comparator, order 195 ); 196 sqlx::query_as(&query) 197 .bind(user_id) 198 .bind(&input.collection) 199 .bind(cursor) 200 .bind(limit_i64) 201 .fetch_all(&state.db) 202 .await 203 } else { 204 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 205 let mut param_idx = 3; 206 207 if input.rkey_start.is_some() { 208 conditions.push("rkey > $3"); 209 param_idx += 1; 210 } 211 212 if input.rkey_end.is_some() { 213 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" }); 214 param_idx += 1; 215 } 216 217 let limit_idx = param_idx; 218 219 let query = format!( 220 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 221 conditions.join(" AND "), 222 order, 223 limit_idx 224 ); 225 226 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 227 .bind(user_id) 228 .bind(&input.collection); 229 230 if let Some(start) = &input.rkey_start { 231 query_builder = query_builder.bind(start); 232 } 233 if let Some(end) = &input.rkey_end { 234 query_builder = query_builder.bind(end); 235 } 236 237 query_builder.bind(limit_i64).fetch_all(&state.db).await 238 }; 239 240 let rows = match rows_res { 241 Ok(r) => r, 242 Err(e) => { 243 error!("Error listing records: {:?}", e); 244 return ( 245 StatusCode::INTERNAL_SERVER_ERROR, 246 Json(json!({"error": "InternalError"})), 247 ) 248 .into_response(); 249 } 250 }; 251 252 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 253 254 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 255 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 256 257 for (rkey, cid_str) in &rows { 258 if let Ok(cid) = Cid::from_str(cid_str) { 259 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 260 cids.push(cid); 261 } 262 } 263 264 let blocks = match state.block_store.get_many(&cids).await { 265 Ok(b) => b, 266 Err(e) => { 267 error!("Error fetching blocks: {:?}", e); 268 return ( 269 StatusCode::INTERNAL_SERVER_ERROR, 270 Json(json!({"error": "InternalError"})), 271 ) 272 .into_response(); 273 } 274 }; 275 276 let mut records = Vec::new(); 277 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 278 if let Some(block) = block_opt { 279 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) { 280 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 281 records.push(json!({ 282 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 283 "cid": cid_str, 284 "value": value 285 })); 286 } 287 } 288 } 289 } 290 291 Json(ListRecordsOutput { 292 cursor: last_rkey, 293 records, 294 }) 295 .into_response() 296}