this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15#[derive(Deserialize)] 16pub struct GetRecordInput { 17 pub repo: String, 18 pub collection: String, 19 pub rkey: String, 20 pub cid: Option<String>, 21} 22pub async fn get_record( 23 State(state): State<AppState>, 24 Query(input): Query<GetRecordInput>, 25) -> Response { 26 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 27 let user_id_opt = if input.repo.starts_with("did:") { 28 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 29 .fetch_optional(&state.db) 30 .await 31 .map(|opt| opt.map(|r| r.id)) 32 } else { 33 let suffix = format!(".{}", hostname); 34 let short_handle = if input.repo.ends_with(&suffix) { 35 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 36 } else { 37 &input.repo 38 }; 39 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 40 .fetch_optional(&state.db) 41 .await 42 .map(|opt| opt.map(|r| r.id)) 43 }; 44 let user_id: uuid::Uuid = match user_id_opt { 45 Ok(Some(id)) => id, 46 _ => { 47 return ( 48 StatusCode::NOT_FOUND, 49 Json(json!({"error": "NotFound", "message": "Repo not found"})), 50 ) 51 .into_response(); 52 } 53 }; 54 let record_row = sqlx::query!( 55 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 56 user_id, 57 input.collection, 58 input.rkey 59 ) 60 .fetch_optional(&state.db) 61 .await; 62 let record_cid_str: String = match record_row { 63 Ok(Some(row)) => row.record_cid, 64 _ => { 65 return ( 66 StatusCode::NOT_FOUND, 67 Json(json!({"error": "NotFound", "message": "Record not found"})), 68 ) 69 .into_response(); 70 } 71 }; 72 if let Some(expected_cid) = &input.cid { 73 if &record_cid_str != expected_cid { 74 return ( 75 StatusCode::NOT_FOUND, 76 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 77 ) 78 .into_response(); 79 } 80 } 81 let cid = match Cid::from_str(&record_cid_str) { 82 Ok(c) => c, 83 Err(_) => { 84 return ( 85 StatusCode::INTERNAL_SERVER_ERROR, 86 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 87 ) 88 .into_response(); 89 } 90 }; 91 let block = match state.block_store.get(&cid).await { 92 Ok(Some(b)) => b, 93 _ => { 94 return ( 95 StatusCode::INTERNAL_SERVER_ERROR, 96 Json(json!({"error": "InternalError", "message": "Record block not found"})), 97 ) 98 .into_response(); 99 } 100 }; 101 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 102 Ok(v) => v, 103 Err(e) => { 104 error!("Failed to deserialize record: {:?}", e); 105 return ( 106 StatusCode::INTERNAL_SERVER_ERROR, 107 Json(json!({"error": "InternalError"})), 108 ) 109 .into_response(); 110 } 111 }; 112 Json(json!({ 113 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 114 "cid": record_cid_str, 115 "value": value 116 })) 117 .into_response() 118} 119#[derive(Deserialize)] 120pub struct ListRecordsInput { 121 pub repo: String, 122 pub collection: String, 123 pub limit: Option<i32>, 124 pub cursor: Option<String>, 125 #[serde(rename = "rkeyStart")] 126 pub rkey_start: Option<String>, 127 #[serde(rename = "rkeyEnd")] 128 pub rkey_end: Option<String>, 129 pub reverse: Option<bool>, 130} 131#[derive(Serialize)] 132pub struct ListRecordsOutput { 133 pub cursor: Option<String>, 134 pub records: Vec<serde_json::Value>, 135} 136pub async fn list_records( 137 State(state): State<AppState>, 138 Query(input): Query<ListRecordsInput>, 139) -> Response { 140 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 141 let user_id_opt = if input.repo.starts_with("did:") { 142 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 143 .fetch_optional(&state.db) 144 .await 145 .map(|opt| opt.map(|r| r.id)) 146 } else { 147 let suffix = format!(".{}", hostname); 148 let short_handle = if input.repo.ends_with(&suffix) { 149 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 150 } else { 151 &input.repo 152 }; 153 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 154 .fetch_optional(&state.db) 155 .await 156 .map(|opt| opt.map(|r| r.id)) 157 }; 158 let user_id: uuid::Uuid = match user_id_opt { 159 Ok(Some(id)) => id, 160 _ => { 161 return ( 162 StatusCode::NOT_FOUND, 163 Json(json!({"error": "NotFound", "message": "Repo not found"})), 164 ) 165 .into_response(); 166 } 167 }; 168 let limit = input.limit.unwrap_or(50).clamp(1, 100); 169 let reverse = input.reverse.unwrap_or(false); 170 let limit_i64 = limit as i64; 171 let order = if reverse { "ASC" } else { "DESC" }; 172 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 173 let comparator = if reverse { ">" } else { "<" }; 174 let query = format!( 175 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 176 comparator, order 177 ); 178 sqlx::query_as(&query) 179 .bind(user_id) 180 .bind(&input.collection) 181 .bind(cursor) 182 .bind(limit_i64) 183 .fetch_all(&state.db) 184 .await 185 } else { 186 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 187 let mut param_idx = 3; 188 if input.rkey_start.is_some() { 189 conditions.push("rkey > $3"); 190 param_idx += 1; 191 } 192 if input.rkey_end.is_some() { 193 conditions.push(if param_idx == 3 { "rkey < $3" } else { "rkey < $4" }); 194 param_idx += 1; 195 } 196 let limit_idx = param_idx; 197 let query = format!( 198 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 199 conditions.join(" AND "), 200 order, 201 limit_idx 202 ); 203 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 204 .bind(user_id) 205 .bind(&input.collection); 206 if let Some(start) = &input.rkey_start { 207 query_builder = query_builder.bind(start); 208 } 209 if let Some(end) = &input.rkey_end { 210 query_builder = query_builder.bind(end); 211 } 212 query_builder.bind(limit_i64).fetch_all(&state.db).await 213 }; 214 let rows = match rows_res { 215 Ok(r) => r, 216 Err(e) => { 217 error!("Error listing records: {:?}", e); 218 return ( 219 StatusCode::INTERNAL_SERVER_ERROR, 220 Json(json!({"error": "InternalError"})), 221 ) 222 .into_response(); 223 } 224 }; 225 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 226 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 227 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 228 for (rkey, cid_str) in &rows { 229 if let Ok(cid) = Cid::from_str(cid_str) { 230 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 231 cids.push(cid); 232 } 233 } 234 let blocks = match state.block_store.get_many(&cids).await { 235 Ok(b) => b, 236 Err(e) => { 237 error!("Error fetching blocks: {:?}", e); 238 return ( 239 StatusCode::INTERNAL_SERVER_ERROR, 240 Json(json!({"error": "InternalError"})), 241 ) 242 .into_response(); 243 } 244 }; 245 let mut records = Vec::new(); 246 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 247 if let Some(block) = block_opt { 248 if let Some((rkey, cid_str)) = cid_to_rkey.get(cid) { 249 if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 250 records.push(json!({ 251 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 252 "cid": cid_str, 253 "value": value 254 })); 255 } 256 } 257 } 258 } 259 Json(ListRecordsOutput { 260 cursor: last_rkey, 261 records, 262 }) 263 .into_response() 264}