this repo has no description
1use crate::api::read_after_write::{
2 FeedOutput, FeedViewPost, ProfileRecord, RecordDescript, extract_repo_rev, format_local_post,
3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed,
4 proxy_to_appview_via_registry,
5};
6use crate::state::AppState;
7use axum::{
8 Json,
9 extract::{Query, State},
10 http::StatusCode,
11 response::{IntoResponse, Response},
12};
13use serde::Deserialize;
14use std::collections::HashMap;
15use tracing::warn;
16
17#[derive(Deserialize)]
18pub struct GetAuthorFeedParams {
19 pub actor: String,
20 pub limit: Option<u32>,
21 pub cursor: Option<String>,
22 pub filter: Option<String>,
23 #[serde(rename = "includePins")]
24 pub include_pins: Option<bool>,
25}
26
27fn update_author_profile_in_feed(
28 feed: &mut [FeedViewPost],
29 author_did: &str,
30 local_profile: &RecordDescript<ProfileRecord>,
31) {
32 for item in feed.iter_mut() {
33 if item.post.author.did == author_did
34 && let Some(ref display_name) = local_profile.record.display_name {
35 item.post.author.display_name = Some(display_name.clone());
36 }
37 }
38}
39
40pub async fn get_author_feed(
41 State(state): State<AppState>,
42 headers: axum::http::HeaderMap,
43 Query(params): Query<GetAuthorFeedParams>,
44) -> Response {
45 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
46 let auth_user = if let Some(h) = auth_header {
47 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
48 crate::auth::validate_bearer_token(&state.db, &token)
49 .await
50 .ok()
51 } else {
52 None
53 }
54 } else {
55 None
56 };
57 let auth_did = auth_user.as_ref().map(|u| u.did.clone());
58 let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone());
59 let mut query_params = HashMap::new();
60 query_params.insert("actor".to_string(), params.actor.clone());
61 if let Some(limit) = params.limit {
62 query_params.insert("limit".to_string(), limit.to_string());
63 }
64 if let Some(cursor) = ¶ms.cursor {
65 query_params.insert("cursor".to_string(), cursor.clone());
66 }
67 if let Some(filter) = ¶ms.filter {
68 query_params.insert("filter".to_string(), filter.clone());
69 }
70 if let Some(include_pins) = params.include_pins {
71 query_params.insert("includePins".to_string(), include_pins.to_string());
72 }
73 let proxy_result = match proxy_to_appview_via_registry(
74 &state,
75 "app.bsky.feed.getAuthorFeed",
76 &query_params,
77 auth_did.as_deref().unwrap_or(""),
78 auth_key_bytes.as_deref(),
79 )
80 .await
81 {
82 Ok(r) => r,
83 Err(e) => return e,
84 };
85 if !proxy_result.status.is_success() {
86 return proxy_result.into_response();
87 }
88 let rev = match extract_repo_rev(&proxy_result.headers) {
89 Some(r) => r,
90 None => return proxy_result.into_response(),
91 };
92 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
93 Ok(f) => f,
94 Err(e) => {
95 warn!("Failed to parse author feed response: {:?}", e);
96 return proxy_result.into_response();
97 }
98 };
99 let requester_did = match &auth_did {
100 Some(d) => d.clone(),
101 None => return (StatusCode::OK, Json(feed_output)).into_response(),
102 };
103 let actor_did = if params.actor.starts_with("did:") {
104 params.actor.clone()
105 } else {
106 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
107 let suffix = format!(".{}", hostname);
108 let short_handle = if params.actor.ends_with(&suffix) {
109 params.actor.strip_suffix(&suffix).unwrap_or(¶ms.actor)
110 } else {
111 ¶ms.actor
112 };
113 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle)
114 .fetch_optional(&state.db)
115 .await
116 {
117 Ok(Some(did)) => did,
118 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(),
119 Err(e) => {
120 warn!("Database error resolving actor handle: {:?}", e);
121 return proxy_result.into_response();
122 }
123 }
124 };
125 if actor_did != requester_did {
126 return (StatusCode::OK, Json(feed_output)).into_response();
127 }
128 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await {
129 Ok(r) => r,
130 Err(e) => {
131 warn!("Failed to get local records: {}", e);
132 return proxy_result.into_response();
133 }
134 };
135 if local_records.count == 0 {
136 return (StatusCode::OK, Json(feed_output)).into_response();
137 }
138 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did)
139 .fetch_optional(&state.db)
140 .await
141 {
142 Ok(Some(h)) => h,
143 Ok(None) => requester_did.clone(),
144 Err(e) => {
145 warn!("Database error fetching handle: {:?}", e);
146 requester_did.clone()
147 }
148 };
149 if let Some(ref local_profile) = local_records.profile {
150 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile);
151 }
152 let local_posts: Vec<_> = local_records
153 .posts
154 .iter()
155 .map(|p| format_local_post(p, &requester_did, &handle, local_records.profile.as_ref()))
156 .collect();
157 insert_posts_into_feed(&mut feed_output.feed, local_posts);
158 let lag = get_local_lag(&local_records);
159 format_munged_response(feed_output, lag)
160}