this repo has no description
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::types::{AtIdentifier, Nsid, Rkey}; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::HeaderMap, 8 response::{IntoResponse, Response}, 9}; 10use base64::Engine; 11use cid::Cid; 12use ipld_core::ipld::Ipld; 13use jacquard_repo::storage::BlockStore; 14use serde::{Deserialize, Serialize}; 15use serde_json::{Map, Value, json}; 16use std::collections::HashMap; 17use std::str::FromStr; 18use tracing::error; 19 20fn ipld_to_json(ipld: Ipld) -> Value { 21 match ipld { 22 Ipld::Null => Value::Null, 23 Ipld::Bool(b) => Value::Bool(b), 24 Ipld::Integer(i) => { 25 if let Ok(n) = i64::try_from(i) { 26 Value::Number(n.into()) 27 } else { 28 Value::String(i.to_string()) 29 } 30 } 31 Ipld::Float(f) => serde_json::Number::from_f64(f) 32 .map(Value::Number) 33 .unwrap_or(Value::Null), 34 Ipld::String(s) => Value::String(s), 35 Ipld::Bytes(b) => { 36 let encoded = base64::engine::general_purpose::STANDARD.encode(&b); 37 json!({ "$bytes": encoded }) 38 } 39 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()), 40 Ipld::Map(map) => { 41 let obj: Map<String, Value> = 42 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect(); 43 Value::Object(obj) 44 } 45 Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 46 } 47} 48 49#[derive(Deserialize)] 50pub struct GetRecordInput { 51 pub repo: AtIdentifier, 52 pub collection: Nsid, 53 pub rkey: Rkey, 54 pub cid: Option<String>, 55} 56 57pub async fn get_record( 58 State(state): State<AppState>, 59 _headers: HeaderMap, 60 Query(input): Query<GetRecordInput>, 61) -> Response { 62 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 63 let user_id_opt = if input.repo.is_did() { 64 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str()) 65 .fetch_optional(&state.db) 66 .await 67 .map(|opt| opt.map(|r| r.id)) 68 } else { 69 let repo_str = input.repo.as_str(); 70 let handle = if !repo_str.contains('.') { 71 format!("{}.{}", repo_str, hostname) 72 } else { 73 repo_str.to_string() 74 }; 75 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 76 .fetch_optional(&state.db) 77 .await 78 .map(|opt| opt.map(|r| r.id)) 79 }; 80 let user_id: uuid::Uuid = match user_id_opt { 81 Ok(Some(id)) => id, 82 Ok(None) => { 83 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response(); 84 } 85 Err(_) => { 86 return ApiError::InternalError(None).into_response(); 87 } 88 }; 89 let record_row = sqlx::query!( 90 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 91 user_id, 92 input.collection.as_str(), 93 input.rkey.as_str() 94 ) 95 .fetch_optional(&state.db) 96 .await; 97 let record_cid_str: String = match record_row { 98 Ok(Some(row)) => row.record_cid, 99 _ => { 100 return ApiError::RecordNotFound.into_response(); 101 } 102 }; 103 if let Some(expected_cid) = &input.cid 104 && &record_cid_str != expected_cid 105 { 106 return ApiError::RecordNotFound.into_response(); 107 } 108 let Ok(cid) = Cid::from_str(&record_cid_str) else { 109 return ApiError::InternalError(Some("Invalid CID in DB".into())).into_response(); 110 }; 111 let block = match state.block_store.get(&cid).await { 112 Ok(Some(b)) => b, 113 _ => { 114 return ApiError::InternalError(Some("Record block not found".into())).into_response(); 115 } 116 }; 117 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) { 118 Ok(v) => v, 119 Err(e) => { 120 error!("Failed to deserialize record: {:?}", e); 121 return ApiError::InternalError(None).into_response(); 122 } 123 }; 124 let value = ipld_to_json(ipld); 125 Json(json!({ 126 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 127 "cid": record_cid_str, 128 "value": value 129 })) 130 .into_response() 131} 132#[derive(Deserialize)] 133pub struct ListRecordsInput { 134 pub repo: AtIdentifier, 135 pub collection: Nsid, 136 pub limit: Option<i32>, 137 pub cursor: Option<String>, 138 #[serde(rename = "rkeyStart")] 139 pub rkey_start: Option<Rkey>, 140 #[serde(rename = "rkeyEnd")] 141 pub rkey_end: Option<Rkey>, 142 pub reverse: Option<bool>, 143} 144#[derive(Serialize)] 145pub struct ListRecordsOutput { 146 #[serde(skip_serializing_if = "Option::is_none")] 147 pub cursor: Option<String>, 148 pub records: Vec<serde_json::Value>, 149} 150 151pub async fn list_records( 152 State(state): State<AppState>, 153 Query(input): Query<ListRecordsInput>, 154) -> Response { 155 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 156 let user_id_opt = if input.repo.is_did() { 157 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str()) 158 .fetch_optional(&state.db) 159 .await 160 .map(|opt| opt.map(|r| r.id)) 161 } else { 162 let repo_str = input.repo.as_str(); 163 let handle = if !repo_str.contains('.') { 164 format!("{}.{}", repo_str, hostname) 165 } else { 166 repo_str.to_string() 167 }; 168 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 169 .fetch_optional(&state.db) 170 .await 171 .map(|opt| opt.map(|r| r.id)) 172 }; 173 let user_id: uuid::Uuid = match user_id_opt { 174 Ok(Some(id)) => id, 175 Ok(None) => { 176 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response(); 177 } 178 Err(_) => { 179 return ApiError::InternalError(None).into_response(); 180 } 181 }; 182 let limit = input.limit.unwrap_or(50).clamp(1, 100); 183 let reverse = input.reverse.unwrap_or(false); 184 let limit_i64 = limit as i64; 185 let order = if reverse { "ASC" } else { "DESC" }; 186 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 187 let comparator = if reverse { ">" } else { "<" }; 188 let query = format!( 189 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 190 comparator, order 191 ); 192 sqlx::query_as(&query) 193 .bind(user_id) 194 .bind(input.collection.as_str()) 195 .bind(cursor) 196 .bind(limit_i64) 197 .fetch_all(&state.db) 198 .await 199 } else { 200 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 201 let mut param_idx = 3; 202 if input.rkey_start.is_some() { 203 conditions.push("rkey > $3"); 204 param_idx += 1; 205 } 206 if input.rkey_end.is_some() { 207 conditions.push(if param_idx == 3 { 208 "rkey < $3" 209 } else { 210 "rkey < $4" 211 }); 212 param_idx += 1; 213 } 214 let limit_idx = param_idx; 215 let query = format!( 216 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 217 conditions.join(" AND "), 218 order, 219 limit_idx 220 ); 221 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 222 .bind(user_id) 223 .bind(input.collection.as_str()); 224 if let Some(start) = &input.rkey_start { 225 query_builder = query_builder.bind(start.as_str()); 226 } 227 if let Some(end) = &input.rkey_end { 228 query_builder = query_builder.bind(end.as_str()); 229 } 230 query_builder.bind(limit_i64).fetch_all(&state.db).await 231 }; 232 let rows = match rows_res { 233 Ok(r) => r, 234 Err(e) => { 235 error!("Error listing records: {:?}", e); 236 return ApiError::InternalError(None).into_response(); 237 } 238 }; 239 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 240 let mut cid_to_rkey: HashMap<Cid, (String, String)> = HashMap::new(); 241 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 242 for (rkey, cid_str) in &rows { 243 if let Ok(cid) = Cid::from_str(cid_str) { 244 cid_to_rkey.insert(cid, (rkey.clone(), cid_str.clone())); 245 cids.push(cid); 246 } 247 } 248 let blocks = match state.block_store.get_many(&cids).await { 249 Ok(b) => b, 250 Err(e) => { 251 error!("Error fetching blocks: {:?}", e); 252 return ApiError::InternalError(None).into_response(); 253 } 254 }; 255 let mut records = Vec::new(); 256 for (cid, block_opt) in cids.iter().zip(blocks.into_iter()) { 257 if let Some(block) = block_opt 258 && let Some((rkey, cid_str)) = cid_to_rkey.get(cid) 259 && let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) 260 { 261 let value = ipld_to_json(ipld); 262 records.push(json!({ 263 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 264 "cid": cid_str, 265 "value": value 266 })); 267 } 268 } 269 Json(ListRecordsOutput { 270 cursor: last_rkey, 271 records, 272 }) 273 .into_response() 274}