this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use std::collections::HashMap; 14use std::str::FromStr; 15use tracing::{error, info}; 16 17#[derive(Deserialize)] 18pub struct GetRecordInput { 19 pub repo: String, 20 pub collection: String, 21 pub rkey: String, 22 pub cid: Option<String>, 23} 24 25pub async fn get_record( 26 State(state): State<AppState>, 27 headers: HeaderMap, 28 Query(input): Query<GetRecordInput>, 29) -> Response { 30 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 31 let user_id_opt = if input.repo.starts_with("did:") { 32 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 33 .fetch_optional(&state.db) 34 .await 35 .map(|opt| opt.map(|r| r.id)) 36 } else { 37 let handle = if !input.repo.contains('.') { 38 format!("{}.{}", input.repo, hostname) 39 } else { 40 input.repo.clone() 41 }; 42 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 43 .fetch_optional(&state.db) 44 .await 45 .map(|opt| opt.map(|r| r.id)) 46 }; 47 let user_id: uuid::Uuid = match user_id_opt { 48 Ok(Some(id)) => id, 49 Ok(None) => { 50 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) { 51 let did = proxy_header.split('#').next().unwrap_or(proxy_header); 52 if let Some(resolved) = state.did_resolver.resolve_did(did).await { 53 let mut url = format!( 54 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 55 resolved.url.trim_end_matches('/'), 56 urlencoding::encode(&input.repo), 57 urlencoding::encode(&input.collection), 58 urlencoding::encode(&input.rkey) 59 ); 60 if let Some(cid) = &input.cid { 61 url.push_str(&format!("&cid={}", urlencoding::encode(cid))); 62 } 63 info!("Proxying getRecord to {}: {}", did, url); 64 match proxy_client().get(&url).send().await { 65 Ok(resp) => { 66 let status = resp.status(); 67 let body = match resp.bytes().await { 68 Ok(b) => b, 69 Err(e) => { 70 error!("Error reading proxy response: {:?}", e); 71 return ( 72 StatusCode::BAD_GATEWAY, 73 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})), 74 ) 75 .into_response(); 76 } 77 }; 78 return Response::builder() 79 .status(status) 80 .header("content-type", "application/json") 81 .body(axum::body::Body::from(body)) 82 .unwrap_or_else(|_| { 83 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error") 84 .into_response() 85 }); 86 } 87 Err(e) => { 88 error!("Error proxying request: {:?}", e); 89 return ( 90 StatusCode::BAD_GATEWAY, 91 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})), 92 ) 93 .into_response(); 94 } 95 } 96 } else { 97 error!("Could not resolve DID from atproto-proxy header: {}", did); 98 return ( 99 StatusCode::BAD_GATEWAY, 100 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})), 101 ) 102 .into_response(); 103 } 104 } 105 return ( 106 StatusCode::NOT_FOUND, 107 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 108 ) 109 .into_response(); 110 } 111 Err(_) => { 112 return ( 113 StatusCode::INTERNAL_SERVER_ERROR, 114 Json(json!({"error": "InternalError"})), 115 ) 116 .into_response(); 117 } 118 }; 119 let record_row = sqlx::query!( 120 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 121 user_id, 122 input.collection, 123 input.rkey 124 ) 125 .fetch_optional(&state.db) 126 .await; 127 let record_cid_str: String = match record_row { 128 Ok(Some(row)) => row.record_cid, 129 _ => { 130 return ( 131 StatusCode::NOT_FOUND, 132 Json(json!({"error": "NotFound", "message": "Record not found"})), 133 ) 134 .into_response(); 135 } 136 }; 137 if let Some(expected_cid) = &input.cid 138 && &record_cid_str != expected_cid 139 { 140 return ( 141 StatusCode::NOT_FOUND, 142 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 143 ) 144 .into_response(); 145 } 146 let cid = match Cid::from_str(&record_cid_str) { 147 Ok(c) => c, 148 Err(_) => { 149 return ( 150 StatusCode::INTERNAL_SERVER_ERROR, 151 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 152 ) 153 .into_response(); 154 } 155 }; 156 let block = match state.block_store.get(&cid).await { 157 Ok(Some(b)) => b, 158 _ => { 159 return ( 160 StatusCode::INTERNAL_SERVER_ERROR, 161 Json(json!({"error": "InternalError", "message": "Record block not found"})), 162 ) 163 .into_response(); 164 } 165 }; 166 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 167 Ok(v) => v, 168 Err(e) => { 169 error!("Failed to deserialize record: {:?}", e); 170 return ( 171 StatusCode::INTERNAL_SERVER_ERROR, 172 Json(json!({"error": "InternalError"})), 173 ) 174 .into_response(); 175 } 176 }; 177 Json(json!({ 178 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 179 "cid": record_cid_str, 180 "value": value 181 })) 182 .into_response() 183} 184#[derive(Deserialize)] 185pub struct ListRecordsInput { 186 pub repo: String, 187 pub collection: String, 188 pub limit: Option<i32>, 189 pub cursor: Option<String>, 190 #[serde(rename = "rkeyStart")] 191 pub rkey_start: Option<String>, 192 #[serde(rename = "rkeyEnd")] 193 pub rkey_end: Option<String>, 194 pub reverse: Option<bool>, 195} 196#[derive(Serialize)] 197pub struct ListRecordsOutput { 198 #[serde(skip_serializing_if = "Option::is_none")] 199 pub cursor: Option<String>, 200 pub records: Vec<serde_json::Value>, 201} 202 203pub async fn list_records( 204 State(state): State<AppState>, 205 Query(input): Query<ListRecordsInput>, 206) -> Response { 207 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 208 let user_id_opt = if input.repo.starts_with("did:") { 209 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 210 .fetch_optional(&state.db) 211 .await 212 .map(|opt| opt.map(|r| r.id)) 213 } else { 214 let handle = if !input.repo.contains('.') { 215 format!("{}.{}", input.repo, hostname) 216 } else { 217 input.repo.clone() 218 }; 219 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 220 .fetch_optional(&state.db) 221 .await 222 .map(|opt| opt.map(|r| r.id)) 223 }; 224 let user_id: uuid::Uuid = match user_id_opt { 225 Ok(Some(id)) => id, 226 Ok(None) => { 227 return ( 228 StatusCode::NOT_FOUND, 229 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 230 ) 231 .into_response(); 232 } 233 Err(_) => { 234 return ( 235 StatusCode::INTERNAL_SERVER_ERROR, 236 Json(json!({"error": "InternalError"})), 237 ) 238 .into_response(); 239 } 240 }; 241 let limit = input.limit.unwrap_or(50).clamp(1, 100); 242 let reverse = input.reverse.unwrap_or(false); 243 let limit_i64 = limit as i64; 244 let order = if reverse { "ASC" } else { "DESC" }; 245 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 246 let comparator = if reverse { ">" } else { "<" }; 247 let query = format!( 248 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 249 comparator, order 250 ); 251 sqlx::query_as(&query) 252 .bind(user_id) 253 .bind(&input.collection) 254 .bind(cursor) 255 .bind(limit_i64) 256 .fetch_all(&state.db) 257 .await 258 } else { 259 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 260 let mut param_idx = 3; 261 if input.rkey_start.is_some() { 262 conditions.push("rkey > $3"); 263 param_idx += 1; 264 } 265 if input.rkey_end.is_some() { 266 conditions.push(if param_idx == 3 { 267 "rkey < $3" 268 } else { 269 "rkey < $4" 270 }); 271 param_idx += 1; 272 } 273 let limit_idx = param_idx; 274 let query = format!( 275 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 276 conditions.join(" AND "), 277 order, 278 limit_idx 279 ); 280 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 281 .bind(user_id) 282 .bind(&input.collection); 283 if let Some(start) = &input.rkey_start { 284 query_builder = query_builder.bind(start); 285 } 286 if let Some(end) = &input.rkey_end { 287 query_builder = query_builder.bind(end); 288 } 289 query_builder.bind(limit_i64).fetch_all(&state.db).await 290 }; 291 let rows = match rows_res { 292 Ok(r) => r, 293 Err(e) => { 294 error!("Error listing records: {:?}", e); 295 return ( 296 StatusCode::INTERNAL_SERVER_ERROR, 297 Json(json!({"error": "InternalError"})), 298 ) 299 .into_response(); 300 } 301 }; 302 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 303 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 304 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 305 for (rkey, cid_str) in &rows { 306 if let Ok(cid) = Cid::from_str(cid_str) { 307 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 308 cids.push(cid); 309 } 310 } 311 let blocks = match state.block_store.get_many(&cids).await { 312 Ok(b) => b, 313 Err(e) => { 314 error!("Error fetching blocks: {:?}", e); 315 return ( 316 StatusCode::INTERNAL_SERVER_ERROR, 317 Json(json!({"error": "InternalError"})), 318 ) 319 .into_response(); 320 } 321 }; 322 let mut records = Vec::new(); 323 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 324 if let Some(block) = block_opt 325 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 326 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) 327 { 328 records.push(json!({ 329 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 330 "cid": cid_str, 331 "value": value 332 })); 333 } 334 } 335 Json(ListRecordsOutput { 336 cursor: last_rkey, 337 records, 338 }) 339 .into_response() 340}