this repo has no description
1use crate::api::proxy_client::{ 2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD, 3}; 4use crate::api::ApiError; 5use crate::state::AppState; 6use axum::{ 7 http::{HeaderMap, HeaderValue, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use chrono::{DateTime, Utc}; 12use jacquard_repo::storage::BlockStore; 13use serde::{Deserialize, Serialize}; 14use serde_json::Value; 15use std::collections::HashMap; 16use tracing::{error, info, warn}; 17use uuid::Uuid; 18 19pub const REPO_REV_HEADER: &str = "atproto-repo-rev"; 20pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag"; 21 22#[derive(Debug, Clone, Serialize, Deserialize)] 23#[serde(rename_all = "camelCase")] 24pub struct PostRecord { 25 #[serde(rename = "$type")] 26 pub record_type: Option<String>, 27 pub text: String, 28 pub created_at: String, 29 #[serde(skip_serializing_if = "Option::is_none")] 30 pub reply: Option<Value>, 31 #[serde(skip_serializing_if = "Option::is_none")] 32 pub embed: Option<Value>, 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub langs: Option<Vec<String>>, 35 #[serde(skip_serializing_if = "Option::is_none")] 36 pub labels: Option<Value>, 37 #[serde(skip_serializing_if = "Option::is_none")] 38 pub tags: Option<Vec<String>>, 39 #[serde(flatten)] 40 pub extra: HashMap<String, Value>, 41} 42 43#[derive(Debug, Clone, Serialize, Deserialize)] 44#[serde(rename_all = "camelCase")] 45pub struct ProfileRecord { 46 #[serde(rename = "$type")] 47 pub record_type: Option<String>, 48 #[serde(skip_serializing_if = "Option::is_none")] 49 pub display_name: Option<String>, 50 #[serde(skip_serializing_if = "Option::is_none")] 51 pub description: Option<String>, 52 #[serde(skip_serializing_if = "Option::is_none")] 53 pub avatar: Option<Value>, 54 #[serde(skip_serializing_if = "Option::is_none")] 55 pub banner: Option<Value>, 56 #[serde(flatten)] 57 pub extra: HashMap<String, Value>, 58} 59 60#[derive(Debug, Clone)] 61pub struct RecordDescript<T> { 62 pub uri: String, 63 pub cid: String, 64 pub indexed_at: DateTime<Utc>, 65 pub record: T, 66} 67 68#[derive(Debug, Clone, Serialize, Deserialize)] 69#[serde(rename_all = "camelCase")] 70pub struct LikeRecord { 71 #[serde(rename = "$type")] 72 pub record_type: Option<String>, 73 pub subject: LikeSubject, 74 pub created_at: String, 75 #[serde(flatten)] 76 pub extra: HashMap<String, Value>, 77} 78 79#[derive(Debug, Clone, Serialize, Deserialize)] 80#[serde(rename_all = "camelCase")] 81pub struct LikeSubject { 82 pub uri: String, 83 pub cid: String, 84} 85 86#[derive(Debug, Default)] 87pub struct LocalRecords { 88 pub count: usize, 89 pub profile: Option<RecordDescript<ProfileRecord>>, 90 pub posts: Vec<RecordDescript<PostRecord>>, 91 pub likes: Vec<RecordDescript<LikeRecord>>, 92} 93 94pub async fn get_records_since_rev( 95 state: &AppState, 96 did: &str, 97 rev: &str, 98) -> Result<LocalRecords, String> { 99 let mut result = LocalRecords::default(); 100 101 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 102 .fetch_optional(&state.db) 103 .await 104 .map_err(|e| format!("DB error: {}", e))? 105 .ok_or_else(|| "User not found".to_string())?; 106 107 let rows = sqlx::query!( 108 r#" 109 SELECT record_cid, collection, rkey, created_at, repo_rev 110 FROM records 111 WHERE repo_id = $1 AND repo_rev > $2 112 ORDER BY repo_rev ASC 113 LIMIT 10 114 "#, 115 user_id, 116 rev 117 ) 118 .fetch_all(&state.db) 119 .await 120 .map_err(|e| format!("DB error fetching records: {}", e))?; 121 122 if rows.is_empty() { 123 return Ok(result); 124 } 125 126 let sanity_check = sqlx::query_scalar!( 127 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 128 user_id, 129 rev 130 ) 131 .fetch_optional(&state.db) 132 .await 133 .map_err(|e| format!("DB error sanity check: {}", e))?; 134 135 if sanity_check.is_none() { 136 warn!("Sanity check failed: no records found before rev {}", rev); 137 return Ok(result); 138 } 139 140 for row in rows { 141 result.count += 1; 142 143 let cid: cid::Cid = match row.record_cid.parse() { 144 Ok(c) => c, 145 Err(_) => continue, 146 }; 147 148 let block_bytes = match state.block_store.get(&cid).await { 149 Ok(Some(b)) => b, 150 _ => continue, 151 }; 152 153 let uri = format!("at://{}/{}/{}", did, row.collection, row.rkey); 154 let indexed_at = row.created_at; 155 156 if row.collection == "app.bsky.actor.profile" && row.rkey == "self" { 157 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) { 158 result.profile = Some(RecordDescript { 159 uri, 160 cid: row.record_cid, 161 indexed_at, 162 record, 163 }); 164 } 165 } else if row.collection == "app.bsky.feed.post" { 166 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) { 167 result.posts.push(RecordDescript { 168 uri, 169 cid: row.record_cid, 170 indexed_at, 171 record, 172 }); 173 } 174 } else if row.collection == "app.bsky.feed.like" { 175 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) { 176 result.likes.push(RecordDescript { 177 uri, 178 cid: row.record_cid, 179 indexed_at, 180 record, 181 }); 182 } 183 } 184 } 185 186 Ok(result) 187} 188 189pub fn get_local_lag(local: &LocalRecords) -> Option<i64> { 190 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at); 191 192 for post in &local.posts { 193 match oldest { 194 None => oldest = Some(post.indexed_at), 195 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at), 196 _ => {} 197 } 198 } 199 200 for like in &local.likes { 201 match oldest { 202 None => oldest = Some(like.indexed_at), 203 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at), 204 _ => {} 205 } 206 } 207 208 oldest.map(|o| (Utc::now() - o).num_milliseconds()) 209} 210 211pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> { 212 headers 213 .get(REPO_REV_HEADER) 214 .and_then(|h| h.to_str().ok()) 215 .map(|s| s.to_string()) 216} 217 218#[derive(Debug)] 219pub struct ProxyResponse { 220 pub status: StatusCode, 221 pub headers: HeaderMap, 222 pub body: bytes::Bytes, 223} 224 225pub async fn proxy_to_appview( 226 method: &str, 227 params: &HashMap<String, String>, 228 auth_header: Option<&str>, 229) -> Result<ProxyResponse, Response> { 230 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| { 231 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response() 232 })?; 233 234 if let Err(e) = is_ssrf_safe(&appview_url) { 235 error!("SSRF check failed for appview URL: {}", e); 236 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 237 .into_response()); 238 } 239 240 let target_url = format!("{}/xrpc/{}", appview_url, method); 241 info!(target = %target_url, "Proxying request to appview"); 242 243 let client = proxy_client(); 244 let mut request_builder = client.get(&target_url).query(params); 245 246 if let Some(auth) = auth_header { 247 request_builder = request_builder.header("Authorization", auth); 248 } 249 250 match request_builder.send().await { 251 Ok(resp) => { 252 let status = 253 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 254 255 let headers: HeaderMap = resp 256 .headers() 257 .iter() 258 .filter(|(k, _)| { 259 RESPONSE_HEADERS_TO_FORWARD 260 .iter() 261 .any(|h| k.as_str().eq_ignore_ascii_case(h)) 262 }) 263 .filter_map(|(k, v)| { 264 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?; 265 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?; 266 Some((name, value)) 267 }) 268 .collect(); 269 270 let content_length = resp 271 .content_length() 272 .unwrap_or(0); 273 if content_length > MAX_RESPONSE_SIZE { 274 error!( 275 content_length, 276 max = MAX_RESPONSE_SIZE, 277 "Upstream response too large" 278 ); 279 return Err(ApiError::UpstreamFailure.into_response()); 280 } 281 282 let body = resp.bytes().await.map_err(|e| { 283 error!(error = ?e, "Error reading proxy response body"); 284 ApiError::UpstreamFailure.into_response() 285 })?; 286 287 if body.len() as u64 > MAX_RESPONSE_SIZE { 288 error!( 289 len = body.len(), 290 max = MAX_RESPONSE_SIZE, 291 "Upstream response body exceeded size limit" 292 ); 293 return Err(ApiError::UpstreamFailure.into_response()); 294 } 295 296 Ok(ProxyResponse { 297 status, 298 headers, 299 body, 300 }) 301 } 302 Err(e) => { 303 error!(error = ?e, "Error sending proxy request"); 304 if e.is_timeout() { 305 Err(ApiError::UpstreamTimeout.into_response()) 306 } else if e.is_connect() { 307 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 308 .into_response()) 309 } else { 310 Err(ApiError::UpstreamFailure.into_response()) 311 } 312 } 313 } 314} 315 316pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response { 317 let mut response = (StatusCode::OK, Json(data)).into_response(); 318 319 if let Some(lag_ms) = lag { 320 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) { 321 response 322 .headers_mut() 323 .insert(UPSTREAM_LAG_HEADER, header_val); 324 } 325 } 326 327 response 328} 329 330#[derive(Debug, Clone, Serialize, Deserialize)] 331#[serde(rename_all = "camelCase")] 332pub struct AuthorView { 333 pub did: String, 334 pub handle: String, 335 #[serde(skip_serializing_if = "Option::is_none")] 336 pub display_name: Option<String>, 337 #[serde(skip_serializing_if = "Option::is_none")] 338 pub avatar: Option<String>, 339 #[serde(flatten)] 340 pub extra: HashMap<String, Value>, 341} 342 343#[derive(Debug, Clone, Serialize, Deserialize)] 344#[serde(rename_all = "camelCase")] 345pub struct PostView { 346 pub uri: String, 347 pub cid: String, 348 pub author: AuthorView, 349 pub record: Value, 350 pub indexed_at: String, 351 #[serde(skip_serializing_if = "Option::is_none")] 352 pub embed: Option<Value>, 353 #[serde(default)] 354 pub reply_count: i64, 355 #[serde(default)] 356 pub repost_count: i64, 357 #[serde(default)] 358 pub like_count: i64, 359 #[serde(default)] 360 pub quote_count: i64, 361 #[serde(flatten)] 362 pub extra: HashMap<String, Value>, 363} 364 365#[derive(Debug, Clone, Serialize, Deserialize)] 366#[serde(rename_all = "camelCase")] 367pub struct FeedViewPost { 368 pub post: PostView, 369 #[serde(skip_serializing_if = "Option::is_none")] 370 pub reply: Option<Value>, 371 #[serde(skip_serializing_if = "Option::is_none")] 372 pub reason: Option<Value>, 373 #[serde(skip_serializing_if = "Option::is_none")] 374 pub feed_context: Option<String>, 375 #[serde(flatten)] 376 pub extra: HashMap<String, Value>, 377} 378 379#[derive(Debug, Clone, Serialize, Deserialize)] 380pub struct FeedOutput { 381 pub feed: Vec<FeedViewPost>, 382 #[serde(skip_serializing_if = "Option::is_none")] 383 pub cursor: Option<String>, 384} 385 386pub fn format_local_post( 387 descript: &RecordDescript<PostRecord>, 388 author_did: &str, 389 author_handle: &str, 390 profile: Option<&RecordDescript<ProfileRecord>>, 391) -> PostView { 392 let display_name = profile.and_then(|p| p.record.display_name.clone()); 393 394 PostView { 395 uri: descript.uri.clone(), 396 cid: descript.cid.clone(), 397 author: AuthorView { 398 did: author_did.to_string(), 399 handle: author_handle.to_string(), 400 display_name, 401 avatar: None, 402 extra: HashMap::new(), 403 }, 404 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null), 405 indexed_at: descript.indexed_at.to_rfc3339(), 406 embed: descript.record.embed.clone(), 407 reply_count: 0, 408 repost_count: 0, 409 like_count: 0, 410 quote_count: 0, 411 extra: HashMap::new(), 412 } 413} 414 415pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) { 416 if posts.is_empty() { 417 return; 418 } 419 420 let new_items: Vec<FeedViewPost> = posts 421 .into_iter() 422 .map(|post| FeedViewPost { 423 post, 424 reply: None, 425 reason: None, 426 feed_context: None, 427 extra: HashMap::new(), 428 }) 429 .collect(); 430 431 feed.extend(new_items); 432 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at)); 433}