this repo has no description
at main 9.4 kB view raw
1use crate::api::error::ApiError; 2use crate::state::AppState; 3use crate::types::{AtIdentifier, Nsid, Rkey}; 4use axum::{ 5 Json, 6 extract::{Query, State}, 7 http::HeaderMap, 8 response::{IntoResponse, Response}, 9}; 10use base64::Engine; 11use cid::Cid; 12use ipld_core::ipld::Ipld; 13use jacquard_repo::storage::BlockStore; 14use serde::{Deserialize, Serialize}; 15use serde_json::{Map, Value, json}; 16use std::str::FromStr; 17use tracing::error; 18 19fn ipld_to_json(ipld: Ipld) -> Value { 20 match ipld { 21 Ipld::Null => Value::Null, 22 Ipld::Bool(b) => Value::Bool(b), 23 Ipld::Integer(i) => { 24 if let Ok(n) = i64::try_from(i) { 25 Value::Number(n.into()) 26 } else { 27 Value::String(i.to_string()) 28 } 29 } 30 Ipld::Float(f) => serde_json::Number::from_f64(f) 31 .map(Value::Number) 32 .unwrap_or(Value::Null), 33 Ipld::String(s) => Value::String(s), 34 Ipld::Bytes(b) => { 35 let encoded = base64::engine::general_purpose::STANDARD.encode(&b); 36 json!({ "$bytes": encoded }) 37 } 38 Ipld::List(arr) => Value::Array(arr.into_iter().map(ipld_to_json).collect()), 39 Ipld::Map(map) => { 40 let obj: Map<String, Value> = 41 map.into_iter().map(|(k, v)| (k, ipld_to_json(v))).collect(); 42 Value::Object(obj) 43 } 44 Ipld::Link(cid) => json!({ "$link": cid.to_string() }), 45 } 46} 47 48#[derive(Deserialize)] 49pub struct GetRecordInput { 50 pub repo: AtIdentifier, 51 pub collection: Nsid, 52 pub rkey: Rkey, 53 pub cid: Option<String>, 54} 55 56pub async fn get_record( 57 State(state): State<AppState>, 58 _headers: HeaderMap, 59 Query(input): Query<GetRecordInput>, 60) -> Response { 61 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 62 let user_id_opt = if input.repo.is_did() { 63 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str()) 64 .fetch_optional(&state.db) 65 .await 66 .map(|opt| opt.map(|r| r.id)) 67 } else { 68 let repo_str = input.repo.as_str(); 69 let handle = if !repo_str.contains('.') { 70 format!("{}.{}", repo_str, hostname) 71 } else { 72 repo_str.to_string() 73 }; 74 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 75 .fetch_optional(&state.db) 76 .await 77 .map(|opt| opt.map(|r| r.id)) 78 }; 79 let user_id: uuid::Uuid = match user_id_opt { 80 Ok(Some(id)) => id, 81 Ok(None) => { 82 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response(); 83 } 84 Err(_) => { 85 return ApiError::InternalError(None).into_response(); 86 } 87 }; 88 let record_row = sqlx::query!( 89 "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", 90 user_id, 91 input.collection.as_str(), 92 input.rkey.as_str() 93 ) 94 .fetch_optional(&state.db) 95 .await; 96 let record_cid_str: String = match record_row { 97 Ok(Some(row)) => row.record_cid, 98 _ => { 99 return ApiError::RecordNotFound.into_response(); 100 } 101 }; 102 if let Some(expected_cid) = &input.cid 103 && &record_cid_str != expected_cid 104 { 105 return ApiError::RecordNotFound.into_response(); 106 } 107 let Ok(cid) = Cid::from_str(&record_cid_str) else { 108 return ApiError::InternalError(Some("Invalid CID in DB".into())).into_response(); 109 }; 110 let block = match state.block_store.get(&cid).await { 111 Ok(Some(b)) => b, 112 _ => { 113 return ApiError::InternalError(Some("Record block not found".into())).into_response(); 114 } 115 }; 116 let ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block) { 117 Ok(v) => v, 118 Err(e) => { 119 error!("Failed to deserialize record: {:?}", e); 120 return ApiError::InternalError(None).into_response(); 121 } 122 }; 123 let value = ipld_to_json(ipld); 124 Json(json!({ 125 "uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey), 126 "cid": record_cid_str, 127 "value": value 128 })) 129 .into_response() 130} 131#[derive(Deserialize)] 132pub struct ListRecordsInput { 133 pub repo: AtIdentifier, 134 pub collection: Nsid, 135 pub limit: Option<i32>, 136 pub cursor: Option<String>, 137 #[serde(rename = "rkeyStart")] 138 pub rkey_start: Option<Rkey>, 139 #[serde(rename = "rkeyEnd")] 140 pub rkey_end: Option<Rkey>, 141 pub reverse: Option<bool>, 142} 143#[derive(Serialize)] 144pub struct ListRecordsOutput { 145 #[serde(skip_serializing_if = "Option::is_none")] 146 pub cursor: Option<String>, 147 pub records: Vec<serde_json::Value>, 148} 149 150pub async fn list_records( 151 State(state): State<AppState>, 152 Query(input): Query<ListRecordsInput>, 153) -> Response { 154 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 155 let user_id_opt = if input.repo.is_did() { 156 sqlx::query!("SELECT id FROM users WHERE did = $1", input.repo.as_str()) 157 .fetch_optional(&state.db) 158 .await 159 .map(|opt| opt.map(|r| r.id)) 160 } else { 161 let repo_str = input.repo.as_str(); 162 let handle = if !repo_str.contains('.') { 163 format!("{}.{}", repo_str, hostname) 164 } else { 165 repo_str.to_string() 166 }; 167 sqlx::query!("SELECT id FROM users WHERE handle = $1", handle) 168 .fetch_optional(&state.db) 169 .await 170 .map(|opt| opt.map(|r| r.id)) 171 }; 172 let user_id: uuid::Uuid = match user_id_opt { 173 Ok(Some(id)) => id, 174 Ok(None) => { 175 return ApiError::RepoNotFound(Some("Repo not found".into())).into_response(); 176 } 177 Err(_) => { 178 return ApiError::InternalError(None).into_response(); 179 } 180 }; 181 let limit = input.limit.unwrap_or(50).clamp(1, 100); 182 let reverse = input.reverse.unwrap_or(false); 183 let limit_i64 = limit as i64; 184 let order = if reverse { "ASC" } else { "DESC" }; 185 let rows_res: Result<Vec<(String, String)>, sqlx::Error> = if let Some(cursor) = &input.cursor { 186 let comparator = if reverse { ">" } else { "<" }; 187 let query = format!( 188 "SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey {} $3 ORDER BY rkey {} LIMIT $4", 189 comparator, order 190 ); 191 sqlx::query_as(&query) 192 .bind(user_id) 193 .bind(input.collection.as_str()) 194 .bind(cursor) 195 .bind(limit_i64) 196 .fetch_all(&state.db) 197 .await 198 } else { 199 let mut conditions = vec!["repo_id = $1", "collection = $2"]; 200 let mut param_idx = 3; 201 if input.rkey_start.is_some() { 202 conditions.push("rkey > $3"); 203 param_idx += 1; 204 } 205 if input.rkey_end.is_some() { 206 conditions.push(if param_idx == 3 { 207 "rkey < $3" 208 } else { 209 "rkey < $4" 210 }); 211 param_idx += 1; 212 } 213 let limit_idx = param_idx; 214 let query = format!( 215 "SELECT rkey, record_cid FROM records WHERE {} ORDER BY rkey {} LIMIT ${}", 216 conditions.join(" AND "), 217 order, 218 limit_idx 219 ); 220 let mut query_builder = sqlx::query_as::<_, (String, String)>(&query) 221 .bind(user_id) 222 .bind(input.collection.as_str()); 223 if let Some(start) = &input.rkey_start { 224 query_builder = query_builder.bind(start.as_str()); 225 } 226 if let Some(end) = &input.rkey_end { 227 query_builder = query_builder.bind(end.as_str()); 228 } 229 query_builder.bind(limit_i64).fetch_all(&state.db).await 230 }; 231 let rows = match rows_res { 232 Ok(r) => r, 233 Err(e) => { 234 error!("Error listing records: {:?}", e); 235 return ApiError::InternalError(None).into_response(); 236 } 237 }; 238 let last_rkey = rows.last().map(|(rkey, _)| rkey.clone()); 239 let parsed_rows: Vec<(Cid, String, String)> = rows 240 .iter() 241 .filter_map(|(rkey, cid_str)| { 242 Cid::from_str(cid_str) 243 .ok() 244 .map(|cid| (cid, rkey.clone(), cid_str.clone())) 245 }) 246 .collect(); 247 let cids: Vec<Cid> = parsed_rows.iter().map(|(cid, _, _)| *cid).collect(); 248 let blocks = match state.block_store.get_many(&cids).await { 249 Ok(b) => b, 250 Err(e) => { 251 error!("Error fetching blocks: {:?}", e); 252 return ApiError::InternalError(None).into_response(); 253 } 254 }; 255 let records: Vec<Value> = parsed_rows 256 .iter() 257 .zip(blocks.into_iter()) 258 .filter_map(|((_, rkey, cid_str), block_opt)| { 259 block_opt.and_then(|block| { 260 serde_ipld_dagcbor::from_slice::<Ipld>(&block) 261 .ok() 262 .map(|ipld| { 263 json!({ 264 "uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey), 265 "cid": cid_str, 266 "value": ipld_to_json(ipld) 267 }) 268 }) 269 }) 270 }) 271 .collect(); 272 Json(ListRecordsOutput { 273 cursor: last_rkey, 274 records, 275 }) 276 .into_response() 277}