this repo has no description
1use crate::api::proxy_client::proxy_client; 2use crate::state::AppState; 3use axum::{ 4 Json, 5 extract::{Query, State}, 6 http::{HeaderMap, StatusCode}, 7 response::{IntoResponse, Response}, 8}; 9use base64::Engine; 10use cid::Cid; 11use ipld_core::ipld::Ipld; 12use jacquard_repo::storage::BlockStore; 13use serde::{Deserialize, Serialize}; 14use serde_json::{Map, Value, json}; 15use std::collections::HashMap; 16use std::str::FromStr; 17use tracing::{error, info}; 18 19fn ipld_to_json(ipld: Ipld) -> Value { 20 match ipld { 21 Ipld::Null => Value::Null, 22 Ipld::Bool(b) => Value::Bool(b), 23 Ipld::Integer(i) => { 24 if let Ok(n) = i64::try_from(i) { 25 Value::Number(n.into()) 26 } else { 27 Value::String(i.to_string()) 28 } 29 } 30 Ipld::Float(f) => serde_json::Number::from_f64(f) 31 .map(Value::Number) 32 .unwrap_or(Value::Null), 33 Ipld::String(s) => Value::String(s), 34 Ipld::Bytes(b) => { 35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b); 36 json!({ "$bytes": encoded }) 37 } 38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()), 39 Ipld::Map(map) => { 40 let obj: Map<String, Value> = 41 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect(); 42 Value::Object(obj) 43 } 44 Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 45 } 46} 47 48#[derive(Deserialize)] 49pub struct GetRecordInput { 50 pub repo: String, 51 pub collection: String, 52 pub rkey: String, 53 pub cid: Option<String>, 54} 55 56pub async fn get_record( 57 State(state): State<AppState>, 58 headers: HeaderMap, 59 Query(input): Query<GetRecordInput>, 60) -> Response { 61 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 62 let user_id_opt = if input.repo.starts_with("did:") { 63 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 64 .fetch_optional(&state.db) 65 .await 66 .map(|opt| opt.map(|r| r.id)) 67 } else { 68 let handle = if !input.repo.contains('.') { 69 format!("{}.{}", input.repo, hostname) 70 } else { 71 input.repo.clone() 72 }; 73 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 74 .fetch_optional(&state.db) 75 .await 76 .map(|opt| opt.map(|r| r.id)) 77 }; 78 let user_id: uuid::Uuid = match user_id_opt { 79 Ok(Some(id)) => id, 80 Ok(None) => { 81 if let Some(proxy_header) = headers.get("atproto-proxy").and_then(|h| h.to_str().ok()) { 82 let did = proxy_header.split('#').next().unwrap_or(proxy_header); 83 if let Some(resolved) = state.did_resolver.resolve_did(did).await { 84 let mut url = format!( 85 "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 86 resolved.url.trim_end_matches('/'), 87 urlencoding::encode(&input.repo), 88 urlencoding::encode(&input.collection), 89 urlencoding::encode(&input.rkey) 90 ); 91 if let Some(cid) = &input.cid { 92 url.push_str(&format!("&cid={}", urlencoding::encode(cid))); 93 } 94 info!("Proxying getRecord to {}: {}", did, url); 95 match proxy_client().get(&url).send().await { 96 Ok(resp) => { 97 let status = resp.status(); 98 let body = match resp.bytes().await { 99 Ok(b) => b, 100 Err(e) => { 101 error!("Error reading proxy response: {:?}", e); 102 return ( 103 StatusCode::BAD_GATEWAY, 104 Json(json!({"error": "UpstreamFailure", "message": "Error reading upstream response"})), 105 ) 106 .into_response(); 107 } 108 }; 109 return Response::builder() 110 .status(status) 111 .header("content-type", "application/json") 112 .body(axum::body::Body::from(body)) 113 .unwrap_or_else(|_| { 114 (StatusCode::INTERNAL_SERVER_ERROR, "Internal error") 115 .into_response() 116 }); 117 } 118 Err(e) => { 119 error!("Error proxying request: {:?}", e); 120 return ( 121 StatusCode::BAD_GATEWAY, 122 Json(json!({"error": "UpstreamFailure", "message": "Failed to reach upstream service"})), 123 ) 124 .into_response(); 125 } 126 } 127 } else { 128 error!("Could not resolve DID from atproto-proxy header: {}", did); 129 return ( 130 StatusCode::BAD_GATEWAY, 131 Json(json!({"error": "UpstreamFailure", "message": "Could not resolve proxy DID"})), 132 ) 133 .into_response(); 134 } 135 } 136 return ( 137 StatusCode::NOT_FOUND, 138 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 139 ) 140 .into_response(); 141 } 142 Err(_) => { 143 return ( 144 StatusCode::INTERNAL_SERVER_ERROR, 145 Json(json!({"error": "InternalError"})), 146 ) 147 .into_response(); 148 } 149 }; 150 let record_row = sqlx::query!( 151 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 152 user_id, 153 input.collection, 154 input.rkey 155 ) 156 .fetch_optional(&state.db) 157 .await; 158 let record_cid_str: String = match record_row { 159 Ok(Some(row)) => row.record_cid, 160 _ => { 161 return ( 162 StatusCode::NOT_FOUND, 163 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 164 ) 165 .into_response(); 166 } 167 }; 168 if let Some(expected_cid) = &input.cid 169 && &record_cid_str != expected_cid 170 { 171 return ( 172 StatusCode::NOT_FOUND, 173 Json(json!({"error": "RecordNotFound", "message": "Record CID mismatch"})), 174 ) 175 .into_response(); 176 } 177 let cid = match Cid::from_str(&record_cid_str) { 178 Ok(c) => c, 179 Err(_) => { 180 return ( 181 StatusCode::INTERNAL_SERVER_ERROR, 182 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 183 ) 184 .into_response(); 185 } 186 }; 187 let block = match state.block_store.get(&cid).await { 188 Ok(Some(b)) => b, 189 _ => { 190 return ( 191 StatusCode::INTERNAL_SERVER_ERROR, 192 Json(json!({"error": "InternalError", "message": "Record block not found"})), 193 ) 194 .into_response(); 195 } 196 }; 197 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) { 198 Ok(v) => v, 199 Err(e) => { 200 error!("Failed to deserialize record: {:?}", e); 201 return ( 202 StatusCode::INTERNAL_SERVER_ERROR, 203 Json(json!({"error": "InternalError"})), 204 ) 205 .into_response(); 206 } 207 }; 208 let value = ipld_to_json(ipld); 209 Json(json!({ 210 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 211 "cid": record_cid_str, 212 "value": value 213 })) 214 .into_response() 215} 216#[derive(Deserialize)] 217pub struct ListRecordsInput { 218 pub repo: String, 219 pub collection: String, 220 pub limit: Option<i32>, 221 pub cursor: Option<String>, 222 #[serde(rename = "rkeyStart")] 223 pub rkey_start: Option<String>, 224 #[serde(rename = "rkeyEnd")] 225 pub rkey_end: Option<String>, 226 pub reverse: Option<bool>, 227} 228#[derive(Serialize)] 229pub struct ListRecordsOutput { 230 #[serde(skip_serializing_if = "Option::is_none")] 231 pub cursor: Option<String>, 232 pub records: Vec<serde_json::Value>, 233} 234 235pub async fn list_records( 236 State(state): State<AppState>, 237 Query(input): Query<ListRecordsInput>, 238) -> Response { 239 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 240 let user_id_opt = if input.repo.starts_with("did:") { 241 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 242 .fetch_optional(&state.db) 243 .await 244 .map(|opt| opt.map(|r| r.id)) 245 } else { 246 let handle = if !input.repo.contains('.') { 247 format!("{}.{}", input.repo, hostname) 248 } else { 249 input.repo.clone() 250 }; 251 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 252 .fetch_optional(&state.db) 253 .await 254 .map(|opt| opt.map(|r| r.id)) 255 }; 256 let user_id: uuid::Uuid = match user_id_opt { 257 Ok(Some(id)) => id, 258 Ok(None) => { 259 return ( 260 StatusCode::NOT_FOUND, 261 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 262 ) 263 .into_response(); 264 } 265 Err(_) => { 266 return ( 267 StatusCode::INTERNAL_SERVER_ERROR, 268 Json(json!({"error": "InternalError"})), 269 ) 270 .into_response(); 271 } 272 }; 273 let limit = input.limit.unwrap_or(50).clamp(1, 100); 274 let reverse = input.reverse.unwrap_or(false); 275 let limit_i64 = limit as i64; 276 let order = if reverse { "ASC" } else { "DESC" }; 277 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 278 let comparator = if reverse { ">" } else { "<" }; 279 let query = format!( 280 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 281 comparator, order 282 ); 283 sqlx::query_as(&query) 284 .bind(user_id) 285 .bind(&input.collection) 286 .bind(cursor) 287 .bind(limit_i64) 288 .fetch_all(&state.db) 289 .await 290 } else { 291 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 292 let mut param_idx = 3; 293 if input.rkey_start.is_some() { 294 conditions.push("rkey > $3"); 295 param_idx += 1; 296 } 297 if input.rkey_end.is_some() { 298 conditions.push(if param_idx == 3 { 299 "rkey < $3" 300 } else { 301 "rkey < $4" 302 }); 303 param_idx += 1; 304 } 305 let limit_idx = param_idx; 306 let query = format!( 307 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 308 conditions.join(" AND "), 309 order, 310 limit_idx 311 ); 312 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 313 .bind(user_id) 314 .bind(&input.collection); 315 if let Some(start) = &input.rkey_start { 316 query_builder = query_builder.bind(start); 317 } 318 if let Some(end) = &input.rkey_end { 319 query_builder = query_builder.bind(end); 320 } 321 query_builder.bind(limit_i64).fetch_all(&state.db).await 322 }; 323 let rows = match rows_res { 324 Ok(r) => r, 325 Err(e) => { 326 error!("Error listing records: {:?}", e); 327 return ( 328 StatusCode::INTERNAL_SERVER_ERROR, 329 Json(json!({"error": "InternalError"})), 330 ) 331 .into_response(); 332 } 333 }; 334 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 335 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 336 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 337 for (rkey, cid_str) in &rows { 338 if let Ok(cid) = Cid::from_str(cid_str) { 339 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 340 cids.push(cid); 341 } 342 } 343 let blocks = match state.block_store.get_many(&cids).await { 344 Ok(b) => b, 345 Err(e) => { 346 error!("Error fetching blocks: {:?}", e); 347 return ( 348 StatusCode::INTERNAL_SERVER_ERROR, 349 Json(json!({"error": "InternalError"})), 350 ) 351 .into_response(); 352 } 353 }; 354 let mut records = Vec::new(); 355 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 356 if let Some(block) = block_opt 357 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 358 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) 359 { 360 let value = ipld_to_json(ipld); 361 records.push(json!({ 362 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 363 "cid": cid_str, 364 "value": value 365 })); 366 } 367 } 368 Json(ListRecordsOutput { 369 cursor: last_rkey, 370 records, 371 }) 372 .into_response() 373}