this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 29 let user_id_opt = if input.repo.starts_with("did:") { 30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 31 .fetch_optional(&state.db) 32 .await 33 .map(|opt| opt.map(|r| r.id)) 34 } else { 35 let suffix = format!(".{}", hostname); 36 let short_handle = if input.repo.ends_with(&suffix) { 37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 38 } else { 39 &input.repo 40 }; 41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 42 .fetch_optional(&state.db) 43 .await 44 .map(|opt| opt.map(|r| r.id)) 45 }; 46 let user_id: uuid::Uuid = match user_id_opt { 47 Ok(Some(id)) => id, 48 Ok(None) => { 49 return ( 50 StatusCode::NOT_FOUND, 51 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 52 ) 53 .into_response(); 54 } 55 Err(_) => { 56 return ( 57 StatusCode::INTERNAL_SERVER_ERROR, 58 Json(json!({"error": "InternalError"})), 59 ) 60 .into_response(); 61 } 62 }; 63 let record_row = sqlx::query!( 64 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 65 user_id, 66 input.collection, 67 input.rkey 68 ) 69 .fetch_optional(&state.db) 70 .await; 71 let record_cid_str: String = match record_row { 72 Ok(Some(row)) => row.record_cid, 73 _ => { 74 return ( 75 StatusCode::NOT_FOUND, 76 Json(json!({"error": "NotFound", "message": "Record not found"})), 77 ) 78 .into_response(); 79 } 80 }; 81 if let Some(expected_cid) = &input.cid 82 && &record_cid_str != expected_cid { 83 return ( 84 StatusCode::NOT_FOUND, 85 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 86 ) 87 .into_response(); 88 } 89 let cid = match Cid::from_str(&record_cid_str) { 90 Ok(c) => c, 91 Err(_) => { 92 return ( 93 StatusCode::INTERNAL_SERVER_ERROR, 94 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 95 ) 96 .into_response(); 97 } 98 }; 99 let block = match state.block_store.get(&cid).await { 100 Ok(Some(b)) => b, 101 _ => { 102 return ( 103 StatusCode::INTERNAL_SERVER_ERROR, 104 Json(json!({"error": "InternalError", "message": "Record block not found"})), 105 ) 106 .into_response(); 107 } 108 }; 109 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 110 Ok(v) => v, 111 Err(e) => { 112 error!("Failed to deserialize record: {:?}", e); 113 return ( 114 StatusCode::INTERNAL_SERVER_ERROR, 115 Json(json!({"error": "InternalError"})), 116 ) 117 .into_response(); 118 } 119 }; 120 Json(json!({ 121 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 122 "cid": record_cid_str, 123 "value": value 124 })) 125 .into_response() 126} 127#[derive(Deserialize)] 128pub struct ListRecordsInput { 129 pub repo: String, 130 pub collection: String, 131 pub limit: Option<i32>, 132 pub cursor: Option<String>, 133 #[serde(rename = "rkeyStart")] 134 pub rkey_start: Option<String>, 135 #[serde(rename = "rkeyEnd")] 136 pub rkey_end: Option<String>, 137 pub reverse: Option<bool>, 138} 139#[derive(Serialize)] 140pub struct ListRecordsOutput { 141 pub cursor: Option<String>, 142 pub records: Vec<serde_json::Value>, 143} 144 145pub async fn list_records( 146 State(state): State<AppState>, 147 Query(input): Query<ListRecordsInput>, 148) -> Response { 149 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 150 let user_id_opt = if input.repo.starts_with("did:") { 151 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 152 .fetch_optional(&state.db) 153 .await 154 .map(|opt| opt.map(|r| r.id)) 155 } else { 156 let suffix = format!(".{}", hostname); 157 let short_handle = if input.repo.ends_with(&suffix) { 158 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 159 } else { 160 &input.repo 161 }; 162 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 163 .fetch_optional(&state.db) 164 .await 165 .map(|opt| opt.map(|r| r.id)) 166 }; 167 let user_id: uuid::Uuid = match user_id_opt { 168 Ok(Some(id)) => id, 169 Ok(None) => { 170 return ( 171 StatusCode::NOT_FOUND, 172 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 173 ) 174 .into_response(); 175 } 176 Err(_) => { 177 return ( 178 StatusCode::INTERNAL_SERVER_ERROR, 179 Json(json!({"error": "InternalError"})), 180 ) 181 .into_response(); 182 } 183 }; 184 let limit = input.limit.unwrap_or(50).clamp(1, 100); 185 let reverse = input.reverse.unwrap_or(false); 186 let limit_i64 = limit as i64; 187 let order = if reverse { "ASC" } else { "DESC" }; 188 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 189 let comparator = if reverse { ">" } else { "<" }; 190 let query = format!( 191 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 192 comparator, order 193 ); 194 sqlx::query_as(&query) 195 .bind(user_id) 196 .bind(&input.collection) 197 .bind(cursor) 198 .bind(limit_i64) 199 .fetch_all(&state.db) 200 .await 201 } else { 202 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 203 let mut param_idx = 3; 204 if input.rkey_start.is_some() { 205 conditions.push("rkey > $3"); 206 param_idx += 1; 207 } 208 if input.rkey_end.is_some() { 209 conditions.push(if param_idx == 3 { 210 "rkey < $3" 211 } else { 212 "rkey < $4" 213 }); 214 param_idx += 1; 215 } 216 let limit_idx = param_idx; 217 let query = format!( 218 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 219 conditions.join(" AND "), 220 order, 221 limit_idx 222 ); 223 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 224 .bind(user_id) 225 .bind(&input.collection); 226 if let Some(start) = &input.rkey_start { 227 query_builder = query_builder.bind(start); 228 } 229 if let Some(end) = &input.rkey_end { 230 query_builder = query_builder.bind(end); 231 } 232 query_builder.bind(limit_i64).fetch_all(&state.db).await 233 }; 234 let rows = match rows_res { 235 Ok(r) => r, 236 Err(e) => { 237 error!("Error listing records: {:?}", e); 238 return ( 239 StatusCode::INTERNAL_SERVER_ERROR, 240 Json(json!({"error": "InternalError"})), 241 ) 242 .into_response(); 243 } 244 }; 245 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 246 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 247 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 248 for (rkey, cid_str) in &rows { 249 if let Ok(cid) = Cid::from_str(cid_str) { 250 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 251 cids.push(cid); 252 } 253 } 254 let blocks = match state.block_store.get_many(&cids).await { 255 Ok(b) => b, 256 Err(e) => { 257 error!("Error fetching blocks: {:?}", e); 258 return ( 259 StatusCode::INTERNAL_SERVER_ERROR, 260 Json(json!({"error": "InternalError"})), 261 ) 262 .into_response(); 263 } 264 }; 265 let mut records = Vec::new(); 266 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 267 if let Some(block) = block_opt 268 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 269 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 270 records.push(json!({ 271 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 272 "cid": cid_str, 273 "value": value 274 })); 275 } 276 } 277 Json(ListRecordsOutput { 278 cursor: last_rkey, 279 records, 280 }) 281 .into_response() 282}