this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use std::collections::HashMap; 14use std::str::FromStr; 15use tracing::{error, info}; 16 17#[derive(Deserialize)] 18pub struct GetRecordInput { 19 pub repo: String, 20 pub collection: String, 21 pub rkey: String, 22 pub cid: Option<String>, 23} 24 25pub async fn get_record( 26 State(state): State<AppState>, 27 headers: HeaderMap, 28 Query(input): Query<GetRecordInput>, 29) -> Response { 30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 31 let user_id_opt = if input.repo.starts_with("did:") { 32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 33 .fetch_optional(&state.db) 34 .await 35 .map(|opt| opt.map(|r| r.id)) 36 } else { 37 let suffix = format!(".{}", hostname); 38 let short_handle = if input.repo.ends_with(&suffix) { 39 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 40 } else { 41 &input.repo 42 }; 43 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 44 .fetch_optional(&state.db) 45 .await 46 .map(|opt| opt.map(|r| r.id)) 47 }; 48 let user_id: uuid::Uuid = match user_id_opt { 49 Ok(Some(id)) => id, 50 Ok(None) => { 51 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) { 52 let did = proxy_header.split('#').next().unwrap_or(proxy_header); 53 if let Some(resolved) = state.did_resolver.resolve_did(did).await { 54 let mut url = format!( 55 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 56 resolved.url.trim_end_matches('/'), 57 urlencoding::encode(&input.repo), 58 urlencoding::encode(&input.collection), 59 urlencoding::encode(&input.rkey) 60 ); 61 if let Some(cid) = &input.cid { 62 url.push_str(&format!("&cid={}", urlencoding::encode(cid))); 63 } 64 info!("Proxying getRecord to {}: {}", did, url); 65 match proxy_client().get(&url).send().await { 66 Ok(resp) => { 67 let status = resp.status(); 68 let body = match resp.bytes().await { 69 Ok(b) => b, 70 Err(e) => { 71 error!("Error reading proxy response: {:?}", e); 72 return ( 73 StatusCode::BAD_GATEWAY, 74 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})), 75 ) 76 .into_response(); 77 } 78 }; 79 return Response::builder() 80 .status(status) 81 .header("content-type", "application/json") 82 .body(axum::body::Body::from(body)) 83 .unwrap_or_else(|_| { 84 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error") 85 .into_response() 86 }); 87 } 88 Err(e) => { 89 error!("Error proxying request: {:?}", e); 90 return ( 91 StatusCode::BAD_GATEWAY, 92 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})), 93 ) 94 .into_response(); 95 } 96 } 97 } else { 98 error!("Could not resolve DID from atproto-proxy header: {}", did); 99 return ( 100 StatusCode::BAD_GATEWAY, 101 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})), 102 ) 103 .into_response(); 104 } 105 } 106 return ( 107 StatusCode::NOT_FOUND, 108 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 109 ) 110 .into_response(); 111 } 112 Err(_) => { 113 return ( 114 StatusCode::INTERNAL_SERVER_ERROR, 115 Json(json!({"error": "InternalError"})), 116 ) 117 .into_response(); 118 } 119 }; 120 let record_row = sqlx::query!( 121 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 122 user_id, 123 input.collection, 124 input.rkey 125 ) 126 .fetch_optional(&state.db) 127 .await; 128 let record_cid_str: String = match record_row { 129 Ok(Some(row)) => row.record_cid, 130 _ => { 131 return ( 132 StatusCode::NOT_FOUND, 133 Json(json!({"error": "NotFound", "message": "Record not found"})), 134 ) 135 .into_response(); 136 } 137 }; 138 if let Some(expected_cid) = &input.cid 139 && &record_cid_str != expected_cid 140 { 141 return ( 142 StatusCode::NOT_FOUND, 143 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 144 ) 145 .into_response(); 146 } 147 let cid = match Cid::from_str(&record_cid_str) { 148 Ok(c) => c, 149 Err(_) => { 150 return ( 151 StatusCode::INTERNAL_SERVER_ERROR, 152 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 153 ) 154 .into_response(); 155 } 156 }; 157 let block = match state.block_store.get(&cid).await { 158 Ok(Some(b)) => b, 159 _ => { 160 return ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError", "message": "Record block not found"})), 163 ) 164 .into_response(); 165 } 166 }; 167 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 168 Ok(v) => v, 169 Err(e) => { 170 error!("Failed to deserialize record: {:?}", e); 171 return ( 172 StatusCode::INTERNAL_SERVER_ERROR, 173 Json(json!({"error": "InternalError"})), 174 ) 175 .into_response(); 176 } 177 }; 178 Json(json!({ 179 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 180 "cid": record_cid_str, 181 "value": value 182 })) 183 .into_response() 184} 185#[derive(Deserialize)] 186pub struct ListRecordsInput { 187 pub repo: String, 188 pub collection: String, 189 pub limit: Option<i32>, 190 pub cursor: Option<String>, 191 #[serde(rename = "rkeyStart")] 192 pub rkey_start: Option<String>, 193 #[serde(rename = "rkeyEnd")] 194 pub rkey_end: Option<String>, 195 pub reverse: Option<bool>, 196} 197#[derive(Serialize)] 198pub struct ListRecordsOutput { 199 #[serde(skip_serializing_if = "Option::is_none")] 200 pub cursor: Option<String>, 201 pub records: Vec<serde_json::Value>, 202} 203 204pub async fn list_records( 205 State(state): State<AppState>, 206 Query(input): Query<ListRecordsInput>, 207) -> Response { 208 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 209 let user_id_opt = if input.repo.starts_with("did:") { 210 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 211 .fetch_optional(&state.db) 212 .await 213 .map(|opt| opt.map(|r| r.id)) 214 } else { 215 let suffix = format!(".{}", hostname); 216 let short_handle = if input.repo.ends_with(&suffix) { 217 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 218 } else { 219 &input.repo 220 }; 221 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 222 .fetch_optional(&state.db) 223 .await 224 .map(|opt| opt.map(|r| r.id)) 225 }; 226 let user_id: uuid::Uuid = match user_id_opt { 227 Ok(Some(id)) => id, 228 Ok(None) => { 229 return ( 230 StatusCode::NOT_FOUND, 231 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 232 ) 233 .into_response(); 234 } 235 Err(_) => { 236 return ( 237 StatusCode::INTERNAL_SERVER_ERROR, 238 Json(json!({"error": "InternalError"})), 239 ) 240 .into_response(); 241 } 242 }; 243 let limit = input.limit.unwrap_or(50).clamp(1, 100); 244 let reverse = input.reverse.unwrap_or(false); 245 let limit_i64 = limit as i64; 246 let order = if reverse { "ASC" } else { "DESC" }; 247 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 248 let comparator = if reverse { ">" } else { "<" }; 249 let query = format!( 250 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 251 comparator, order 252 ); 253 sqlx::query_as(&query) 254 .bind(user_id) 255 .bind(&input.collection) 256 .bind(cursor) 257 .bind(limit_i64) 258 .fetch_all(&state.db) 259 .await 260 } else { 261 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 262 let mut param_idx = 3; 263 if input.rkey_start.is_some() { 264 conditions.push("rkey > $3"); 265 param_idx += 1; 266 } 267 if input.rkey_end.is_some() { 268 conditions.push(if param_idx == 3 { 269 "rkey < $3" 270 } else { 271 "rkey < $4" 272 }); 273 param_idx += 1; 274 } 275 let limit_idx = param_idx; 276 let query = format!( 277 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 278 conditions.join(" AND "), 279 order, 280 limit_idx 281 ); 282 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 283 .bind(user_id) 284 .bind(&input.collection); 285 if let Some(start) = &input.rkey_start { 286 query_builder = query_builder.bind(start); 287 } 288 if let Some(end) = &input.rkey_end { 289 query_builder = query_builder.bind(end); 290 } 291 query_builder.bind(limit_i64).fetch_all(&state.db).await 292 }; 293 let rows = match rows_res { 294 Ok(r) => r, 295 Err(e) => { 296 error!("Error listing records: {:?}", e); 297 return ( 298 StatusCode::INTERNAL_SERVER_ERROR, 299 Json(json!({"error": "InternalError"})), 300 ) 301 .into_response(); 302 } 303 }; 304 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 305 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 306 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 307 for (rkey, cid_str) in &rows { 308 if let Ok(cid) = Cid::from_str(cid_str) { 309 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 310 cids.push(cid); 311 } 312 } 313 let blocks = match state.block_store.get_many(&cids).await { 314 Ok(b) => b, 315 Err(e) => { 316 error!("Error fetching blocks: {:?}", e); 317 return ( 318 StatusCode::INTERNAL_SERVER_ERROR, 319 Json(json!({"error": "InternalError"})), 320 ) 321 .into_response(); 322 } 323 }; 324 let mut records = Vec::new(); 325 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 326 if let Some(block) = block_opt 327 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 328 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) 329 { 330 records.push(json!({ 331 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 332 "cid": cid_str, 333 "value": value 334 })); 335 } 336 } 337 Json(ListRecordsOutput { 338 cursor: last_rkey, 339 records, 340 }) 341 .into_response() 342}