this repo has no description
1use crate::api::proxy_client::{
2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD,
3};
4use crate::api::ApiError;
5use crate::state::AppState;
6use axum::{
7 http::{HeaderMap, HeaderValue, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use chrono::{DateTime, Utc};
12use jacquard_repo::storage::BlockStore;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use tracing::{error, info, warn};
17use uuid::Uuid;
18
19pub const REPO_REV_HEADER: &str = "atproto-repo-rev";
20pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag";
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub struct PostRecord {
25 #[serde(rename = "$type")]
26 pub record_type: Option<String>,
27 pub text: String,
28 pub created_at: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub reply: Option<Value>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub embed: Option<Value>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub langs: Option<Vec<String>>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub labels: Option<Value>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub tags: Option<Vec<String>>,
39 #[serde(flatten)]
40 pub extra: HashMap<String, Value>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct ProfileRecord {
46 #[serde(rename = "$type")]
47 pub record_type: Option<String>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub display_name: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub description: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub avatar: Option<Value>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub banner: Option<Value>,
56 #[serde(flatten)]
57 pub extra: HashMap<String, Value>,
58}
59
60#[derive(Debug, Clone)]
61pub struct RecordDescript<T> {
62 pub uri: String,
63 pub cid: String,
64 pub indexed_at: DateTime<Utc>,
65 pub record: T,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69#[serde(rename_all = "camelCase")]
70pub struct LikeRecord {
71 #[serde(rename = "$type")]
72 pub record_type: Option<String>,
73 pub subject: LikeSubject,
74 pub created_at: String,
75 #[serde(flatten)]
76 pub extra: HashMap<String, Value>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(rename_all = "camelCase")]
81pub struct LikeSubject {
82 pub uri: String,
83 pub cid: String,
84}
85
86#[derive(Debug, Default)]
87pub struct LocalRecords {
88 pub count: usize,
89 pub profile: Option<RecordDescript<ProfileRecord>>,
90 pub posts: Vec<RecordDescript<PostRecord>>,
91 pub likes: Vec<RecordDescript<LikeRecord>>,
92}
93
94pub async fn get_records_since_rev(
95 state: &AppState,
96 did: &str,
97 rev: &str,
98) -> Result<LocalRecords, String> {
99 let mut result = LocalRecords::default();
100
101 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
102 .fetch_optional(&state.db)
103 .await
104 .map_err(|e| format!("DB error: {}", e))?
105 .ok_or_else(|| "User not found".to_string())?;
106
107 let rows = sqlx::query!(
108 r#"
109 SELECT record_cid, collection, rkey, created_at, repo_rev
110 FROM records
111 WHERE repo_id = $1 AND repo_rev > $2
112 ORDER BY repo_rev ASC
113 LIMIT 10
114 "#,
115 user_id,
116 rev
117 )
118 .fetch_all(&state.db)
119 .await
120 .map_err(|e| format!("DB error fetching records: {}", e))?;
121
122 if rows.is_empty() {
123 return Ok(result);
124 }
125
126 let sanity_check = sqlx::query_scalar!(
127 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1",
128 user_id,
129 rev
130 )
131 .fetch_optional(&state.db)
132 .await
133 .map_err(|e| format!("DB error sanity check: {}", e))?;
134
135 if sanity_check.is_none() {
136 warn!("Sanity check failed: no records found before rev {}", rev);
137 return Ok(result);
138 }
139
140 for row in rows {
141 result.count += 1;
142
143 let cid: cid::Cid = match row.record_cid.parse() {
144 Ok(c) => c,
145 Err(_) => continue,
146 };
147
148 let block_bytes = match state.block_store.get(&cid).await {
149 Ok(Some(b)) => b,
150 _ => continue,
151 };
152
153 let uri = format!("at://{}/{}/{}", did, row.collection, row.rkey);
154 let indexed_at = row.created_at;
155
156 if row.collection == "app.bsky.actor.profile" && row.rkey == "self" {
157 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) {
158 result.profile = Some(RecordDescript {
159 uri,
160 cid: row.record_cid,
161 indexed_at,
162 record,
163 });
164 }
165 } else if row.collection == "app.bsky.feed.post" {
166 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) {
167 result.posts.push(RecordDescript {
168 uri,
169 cid: row.record_cid,
170 indexed_at,
171 record,
172 });
173 }
174 } else if row.collection == "app.bsky.feed.like" {
175 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) {
176 result.likes.push(RecordDescript {
177 uri,
178 cid: row.record_cid,
179 indexed_at,
180 record,
181 });
182 }
183 }
184 }
185
186 Ok(result)
187}
188
189pub fn get_local_lag(local: &LocalRecords) -> Option<i64> {
190 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at);
191
192 for post in &local.posts {
193 match oldest {
194 None => oldest = Some(post.indexed_at),
195 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at),
196 _ => {}
197 }
198 }
199
200 for like in &local.likes {
201 match oldest {
202 None => oldest = Some(like.indexed_at),
203 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at),
204 _ => {}
205 }
206 }
207
208 oldest.map(|o| (Utc::now() - o).num_milliseconds())
209}
210
211pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> {
212 headers
213 .get(REPO_REV_HEADER)
214 .and_then(|h| h.to_str().ok())
215 .map(|s| s.to_string())
216}
217
218#[derive(Debug)]
219pub struct ProxyResponse {
220 pub status: StatusCode,
221 pub headers: HeaderMap,
222 pub body: bytes::Bytes,
223}
224
225pub async fn proxy_to_appview(
226 method: &str,
227 params: &HashMap<String, String>,
228 auth_header: Option<&str>,
229) -> Result<ProxyResponse, Response> {
230 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
231 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
232 })?;
233
234 if let Err(e) = is_ssrf_safe(&appview_url) {
235 error!("SSRF check failed for appview URL: {}", e);
236 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e))
237 .into_response());
238 }
239
240 let target_url = format!("{}/xrpc/{}", appview_url, method);
241 info!(target = %target_url, "Proxying request to appview");
242
243 let client = proxy_client();
244 let mut request_builder = client.get(&target_url).query(params);
245
246 if let Some(auth) = auth_header {
247 request_builder = request_builder.header("Authorization", auth);
248 }
249
250 match request_builder.send().await {
251 Ok(resp) => {
252 let status =
253 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
254
255 let headers: HeaderMap = resp
256 .headers()
257 .iter()
258 .filter(|(k, _)| {
259 RESPONSE_HEADERS_TO_FORWARD
260 .iter()
261 .any(|h| k.as_str().eq_ignore_ascii_case(h))
262 })
263 .filter_map(|(k, v)| {
264 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?;
265 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?;
266 Some((name, value))
267 })
268 .collect();
269
270 let content_length = resp
271 .content_length()
272 .unwrap_or(0);
273 if content_length > MAX_RESPONSE_SIZE {
274 error!(
275 content_length,
276 max = MAX_RESPONSE_SIZE,
277 "Upstream response too large"
278 );
279 return Err(ApiError::UpstreamFailure.into_response());
280 }
281
282 let body = resp.bytes().await.map_err(|e| {
283 error!(error = ?e, "Error reading proxy response body");
284 ApiError::UpstreamFailure.into_response()
285 })?;
286
287 if body.len() as u64 > MAX_RESPONSE_SIZE {
288 error!(
289 len = body.len(),
290 max = MAX_RESPONSE_SIZE,
291 "Upstream response body exceeded size limit"
292 );
293 return Err(ApiError::UpstreamFailure.into_response());
294 }
295
296 Ok(ProxyResponse {
297 status,
298 headers,
299 body,
300 })
301 }
302 Err(e) => {
303 error!(error = ?e, "Error sending proxy request");
304 if e.is_timeout() {
305 Err(ApiError::UpstreamTimeout.into_response())
306 } else if e.is_connect() {
307 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string())
308 .into_response())
309 } else {
310 Err(ApiError::UpstreamFailure.into_response())
311 }
312 }
313 }
314}
315
316pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response {
317 let mut response = (StatusCode::OK, Json(data)).into_response();
318
319 if let Some(lag_ms) = lag {
320 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) {
321 response
322 .headers_mut()
323 .insert(UPSTREAM_LAG_HEADER, header_val);
324 }
325 }
326
327 response
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332pub struct AuthorView {
333 pub did: String,
334 pub handle: String,
335 #[serde(skip_serializing_if = "Option::is_none")]
336 pub display_name: Option<String>,
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub avatar: Option<String>,
339 #[serde(flatten)]
340 pub extra: HashMap<String, Value>,
341}
342
343#[derive(Debug, Clone, Serialize, Deserialize)]
344#[serde(rename_all = "camelCase")]
345pub struct PostView {
346 pub uri: String,
347 pub cid: String,
348 pub author: AuthorView,
349 pub record: Value,
350 pub indexed_at: String,
351 #[serde(skip_serializing_if = "Option::is_none")]
352 pub embed: Option<Value>,
353 #[serde(default)]
354 pub reply_count: i64,
355 #[serde(default)]
356 pub repost_count: i64,
357 #[serde(default)]
358 pub like_count: i64,
359 #[serde(default)]
360 pub quote_count: i64,
361 #[serde(flatten)]
362 pub extra: HashMap<String, Value>,
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize)]
366#[serde(rename_all = "camelCase")]
367pub struct FeedViewPost {
368 pub post: PostView,
369 #[serde(skip_serializing_if = "Option::is_none")]
370 pub reply: Option<Value>,
371 #[serde(skip_serializing_if = "Option::is_none")]
372 pub reason: Option<Value>,
373 #[serde(skip_serializing_if = "Option::is_none")]
374 pub feed_context: Option<String>,
375 #[serde(flatten)]
376 pub extra: HashMap<String, Value>,
377}
378
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct FeedOutput {
381 pub feed: Vec<FeedViewPost>,
382 #[serde(skip_serializing_if = "Option::is_none")]
383 pub cursor: Option<String>,
384}
385
386pub fn format_local_post(
387 descript: &RecordDescript<PostRecord>,
388 author_did: &str,
389 author_handle: &str,
390 profile: Option<&RecordDescript<ProfileRecord>>,
391) -> PostView {
392 let display_name = profile.and_then(|p| p.record.display_name.clone());
393
394 PostView {
395 uri: descript.uri.clone(),
396 cid: descript.cid.clone(),
397 author: AuthorView {
398 did: author_did.to_string(),
399 handle: author_handle.to_string(),
400 display_name,
401 avatar: None,
402 extra: HashMap::new(),
403 },
404 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null),
405 indexed_at: descript.indexed_at.to_rfc3339(),
406 embed: descript.record.embed.clone(),
407 reply_count: 0,
408 repost_count: 0,
409 like_count: 0,
410 quote_count: 0,
411 extra: HashMap::new(),
412 }
413}
414
415pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) {
416 if posts.is_empty() {
417 return;
418 }
419
420 let new_items: Vec<FeedViewPost> = posts
421 .into_iter()
422 .map(|post| FeedViewPost {
423 post,
424 reply: None,
425 reason: None,
426 feed_context: None,
427 extra: HashMap::new(),
428 })
429 .collect();
430
431 feed.extend(new_items);
432 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at));
433}