this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 29 let user_id_opt = if input.repo.starts_with("did:") { 30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 31 .fetch_optional(&state.db) 32 .await 33 .map(|opt| opt.map(|r| r.id)) 34 } else { 35 let suffix = format!(".{}", hostname); 36 let short_handle = if input.repo.ends_with(&suffix) { 37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 38 } else { 39 &input.repo 40 }; 41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 42 .fetch_optional(&state.db) 43 .await 44 .map(|opt| opt.map(|r| r.id)) 45 }; 46 let user_id: uuid::Uuid = match user_id_opt { 47 Ok(Some(id)) => id, 48 _ => { 49 return ( 50 StatusCode::NOT_FOUND, 51 Json(json!({"error": "NotFound", "message": "Repo not found"})), 52 ) 53 .into_response(); 54 } 55 }; 56 let record_row = sqlx::query!( 57 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 58 user_id, 59 input.collection, 60 input.rkey 61 ) 62 .fetch_optional(&state.db) 63 .await; 64 let record_cid_str: String = match record_row { 65 Ok(Some(row)) => row.record_cid, 66 _ => { 67 return ( 68 StatusCode::NOT_FOUND, 69 Json(json!({"error": "NotFound", "message": "Record not found"})), 70 ) 71 .into_response(); 72 } 73 }; 74 if let Some(expected_cid) = &input.cid { 75 if &record_cid_str != expected_cid { 76 return ( 77 StatusCode::NOT_FOUND, 78 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 79 ) 80 .into_response(); 81 } 82 } 83 let cid = match Cid::from_str(&record_cid_str) { 84 Ok(c) => c, 85 Err(_) => { 86 return ( 87 StatusCode::INTERNAL_SERVER_ERROR, 88 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 89 ) 90 .into_response(); 91 } 92 }; 93 let block = match state.block_store.get(&cid).await { 94 Ok(Some(b)) => b, 95 _ => { 96 return ( 97 StatusCode::INTERNAL_SERVER_ERROR, 98 Json(json!({"error": "InternalError", "message": "Record block not found"})), 99 ) 100 .into_response(); 101 } 102 }; 103 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 104 Ok(v) => v, 105 Err(e) => { 106 error!("Failed to deserialize record: {:?}", e); 107 return ( 108 StatusCode::INTERNAL_SERVER_ERROR, 109 Json(json!({"error": "InternalError"})), 110 ) 111 .into_response(); 112 } 113 }; 114 Json(json!({ 115 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 116 "cid": record_cid_str, 117 "value": value 118 })) 119 .into_response() 120} 121#[derive(Deserialize)] 122pub struct ListRecordsInput { 123 pub repo: String, 124 pub collection: String, 125 pub limit: Option<i32>, 126 pub cursor: Option<String>, 127 #[serde(rename = "rkeyStart")] 128 pub rkey_start: Option<String>, 129 #[serde(rename = "rkeyEnd")] 130 pub rkey_end: Option<String>, 131 pub reverse: Option<bool>, 132} 133#[derive(Serialize)] 134pub struct ListRecordsOutput { 135 pub cursor: Option<String>, 136 pub records: Vec<serde_json::Value>, 137} 138pub async fn list_records( 139 State(state): State<AppState>, 140 Query(input): Query<ListRecordsInput>, 141) -> Response { 142 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 143 let user_id_opt = if input.repo.starts_with("did:") { 144 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 145 .fetch_optional(&state.db) 146 .await 147 .map(|opt| opt.map(|r| r.id)) 148 } else { 149 let suffix = format!(".{}", hostname); 150 let short_handle = if input.repo.ends_with(&suffix) { 151 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 152 } else { 153 &input.repo 154 }; 155 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 156 .fetch_optional(&state.db) 157 .await 158 .map(|opt| opt.map(|r| r.id)) 159 }; 160 let user_id: uuid::Uuid = match user_id_opt { 161 Ok(Some(id)) => id, 162 _ => { 163 return ( 164 StatusCode::NOT_FOUND, 165 Json(json!({"error": "NotFound", "message": "Repo not found"})), 166 ) 167 .into_response(); 168 } 169 }; 170 let limit = input.limit.unwrap_or(50).clamp(1, 100); 171 let reverse = input.reverse.unwrap_or(false); 172 let limit_i64 = limit as i64; 173 let order = if reverse { "ASC" } else { "DESC" }; 174 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 175 let comparator = if reverse { ">" } else { "<" }; 176 let query = format!( 177 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 178 comparator, order 179 ); 180 sqlx::query_as(&query) 181 .bind(user_id) 182 .bind(&input.collection) 183 .bind(cursor) 184 .bind(limit_i64) 185 .fetch_all(&state.db) 186 .await 187 } else { 188 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 189 let mut param_idx = 3; 190 if input.rkey_start.is_some() { 191 conditions.push("rkey > $3"); 192 param_idx += 1; 193 } 194 if input.rkey_end.is_some() { 195 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" }); 196 param_idx += 1; 197 } 198 let limit_idx = param_idx; 199 let query = format!( 200 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 201 conditions.join(" AND "), 202 order, 203 limit_idx 204 ); 205 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 206 .bind(user_id) 207 .bind(&input.collection); 208 if let Some(start) = &input.rkey_start { 209 query_builder = query_builder.bind(start); 210 } 211 if let Some(end) = &input.rkey_end { 212 query_builder = query_builder.bind(end); 213 } 214 query_builder.bind(limit_i64).fetch_all(&state.db).await 215 }; 216 let rows = match rows_res { 217 Ok(r) => r, 218 Err(e) => { 219 error!("Error listing records: {:?}", e); 220 return ( 221 StatusCode::INTERNAL_SERVER_ERROR, 222 Json(json!({"error": "InternalError"})), 223 ) 224 .into_response(); 225 } 226 }; 227 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 228 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 229 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 230 for (rkey, cid_str) in &rows { 231 if let Ok(cid) = Cid::from_str(cid_str) { 232 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 233 cids.push(cid); 234 } 235 } 236 let blocks = match state.block_store.get_many(&cids).await { 237 Ok(b) => b, 238 Err(e) => { 239 error!("Error fetching blocks: {:?}", e); 240 return ( 241 StatusCode::INTERNAL_SERVER_ERROR, 242 Json(json!({"error": "InternalError"})), 243 ) 244 .into_response(); 245 } 246 }; 247 let mut records = Vec::new(); 248 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 249 if let Some(block) = block_opt { 250 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) { 251 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 252 records.push(json!({ 253 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 254 "cid": cid_str, 255 "value": value 256 })); 257 } 258 } 259 } 260 } 261 Json(ListRecordsOutput { 262 cursor: last_rkey, 263 records, 264 }) 265 .into_response() 266}