this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use base64::Engine; 10use cid::Cid; 11use ipld_core::ipld::Ipld; 12use jacquard_repo::storage::BlockStore; 13use serde::{Deserialize, Serialize}; 14use serde_json::{json, Map, Value}; 15use std::collections::HashMap; 16use std::str::FromStr; 17use tracing::{error, info}; 18 19fn ipld_to_json(ipld: Ipld) -> Value { 20 match ipld { 21 Ipld::Null => Value::Null, 22 Ipld::Bool(b) => Value::Bool(b), 23 Ipld::Integer(i) => { 24 if let Ok(n) = i64::try_from(i) { 25 Value::Number(n.into()) 26 } else { 27 Value::String(i.to_string()) 28 } 29 } 30 Ipld::Float(f) => serde_json::Number::from_f64(f) 31 .map(Value::Number) 32 .unwrap_or(Value::Null), 33 Ipld::String(s) => Value::String(s), 34 Ipld::Bytes(b) => { 35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b); 36 json!({ "$bytes": encoded }) 37 } 38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()), 39 Ipld::Map(map) => { 40 let obj: Map<String, Value> = map 41 .into_iter() 42 .map(|(k, v)| (k, ipld_to_json(v))) 43 .collect(); 44 Value::Object(obj) 45 } 46 Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 47 } 48} 49 50#[derive(Deserialize)] 51pub struct GetRecordInput { 52 pub repo: String, 53 pub collection: String, 54 pub rkey: String, 55 pub cid: Option<String>, 56} 57 58pub async fn get_record( 59 State(state): State<AppState>, 60 headers: HeaderMap, 61 Query(input): Query<GetRecordInput>, 62) -> Response { 63 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 64 let user_id_opt = if input.repo.starts_with("did:") { 65 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 66 .fetch_optional(&state.db) 67 .await 68 .map(|opt| opt.map(|r| r.id)) 69 } else { 70 let handle = if !input.repo.contains('.') { 71 format!("{}.{}", input.repo, hostname) 72 } else { 73 input.repo.clone() 74 }; 75 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 76 .fetch_optional(&state.db) 77 .await 78 .map(|opt| opt.map(|r| r.id)) 79 }; 80 let user_id: uuid::Uuid = match user_id_opt { 81 Ok(Some(id)) => id, 82 Ok(None) => { 83 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) { 84 let did = proxy_header.split('#').next().unwrap_or(proxy_header); 85 if let Some(resolved) = state.did_resolver.resolve_did(did).await { 86 let mut url = format!( 87 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 88 resolved.url.trim_end_matches('/'), 89 urlencoding::encode(&input.repo), 90 urlencoding::encode(&input.collection), 91 urlencoding::encode(&input.rkey) 92 ); 93 if let Some(cid) = &input.cid { 94 url.push_str(&format!("&cid={}", urlencoding::encode(cid))); 95 } 96 info!("Proxying getRecord to {}: {}", did, url); 97 match proxy_client().get(&url).send().await { 98 Ok(resp) => { 99 let status = resp.status(); 100 let body = match resp.bytes().await { 101 Ok(b) => b, 102 Err(e) => { 103 error!("Error reading proxy response: {:?}", e); 104 return ( 105 StatusCode::BAD_GATEWAY, 106 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})), 107 ) 108 .into_response(); 109 } 110 }; 111 return Response::builder() 112 .status(status) 113 .header("content-type", "application/json") 114 .body(axum::body::Body::from(body)) 115 .unwrap_or_else(|_| { 116 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error") 117 .into_response() 118 }); 119 } 120 Err(e) => { 121 error!("Error proxying request: {:?}", e); 122 return ( 123 StatusCode::BAD_GATEWAY, 124 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})), 125 ) 126 .into_response(); 127 } 128 } 129 } else { 130 error!("Could not resolve DID from atproto-proxy header: {}", did); 131 return ( 132 StatusCode::BAD_GATEWAY, 133 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})), 134 ) 135 .into_response(); 136 } 137 } 138 return ( 139 StatusCode::NOT_FOUND, 140 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 141 ) 142 .into_response(); 143 } 144 Err(_) => { 145 return ( 146 StatusCode::INTERNAL_SERVER_ERROR, 147 Json(json!({"error": "InternalError"})), 148 ) 149 .into_response(); 150 } 151 }; 152 let record_row = sqlx::query!( 153 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 154 user_id, 155 input.collection, 156 input.rkey 157 ) 158 .fetch_optional(&state.db) 159 .await; 160 let record_cid_str: String = match record_row { 161 Ok(Some(row)) => row.record_cid, 162 _ => { 163 return ( 164 StatusCode::NOT_FOUND, 165 Json(json!({"error": "NotFound", "message": "Record not found"})), 166 ) 167 .into_response(); 168 } 169 }; 170 if let Some(expected_cid) = &input.cid 171 && &record_cid_str != expected_cid 172 { 173 return ( 174 StatusCode::NOT_FOUND, 175 Json(json!({"error": "NotFound", "message": "Record CID mismatch"})), 176 ) 177 .into_response(); 178 } 179 let cid = match Cid::from_str(&record_cid_str) { 180 Ok(c) => c, 181 Err(_) => { 182 return ( 183 StatusCode::INTERNAL_SERVER_ERROR, 184 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 185 ) 186 .into_response(); 187 } 188 }; 189 let block = match state.block_store.get(&cid).await { 190 Ok(Some(b)) => b, 191 _ => { 192 return ( 193 StatusCode::INTERNAL_SERVER_ERROR, 194 Json(json!({"error": "InternalError", "message": "Record block not found"})), 195 ) 196 .into_response(); 197 } 198 }; 199 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) { 200 Ok(v) => v, 201 Err(e) => { 202 error!("Failed to deserialize record: {:?}", e); 203 return ( 204 StatusCode::INTERNAL_SERVER_ERROR, 205 Json(json!({"error": "InternalError"})), 206 ) 207 .into_response(); 208 } 209 }; 210 let value = ipld_to_json(ipld); 211 Json(json!({ 212 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 213 "cid": record_cid_str, 214 "value": value 215 })) 216 .into_response() 217} 218#[derive(Deserialize)] 219pub struct ListRecordsInput { 220 pub repo: String, 221 pub collection: String, 222 pub limit: Option<i32>, 223 pub cursor: Option<String>, 224 #[serde(rename = "rkeyStart")] 225 pub rkey_start: Option<String>, 226 #[serde(rename = "rkeyEnd")] 227 pub rkey_end: Option<String>, 228 pub reverse: Option<bool>, 229} 230#[derive(Serialize)] 231pub struct ListRecordsOutput { 232 #[serde(skip_serializing_if = "Option::is_none")] 233 pub cursor: Option<String>, 234 pub records: Vec<serde_json::Value>, 235} 236 237pub async fn list_records( 238 State(state): State<AppState>, 239 Query(input): Query<ListRecordsInput>, 240) -> Response { 241 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 242 let user_id_opt = if input.repo.starts_with("did:") { 243 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 244 .fetch_optional(&state.db) 245 .await 246 .map(|opt| opt.map(|r| r.id)) 247 } else { 248 let handle = if !input.repo.contains('.') { 249 format!("{}.{}", input.repo, hostname) 250 } else { 251 input.repo.clone() 252 }; 253 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 254 .fetch_optional(&state.db) 255 .await 256 .map(|opt| opt.map(|r| r.id)) 257 }; 258 let user_id: uuid::Uuid = match user_id_opt { 259 Ok(Some(id)) => id, 260 Ok(None) => { 261 return ( 262 StatusCode::NOT_FOUND, 263 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 264 ) 265 .into_response(); 266 } 267 Err(_) => { 268 return ( 269 StatusCode::INTERNAL_SERVER_ERROR, 270 Json(json!({"error": "InternalError"})), 271 ) 272 .into_response(); 273 } 274 }; 275 let limit = input.limit.unwrap_or(50).clamp(1, 100); 276 let reverse = input.reverse.unwrap_or(false); 277 let limit_i64 = limit as i64; 278 let order = if reverse { "ASC" } else { "DESC" }; 279 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 280 let comparator = if reverse { ">" } else { "<" }; 281 let query = format!( 282 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 283 comparator, order 284 ); 285 sqlx::query_as(&query) 286 .bind(user_id) 287 .bind(&input.collection) 288 .bind(cursor) 289 .bind(limit_i64) 290 .fetch_all(&state.db) 291 .await 292 } else { 293 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 294 let mut param_idx = 3; 295 if input.rkey_start.is_some() { 296 conditions.push("rkey > $3"); 297 param_idx += 1; 298 } 299 if input.rkey_end.is_some() { 300 conditions.push(if param_idx == 3 { 301 "rkey < $3" 302 } else { 303 "rkey < $4" 304 }); 305 param_idx += 1; 306 } 307 let limit_idx = param_idx; 308 let query = format!( 309 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 310 conditions.join(" AND "), 311 order, 312 limit_idx 313 ); 314 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 315 .bind(user_id) 316 .bind(&input.collection); 317 if let Some(start) = &input.rkey_start { 318 query_builder = query_builder.bind(start); 319 } 320 if let Some(end) = &input.rkey_end { 321 query_builder = query_builder.bind(end); 322 } 323 query_builder.bind(limit_i64).fetch_all(&state.db).await 324 }; 325 let rows = match rows_res { 326 Ok(r) => r, 327 Err(e) => { 328 error!("Error listing records: {:?}", e); 329 return ( 330 StatusCode::INTERNAL_SERVER_ERROR, 331 Json(json!({"error": "InternalError"})), 332 ) 333 .into_response(); 334 } 335 }; 336 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 337 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 338 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 339 for (rkey, cid_str) in &rows { 340 if let Ok(cid) = Cid::from_str(cid_str) { 341 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 342 cids.push(cid); 343 } 344 } 345 let blocks = match state.block_store.get_many(&cids).await { 346 Ok(b) => b, 347 Err(e) => { 348 error!("Error fetching blocks: {:?}", e); 349 return ( 350 StatusCode::INTERNAL_SERVER_ERROR, 351 Json(json!({"error": "InternalError"})), 352 ) 353 .into_response(); 354 } 355 }; 356 let mut records = Vec::new(); 357 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 358 if let Some(block) = block_opt 359 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 360 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) 361 { 362 let value = ipld_to_json(ipld); 363 records.push(json!({ 364 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 365 "cid": cid_str, 366 "value": value 367 })); 368 } 369 } 370 Json(ListRecordsOutput { 371 cursor: last_rkey, 372 records, 373 }) 374 .into_response() 375}