this repo has no description
1use crate::api::proxy_client::{
2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD,
3};
4use crate::api::ApiError;
5use crate::state::AppState;
6use axum::{
7 http::{HeaderMap, HeaderValue, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use jacquard_repo::storage::BlockStore;
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19use uuid::Uuid;
20pub const REPO_REV_HEADER: &str = "atproto-repo-rev";
21pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag";
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "camelCase")]
24pub struct PostRecord {
25 #[serde(rename = "$type")]
26 pub record_type: Option<String>,
27 pub text: String,
28 pub created_at: String,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub reply: Option<Value>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub embed: Option<Value>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub langs: Option<Vec<String>>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub labels: Option<Value>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub tags: Option<Vec<String>>,
39 #[serde(flatten)]
40 pub extra: HashMap<String, Value>,
41}
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "camelCase")]
44pub struct ProfileRecord {
45 #[serde(rename = "$type")]
46 pub record_type: Option<String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub display_name: Option<String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub description: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub avatar: Option<Value>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub banner: Option<Value>,
55 #[serde(flatten)]
56 pub extra: HashMap<String, Value>,
57}
58#[derive(Debug, Clone)]
59pub struct RecordDescript<T> {
60 pub uri: String,
61 pub cid: String,
62 pub indexed_at: DateTime<Utc>,
63 pub record: T,
64}
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(rename_all = "camelCase")]
67pub struct LikeRecord {
68 #[serde(rename = "$type")]
69 pub record_type: Option<String>,
70 pub subject: LikeSubject,
71 pub created_at: String,
72 #[serde(flatten)]
73 pub extra: HashMap<String, Value>,
74}
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct LikeSubject {
78 pub uri: String,
79 pub cid: String,
80}
81#[derive(Debug, Default)]
82pub struct LocalRecords {
83 pub count: usize,
84 pub profile: Option<RecordDescript<ProfileRecord>>,
85 pub posts: Vec<RecordDescript<PostRecord>>,
86 pub likes: Vec<RecordDescript<LikeRecord>>,
87}
88pub async fn get_records_since_rev(
89 state: &AppState,
90 did: &str,
91 rev: &str,
92) -> Result<LocalRecords, String> {
93 let mut result = LocalRecords::default();
94 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
95 .fetch_optional(&state.db)
96 .await
97 .map_err(|e| format!("DB error: {}", e))?
98 .ok_or_else(|| "User not found".to_string())?;
99 let rows = sqlx::query!(
100 r#"
101 SELECT record_cid, collection, rkey, created_at, repo_rev
102 FROM records
103 WHERE repo_id = $1 AND repo_rev > $2
104 ORDER BY repo_rev ASC
105 LIMIT 10
106 "#,
107 user_id,
108 rev
109 )
110 .fetch_all(&state.db)
111 .await
112 .map_err(|e| format!("DB error fetching records: {}", e))?;
113 if rows.is_empty() {
114 return Ok(result);
115 }
116 let sanity_check = sqlx::query_scalar!(
117 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1",
118 user_id,
119 rev
120 )
121 .fetch_optional(&state.db)
122 .await
123 .map_err(|e| format!("DB error sanity check: {}", e))?;
124 if sanity_check.is_none() {
125 warn!("Sanity check failed: no records found before rev {}", rev);
126 return Ok(result);
127 }
128 struct RowData {
129 cid_str: String,
130 collection: String,
131 rkey: String,
132 created_at: DateTime<Utc>,
133 }
134 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len());
135 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
136 for row in &rows {
137 if let Ok(cid) = row.record_cid.parse::<Cid>() {
138 cids.push(cid);
139 row_data.push(RowData {
140 cid_str: row.record_cid.clone(),
141 collection: row.collection.clone(),
142 rkey: row.rkey.clone(),
143 created_at: row.created_at,
144 });
145 }
146 }
147 let blocks: Vec<Option<Bytes>> = state
148 .block_store
149 .get_many(&cids)
150 .await
151 .map_err(|e| format!("Error fetching blocks: {}", e))?;
152 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) {
153 let block_bytes = match block_opt {
154 Some(b) => b,
155 None => continue,
156 };
157 result.count += 1;
158 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey);
159 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" {
160 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) {
161 result.profile = Some(RecordDescript {
162 uri,
163 cid: data.cid_str,
164 indexed_at: data.created_at,
165 record,
166 });
167 }
168 } else if data.collection == "app.bsky.feed.post" {
169 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) {
170 result.posts.push(RecordDescript {
171 uri,
172 cid: data.cid_str,
173 indexed_at: data.created_at,
174 record,
175 });
176 }
177 } else if data.collection == "app.bsky.feed.like" {
178 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) {
179 result.likes.push(RecordDescript {
180 uri,
181 cid: data.cid_str,
182 indexed_at: data.created_at,
183 record,
184 });
185 }
186 }
187 }
188 Ok(result)
189}
190pub fn get_local_lag(local: &LocalRecords) -> Option<i64> {
191 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at);
192 for post in &local.posts {
193 match oldest {
194 None => oldest = Some(post.indexed_at),
195 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at),
196 _ => {}
197 }
198 }
199 for like in &local.likes {
200 match oldest {
201 None => oldest = Some(like.indexed_at),
202 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at),
203 _ => {}
204 }
205 }
206 oldest.map(|o| (Utc::now() - o).num_milliseconds())
207}
208pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> {
209 headers
210 .get(REPO_REV_HEADER)
211 .and_then(|h| h.to_str().ok())
212 .map(|s| s.to_string())
213}
214#[derive(Debug)]
215pub struct ProxyResponse {
216 pub status: StatusCode,
217 pub headers: HeaderMap,
218 pub body: bytes::Bytes,
219}
220pub async fn proxy_to_appview(
221 method: &str,
222 params: &HashMap<String, String>,
223 auth_header: Option<&str>,
224) -> Result<ProxyResponse, Response> {
225 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
226 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
227 })?;
228 if let Err(e) = is_ssrf_safe(&appview_url) {
229 error!("SSRF check failed for appview URL: {}", e);
230 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e))
231 .into_response());
232 }
233 let target_url = format!("{}/xrpc/{}", appview_url, method);
234 info!(target = %target_url, "Proxying request to appview");
235 let client = proxy_client();
236 let mut request_builder = client.get(&target_url).query(params);
237 if let Some(auth) = auth_header {
238 request_builder = request_builder.header("Authorization", auth);
239 }
240 match request_builder.send().await {
241 Ok(resp) => {
242 let status =
243 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
244 let headers: HeaderMap = resp
245 .headers()
246 .iter()
247 .filter(|(k, _)| {
248 RESPONSE_HEADERS_TO_FORWARD
249 .iter()
250 .any(|h| k.as_str().eq_ignore_ascii_case(h))
251 })
252 .filter_map(|(k, v)| {
253 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?;
254 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?;
255 Some((name, value))
256 })
257 .collect();
258 let content_length = resp
259 .content_length()
260 .unwrap_or(0);
261 if content_length > MAX_RESPONSE_SIZE {
262 error!(
263 content_length,
264 max = MAX_RESPONSE_SIZE,
265 "Upstream response too large"
266 );
267 return Err(ApiError::UpstreamFailure.into_response());
268 }
269 let body = resp.bytes().await.map_err(|e| {
270 error!(error = ?e, "Error reading proxy response body");
271 ApiError::UpstreamFailure.into_response()
272 })?;
273 if body.len() as u64 > MAX_RESPONSE_SIZE {
274 error!(
275 len = body.len(),
276 max = MAX_RESPONSE_SIZE,
277 "Upstream response body exceeded size limit"
278 );
279 return Err(ApiError::UpstreamFailure.into_response());
280 }
281 Ok(ProxyResponse {
282 status,
283 headers,
284 body,
285 })
286 }
287 Err(e) => {
288 error!(error = ?e, "Error sending proxy request");
289 if e.is_timeout() {
290 Err(ApiError::UpstreamTimeout.into_response())
291 } else if e.is_connect() {
292 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string())
293 .into_response())
294 } else {
295 Err(ApiError::UpstreamFailure.into_response())
296 }
297 }
298 }
299}
300pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response {
301 let mut response = (StatusCode::OK, Json(data)).into_response();
302 if let Some(lag_ms) = lag {
303 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) {
304 response
305 .headers_mut()
306 .insert(UPSTREAM_LAG_HEADER, header_val);
307 }
308 }
309 response
310}
311#[derive(Debug, Clone, Serialize, Deserialize)]
312#[serde(rename_all = "camelCase")]
313pub struct AuthorView {
314 pub did: String,
315 pub handle: String,
316 #[serde(skip_serializing_if = "Option::is_none")]
317 pub display_name: Option<String>,
318 #[serde(skip_serializing_if = "Option::is_none")]
319 pub avatar: Option<String>,
320 #[serde(flatten)]
321 pub extra: HashMap<String, Value>,
322}
323#[derive(Debug, Clone, Serialize, Deserialize)]
324#[serde(rename_all = "camelCase")]
325pub struct PostView {
326 pub uri: String,
327 pub cid: String,
328 pub author: AuthorView,
329 pub record: Value,
330 pub indexed_at: String,
331 #[serde(skip_serializing_if = "Option::is_none")]
332 pub embed: Option<Value>,
333 #[serde(default)]
334 pub reply_count: i64,
335 #[serde(default)]
336 pub repost_count: i64,
337 #[serde(default)]
338 pub like_count: i64,
339 #[serde(default)]
340 pub quote_count: i64,
341 #[serde(flatten)]
342 pub extra: HashMap<String, Value>,
343}
344#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(rename_all = "camelCase")]
346pub struct FeedViewPost {
347 pub post: PostView,
348 #[serde(skip_serializing_if = "Option::is_none")]
349 pub reply: Option<Value>,
350 #[serde(skip_serializing_if = "Option::is_none")]
351 pub reason: Option<Value>,
352 #[serde(skip_serializing_if = "Option::is_none")]
353 pub feed_context: Option<String>,
354 #[serde(flatten)]
355 pub extra: HashMap<String, Value>,
356}
357#[derive(Debug, Clone, Serialize, Deserialize)]
358pub struct FeedOutput {
359 pub feed: Vec<FeedViewPost>,
360 #[serde(skip_serializing_if = "Option::is_none")]
361 pub cursor: Option<String>,
362}
363pub fn format_local_post(
364 descript: &RecordDescript<PostRecord>,
365 author_did: &str,
366 author_handle: &str,
367 profile: Option<&RecordDescript<ProfileRecord>>,
368) -> PostView {
369 let display_name = profile.and_then(|p| p.record.display_name.clone());
370 PostView {
371 uri: descript.uri.clone(),
372 cid: descript.cid.clone(),
373 author: AuthorView {
374 did: author_did.to_string(),
375 handle: author_handle.to_string(),
376 display_name,
377 avatar: None,
378 extra: HashMap::new(),
379 },
380 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null),
381 indexed_at: descript.indexed_at.to_rfc3339(),
382 embed: descript.record.embed.clone(),
383 reply_count: 0,
384 repost_count: 0,
385 like_count: 0,
386 quote_count: 0,
387 extra: HashMap::new(),
388 }
389}
390pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) {
391 if posts.is_empty() {
392 return;
393 }
394 let new_items: Vec<FeedViewPost> = posts
395 .into_iter()
396 .map(|post| FeedViewPost {
397 post,
398 reply: None,
399 reason: None,
400 feed_context: None,
401 extra: HashMap::new(),
402 })
403 .collect();
404 feed.extend(new_items);
405 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at));
406}