this repo has no description
1use crate::api::proxy_client::{
2 is_ssrf_safe, proxy_client, MAX_RESPONSE_SIZE, RESPONSE_HEADERS_TO_FORWARD,
3};
4use crate::api::ApiError;
5use crate::state::AppState;
6use axum::{
7 http::{HeaderMap, HeaderValue, StatusCode},
8 response::{IntoResponse, Response},
9 Json,
10};
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use jacquard_repo::storage::BlockStore;
15use serde::{Deserialize, Serialize};
16use serde_json::Value;
17use std::collections::HashMap;
18use tracing::{error, info, warn};
19use uuid::Uuid;
20
21pub const REPO_REV_HEADER: &str = "atproto-repo-rev";
22pub const UPSTREAM_LAG_HEADER: &str = "atproto-upstream-lag";
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(rename_all = "camelCase")]
26pub struct PostRecord {
27 #[serde(rename = "$type")]
28 pub record_type: Option<String>,
29 pub text: String,
30 pub created_at: String,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub reply: Option<Value>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub embed: Option<Value>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub langs: Option<Vec<String>>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub labels: Option<Value>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub tags: Option<Vec<String>>,
41 #[serde(flatten)]
42 pub extra: HashMap<String, Value>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct ProfileRecord {
48 #[serde(rename = "$type")]
49 pub record_type: Option<String>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub display_name: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub description: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub avatar: Option<Value>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub banner: Option<Value>,
58 #[serde(flatten)]
59 pub extra: HashMap<String, Value>,
60}
61
62#[derive(Debug, Clone)]
63pub struct RecordDescript<T> {
64 pub uri: String,
65 pub cid: String,
66 pub indexed_at: DateTime<Utc>,
67 pub record: T,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "camelCase")]
72pub struct LikeRecord {
73 #[serde(rename = "$type")]
74 pub record_type: Option<String>,
75 pub subject: LikeSubject,
76 pub created_at: String,
77 #[serde(flatten)]
78 pub extra: HashMap<String, Value>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct LikeSubject {
84 pub uri: String,
85 pub cid: String,
86}
87
88#[derive(Debug, Default)]
89pub struct LocalRecords {
90 pub count: usize,
91 pub profile: Option<RecordDescript<ProfileRecord>>,
92 pub posts: Vec<RecordDescript<PostRecord>>,
93 pub likes: Vec<RecordDescript<LikeRecord>>,
94}
95
96pub async fn get_records_since_rev(
97 state: &AppState,
98 did: &str,
99 rev: &str,
100) -> Result<LocalRecords, String> {
101 let mut result = LocalRecords::default();
102 let user_id: Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
103 .fetch_optional(&state.db)
104 .await
105 .map_err(|e| format!("DB error: {}", e))?
106 .ok_or_else(|| "User not found".to_string())?;
107 let rows = sqlx::query!(
108 r#"
109 SELECT record_cid, collection, rkey, created_at, repo_rev
110 FROM records
111 WHERE repo_id = $1 AND repo_rev > $2
112 ORDER BY repo_rev ASC
113 LIMIT 10
114 "#,
115 user_id,
116 rev
117 )
118 .fetch_all(&state.db)
119 .await
120 .map_err(|e| format!("DB error fetching records: {}", e))?;
121 if rows.is_empty() {
122 return Ok(result);
123 }
124 let sanity_check = sqlx::query_scalar!(
125 "SELECT 1 as val FROM records WHERE repo_id = $1 AND repo_rev <= $2 LIMIT 1",
126 user_id,
127 rev
128 )
129 .fetch_optional(&state.db)
130 .await
131 .map_err(|e| format!("DB error sanity check: {}", e))?;
132 if sanity_check.is_none() {
133 warn!("Sanity check failed: no records found before rev {}", rev);
134 return Ok(result);
135 }
136 struct RowData {
137 cid_str: String,
138 collection: String,
139 rkey: String,
140 created_at: DateTime<Utc>,
141 }
142 let mut row_data: Vec<RowData> = Vec::with_capacity(rows.len());
143 let mut cids: Vec<Cid> = Vec::with_capacity(rows.len());
144 for row in &rows {
145 if let Ok(cid) = row.record_cid.parse::<Cid>() {
146 cids.push(cid);
147 row_data.push(RowData {
148 cid_str: row.record_cid.clone(),
149 collection: row.collection.clone(),
150 rkey: row.rkey.clone(),
151 created_at: row.created_at,
152 });
153 }
154 }
155 let blocks: Vec<Option<Bytes>> = state
156 .block_store
157 .get_many(&cids)
158 .await
159 .map_err(|e| format!("Error fetching blocks: {}", e))?;
160 for (data, block_opt) in row_data.into_iter().zip(blocks.into_iter()) {
161 let block_bytes = match block_opt {
162 Some(b) => b,
163 None => continue,
164 };
165 result.count += 1;
166 let uri = format!("at://{}/{}/{}", did, data.collection, data.rkey);
167 if data.collection == "app.bsky.actor.profile" && data.rkey == "self" {
168 if let Ok(record) = serde_ipld_dagcbor::from_slice::<ProfileRecord>(&block_bytes) {
169 result.profile = Some(RecordDescript {
170 uri,
171 cid: data.cid_str,
172 indexed_at: data.created_at,
173 record,
174 });
175 }
176 } else if data.collection == "app.bsky.feed.post" {
177 if let Ok(record) = serde_ipld_dagcbor::from_slice::<PostRecord>(&block_bytes) {
178 result.posts.push(RecordDescript {
179 uri,
180 cid: data.cid_str,
181 indexed_at: data.created_at,
182 record,
183 });
184 }
185 } else if data.collection == "app.bsky.feed.like" {
186 if let Ok(record) = serde_ipld_dagcbor::from_slice::<LikeRecord>(&block_bytes) {
187 result.likes.push(RecordDescript {
188 uri,
189 cid: data.cid_str,
190 indexed_at: data.created_at,
191 record,
192 });
193 }
194 }
195 }
196 Ok(result)
197}
198
199pub fn get_local_lag(local: &LocalRecords) -> Option<i64> {
200 let mut oldest: Option<DateTime<Utc>> = local.profile.as_ref().map(|p| p.indexed_at);
201 for post in &local.posts {
202 match oldest {
203 None => oldest = Some(post.indexed_at),
204 Some(o) if post.indexed_at < o => oldest = Some(post.indexed_at),
205 _ => {}
206 }
207 }
208 for like in &local.likes {
209 match oldest {
210 None => oldest = Some(like.indexed_at),
211 Some(o) if like.indexed_at < o => oldest = Some(like.indexed_at),
212 _ => {}
213 }
214 }
215 oldest.map(|o| (Utc::now() - o).num_milliseconds())
216}
217
218pub fn extract_repo_rev(headers: &HeaderMap) -> Option<String> {
219 headers
220 .get(REPO_REV_HEADER)
221 .and_then(|h| h.to_str().ok())
222 .map(|s| s.to_string())
223}
224
225#[derive(Debug)]
226pub struct ProxyResponse {
227 pub status: StatusCode,
228 pub headers: HeaderMap,
229 pub body: bytes::Bytes,
230}
231
232impl ProxyResponse {
233 pub fn into_response(self) -> Response {
234 let mut response = Response::builder().status(self.status);
235 for (key, value) in self.headers.iter() {
236 response = response.header(key, value);
237 }
238 response.body(axum::body::Body::from(self.body)).unwrap()
239 }
240}
241
242pub async fn proxy_to_appview(
243 method: &str,
244 params: &HashMap<String, String>,
245 auth_did: &str,
246 auth_key_bytes: Option<&[u8]>,
247) -> Result<ProxyResponse, Response> {
248 let appview_url = std::env::var("APPVIEW_URL").map_err(|_| {
249 ApiError::UpstreamUnavailable("No upstream AppView configured".to_string()).into_response()
250 })?;
251 if let Err(e) = is_ssrf_safe(&appview_url) {
252 error!("SSRF check failed for appview URL: {}", e);
253 return Err(ApiError::UpstreamUnavailable(format!("Invalid upstream URL: {}", e))
254 .into_response());
255 }
256 let target_url = format!("{}/xrpc/{}", appview_url, method);
257 info!(target = %target_url, "Proxying request to appview");
258 let client = proxy_client();
259 let mut request_builder = client.get(&target_url).query(params);
260 if let Some(key_bytes) = auth_key_bytes {
261 let appview_did = std::env::var("APPVIEW_DID").unwrap_or_else(|_| "did:web:api.bsky.app".to_string());
262 match crate::auth::create_service_token(auth_did, &appview_did, method, key_bytes) {
263 Ok(service_token) => {
264 request_builder = request_builder.header("Authorization", format!("Bearer {}", service_token));
265 }
266 Err(e) => {
267 error!(error = ?e, "Failed to create service token");
268 return Err(ApiError::InternalError.into_response());
269 }
270 }
271 }
272 match request_builder.send().await {
273 Ok(resp) => {
274 let status =
275 StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
276 let headers: HeaderMap = resp
277 .headers()
278 .iter()
279 .filter(|(k, _)| {
280 RESPONSE_HEADERS_TO_FORWARD
281 .iter()
282 .any(|h| k.as_str().eq_ignore_ascii_case(h))
283 })
284 .filter_map(|(k, v)| {
285 let name = axum::http::HeaderName::try_from(k.as_str()).ok()?;
286 let value = HeaderValue::from_bytes(v.as_bytes()).ok()?;
287 Some((name, value))
288 })
289 .collect();
290 let content_length = resp
291 .content_length()
292 .unwrap_or(0);
293 if content_length > MAX_RESPONSE_SIZE {
294 error!(
295 content_length,
296 max = MAX_RESPONSE_SIZE,
297 "Upstream response too large"
298 );
299 return Err(ApiError::UpstreamFailure.into_response());
300 }
301 let body = resp.bytes().await.map_err(|e| {
302 error!(error = ?e, "Error reading proxy response body");
303 ApiError::UpstreamFailure.into_response()
304 })?;
305 if body.len() as u64 > MAX_RESPONSE_SIZE {
306 error!(
307 len = body.len(),
308 max = MAX_RESPONSE_SIZE,
309 "Upstream response body exceeded size limit"
310 );
311 return Err(ApiError::UpstreamFailure.into_response());
312 }
313 Ok(ProxyResponse {
314 status,
315 headers,
316 body,
317 })
318 }
319 Err(e) => {
320 error!(error = ?e, "Error sending proxy request");
321 if e.is_timeout() {
322 Err(ApiError::UpstreamTimeout.into_response())
323 } else if e.is_connect() {
324 Err(ApiError::UpstreamUnavailable("Failed to connect to upstream".to_string())
325 .into_response())
326 } else {
327 Err(ApiError::UpstreamFailure.into_response())
328 }
329 }
330 }
331}
332
333pub fn format_munged_response<T: Serialize>(data: T, lag: Option<i64>) -> Response {
334 let mut response = (StatusCode::OK, Json(data)).into_response();
335 if let Some(lag_ms) = lag {
336 if let Ok(header_val) = HeaderValue::from_str(&lag_ms.to_string()) {
337 response
338 .headers_mut()
339 .insert(UPSTREAM_LAG_HEADER, header_val);
340 }
341 }
342 response
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
346#[serde(rename_all = "camelCase")]
347pub struct AuthorView {
348 pub did: String,
349 pub handle: String,
350 #[serde(skip_serializing_if = "Option::is_none")]
351 pub display_name: Option<String>,
352 #[serde(skip_serializing_if = "Option::is_none")]
353 pub avatar: Option<String>,
354 #[serde(flatten)]
355 pub extra: HashMap<String, Value>,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
359#[serde(rename_all = "camelCase")]
360pub struct PostView {
361 pub uri: String,
362 pub cid: String,
363 pub author: AuthorView,
364 pub record: Value,
365 pub indexed_at: String,
366 #[serde(skip_serializing_if = "Option::is_none")]
367 pub embed: Option<Value>,
368 #[serde(default)]
369 pub reply_count: i64,
370 #[serde(default)]
371 pub repost_count: i64,
372 #[serde(default)]
373 pub like_count: i64,
374 #[serde(default)]
375 pub quote_count: i64,
376 #[serde(flatten)]
377 pub extra: HashMap<String, Value>,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381#[serde(rename_all = "camelCase")]
382pub struct FeedViewPost {
383 pub post: PostView,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 pub reply: Option<Value>,
386 #[serde(skip_serializing_if = "Option::is_none")]
387 pub reason: Option<Value>,
388 #[serde(skip_serializing_if = "Option::is_none")]
389 pub feed_context: Option<String>,
390 #[serde(flatten)]
391 pub extra: HashMap<String, Value>,
392}
393
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct FeedOutput {
396 pub feed: Vec<FeedViewPost>,
397 #[serde(skip_serializing_if = "Option::is_none")]
398 pub cursor: Option<String>,
399}
400
401pub fn format_local_post(
402 descript: &RecordDescript<PostRecord>,
403 author_did: &str,
404 author_handle: &str,
405 profile: Option<&RecordDescript<ProfileRecord>>,
406) -> PostView {
407 let display_name = profile.and_then(|p| p.record.display_name.clone());
408 PostView {
409 uri: descript.uri.clone(),
410 cid: descript.cid.clone(),
411 author: AuthorView {
412 did: author_did.to_string(),
413 handle: author_handle.to_string(),
414 display_name,
415 avatar: None,
416 extra: HashMap::new(),
417 },
418 record: serde_json::to_value(&descript.record).unwrap_or(Value::Null),
419 indexed_at: descript.indexed_at.to_rfc3339(),
420 embed: descript.record.embed.clone(),
421 reply_count: 0,
422 repost_count: 0,
423 like_count: 0,
424 quote_count: 0,
425 extra: HashMap::new(),
426 }
427}
428
429pub fn insert_posts_into_feed(feed: &mut Vec<FeedViewPost>, posts: Vec<PostView>) {
430 if posts.is_empty() {
431 return;
432 }
433 let new_items: Vec<FeedViewPost> = posts
434 .into_iter()
435 .map(|post| FeedViewPost {
436 post,
437 reply: None,
438 reason: None,
439 feed_context: None,
440 extra: HashMap::new(),
441 })
442 .collect();
443 feed.extend(new_items);
444 feed.sort_by(|a, b| b.post.indexed_at.cmp(&a.post.indexed_at));
445}