this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use std::collections::HashMap; 14use std::str::FromStr; 15use tracing::{error, info}; 16 17#[derive(Deserialize)] 18pub struct GetRecordInput { 19 pub repo: String, 20 pub collection: String, 21 pub rkey: String, 22 pub cid: Option<String>, 23} 24 25pub async fn get_record( 26 State(state): State<AppState>, 27 headers: HeaderMap, 28 Query(input): Query<GetRecordInput>, 29) -> Response { 30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 31 let user_id_opt = if input.repo.starts_with("did:") { 32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 33 .fetch_optional(&state.db) 34 .await 35 .map(|opt| opt.map(|r| r.id)) 36 } else { 37 let suffix = format!(".{}", hostname); 38 let short_handle = if input.repo.ends_with(&suffix) { 39 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 40 } else { 41 &input.repo 42 }; 43 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 44 .fetch_optional(&state.db) 45 .await 46 .map(|opt| opt.map(|r| r.id)) 47 }; 48 let user_id: uuid::Uuid = match user_id_opt { 49 Ok(Some(id)) => id, 50 Ok(None) => { 51 if let Some(proxy_header) = headers 52 .get("atproto-proxy") 53 .and_then(|h| h.to_str().ok()) 54 { 55 let did = proxy_header.split('#').next().unwrap_or(proxy_header); 56 if let Some(resolved) = state.did_resolver.resolve_did(did).await { 57 let mut url = format!( 58 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 59 resolved.url.trim_end_matches('/'), 60 urlencoding::encode(&input.repo), 61 urlencoding::encode(&input.collection), 62 urlencoding::encode(&input.rkey) 63 ); 64 if let Some(cid) = &input.cid { 65 url.push_str(&format!("&cid={}", urlencoding::encode(cid))); 66 } 67 info!("Proxying getRecord to {}: {}", did, url); 68 match proxy_client().get(&url).send().await { 69 Ok(resp) => { 70 let status = resp.status(); 71 let body = match resp.bytes().await { 72 Ok(b) => b, 73 Err(e) => { 74 error!("Error reading proxy response: {:?}", e); 75 return ( 76 StatusCode::BAD_GATEWAY, 77 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})), 78 ) 79 .into_response(); 80 } 81 }; 82 return Response::builder() 83 .status(status) 84 .header("content-type", "application/json") 85 .body(axum::body::Body::from(body)) 86 .unwrap_or_else(|_| { 87 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response() 88 }); 89 } 90 Err(e) => { 91 error!("Error proxying request: {:?}", e); 92 return ( 93 StatusCode::BAD_GATEWAY, 94 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})), 95 ) 96 .into_response(); 97 } 98 } 99 } else { 100 error!("Could not resolve DID from atproto-proxy header: {}", did); 101 return ( 102 StatusCode::BAD_GATEWAY, 103 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})), 104 ) 105 .into_response(); 106 } 107 } 108 return ( 109 StatusCode::NOT_FOUND, 110 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 111 ) 112 .into_response(); 113 } 114 Err(_) => { 115 return ( 116 StatusCode::INTERNAL_SERVER_ERROR, 117 Json(json!({"error": "InternalError"})), 118 ) 119 .into_response(); 120 } 121 }; 122 let record_row = sqlx::query!( 123 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 124 user_id, 125 input.collection, 126 input.rkey 127 ) 128 .fetch_optional(&state.db) 129 .await; 130 let record_cid_str: String = match record_row { 131 Ok(Some(row)) => row.record_cid, 132 _ => { 133 return ( 134 StatusCode::NOT_FOUND, 135 Json(json!({"error": "NotFound", "message": "Record not found"})), 136 ) 137 .into_response(); 138 } 139 }; 140 if let Some(expected_cid) = &input.cid 141 && &record_cid_str != expected_cid { 142 return ( 143 StatusCode::NOT_FOUND, 144 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 145 ) 146 .into_response(); 147 } 148 let cid = match Cid::from_str(&record_cid_str) { 149 Ok(c) => c, 150 Err(_) => { 151 return ( 152 StatusCode::INTERNAL_SERVER_ERROR, 153 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 154 ) 155 .into_response(); 156 } 157 }; 158 let block = match state.block_store.get(&cid).await { 159 Ok(Some(b)) => b, 160 _ => { 161 return ( 162 StatusCode::INTERNAL_SERVER_ERROR, 163 Json(json!({"error": "InternalError", "message": "Record block not found"})), 164 ) 165 .into_response(); 166 } 167 }; 168 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 169 Ok(v) => v, 170 Err(e) => { 171 error!("Failed to deserialize record: {:?}", e); 172 return ( 173 StatusCode::INTERNAL_SERVER_ERROR, 174 Json(json!({"error": "InternalError"})), 175 ) 176 .into_response(); 177 } 178 }; 179 Json(json!({ 180 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 181 "cid": record_cid_str, 182 "value": value 183 })) 184 .into_response() 185} 186#[derive(Deserialize)] 187pub struct ListRecordsInput { 188 pub repo: String, 189 pub collection: String, 190 pub limit: Option<i32>, 191 pub cursor: Option<String>, 192 #[serde(rename = "rkeyStart")] 193 pub rkey_start: Option<String>, 194 #[serde(rename = "rkeyEnd")] 195 pub rkey_end: Option<String>, 196 pub reverse: Option<bool>, 197} 198#[derive(Serialize)] 199pub struct ListRecordsOutput { 200 #[serde(skip_serializing_if = "Option::is_none")] 201 pub cursor: Option<String>, 202 pub records: Vec<serde_json::Value>, 203} 204 205pub async fn list_records( 206 State(state): State<AppState>, 207 Query(input): Query<ListRecordsInput>, 208) -> Response { 209 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 210 let user_id_opt = if input.repo.starts_with("did:") { 211 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 212 .fetch_optional(&state.db) 213 .await 214 .map(|opt| opt.map(|r| r.id)) 215 } else { 216 let suffix = format!(".{}", hostname); 217 let short_handle = if input.repo.ends_with(&suffix) { 218 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 219 } else { 220 &input.repo 221 }; 222 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 223 .fetch_optional(&state.db) 224 .await 225 .map(|opt| opt.map(|r| r.id)) 226 }; 227 let user_id: uuid::Uuid = match user_id_opt { 228 Ok(Some(id)) => id, 229 Ok(None) => { 230 return ( 231 StatusCode::NOT_FOUND, 232 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 233 ) 234 .into_response(); 235 } 236 Err(_) => { 237 return ( 238 StatusCode::INTERNAL_SERVER_ERROR, 239 Json(json!({"error": "InternalError"})), 240 ) 241 .into_response(); 242 } 243 }; 244 let limit = input.limit.unwrap_or(50).clamp(1, 100); 245 let reverse = input.reverse.unwrap_or(false); 246 let limit_i64 = limit as i64; 247 let order = if reverse { "ASC" } else { "DESC" }; 248 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 249 let comparator = if reverse { ">" } else { "<" }; 250 let query = format!( 251 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 252 comparator, order 253 ); 254 sqlx::query_as(&query) 255 .bind(user_id) 256 .bind(&input.collection) 257 .bind(cursor) 258 .bind(limit_i64) 259 .fetch_all(&state.db) 260 .await 261 } else { 262 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 263 let mut param_idx = 3; 264 if input.rkey_start.is_some() { 265 conditions.push("rkey > $3"); 266 param_idx += 1; 267 } 268 if input.rkey_end.is_some() { 269 conditions.push(if param_idx == 3 { 270 "rkey < $3" 271 } else { 272 "rkey < $4" 273 }); 274 param_idx += 1; 275 } 276 let limit_idx = param_idx; 277 let query = format!( 278 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 279 conditions.join(" AND "), 280 order, 281 limit_idx 282 ); 283 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 284 .bind(user_id) 285 .bind(&input.collection); 286 if let Some(start) = &input.rkey_start { 287 query_builder = query_builder.bind(start); 288 } 289 if let Some(end) = &input.rkey_end { 290 query_builder = query_builder.bind(end); 291 } 292 query_builder.bind(limit_i64).fetch_all(&state.db).await 293 }; 294 let rows = match rows_res { 295 Ok(r) => r, 296 Err(e) => { 297 error!("Error listing records: {:?}", e); 298 return ( 299 StatusCode::INTERNAL_SERVER_ERROR, 300 Json(json!({"error": "InternalError"})), 301 ) 302 .into_response(); 303 } 304 }; 305 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 306 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 307 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 308 for (rkey, cid_str) in &rows { 309 if let Ok(cid) = Cid::from_str(cid_str) { 310 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 311 cids.push(cid); 312 } 313 } 314 let blocks = match state.block_store.get_many(&cids).await { 315 Ok(b) => b, 316 Err(e) => { 317 error!("Error fetching blocks: {:?}", e); 318 return ( 319 StatusCode::INTERNAL_SERVER_ERROR, 320 Json(json!({"error": "InternalError"})), 321 ) 322 .into_response(); 323 } 324 }; 325 let mut records = Vec::new(); 326 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 327 if let Some(block) = block_opt 328 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 329 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 330 records.push(json!({ 331 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 332 "cid": cid_str, 333 "value": value 334 })); 335 } 336 } 337 Json(ListRecordsOutput { 338 cursor: last_rkey, 339 records, 340 }) 341 .into_response() 342}