this repo has no description
1use crate::api::ApiError; 2use crate::api::proxy_client::{ 3 MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD, is_ssrf_safe, proxy_client, 4}; 5use crate::state::AppState; 6use axum::{ 7 Json, 8 http::{HeaderMap, HeaderValue, StatusCode}, 9 response::{IntoResponse, Response}, 10}; 11use bytes::Bytes; 12use chrono::{DateTime, Utc}; 13use cid::Cid; 14use jacquard_repo::storage::BlockStore; 15use serde::{Deserialize, Serialize}; 16use serde_json::Value; 17use std::collections::HashMap; 18use tracing::{error, info, warn}; 19use uuid::Uuid; 20 21pub const REPO_REV_HEADER: &str = "atproto-repo-rev"; 22pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag"; 23 24#[derive(Debug, Clone, Serialize, Deserialize)] 25#[serde(rename_all = "camelCase")] 26pub struct PostRecord { 27 #[serde(rename = "$type")] 28 pub record_type: Option<String>, 29 pub text: String, 30 pub created_at: String, 31 #[serde(skip_serializing_if = "Option::is_none")] 32 pub reply: Option<Value>, 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub embed: Option<Value>, 35 #[serde(skip_serializing_if = "Option::is_none")] 36 pub langs: Option<Vec<String>>, 37 #[serde(skip_serializing_if = "Option::is_none")] 38 pub labels: Option<Value>, 39 #[serde(skip_serializing_if = "Option::is_none")] 40 pub tags: Option<Vec<String>>, 41 #[serde(flatten)] 42 pub extra: HashMap<String, Value>, 43} 44 45#[derive(Debug, Clone, Serialize, Deserialize)] 46#[serde(rename_all = "camelCase")] 47pub struct ProfileRecord { 48 #[serde(rename = "$type")] 49 pub record_type: Option<String>, 50 #[serde(skip_serializing_if = "Option::is_none")] 51 pub display_name: Option<String>, 52 #[serde(skip_serializing_if = "Option::is_none")] 53 pub description: Option<String>, 54 #[serde(skip_serializing_if = "Option::is_none")] 55 pub avatar: Option<Value>, 56 #[serde(skip_serializing_if = "Option::is_none")] 57 pub banner: Option<Value>, 58 #[serde(flatten)] 59 pub extra: HashMap<String, Value>, 60} 61 62#[derive(Debug, Clone)] 63pub struct RecordDescript<T> { 64 pub uri: String, 65 pub cid: String, 66 pub indexed_at: DateTime<Utc>, 67 pub record: T, 68} 69 70#[derive(Debug, Clone, Serialize, Deserialize)] 71#[serde(rename_all = "camelCase")] 72pub struct LikeRecord { 73 #[serde(rename = "$type")] 74 pub record_type: Option<String>, 75 pub subject: LikeSubject, 76 pub created_at: String, 77 #[serde(flatten)] 78 pub extra: HashMap<String, Value>, 79} 80 81#[derive(Debug, Clone, Serialize, Deserialize)] 82#[serde(rename_all = "camelCase")] 83pub struct LikeSubject { 84 pub uri: String, 85 pub cid: String, 86} 87 88#[derive(Debug, Default)] 89pub struct LocalRecords { 90 pub count: usize, 91 pub profile: Option<RecordDescript<ProfileRecord>>, 92 pub posts: Vec<RecordDescript<PostRecord>>, 93 pub likes: Vec<RecordDescript<LikeRecord>>, 94} 95 96pub async fn get_records_since_rev( 97 state: &AppState, 98 did: &str, 99 rev: &str, 100) -> Result<LocalRecords, String> { 101 let mut result = LocalRecords::default(); 102 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did) 103 .fetch_optional(&state.db) 104 .await 105 .map_err(|e| format!("DB error: {}", e))? 106 .ok_or_else(|| "User not found".to_string())?; 107 let rows = sqlx::query!( 108 r#" 109 SELECT record_cid, collection, rkey, created_at, repo_rev 110 FROM records 111 WHERE repo_id = $1 AND repo_rev > $2 112 ORDER BY repo_rev ASC 113 LIMIT 10 114 "#, 115 user_id, 116 rev 117 ) 118 .fetch_all(&state.db) 119 .await 120 .map_err(|e| format!("DB error fetching records: {}", e))?; 121 if rows.is_empty() { 122 return Ok(result); 123 } 124 let sanity_check = sqlx::query_scalar!( 125 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1", 126 user_id, 127 rev 128 ) 129 .fetch_optional(&state.db) 130 .await 131 .map_err(|e| format!("DB error sanity check: {}", e))?; 132 if sanity_check.is_none() { 133 warn!("Sanity check failed: no records found before rev {}", rev); 134 return Ok(result); 135 } 136 struct RowData { 137 cid_str: String, 138 collection: String, 139 rkey: String, 140 created_at: DateTime<Utc>, 141 } 142 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len()); 143 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len()); 144 for row in &rows { 145 if let Ok(cid) = row.record_cid.parse::<Cid>() { 146 cids.push(cid); 147 row_data.push(RowData { 148 cid_str: row.record_cid.clone(), 149 collection: row.collection.clone(), 150 rkey: row.rkey.clone(), 151 created_at: row.created_at, 152 }); 153 } 154 } 155 let blocks: Vec<Option<Bytes>> = state 156 .block_store 157 .get_many(&cids) 158 .await 159 .map_err(|e| format!("Error fetching blocks: {}", e))?; 160 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) { 161 let block_bytes = match block_opt { 162 Some(b) => b, 163 None => continue, 164 }; 165 result.count += 1; 166 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey); 167 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" { 168 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) { 169 result.profile = Some(RecordDescript { 170 uri, 171 cid: data.cid_str, 172 indexed_at: data.created_at, 173 record, 174 }); 175 } 176 } else if data.collection == "app.bsky.feed.post" { 177 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) { 178 result.posts.push(RecordDescript { 179 uri, 180 cid: data.cid_str, 181 indexed_at: data.created_at, 182 record, 183 }); 184 } 185 } else if data.collection == "app.bsky.feed.like" 186 && let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) { 187 result.likes.push(RecordDescript { 188 uri, 189 cid: data.cid_str, 190 indexed_at: data.created_at, 191 record, 192 }); 193 } 194 } 195 Ok(result) 196} 197 198pub fn get_local_lag(local: &LocalRecords) -> Option<i64> { 199 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at); 200 for post in &local.posts { 201 match oldest { 202 None => oldest = Some(post.indexed_at), 203 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at), 204 _ => {} 205 } 206 } 207 for like in &local.likes { 208 match oldest { 209 None => oldest = Some(like.indexed_at), 210 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at), 211 _ => {} 212 } 213 } 214 oldest.map(|o| (Utc::now() - o).num_milliseconds()) 215} 216 217pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> { 218 headers 219 .get(REPO_REV_HEADER) 220 .and_then(|h| h.to_str().ok()) 221 .map(|s| s.to_string()) 222} 223 224#[derive(Debug)] 225pub struct ProxyResponse { 226 pub status: StatusCode, 227 pub headers: HeaderMap, 228 pub body: bytes::Bytes, 229} 230 231impl ProxyResponse { 232 pub fn into_response(self) -> Response { 233 let mut response = Response::builder().status(self.status); 234 for (key, value) in self.headers.iter() { 235 response = response.header(key, value); 236 } 237 response.body(axum::body::Body::from(self.body)).unwrap() 238 } 239} 240 241pub async fn proxy_to_appview_via_registry( 242 state: &AppState, 243 method: &str, 244 params: &HashMap<String, String>, 245 auth_did: &str, 246 auth_key_bytes: Option<&[u8]>, 247) -> Result<ProxyResponse, Response> { 248 let resolved = state.appview_registry.get_appview_for_method(method).await.ok_or_else(|| { 249 ApiError::UpstreamUnavailable(format!("No AppView configured for method: {}", method)).into_response() 250 })?; 251 proxy_to_appview_with_url(method, params, auth_did, auth_key_bytes, &resolved.url, &resolved.did).await 252} 253 254pub async fn proxy_to_appview_with_url( 255 method: &str, 256 params: &HashMap<String, String>, 257 auth_did: &str, 258 auth_key_bytes: Option<&[u8]>, 259 appview_url: &str, 260 appview_did: &str, 261) -> Result<ProxyResponse, Response> { 262 if let Err(e) = is_ssrf_safe(appview_url) { 263 error!("SSRF check failed for appview URL: {}", e); 264 return Err( 265 ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e)).into_response(), 266 ); 267 } 268 let target_url = format!("{}/xrpc/{}", appview_url, method); 269 info!(target = %target_url, "Proxying request to appview"); 270 let client = proxy_client(); 271 let mut request_builder = client.get(&target_url).query(params); 272 if let Some(key_bytes) = auth_key_bytes { 273 match crate::auth::create_service_token(auth_did, appview_did, method, key_bytes) { 274 Ok(service_token) => { 275 request_builder = 276 request_builder.header("Authorization", format!("Bearer {}", service_token)); 277 } 278 Err(e) => { 279 error!(error = ?e, "Failed to create service token"); 280 return Err(ApiError::InternalError.into_response()); 281 } 282 } 283 } 284 match request_builder.send().await { 285 Ok(resp) => { 286 let status = 287 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY); 288 let headers: HeaderMap = resp 289 .headers() 290 .iter() 291 .filter(|(k, _)| { 292 RESPONSE_HEADERS_TO_FORWARD 293 .iter() 294 .any(|h| k.as_str().eq_ignore_ascii_case(h)) 295 }) 296 .filter_map(|(k, v)| { 297 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?; 298 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?; 299 Some((name, value)) 300 }) 301 .collect(); 302 let content_length = resp.content_length().unwrap_or(0); 303 if content_length > MAX_RESPONSE_SIZE { 304 error!( 305 content_length, 306 max = MAX_RESPONSE_SIZE, 307 "Upstream response too large" 308 ); 309 return Err(ApiError::UpstreamFailure.into_response()); 310 } 311 let body = resp.bytes().await.map_err(|e| { 312 error!(error = ?e, "Error reading proxy response body"); 313 ApiError::UpstreamFailure.into_response() 314 })?; 315 if body.len() as u64 > MAX_RESPONSE_SIZE { 316 error!( 317 len = body.len(), 318 max = MAX_RESPONSE_SIZE, 319 "Upstream response body exceeded size limit" 320 ); 321 return Err(ApiError::UpstreamFailure.into_response()); 322 } 323 Ok(ProxyResponse { 324 status, 325 headers, 326 body, 327 }) 328 } 329 Err(e) => { 330 error!(error = ?e, "Error sending proxy request"); 331 if e.is_timeout() { 332 Err(ApiError::UpstreamTimeout.into_response()) 333 } else if e.is_connect() { 334 Err( 335 ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string()) 336 .into_response(), 337 ) 338 } else { 339 Err(ApiError::UpstreamFailure.into_response()) 340 } 341 } 342 } 343} 344 345pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response { 346 let mut response = (StatusCode::OK, Json(data)).into_response(); 347 if let Some(lag_ms) = lag 348 && let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) { 349 response 350 .headers_mut() 351 .insert(UPSTREAM_LAG_HEADER, header_val); 352 } 353 response 354} 355 356#[derive(Debug, Clone, Serialize, Deserialize)] 357#[serde(rename_all = "camelCase")] 358pub struct AuthorView { 359 pub did: String, 360 pub handle: String, 361 #[serde(skip_serializing_if = "Option::is_none")] 362 pub display_name: Option<String>, 363 #[serde(skip_serializing_if = "Option::is_none")] 364 pub avatar: Option<String>, 365 #[serde(flatten)] 366 pub extra: HashMap<String, Value>, 367} 368 369#[derive(Debug, Clone, Serialize, Deserialize)] 370#[serde(rename_all = "camelCase")] 371pub struct PostView { 372 pub uri: String, 373 pub cid: String, 374 pub author: AuthorView, 375 pub record: Value, 376 pub indexed_at: String, 377 #[serde(skip_serializing_if = "Option::is_none")] 378 pub embed: Option<Value>, 379 #[serde(default)] 380 pub reply_count: i64, 381 #[serde(default)] 382 pub repost_count: i64, 383 #[serde(default)] 384 pub like_count: i64, 385 #[serde(default)] 386 pub quote_count: i64, 387 #[serde(flatten)] 388 pub extra: HashMap<String, Value>, 389} 390 391#[derive(Debug, Clone, Serialize, Deserialize)] 392#[serde(rename_all = "camelCase")] 393pub struct FeedViewPost { 394 pub post: PostView, 395 #[serde(skip_serializing_if = "Option::is_none")] 396 pub reply: Option<Value>, 397 #[serde(skip_serializing_if = "Option::is_none")] 398 pub reason: Option<Value>, 399 #[serde(skip_serializing_if = "Option::is_none")] 400 pub feed_context: Option<String>, 401 #[serde(flatten)] 402 pub extra: HashMap<String, Value>, 403} 404 405#[derive(Debug, Clone, Serialize, Deserialize)] 406pub struct FeedOutput { 407 pub feed: Vec<FeedViewPost>, 408 #[serde(skip_serializing_if = "Option::is_none")] 409 pub cursor: Option<String>, 410} 411 412pub fn format_local_post( 413 descript: &RecordDescript<PostRecord>, 414 author_did: &str, 415 author_handle: &str, 416 profile: Option<&RecordDescript<ProfileRecord>>, 417) -> PostView { 418 let display_name = profile.and_then(|p| p.record.display_name.clone()); 419 PostView { 420 uri: descript.uri.clone(), 421 cid: descript.cid.clone(), 422 author: AuthorView { 423 did: author_did.to_string(), 424 handle: author_handle.to_string(), 425 display_name, 426 avatar: None, 427 extra: HashMap::new(), 428 }, 429 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null), 430 indexed_at: descript.indexed_at.to_rfc3339(), 431 embed: descript.record.embed.clone(), 432 reply_count: 0, 433 repost_count: 0, 434 like_count: 0, 435 quote_count: 0, 436 extra: HashMap::new(), 437 } 438} 439 440pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) { 441 if posts.is_empty() { 442 return; 443 } 444 let new_items: Vec<FeedViewPost> = posts 445 .into_iter() 446 .map(|post| FeedViewPost { 447 post, 448 reply: None, 449 reason: None, 450 feed_context: None, 451 extra: HashMap::new(), 452 }) 453 .collect(); 454 feed.extend(new_items); 455 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at)); 456}