this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, RawQuery, State}, 6 http::StatusCode, 7 response::{IntoResponse, Response}, 8}; 9use cid::Cid; 10use jacquard_repo::storage::BlockStore; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use std::collections::HashMap; 14use std::str::FromStr; 15use tracing::{error, info}; 16 17#[derive(Deserialize)] 18pub struct GetRecordInput { 19 pub repo: String, 20 pub collection: String, 21 pub rkey: String, 22 pub cid: Option<String>, 23} 24 25async fn proxy_get_record_to_appview(state: &AppState, raw_query: Option<&str>) -> Response { 26 let resolved = match state.appview_registry.get_appview_for_method("com.atproto.repo.getRecord").await { 27 Some(r) => r, 28 None => { 29 return ( 30 StatusCode::NOT_FOUND, 31 Json(json!({"error": "NotFound", "message": "Repo not found"})), 32 ) 33 .into_response(); 34 } 35 }; 36 let target_url = match raw_query { 37 Some(q) => format!("{}/xrpc/com.atproto.repo.getRecord?{}", resolved.url, q), 38 None => format!("{}/xrpc/com.atproto.repo.getRecord", resolved.url), 39 }; 40 info!("Proxying getRecord to AppView: {}", target_url); 41 let client = proxy_client(); 42 match client.get(&target_url).send().await { 43 Ok(resp) => { 44 let status = 45 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 46 let content_type = resp 47 .headers() 48 .get("content-type") 49 .and_then(|v| v.to_str().ok()) 50 .map(|s| s.to_string()); 51 match resp.bytes().await { 52 Ok(body) => { 53 let mut builder = Response::builder().status(status); 54 if let Some(ct) = content_type { 55 builder = builder.header("content-type", ct); 56 } 57 builder 58 .body(axum::body::Body::from(body)) 59 .unwrap_or_else(|_| { 60 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response() 61 }) 62 } 63 Err(e) => { 64 error!("Error reading AppView response: {:?}", e); 65 ( 66 StatusCode::BAD_GATEWAY, 67 Json(json!({"error": "UpstreamError"})), 68 ) 69 .into_response() 70 } 71 } 72 } 73 Err(e) => { 74 error!("Error proxying to AppView: {:?}", e); 75 ( 76 StatusCode::BAD_GATEWAY, 77 Json(json!({"error": "UpstreamError"})), 78 ) 79 .into_response() 80 } 81 } 82} 83 84pub async fn get_record( 85 State(state): State<AppState>, 86 Query(input): Query<GetRecordInput>, 87 RawQuery(raw_query): RawQuery, 88) -> Response { 89 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 90 let user_id_opt = if input.repo.starts_with("did:") { 91 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 92 .fetch_optional(&state.db) 93 .await 94 .map(|opt| opt.map(|r| r.id)) 95 } else { 96 let suffix = format!(".{}", hostname); 97 let short_handle = if input.repo.ends_with(&suffix) { 98 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 99 } else { 100 &input.repo 101 }; 102 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 103 .fetch_optional(&state.db) 104 .await 105 .map(|opt| opt.map(|r| r.id)) 106 }; 107 let user_id: uuid::Uuid = match user_id_opt { 108 Ok(Some(id)) => id, 109 _ => { 110 return proxy_get_record_to_appview(&state, raw_query.as_deref()).await; 111 } 112 }; 113 let record_row = sqlx::query!( 114 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 115 user_id, 116 input.collection, 117 input.rkey 118 ) 119 .fetch_optional(&state.db) 120 .await; 121 let record_cid_str: String = match record_row { 122 Ok(Some(row)) => row.record_cid, 123 _ => { 124 return ( 125 StatusCode::NOT_FOUND, 126 Json(json!({"error": "NotFound", "message": "Record not found"})), 127 ) 128 .into_response(); 129 } 130 }; 131 if let Some(expected_cid) = &input.cid 132 && &record_cid_str != expected_cid { 133 return ( 134 StatusCode::NOT_FOUND, 135 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 136 ) 137 .into_response(); 138 } 139 let cid = match Cid::from_str(&record_cid_str) { 140 Ok(c) => c, 141 Err(_) => { 142 return ( 143 StatusCode::INTERNAL_SERVER_ERROR, 144 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 145 ) 146 .into_response(); 147 } 148 }; 149 let block = match state.block_store.get(&cid).await { 150 Ok(Some(b)) => b, 151 _ => { 152 return ( 153 StatusCode::INTERNAL_SERVER_ERROR, 154 Json(json!({"error": "InternalError", "message": "Record block not found"})), 155 ) 156 .into_response(); 157 } 158 }; 159 let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) { 160 Ok(v) => v, 161 Err(e) => { 162 error!("Failed to deserialize record: {:?}", e); 163 return ( 164 StatusCode::INTERNAL_SERVER_ERROR, 165 Json(json!({"error": "InternalError"})), 166 ) 167 .into_response(); 168 } 169 }; 170 Json(json!({ 171 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 172 "cid": record_cid_str, 173 "value": value 174 })) 175 .into_response() 176} 177#[derive(Deserialize)] 178pub struct ListRecordsInput { 179 pub repo: String, 180 pub collection: String, 181 pub limit: Option<i32>, 182 pub cursor: Option<String>, 183 #[serde(rename = "rkeyStart")] 184 pub rkey_start: Option<String>, 185 #[serde(rename = "rkeyEnd")] 186 pub rkey_end: Option<String>, 187 pub reverse: Option<bool>, 188} 189#[derive(Serialize)] 190pub struct ListRecordsOutput { 191 pub cursor: Option<String>, 192 pub records: Vec<serde_json::Value>, 193} 194 195async fn proxy_list_records_to_appview(state: &AppState, raw_query: Option<&str>) -> Response { 196 let resolved = match state.appview_registry.get_appview_for_method("com.atproto.repo.listRecords").await { 197 Some(r) => r, 198 None => { 199 return ( 200 StatusCode::NOT_FOUND, 201 Json(json!({"error": "NotFound", "message": "Repo not found"})), 202 ) 203 .into_response(); 204 } 205 }; 206 let target_url = match raw_query { 207 Some(q) => format!("{}/xrpc/com.atproto.repo.listRecords?{}", resolved.url, q), 208 None => format!("{}/xrpc/com.atproto.repo.listRecords", resolved.url), 209 }; 210 info!("Proxying listRecords to AppView: {}", target_url); 211 let client = proxy_client(); 212 match client.get(&target_url).send().await { 213 Ok(resp) => { 214 let status = 215 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 216 let content_type = resp 217 .headers() 218 .get("content-type") 219 .and_then(|v| v.to_str().ok()) 220 .map(|s| s.to_string()); 221 match resp.bytes().await { 222 Ok(body) => { 223 let mut builder = Response::builder().status(status); 224 if let Some(ct) = content_type { 225 builder = builder.header("content-type", ct); 226 } 227 builder 228 .body(axum::body::Body::from(body)) 229 .unwrap_or_else(|_| { 230 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response() 231 }) 232 } 233 Err(e) => { 234 error!("Error reading AppView response: {:?}", e); 235 (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response() 236 } 237 } 238 } 239 Err(e) => { 240 error!("Error proxying to AppView: {:?}", e); 241 (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response() 242 } 243 } 244} 245 246pub async fn list_records( 247 State(state): State<AppState>, 248 Query(input): Query<ListRecordsInput>, 249 RawQuery(raw_query): RawQuery, 250) -> Response { 251 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 252 let user_id_opt = if input.repo.starts_with("did:") { 253 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 254 .fetch_optional(&state.db) 255 .await 256 .map(|opt| opt.map(|r| r.id)) 257 } else { 258 let suffix = format!(".{}", hostname); 259 let short_handle = if input.repo.ends_with(&suffix) { 260 input.repo.strip_suffix(&suffix).unwrap_or(&input.repo) 261 } else { 262 &input.repo 263 }; 264 sqlx::query!("SELECT id FROM users WHERE handle = $1", short_handle) 265 .fetch_optional(&state.db) 266 .await 267 .map(|opt| opt.map(|r| r.id)) 268 }; 269 let user_id: uuid::Uuid = match user_id_opt { 270 Ok(Some(id)) => id, 271 _ => { 272 return proxy_list_records_to_appview(&state, raw_query.as_deref()).await; 273 } 274 }; 275 let limit = input.limit.unwrap_or(50).clamp(1, 100); 276 let reverse = input.reverse.unwrap_or(false); 277 let limit_i64 = limit as i64; 278 let order = if reverse { "ASC" } else { "DESC" }; 279 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 280 let comparator = if reverse { ">" } else { "<" }; 281 let query = format!( 282 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 283 comparator, order 284 ); 285 sqlx::query_as(&query) 286 .bind(user_id) 287 .bind(&input.collection) 288 .bind(cursor) 289 .bind(limit_i64) 290 .fetch_all(&state.db) 291 .await 292 } else { 293 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 294 let mut param_idx = 3; 295 if input.rkey_start.is_some() { 296 conditions.push("rkey > $3"); 297 param_idx += 1; 298 } 299 if input.rkey_end.is_some() { 300 conditions.push(if param_idx == 3 { 301 "rkey < $3" 302 } else { 303 "rkey < $4" 304 }); 305 param_idx += 1; 306 } 307 let limit_idx = param_idx; 308 let query = format!( 309 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 310 conditions.join(" AND "), 311 order, 312 limit_idx 313 ); 314 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 315 .bind(user_id) 316 .bind(&input.collection); 317 if let Some(start) = &input.rkey_start { 318 query_builder = query_builder.bind(start); 319 } 320 if let Some(end) = &input.rkey_end { 321 query_builder = query_builder.bind(end); 322 } 323 query_builder.bind(limit_i64).fetch_all(&state.db).await 324 }; 325 let rows = match rows_res { 326 Ok(r) => r, 327 Err(e) => { 328 error!("Error listing records: {:?}", e); 329 return ( 330 StatusCode::INTERNAL_SERVER_ERROR, 331 Json(json!({"error": "InternalError"})), 332 ) 333 .into_response(); 334 } 335 }; 336 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 337 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 338 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 339 for (rkey, cid_str) in &rows { 340 if let Ok(cid) = Cid::from_str(cid_str) { 341 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 342 cids.push(cid); 343 } 344 } 345 let blocks = match state.block_store.get_many(&cids).await { 346 Ok(b) => b, 347 Err(e) => { 348 error!("Error fetching blocks: {:?}", e); 349 return ( 350 StatusCode::INTERNAL_SERVER_ERROR, 351 Json(json!({"error": "InternalError"})), 352 ) 353 .into_response(); 354 } 355 }; 356 let mut records = Vec::new(); 357 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 358 if let Some(block) = block_opt 359 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 360 && let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) { 361 records.push(json!({ 362 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 363 "cid": cid_str, 364 "value": value 365 })); 366 } 367 } 368 Json(ListRecordsOutput { 369 cursor: last_rkey, 370 records, 371 }) 372 .into_response() 373}