this repo has no description
1use crate::api::proxy_client::{ 2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD, 3}; 4use crate::api::ApiError; 5use crate::state::AppState; 6use axum::{ 7 http::{HeaderMap, HeaderValue, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use bytes::Bytes; 12use chrono::{DateTime, Utc}; 13use cid::Cid; 14use jacquard_repo::storage::BlockStore; 15use serde::{Deserialize, Serialize}; 16use serde_json::Value; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19use uuid::Uuid; 20pub const REPO_REV_HEADER: &str = "atproto-repo-rev"; 21pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag"; 22#[derive(Debug, Clone, Serialize, Deserialize)] 23#[serde(rename_all = "camelCase")] 24pub struct PostRecord { 25 #[serde(rename = "$type")] 26 pub record_type: Option<String>, 27 pub text: String, 28 pub created_at: String, 29 #[serde(skip_serializing_if = "Option::is_none")] 30 pub reply: Option<Value>, 31 #[serde(skip_serializing_if = "Option::is_none")] 32 pub embed: Option<Value>, 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub langs: Option<Vec<String>>, 35 #[serde(skip_serializing_if = "Option::is_none")] 36 pub labels: Option<Value>, 37 #[serde(skip_serializing_if = "Option::is_none")] 38 pub tags: Option<Vec<String>>, 39 #[serde(flatten)] 40 pub extra: HashMap<String, Value>, 41} 42#[derive(Debug, Clone, Serialize, Deserialize)] 43#[serde(rename_all = "camelCase")] 44pub struct ProfileRecord { 45 #[serde(rename = "$type")] 46 pub record_type: Option<String>, 47 #[serde(skip_serializing_if = "Option::is_none")] 48 pub display_name: Option<String>, 49 #[serde(skip_serializing_if = "Option::is_none")] 50 pub description: Option<String>, 51 #[serde(skip_serializing_if = "Option::is_none")] 52 pub avatar: Option<Value>, 53 #[serde(skip_serializing_if = "Option::is_none")] 54 pub banner: Option<Value>, 55 #[serde(flatten)] 56 pub extra: HashMap<String, Value>, 57} 58#[derive(Debug, Clone)] 59pub struct RecordDescript<T> { 60 pub uri: String, 61 pub cid: String, 62 pub indexed_at: DateTime<Utc>, 63 pub record: T, 64} 65#[derive(Debug, Clone, Serialize, Deserialize)] 66#[serde(rename_all = "camelCase")] 67pub struct LikeRecord { 68 #[serde(rename = "$type")] 69 pub record_type: Option<String>, 70 pub subject: LikeSubject, 71 pub created_at: String, 72 #[serde(flatten)] 73 pub extra: HashMap<String, Value>, 74} 75#[derive(Debug, Clone, Serialize, Deserialize)] 76#[serde(rename_all = "camelCase")] 77pub struct LikeSubject { 78 pub uri: String, 79 pub cid: String, 80} 81#[derive(Debug, Default)] 82pub struct LocalRecords { 83 pub count: usize, 84 pub profile: Option<RecordDescript<ProfileRecord>>, 85 pub posts: Vec<RecordDescript<PostRecord>>, 86 pub likes: Vec<RecordDescript<LikeRecord>>, 87} 88pub async fn get_records_since_rev( 89 state: &AppState, 90 did: &str, 91 rev: &str, 92) -> Result<LocalRecords, String> { 93 let mut result = LocalRecords::default(); 94 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 95 .fetch_optional(&state.db) 96 .await 97 .map_err(|e| format!("DB error: {}", e))? 98 .ok_or_else(|| "User not found".to_string())?; 99 let rows = sqlx::query!( 100 r#" 101 SELECT record_cid, collection, rkey, created_at, repo_rev 102 FROM records 103 WHERE repo_id = $1 AND repo_rev > $2 104 ORDER BY repo_rev ASC 105 LIMIT 10 106 "#, 107 user_id, 108 rev 109 ) 110 .fetch_all(&state.db) 111 .await 112 .map_err(|e| format!("DB error fetching records: {}", e))?; 113 if rows.is_empty() { 114 return Ok(result); 115 } 116 let sanity_check = sqlx::query_scalar!( 117 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 118 user_id, 119 rev 120 ) 121 .fetch_optional(&state.db) 122 .await 123 .map_err(|e| format!("DB error sanity check: {}", e))?; 124 if sanity_check.is_none() { 125 warn!("Sanity check failed: no records found before rev {}", rev); 126 return Ok(result); 127 } 128 struct RowData { 129 cid_str: String, 130 collection: String, 131 rkey: String, 132 created_at: DateTime<Utc>, 133 } 134 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len()); 135 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 136 for row in &rows { 137 if let Ok(cid) = row.record_cid.parse::<Cid>() { 138 cids.push(cid); 139 row_data.push(RowData { 140 cid_str: row.record_cid.clone(), 141 collection: row.collection.clone(), 142 rkey: row.rkey.clone(), 143 created_at: row.created_at, 144 }); 145 } 146 } 147 let blocks: Vec<Option<Bytes>> = state 148 .block_store 149 .get_many(&cids) 150 .await 151 .map_err(|e| format!("Error fetching blocks: {}", e))?; 152 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) { 153 let block_bytes = match block_opt { 154 Some(b) => b, 155 None => continue, 156 }; 157 result.count += 1; 158 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey); 159 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" { 160 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) { 161 result.profile = Some(RecordDescript { 162 uri, 163 cid: data.cid_str, 164 indexed_at: data.created_at, 165 record, 166 }); 167 } 168 } else if data.collection == "app.bsky.feed.post" { 169 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) { 170 result.posts.push(RecordDescript { 171 uri, 172 cid: data.cid_str, 173 indexed_at: data.created_at, 174 record, 175 }); 176 } 177 } else if data.collection == "app.bsky.feed.like" { 178 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) { 179 result.likes.push(RecordDescript { 180 uri, 181 cid: data.cid_str, 182 indexed_at: data.created_at, 183 record, 184 }); 185 } 186 } 187 } 188 Ok(result) 189} 190pub fn get_local_lag(local: &LocalRecords) -> Option<i64> { 191 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at); 192 for post in &local.posts { 193 match oldest { 194 None => oldest = Some(post.indexed_at), 195 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at), 196 _ => {} 197 } 198 } 199 for like in &local.likes { 200 match oldest { 201 None => oldest = Some(like.indexed_at), 202 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at), 203 _ => {} 204 } 205 } 206 oldest.map(|o| (Utc::now() - o).num_milliseconds()) 207} 208pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> { 209 headers 210 .get(REPO_REV_HEADER) 211 .and_then(|h| h.to_str().ok()) 212 .map(|s| s.to_string()) 213} 214#[derive(Debug)] 215pub struct ProxyResponse { 216 pub status: StatusCode, 217 pub headers: HeaderMap, 218 pub body: bytes::Bytes, 219} 220pub async fn proxy_to_appview( 221 method: &str, 222 params: &HashMap<String, String>, 223 auth_header: Option<&str>, 224) -> Result<ProxyResponse, Response> { 225 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| { 226 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response() 227 })?; 228 if let Err(e) = is_ssrf_safe(&appview_url) { 229 error!("SSRF check failed for appview URL: {}", e); 230 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 231 .into_response()); 232 } 233 let target_url = format!("{}/xrpc/{}", appview_url, method); 234 info!(target = %target_url, "Proxying request to appview"); 235 let client = proxy_client(); 236 let mut request_builder = client.get(&target_url).query(params); 237 if let Some(auth) = auth_header { 238 request_builder = request_builder.header("Authorization", auth); 239 } 240 match request_builder.send().await { 241 Ok(resp) => { 242 let status = 243 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 244 let headers: HeaderMap = resp 245 .headers() 246 .iter() 247 .filter(|(k, _)| { 248 RESPONSE_HEADERS_TO_FORWARD 249 .iter() 250 .any(|h| k.as_str().eq_ignore_ascii_case(h)) 251 }) 252 .filter_map(|(k, v)| { 253 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?; 254 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?; 255 Some((name, value)) 256 }) 257 .collect(); 258 let content_length = resp 259 .content_length() 260 .unwrap_or(0); 261 if content_length > MAX_RESPONSE_SIZE { 262 error!( 263 content_length, 264 max = MAX_RESPONSE_SIZE, 265 "Upstream response too large" 266 ); 267 return Err(ApiError::UpstreamFailure.into_response()); 268 } 269 let body = resp.bytes().await.map_err(|e| { 270 error!(error = ?e, "Error reading proxy response body"); 271 ApiError::UpstreamFailure.into_response() 272 })?; 273 if body.len() as u64 > MAX_RESPONSE_SIZE { 274 error!( 275 len = body.len(), 276 max = MAX_RESPONSE_SIZE, 277 "Upstream response body exceeded size limit" 278 ); 279 return Err(ApiError::UpstreamFailure.into_response()); 280 } 281 Ok(ProxyResponse { 282 status, 283 headers, 284 body, 285 }) 286 } 287 Err(e) => { 288 error!(error = ?e, "Error sending proxy request"); 289 if e.is_timeout() { 290 Err(ApiError::UpstreamTimeout.into_response()) 291 } else if e.is_connect() { 292 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 293 .into_response()) 294 } else { 295 Err(ApiError::UpstreamFailure.into_response()) 296 } 297 } 298 } 299} 300pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response { 301 let mut response = (StatusCode::OK, Json(data)).into_response(); 302 if let Some(lag_ms) = lag { 303 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) { 304 response 305 .headers_mut() 306 .insert(UPSTREAM_LAG_HEADER, header_val); 307 } 308 } 309 response 310} 311#[derive(Debug, Clone, Serialize, Deserialize)] 312#[serde(rename_all = "camelCase")] 313pub struct AuthorView { 314 pub did: String, 315 pub handle: String, 316 #[serde(skip_serializing_if = "Option::is_none")] 317 pub display_name: Option<String>, 318 #[serde(skip_serializing_if = "Option::is_none")] 319 pub avatar: Option<String>, 320 #[serde(flatten)] 321 pub extra: HashMap<String, Value>, 322} 323#[derive(Debug, Clone, Serialize, Deserialize)] 324#[serde(rename_all = "camelCase")] 325pub struct PostView { 326 pub uri: String, 327 pub cid: String, 328 pub author: AuthorView, 329 pub record: Value, 330 pub indexed_at: String, 331 #[serde(skip_serializing_if = "Option::is_none")] 332 pub embed: Option<Value>, 333 #[serde(default)] 334 pub reply_count: i64, 335 #[serde(default)] 336 pub repost_count: i64, 337 #[serde(default)] 338 pub like_count: i64, 339 #[serde(default)] 340 pub quote_count: i64, 341 #[serde(flatten)] 342 pub extra: HashMap<String, Value>, 343} 344#[derive(Debug, Clone, Serialize, Deserialize)] 345#[serde(rename_all = "camelCase")] 346pub struct FeedViewPost { 347 pub post: PostView, 348 #[serde(skip_serializing_if = "Option::is_none")] 349 pub reply: Option<Value>, 350 #[serde(skip_serializing_if = "Option::is_none")] 351 pub reason: Option<Value>, 352 #[serde(skip_serializing_if = "Option::is_none")] 353 pub feed_context: Option<String>, 354 #[serde(flatten)] 355 pub extra: HashMap<String, Value>, 356} 357#[derive(Debug, Clone, Serialize, Deserialize)] 358pub struct FeedOutput { 359 pub feed: Vec<FeedViewPost>, 360 #[serde(skip_serializing_if = "Option::is_none")] 361 pub cursor: Option<String>, 362} 363pub fn format_local_post( 364 descript: &RecordDescript<PostRecord>, 365 author_did: &str, 366 author_handle: &str, 367 profile: Option<&RecordDescript<ProfileRecord>>, 368) -> PostView { 369 let display_name = profile.and_then(|p| p.record.display_name.clone()); 370 PostView { 371 uri: descript.uri.clone(), 372 cid: descript.cid.clone(), 373 author: AuthorView { 374 did: author_did.to_string(), 375 handle: author_handle.to_string(), 376 display_name, 377 avatar: None, 378 extra: HashMap::new(), 379 }, 380 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null), 381 indexed_at: descript.indexed_at.to_rfc3339(), 382 embed: descript.record.embed.clone(), 383 reply_count: 0, 384 repost_count: 0, 385 like_count: 0, 386 quote_count: 0, 387 extra: HashMap::new(), 388 } 389} 390pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) { 391 if posts.is_empty() { 392 return; 393 } 394 let new_items: Vec<FeedViewPost> = posts 395 .into_iter() 396 .map(|post| FeedViewPost { 397 post, 398 reply: None, 399 reason: None, 400 feed_context: None, 401 extra: HashMap::new(), 402 }) 403 .collect(); 404 feed.extend(new_items); 405 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at)); 406}