this repo has no description
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::{HeaderMap, StatusCode}, 6 response::{IntoResponse, Response}, 7}; 8use base64::Engine; 9use cid::Cid; 10use ipld_core::ipld::Ipld; 11use jacquard_repo::storage::BlockStore; 12use serde::{Deserialize, Serialize}; 13use serde_json::{Map, Value, json}; 14use std::collections::HashMap; 15use std::str::FromStr; 16use tracing::error; 17 18fn ipld_to_json(ipld: Ipld) -> Value { 19 match ipld { 20 Ipld::Null => Value::Null, 21 Ipld::Bool(b) => Value::Bool(b), 22 Ipld::Integer(i) => { 23 if let Ok(n) = i64::try_from(i) { 24 Value::Number(n.into()) 25 } else { 26 Value::String(i.to_string()) 27 } 28 } 29 Ipld::Float(f) => serde_json::Number::from_f64(f) 30 .map(Value::Number) 31 .unwrap_or(Value::Null), 32 Ipld::String(s) => Value::String(s), 33 Ipld::Bytes(b) => { 34 let encoded = base64::engine::general_purpose::STANDARD.encode(&b); 35 json!({ "$bytes": encoded }) 36 } 37 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()), 38 Ipld::Map(map) => { 39 let obj: Map<String, Value> = 40 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect(); 41 Value::Object(obj) 42 } 43 Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 44 } 45} 46 47#[derive(Deserialize)] 48pub struct GetRecordInput { 49 pub repo: String, 50 pub collection: String, 51 pub rkey: String, 52 pub cid: Option<String>, 53} 54 55pub async fn get_record( 56 State(state): State<AppState>, 57 headers: HeaderMap, 58 Query(input): Query<GetRecordInput>, 59) -> Response { 60 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 61 let user_id_opt = if input.repo.starts_with("did:") { 62 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 63 .fetch_optional(&state.db) 64 .await 65 .map(|opt| opt.map(|r| r.id)) 66 } else { 67 let handle = if !input.repo.contains('.') { 68 format!("{}.{}", input.repo, hostname) 69 } else { 70 input.repo.clone() 71 }; 72 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 73 .fetch_optional(&state.db) 74 .await 75 .map(|opt| opt.map(|r| r.id)) 76 }; 77 let user_id: uuid::Uuid = match user_id_opt { 78 Ok(Some(id)) => id, 79 Ok(None) => { 80 return ( 81 StatusCode::NOT_FOUND, 82 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 83 ) 84 .into_response(); 85 } 86 Err(_) => { 87 return ( 88 StatusCode::INTERNAL_SERVER_ERROR, 89 Json(json!({"error": "InternalError"})), 90 ) 91 .into_response(); 92 } 93 }; 94 let record_row = sqlx::query!( 95 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 96 user_id, 97 input.collection, 98 input.rkey 99 ) 100 .fetch_optional(&state.db) 101 .await; 102 let record_cid_str: String = match record_row { 103 Ok(Some(row)) => row.record_cid, 104 _ => { 105 return ( 106 StatusCode::NOT_FOUND, 107 Json(json!({"error": "RecordNotFound", "message": "Record not found"})), 108 ) 109 .into_response(); 110 } 111 }; 112 if let Some(expected_cid) = &input.cid 113 && &record_cid_str != expected_cid 114 { 115 return ( 116 StatusCode::NOT_FOUND, 117 Json(json!({"error": "RecordNotFound", "message": "Record CID mismatch"})), 118 ) 119 .into_response(); 120 } 121 let cid = match Cid::from_str(&record_cid_str) { 122 Ok(c) => c, 123 Err(_) => { 124 return ( 125 StatusCode::INTERNAL_SERVER_ERROR, 126 Json(json!({"error": "InternalError", "message": "Invalid CID in DB"})), 127 ) 128 .into_response(); 129 } 130 }; 131 let block = match state.block_store.get(&cid).await { 132 Ok(Some(b)) => b, 133 _ => { 134 return ( 135 StatusCode::INTERNAL_SERVER_ERROR, 136 Json(json!({"error": "InternalError", "message": "Record block not found"})), 137 ) 138 .into_response(); 139 } 140 }; 141 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) { 142 Ok(v) => v, 143 Err(e) => { 144 error!("Failed to deserialize record: {:?}", e); 145 return ( 146 StatusCode::INTERNAL_SERVER_ERROR, 147 Json(json!({"error": "InternalError"})), 148 ) 149 .into_response(); 150 } 151 }; 152 let value = ipld_to_json(ipld); 153 Json(json!({ 154 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 155 "cid": record_cid_str, 156 "value": value 157 })) 158 .into_response() 159} 160#[derive(Deserialize)] 161pub struct ListRecordsInput { 162 pub repo: String, 163 pub collection: String, 164 pub limit: Option<i32>, 165 pub cursor: Option<String>, 166 #[serde(rename = "rkeyStart")] 167 pub rkey_start: Option<String>, 168 #[serde(rename = "rkeyEnd")] 169 pub rkey_end: Option<String>, 170 pub reverse: Option<bool>, 171} 172#[derive(Serialize)] 173pub struct ListRecordsOutput { 174 #[serde(skip_serializing_if = "Option::is_none")] 175 pub cursor: Option<String>, 176 pub records: Vec<serde_json::Value>, 177} 178 179pub async fn list_records( 180 State(state): State<AppState>, 181 Query(input): Query<ListRecordsInput>, 182) -> Response { 183 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 184 let user_id_opt = if input.repo.starts_with("did:") { 185 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo) 186 .fetch_optional(&state.db) 187 .await 188 .map(|opt| opt.map(|r| r.id)) 189 } else { 190 let handle = if !input.repo.contains('.') { 191 format!("{}.{}", input.repo, hostname) 192 } else { 193 input.repo.clone() 194 }; 195 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 196 .fetch_optional(&state.db) 197 .await 198 .map(|opt| opt.map(|r| r.id)) 199 }; 200 let user_id: uuid::Uuid = match user_id_opt { 201 Ok(Some(id)) => id, 202 Ok(None) => { 203 return ( 204 StatusCode::NOT_FOUND, 205 Json(json!({"error": "RepoNotFound", "message": "Repo not found"})), 206 ) 207 .into_response(); 208 } 209 Err(_) => { 210 return ( 211 StatusCode::INTERNAL_SERVER_ERROR, 212 Json(json!({"error": "InternalError"})), 213 ) 214 .into_response(); 215 } 216 }; 217 let limit = input.limit.unwrap_or(50).clamp(1, 100); 218 let reverse = input.reverse.unwrap_or(false); 219 let limit_i64 = limit as i64; 220 let order = if reverse { "ASC" } else { "DESC" }; 221 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 222 let comparator = if reverse { ">" } else { "<" }; 223 let query = format!( 224 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 225 comparator, order 226 ); 227 sqlx::query_as(&query) 228 .bind(user_id) 229 .bind(&input.collection) 230 .bind(cursor) 231 .bind(limit_i64) 232 .fetch_all(&state.db) 233 .await 234 } else { 235 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 236 let mut param_idx = 3; 237 if input.rkey_start.is_some() { 238 conditions.push("rkey > $3"); 239 param_idx += 1; 240 } 241 if input.rkey_end.is_some() { 242 conditions.push(if param_idx == 3 { 243 "rkey < $3" 244 } else { 245 "rkey < $4" 246 }); 247 param_idx += 1; 248 } 249 let limit_idx = param_idx; 250 let query = format!( 251 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 252 conditions.join(" AND "), 253 order, 254 limit_idx 255 ); 256 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 257 .bind(user_id) 258 .bind(&input.collection); 259 if let Some(start) = &input.rkey_start { 260 query_builder = query_builder.bind(start); 261 } 262 if let Some(end) = &input.rkey_end { 263 query_builder = query_builder.bind(end); 264 } 265 query_builder.bind(limit_i64).fetch_all(&state.db).await 266 }; 267 let rows = match rows_res { 268 Ok(r) => r, 269 Err(e) => { 270 error!("Error listing records: {:?}", e); 271 return ( 272 StatusCode::INTERNAL_SERVER_ERROR, 273 Json(json!({"error": "InternalError"})), 274 ) 275 .into_response(); 276 } 277 }; 278 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 279 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 280 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 281 for (rkey, cid_str) in &rows { 282 if let Ok(cid) = Cid::from_str(cid_str) { 283 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 284 cids.push(cid); 285 } 286 } 287 let blocks = match state.block_store.get_many(&cids).await { 288 Ok(b) => b, 289 Err(e) => { 290 error!("Error fetching blocks: {:?}", e); 291 return ( 292 StatusCode::INTERNAL_SERVER_ERROR, 293 Json(json!({"error": "InternalError"})), 294 ) 295 .into_response(); 296 } 297 }; 298 let mut records = Vec::new(); 299 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 300 if let Some(block) = block_opt 301 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 302 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) 303 { 304 let value = ipld_to_json(ipld); 305 records.push(json!({ 306 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 307 "cid": cid_str, 308 "value": value 309 })); 310 } 311 } 312 Json(ListRecordsOutput { 313 cursor: last_rkey, 314 records, 315 }) 316 .into_response() 317}