this repo has no description
1use crate::api::proxy_client::{
2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD,
3};
4use crate::api::ApiError;
5use crate::state::AppState;
6use axum::{
7 http::{HeaderMap, HeaderValue, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use jacquard_repo::storage::BlockStore;
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19use uuid::Uuid;
20
21pub const REPO_REV_HEADER: &str = "atproto-repo-rev";
22pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag";
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(rename_all = "camelCase")]
26pub struct PostRecord {
27 #[serde(rename = "$type")]
28 pub record_type: Option<String>,
29 pub text: String,
30 pub created_at: String,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub reply: Option<Value>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub embed: Option<Value>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub langs: Option<Vec<String>>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub labels: Option<Value>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub tags: Option<Vec<String>>,
41 #[serde(flatten)]
42 pub extra: HashMap<String, Value>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct ProfileRecord {
48 #[serde(rename = "$type")]
49 pub record_type: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub display_name: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub description: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub avatar: Option<Value>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub banner: Option<Value>,
58 #[serde(flatten)]
59 pub extra: HashMap<String, Value>,
60}
61
62#[derive(Debug, Clone)]
63pub struct RecordDescript<T> {
64 pub uri: String,
65 pub cid: String,
66 pub indexed_at: DateTime<Utc>,
67 pub record: T,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "camelCase")]
72pub struct LikeRecord {
73 #[serde(rename = "$type")]
74 pub record_type: Option<String>,
75 pub subject: LikeSubject,
76 pub created_at: String,
77 #[serde(flatten)]
78 pub extra: HashMap<String, Value>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct LikeSubject {
84 pub uri: String,
85 pub cid: String,
86}
87
88#[derive(Debug, Default)]
89pub struct LocalRecords {
90 pub count: usize,
91 pub profile: Option<RecordDescript<ProfileRecord>>,
92 pub posts: Vec<RecordDescript<PostRecord>>,
93 pub likes: Vec<RecordDescript<LikeRecord>>,
94}
95
96pub async fn get_records_since_rev(
97 state: &AppState,
98 did: &str,
99 rev: &str,
100) -> Result<LocalRecords, String> {
101 let mut result = LocalRecords::default();
102
103 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
104 .fetch_optional(&state.db)
105 .await
106 .map_err(|e| format!("DB error: {}", e))?
107 .ok_or_else(|| "User not found".to_string())?;
108
109 let rows = sqlx::query!(
110 r#"
111 SELECT record_cid, collection, rkey, created_at, repo_rev
112 FROM records
113 WHERE repo_id = $1 AND repo_rev > $2
114 ORDER BY repo_rev ASC
115 LIMIT 10
116 "#,
117 user_id,
118 rev
119 )
120 .fetch_all(&state.db)
121 .await
122 .map_err(|e| format!("DB error fetching records: {}", e))?;
123
124 if rows.is_empty() {
125 return Ok(result);
126 }
127
128 let sanity_check = sqlx::query_scalar!(
129 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1",
130 user_id,
131 rev
132 )
133 .fetch_optional(&state.db)
134 .await
135 .map_err(|e| format!("DB error sanity check: {}", e))?;
136
137 if sanity_check.is_none() {
138 warn!("Sanity check failed: no records found before rev {}", rev);
139 return Ok(result);
140 }
141
142 struct RowData {
143 cid_str: String,
144 collection: String,
145 rkey: String,
146 created_at: DateTime<Utc>,
147 }
148
149 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len());
150 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
151
152 for row in &rows {
153 if let Ok(cid) = row.record_cid.parse::<Cid>() {
154 cids.push(cid);
155 row_data.push(RowData {
156 cid_str: row.record_cid.clone(),
157 collection: row.collection.clone(),
158 rkey: row.rkey.clone(),
159 created_at: row.created_at,
160 });
161 }
162 }
163
164 let blocks: Vec<Option<Bytes>> = state
165 .block_store
166 .get_many(&cids)
167 .await
168 .map_err(|e| format!("Error fetching blocks: {}", e))?;
169
170 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) {
171 let block_bytes = match block_opt {
172 Some(b) => b,
173 None => continue,
174 };
175
176 result.count += 1;
177 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey);
178
179 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" {
180 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) {
181 result.profile = Some(RecordDescript {
182 uri,
183 cid: data.cid_str,
184 indexed_at: data.created_at,
185 record,
186 });
187 }
188 } else if data.collection == "app.bsky.feed.post" {
189 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) {
190 result.posts.push(RecordDescript {
191 uri,
192 cid: data.cid_str,
193 indexed_at: data.created_at,
194 record,
195 });
196 }
197 } else if data.collection == "app.bsky.feed.like" {
198 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) {
199 result.likes.push(RecordDescript {
200 uri,
201 cid: data.cid_str,
202 indexed_at: data.created_at,
203 record,
204 });
205 }
206 }
207 }
208
209 Ok(result)
210}
211
212pub fn get_local_lag(local: &LocalRecords) -> Option<i64> {
213 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at);
214
215 for post in &local.posts {
216 match oldest {
217 None => oldest = Some(post.indexed_at),
218 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at),
219 _ => {}
220 }
221 }
222
223 for like in &local.likes {
224 match oldest {
225 None => oldest = Some(like.indexed_at),
226 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at),
227 _ => {}
228 }
229 }
230
231 oldest.map(|o| (Utc::now() - o).num_milliseconds())
232}
233
234pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> {
235 headers
236 .get(REPO_REV_HEADER)
237 .and_then(|h| h.to_str().ok())
238 .map(|s| s.to_string())
239}
240
241#[derive(Debug)]
242pub struct ProxyResponse {
243 pub status: StatusCode,
244 pub headers: HeaderMap,
245 pub body: bytes::Bytes,
246}
247
248pub async fn proxy_to_appview(
249 method: &str,
250 params: &HashMap<String, String>,
251 auth_header: Option<&str>,
252) -> Result<ProxyResponse, Response> {
253 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
254 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
255 })?;
256
257 if let Err(e) = is_ssrf_safe(&appview_url) {
258 error!("SSRF check failed for appview URL: {}", e);
259 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e))
260 .into_response());
261 }
262
263 let target_url = format!("{}/xrpc/{}", appview_url, method);
264 info!(target = %target_url, "Proxying request to appview");
265
266 let client = proxy_client();
267 let mut request_builder = client.get(&target_url).query(params);
268
269 if let Some(auth) = auth_header {
270 request_builder = request_builder.header("Authorization", auth);
271 }
272
273 match request_builder.send().await {
274 Ok(resp) => {
275 let status =
276 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
277
278 let headers: HeaderMap = resp
279 .headers()
280 .iter()
281 .filter(|(k, _)| {
282 RESPONSE_HEADERS_TO_FORWARD
283 .iter()
284 .any(|h| k.as_str().eq_ignore_ascii_case(h))
285 })
286 .filter_map(|(k, v)| {
287 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?;
288 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?;
289 Some((name, value))
290 })
291 .collect();
292
293 let content_length = resp
294 .content_length()
295 .unwrap_or(0);
296 if content_length > MAX_RESPONSE_SIZE {
297 error!(
298 content_length,
299 max = MAX_RESPONSE_SIZE,
300 "Upstream response too large"
301 );
302 return Err(ApiError::UpstreamFailure.into_response());
303 }
304
305 let body = resp.bytes().await.map_err(|e| {
306 error!(error = ?e, "Error reading proxy response body");
307 ApiError::UpstreamFailure.into_response()
308 })?;
309
310 if body.len() as u64 > MAX_RESPONSE_SIZE {
311 error!(
312 len = body.len(),
313 max = MAX_RESPONSE_SIZE,
314 "Upstream response body exceeded size limit"
315 );
316 return Err(ApiError::UpstreamFailure.into_response());
317 }
318
319 Ok(ProxyResponse {
320 status,
321 headers,
322 body,
323 })
324 }
325 Err(e) => {
326 error!(error = ?e, "Error sending proxy request");
327 if e.is_timeout() {
328 Err(ApiError::UpstreamTimeout.into_response())
329 } else if e.is_connect() {
330 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string())
331 .into_response())
332 } else {
333 Err(ApiError::UpstreamFailure.into_response())
334 }
335 }
336 }
337}
338
339pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response {
340 let mut response = (StatusCode::OK, Json(data)).into_response();
341
342 if let Some(lag_ms) = lag {
343 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) {
344 response
345 .headers_mut()
346 .insert(UPSTREAM_LAG_HEADER, header_val);
347 }
348 }
349
350 response
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize)]
354#[serde(rename_all = "camelCase")]
355pub struct AuthorView {
356 pub did: String,
357 pub handle: String,
358 #[serde(skip_serializing_if = "Option::is_none")]
359 pub display_name: Option<String>,
360 #[serde(skip_serializing_if = "Option::is_none")]
361 pub avatar: Option<String>,
362 #[serde(flatten)]
363 pub extra: HashMap<String, Value>,
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
367#[serde(rename_all = "camelCase")]
368pub struct PostView {
369 pub uri: String,
370 pub cid: String,
371 pub author: AuthorView,
372 pub record: Value,
373 pub indexed_at: String,
374 #[serde(skip_serializing_if = "Option::is_none")]
375 pub embed: Option<Value>,
376 #[serde(default)]
377 pub reply_count: i64,
378 #[serde(default)]
379 pub repost_count: i64,
380 #[serde(default)]
381 pub like_count: i64,
382 #[serde(default)]
383 pub quote_count: i64,
384 #[serde(flatten)]
385 pub extra: HashMap<String, Value>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub struct FeedViewPost {
391 pub post: PostView,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 pub reply: Option<Value>,
394 #[serde(skip_serializing_if = "Option::is_none")]
395 pub reason: Option<Value>,
396 #[serde(skip_serializing_if = "Option::is_none")]
397 pub feed_context: Option<String>,
398 #[serde(flatten)]
399 pub extra: HashMap<String, Value>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize)]
403pub struct FeedOutput {
404 pub feed: Vec<FeedViewPost>,
405 #[serde(skip_serializing_if = "Option::is_none")]
406 pub cursor: Option<String>,
407}
408
409pub fn format_local_post(
410 descript: &RecordDescript<PostRecord>,
411 author_did: &str,
412 author_handle: &str,
413 profile: Option<&RecordDescript<ProfileRecord>>,
414) -> PostView {
415 let display_name = profile.and_then(|p| p.record.display_name.clone());
416
417 PostView {
418 uri: descript.uri.clone(),
419 cid: descript.cid.clone(),
420 author: AuthorView {
421 did: author_did.to_string(),
422 handle: author_handle.to_string(),
423 display_name,
424 avatar: None,
425 extra: HashMap::new(),
426 },
427 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null),
428 indexed_at: descript.indexed_at.to_rfc3339(),
429 embed: descript.record.embed.clone(),
430 reply_count: 0,
431 repost_count: 0,
432 like_count: 0,
433 quote_count: 0,
434 extra: HashMap::new(),
435 }
436}
437
438pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) {
439 if posts.is_empty() {
440 return;
441 }
442
443 let new_items: Vec<FeedViewPost> = posts
444 .into_iter()
445 .map(|post| FeedViewPost {
446 post,
447 reply: None,
448 reason: None,
449 feed_context: None,
450 extra: HashMap::new(),
451 })
452 .collect();
453
454 feed.extend(new_items);
455 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at));
456}