this repo has no description
1use crate::api::proxy_client::{ 2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD, 3}; 4use crate::api::ApiError; 5use crate::state::AppState; 6use axum::{ 7 http::{HeaderMap, HeaderValue, StatusCode}, 8 response::{IntoResponse, Response}, 9 Json, 10}; 11use bytes::Bytes; 12use chrono::{DateTime, Utc}; 13use cid::Cid; 14use jacquard_repo::storage::BlockStore; 15use serde::{Deserialize, Serialize}; 16use serde_json::Value; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19use uuid::Uuid; 20 21pub const REPO_REV_HEADER: &str = "atproto-repo-rev"; 22pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag"; 23 24#[derive(Debug, Clone, Serialize, Deserialize)] 25#[serde(rename_all = "camelCase")] 26pub struct PostRecord { 27 #[serde(rename = "$type")] 28 pub record_type: Option<String>, 29 pub text: String, 30 pub created_at: String, 31 #[serde(skip_serializing_if = "Option::is_none")] 32 pub reply: Option<Value>, 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub embed: Option<Value>, 35 #[serde(skip_serializing_if = "Option::is_none")] 36 pub langs: Option<Vec<String>>, 37 #[serde(skip_serializing_if = "Option::is_none")] 38 pub labels: Option<Value>, 39 #[serde(skip_serializing_if = "Option::is_none")] 40 pub tags: Option<Vec<String>>, 41 #[serde(flatten)] 42 pub extra: HashMap<String, Value>, 43} 44 45#[derive(Debug, Clone, Serialize, Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct ProfileRecord { 48 #[serde(rename = "$type")] 49 pub record_type: Option<String>, 50 #[serde(skip_serializing_if = "Option::is_none")] 51 pub display_name: Option<String>, 52 #[serde(skip_serializing_if = "Option::is_none")] 53 pub description: Option<String>, 54 #[serde(skip_serializing_if = "Option::is_none")] 55 pub avatar: Option<Value>, 56 #[serde(skip_serializing_if = "Option::is_none")] 57 pub banner: Option<Value>, 58 #[serde(flatten)] 59 pub extra: HashMap<String, Value>, 60} 61 62#[derive(Debug, Clone)] 63pub struct RecordDescript<T> { 64 pub uri: String, 65 pub cid: String, 66 pub indexed_at: DateTime<Utc>, 67 pub record: T, 68} 69 70#[derive(Debug, Clone, Serialize, Deserialize)] 71#[serde(rename_all = "camelCase")] 72pub struct LikeRecord { 73 #[serde(rename = "$type")] 74 pub record_type: Option<String>, 75 pub subject: LikeSubject, 76 pub created_at: String, 77 #[serde(flatten)] 78 pub extra: HashMap<String, Value>, 79} 80 81#[derive(Debug, Clone, Serialize, Deserialize)] 82#[serde(rename_all = "camelCase")] 83pub struct LikeSubject { 84 pub uri: String, 85 pub cid: String, 86} 87 88#[derive(Debug, Default)] 89pub struct LocalRecords { 90 pub count: usize, 91 pub profile: Option<RecordDescript<ProfileRecord>>, 92 pub posts: Vec<RecordDescript<PostRecord>>, 93 pub likes: Vec<RecordDescript<LikeRecord>>, 94} 95 96pub async fn get_records_since_rev( 97 state: &AppState, 98 did: &str, 99 rev: &str, 100) -> Result<LocalRecords, String> { 101 let mut result = LocalRecords::default(); 102 103 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 104 .fetch_optional(&state.db) 105 .await 106 .map_err(|e| format!("DB error: {}", e))? 107 .ok_or_else(|| "User not found".to_string())?; 108 109 let rows = sqlx::query!( 110 r#" 111 SELECT record_cid, collection, rkey, created_at, repo_rev 112 FROM records 113 WHERE repo_id = $1 AND repo_rev > $2 114 ORDER BY repo_rev ASC 115 LIMIT 10 116 "#, 117 user_id, 118 rev 119 ) 120 .fetch_all(&state.db) 121 .await 122 .map_err(|e| format!("DB error fetching records: {}", e))?; 123 124 if rows.is_empty() { 125 return Ok(result); 126 } 127 128 let sanity_check = sqlx::query_scalar!( 129 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 130 user_id, 131 rev 132 ) 133 .fetch_optional(&state.db) 134 .await 135 .map_err(|e| format!("DB error sanity check: {}", e))?; 136 137 if sanity_check.is_none() { 138 warn!("Sanity check failed: no records found before rev {}", rev); 139 return Ok(result); 140 } 141 142 struct RowData { 143 cid_str: String, 144 collection: String, 145 rkey: String, 146 created_at: DateTime<Utc>, 147 } 148 149 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len()); 150 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 151 152 for row in &rows { 153 if let Ok(cid) = row.record_cid.parse::<Cid>() { 154 cids.push(cid); 155 row_data.push(RowData { 156 cid_str: row.record_cid.clone(), 157 collection: row.collection.clone(), 158 rkey: row.rkey.clone(), 159 created_at: row.created_at, 160 }); 161 } 162 } 163 164 let blocks: Vec<Option<Bytes>> = state 165 .block_store 166 .get_many(&cids) 167 .await 168 .map_err(|e| format!("Error fetching blocks: {}", e))?; 169 170 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) { 171 let block_bytes = match block_opt { 172 Some(b) => b, 173 None => continue, 174 }; 175 176 result.count += 1; 177 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey); 178 179 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" { 180 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) { 181 result.profile = Some(RecordDescript { 182 uri, 183 cid: data.cid_str, 184 indexed_at: data.created_at, 185 record, 186 }); 187 } 188 } else if data.collection == "app.bsky.feed.post" { 189 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) { 190 result.posts.push(RecordDescript { 191 uri, 192 cid: data.cid_str, 193 indexed_at: data.created_at, 194 record, 195 }); 196 } 197 } else if data.collection == "app.bsky.feed.like" { 198 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) { 199 result.likes.push(RecordDescript { 200 uri, 201 cid: data.cid_str, 202 indexed_at: data.created_at, 203 record, 204 }); 205 } 206 } 207 } 208 209 Ok(result) 210} 211 212pub fn get_local_lag(local: &LocalRecords) -> Option<i64> { 213 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at); 214 215 for post in &local.posts { 216 match oldest { 217 None => oldest = Some(post.indexed_at), 218 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at), 219 _ => {} 220 } 221 } 222 223 for like in &local.likes { 224 match oldest { 225 None => oldest = Some(like.indexed_at), 226 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at), 227 _ => {} 228 } 229 } 230 231 oldest.map(|o| (Utc::now() - o).num_milliseconds()) 232} 233 234pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> { 235 headers 236 .get(REPO_REV_HEADER) 237 .and_then(|h| h.to_str().ok()) 238 .map(|s| s.to_string()) 239} 240 241#[derive(Debug)] 242pub struct ProxyResponse { 243 pub status: StatusCode, 244 pub headers: HeaderMap, 245 pub body: bytes::Bytes, 246} 247 248pub async fn proxy_to_appview( 249 method: &str, 250 params: &HashMap<String, String>, 251 auth_header: Option<&str>, 252) -> Result<ProxyResponse, Response> { 253 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| { 254 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response() 255 })?; 256 257 if let Err(e) = is_ssrf_safe(&appview_url) { 258 error!("SSRF check failed for appview URL: {}", e); 259 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)) 260 .into_response()); 261 } 262 263 let target_url = format!("{}/xrpc/{}", appview_url, method); 264 info!(target = %target_url, "Proxying request to appview"); 265 266 let client = proxy_client(); 267 let mut request_builder = client.get(&target_url).query(params); 268 269 if let Some(auth) = auth_header { 270 request_builder = request_builder.header("Authorization", auth); 271 } 272 273 match request_builder.send().await { 274 Ok(resp) => { 275 let status = 276 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 277 278 let headers: HeaderMap = resp 279 .headers() 280 .iter() 281 .filter(|(k, _)| { 282 RESPONSE_HEADERS_TO_FORWARD 283 .iter() 284 .any(|h| k.as_str().eq_ignore_ascii_case(h)) 285 }) 286 .filter_map(|(k, v)| { 287 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?; 288 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?; 289 Some((name, value)) 290 }) 291 .collect(); 292 293 let content_length = resp 294 .content_length() 295 .unwrap_or(0); 296 if content_length > MAX_RESPONSE_SIZE { 297 error!( 298 content_length, 299 max = MAX_RESPONSE_SIZE, 300 "Upstream response too large" 301 ); 302 return Err(ApiError::UpstreamFailure.into_response()); 303 } 304 305 let body = resp.bytes().await.map_err(|e| { 306 error!(error = ?e, "Error reading proxy response body"); 307 ApiError::UpstreamFailure.into_response() 308 })?; 309 310 if body.len() as u64 > MAX_RESPONSE_SIZE { 311 error!( 312 len = body.len(), 313 max = MAX_RESPONSE_SIZE, 314 "Upstream response body exceeded size limit" 315 ); 316 return Err(ApiError::UpstreamFailure.into_response()); 317 } 318 319 Ok(ProxyResponse { 320 status, 321 headers, 322 body, 323 }) 324 } 325 Err(e) => { 326 error!(error = ?e, "Error sending proxy request"); 327 if e.is_timeout() { 328 Err(ApiError::UpstreamTimeout.into_response()) 329 } else if e.is_connect() { 330 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 331 .into_response()) 332 } else { 333 Err(ApiError::UpstreamFailure.into_response()) 334 } 335 } 336 } 337} 338 339pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response { 340 let mut response = (StatusCode::OK, Json(data)).into_response(); 341 342 if let Some(lag_ms) = lag { 343 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) { 344 response 345 .headers_mut() 346 .insert(UPSTREAM_LAG_HEADER, header_val); 347 } 348 } 349 350 response 351} 352 353#[derive(Debug, Clone, Serialize, Deserialize)] 354#[serde(rename_all = "camelCase")] 355pub struct AuthorView { 356 pub did: String, 357 pub handle: String, 358 #[serde(skip_serializing_if = "Option::is_none")] 359 pub display_name: Option<String>, 360 #[serde(skip_serializing_if = "Option::is_none")] 361 pub avatar: Option<String>, 362 #[serde(flatten)] 363 pub extra: HashMap<String, Value>, 364} 365 366#[derive(Debug, Clone, Serialize, Deserialize)] 367#[serde(rename_all = "camelCase")] 368pub struct PostView { 369 pub uri: String, 370 pub cid: String, 371 pub author: AuthorView, 372 pub record: Value, 373 pub indexed_at: String, 374 #[serde(skip_serializing_if = "Option::is_none")] 375 pub embed: Option<Value>, 376 #[serde(default)] 377 pub reply_count: i64, 378 #[serde(default)] 379 pub repost_count: i64, 380 #[serde(default)] 381 pub like_count: i64, 382 #[serde(default)] 383 pub quote_count: i64, 384 #[serde(flatten)] 385 pub extra: HashMap<String, Value>, 386} 387 388#[derive(Debug, Clone, Serialize, Deserialize)] 389#[serde(rename_all = "camelCase")] 390pub struct FeedViewPost { 391 pub post: PostView, 392 #[serde(skip_serializing_if = "Option::is_none")] 393 pub reply: Option<Value>, 394 #[serde(skip_serializing_if = "Option::is_none")] 395 pub reason: Option<Value>, 396 #[serde(skip_serializing_if = "Option::is_none")] 397 pub feed_context: Option<String>, 398 #[serde(flatten)] 399 pub extra: HashMap<String, Value>, 400} 401 402#[derive(Debug, Clone, Serialize, Deserialize)] 403pub struct FeedOutput { 404 pub feed: Vec<FeedViewPost>, 405 #[serde(skip_serializing_if = "Option::is_none")] 406 pub cursor: Option<String>, 407} 408 409pub fn format_local_post( 410 descript: &RecordDescript<PostRecord>, 411 author_did: &str, 412 author_handle: &str, 413 profile: Option<&RecordDescript<ProfileRecord>>, 414) -> PostView { 415 let display_name = profile.and_then(|p| p.record.display_name.clone()); 416 417 PostView { 418 uri: descript.uri.clone(), 419 cid: descript.cid.clone(), 420 author: AuthorView { 421 did: author_did.to_string(), 422 handle: author_handle.to_string(), 423 display_name, 424 avatar: None, 425 extra: HashMap::new(), 426 }, 427 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null), 428 indexed_at: descript.indexed_at.to_rfc3339(), 429 embed: descript.record.embed.clone(), 430 reply_count: 0, 431 repost_count: 0, 432 like_count: 0, 433 quote_count: 0, 434 extra: HashMap::new(), 435 } 436} 437 438pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) { 439 if posts.is_empty() { 440 return; 441 } 442 443 let new_items: Vec<FeedViewPost> = posts 444 .into_iter() 445 .map(|post| FeedViewPost { 446 post, 447 reply: None, 448 reason: None, 449 feed_context: None, 450 extra: HashMap::new(), 451 }) 452 .collect(); 453 454 feed.extend(new_items); 455 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at)); 456}