this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::storage::BlockStore; 10use serde::{Deserialize, Serialize}; 11use serde_json::json; 12use std::collections::HashMap; 13use std::str::FromStr; 14use tracing::error; 15 16#[derive(Deserialize)] 17pub struct GetRecordInput { 18 pub repo: String, 19 pub collection: String, 20 pub rkey: String, 21 pub cid: Option<String>, 22} 23 24pub async fn get_record( 25 State(state): State<AppState>, 26 Query(input): Query<GetRecordInput>, 27) -> Response { 28 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 29 let user_id_opt = if input.repo.starts_with("did:") { 30 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 31 .fetch_optional(&state.db) 32 .await 33 .map(|opt| opt.map(|r| r.id)) 34 } else { 35 let suffix = format!(".{}", hostname); 36 let short_handle = if input.repo.ends_with(&suffix) { 37 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 38 } else { 39 &input.repo 40 }; 41 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 42 .fetch_optional(&state.db) 43 .await 44 .map(|opt| opt.map(|r| r.id)) 45 }; 46 let user_id: uuid::Uuid = match user_id_opt { 47 Ok(Some(id)) => id, 48 _ => { 49 return ( 50 StatusCode::NOT_FOUND, 51 Json(json!({"error": "NotFound", "message": "Repo not found"})), 52 ) 53 .into_response(); 54 } 55 }; 56 let record_row = sqlx::query!( 57 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 58 user_id, 59 input.collection, 60 input.rkey 61 ) 62 .fetch_optional(&state.db) 63 .await; 64 let record_cid_str: String = match record_row { 65 Ok(Some(row)) => row.record_cid, 66 _ => { 67 return ( 68 StatusCode::NOT_FOUND, 69 Json(json!({"error": "NotFound", "message": "Record not found"})), 70 ) 71 .into_response(); 72 } 73 }; 74 if let Some(expected_cid) = &input.cid 75 && &record_cid_str != expected_cid { 76 return ( 77 StatusCode::NOT_FOUND, 78 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 79 ) 80 .into_response(); 81 } 82 let cid = match Cid::from_str(&record_cid_str) { 83 Ok(c) => c, 84 Err(_) => { 85 return ( 86 StatusCode::INTERNAL_SERVER_ERROR, 87 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 88 ) 89 .into_response(); 90 } 91 }; 92 let block = match state.block_store.get(&cid).await { 93 Ok(Some(b)) => b, 94 _ => { 95 return ( 96 StatusCode::INTERNAL_SERVER_ERROR, 97 Json(json!({"error": "InternalError", "message": "Record block not found"})), 98 ) 99 .into_response(); 100 } 101 }; 102 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 103 Ok(v) => v, 104 Err(e) => { 105 error!("Failed to deserialize record: {:?}", e); 106 return ( 107 StatusCode::INTERNAL_SERVER_ERROR, 108 Json(json!({"error": "InternalError"})), 109 ) 110 .into_response(); 111 } 112 }; 113 Json(json!({ 114 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 115 "cid": record_cid_str, 116 "value": value 117 })) 118 .into_response() 119} 120#[derive(Deserialize)] 121pub struct ListRecordsInput { 122 pub repo: String, 123 pub collection: String, 124 pub limit: Option<i32>, 125 pub cursor: Option<String>, 126 #[serde(rename = "rkeyStart")] 127 pub rkey_start: Option<String>, 128 #[serde(rename = "rkeyEnd")] 129 pub rkey_end: Option<String>, 130 pub reverse: Option<bool>, 131} 132#[derive(Serialize)] 133pub struct ListRecordsOutput { 134 pub cursor: Option<String>, 135 pub records: Vec<serde_json::Value>, 136} 137pub async fn list_records( 138 State(state): State<AppState>, 139 Query(input): Query<ListRecordsInput>, 140) -> Response { 141 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 142 let user_id_opt = if input.repo.starts_with("did:") { 143 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 144 .fetch_optional(&state.db) 145 .await 146 .map(|opt| opt.map(|r| r.id)) 147 } else { 148 let suffix = format!(".{}", hostname); 149 let short_handle = if input.repo.ends_with(&suffix) { 150 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 151 } else { 152 &input.repo 153 }; 154 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 155 .fetch_optional(&state.db) 156 .await 157 .map(|opt| opt.map(|r| r.id)) 158 }; 159 let user_id: uuid::Uuid = match user_id_opt { 160 Ok(Some(id)) => id, 161 _ => { 162 return ( 163 StatusCode::NOT_FOUND, 164 Json(json!({"error": "NotFound", "message": "Repo not found"})), 165 ) 166 .into_response(); 167 } 168 }; 169 let limit = input.limit.unwrap_or(50).clamp(1, 100); 170 let reverse = input.reverse.unwrap_or(false); 171 let limit_i64 = limit as i64; 172 let order = if reverse { "ASC" } else { "DESC" }; 173 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 174 let comparator = if reverse { ">" } else { "<" }; 175 let query = format!( 176 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 177 comparator, order 178 ); 179 sqlx::query_as(&query) 180 .bind(user_id) 181 .bind(&input.collection) 182 .bind(cursor) 183 .bind(limit_i64) 184 .fetch_all(&state.db) 185 .await 186 } else { 187 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 188 let mut param_idx = 3; 189 if input.rkey_start.is_some() { 190 conditions.push("rkey > $3"); 191 param_idx += 1; 192 } 193 if input.rkey_end.is_some() { 194 conditions.push(if param_idx == 3 { 195 "rkey < $3" 196 } else { 197 "rkey < $4" 198 }); 199 param_idx += 1; 200 } 201 let limit_idx = param_idx; 202 let query = format!( 203 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 204 conditions.join(" AND "), 205 order, 206 limit_idx 207 ); 208 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 209 .bind(user_id) 210 .bind(&input.collection); 211 if let Some(start) = &input.rkey_start { 212 query_builder = query_builder.bind(start); 213 } 214 if let Some(end) = &input.rkey_end { 215 query_builder = query_builder.bind(end); 216 } 217 query_builder.bind(limit_i64).fetch_all(&state.db).await 218 }; 219 let rows = match rows_res { 220 Ok(r) => r, 221 Err(e) => { 222 error!("Error listing records: {:?}", e); 223 return ( 224 StatusCode::INTERNAL_SERVER_ERROR, 225 Json(json!({"error": "InternalError"})), 226 ) 227 .into_response(); 228 } 229 }; 230 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 231 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 232 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 233 for (rkey, cid_str) in &rows { 234 if let Ok(cid) = Cid::from_str(cid_str) { 235 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 236 cids.push(cid); 237 } 238 } 239 let blocks = match state.block_store.get_many(&cids).await { 240 Ok(b) => b, 241 Err(e) => { 242 error!("Error fetching blocks: {:?}", e); 243 return ( 244 StatusCode::INTERNAL_SERVER_ERROR, 245 Json(json!({"error": "InternalError"})), 246 ) 247 .into_response(); 248 } 249 }; 250 let mut records = Vec::new(); 251 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 252 if let Some(block) = block_opt 253 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 254 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 255 records.push(json!({ 256 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 257 "cid": cid_str, 258 "value": value 259 })); 260 } 261 } 262 Json(ListRecordsOutput { 263 cursor: last_rkey, 264 records, 265 }) 266 .into_response() 267}