this repo has no description
1use crate::api::read_after_write::{
2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag,
3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost,
4 ProfileRecord, RecordDescript,
5};
6use crate::state::AppState;
7use axum::{
8 extract::{Query, State},
9 http::StatusCode,
10 response::{IntoResponse, Response},
11 Json,
12};
13use serde::Deserialize;
14use std::collections::HashMap;
15use tracing::warn;
16#[derive(Deserialize)]
17pub struct GetAuthorFeedParams {
18 pub actor: String,
19 pub limit: Option<u32>,
20 pub cursor: Option<String>,
21 pub filter: Option<String>,
22 #[serde(rename = "includePins")]
23 pub include_pins: Option<bool>,
24}
25fn update_author_profile_in_feed(
26 feed: &mut [FeedViewPost],
27 author_did: &str,
28 local_profile: &RecordDescript<ProfileRecord>,
29) {
30 for item in feed.iter_mut() {
31 if item.post.author.did == author_did {
32 if let Some(ref display_name) = local_profile.record.display_name {
33 item.post.author.display_name = Some(display_name.clone());
34 }
35 }
36 }
37}
38pub async fn get_author_feed(
39 State(state): State<AppState>,
40 headers: axum::http::HeaderMap,
41 Query(params): Query<GetAuthorFeedParams>,
42) -> Response {
43 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
44 let auth_did = if let Some(h) = auth_header {
45 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) {
46 match crate::auth::validate_bearer_token(&state.db, &token).await {
47 Ok(user) => Some(user.did),
48 Err(_) => None,
49 }
50 } else {
51 None
52 }
53 } else {
54 None
55 };
56 let mut query_params = HashMap::new();
57 query_params.insert("actor".to_string(), params.actor.clone());
58 if let Some(limit) = params.limit {
59 query_params.insert("limit".to_string(), limit.to_string());
60 }
61 if let Some(cursor) = ¶ms.cursor {
62 query_params.insert("cursor".to_string(), cursor.clone());
63 }
64 if let Some(filter) = ¶ms.filter {
65 query_params.insert("filter".to_string(), filter.clone());
66 }
67 if let Some(include_pins) = params.include_pins {
68 query_params.insert("includePins".to_string(), include_pins.to_string());
69 }
70 let proxy_result =
71 match proxy_to_appview("app.bsky.feed.getAuthorFeed", &query_params, auth_header).await {
72 Ok(r) => r,
73 Err(e) => return e,
74 };
75 if !proxy_result.status.is_success() {
76 return (proxy_result.status, proxy_result.body).into_response();
77 }
78 let rev = match extract_repo_rev(&proxy_result.headers) {
79 Some(r) => r,
80 None => return (proxy_result.status, proxy_result.body).into_response(),
81 };
82 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) {
83 Ok(f) => f,
84 Err(e) => {
85 warn!("Failed to parse author feed response: {:?}", e);
86 return (proxy_result.status, proxy_result.body).into_response();
87 }
88 };
89 let requester_did = match auth_did {
90 Some(d) => d,
91 None => return (StatusCode::OK, Json(feed_output)).into_response(),
92 };
93 let actor_did = if params.actor.starts_with("did:") {
94 params.actor.clone()
95 } else {
96 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
97 let suffix = format!(".{}", hostname);
98 let short_handle = if params.actor.ends_with(&suffix) {
99 params.actor.strip_suffix(&suffix).unwrap_or(¶ms.actor)
100 } else {
101 ¶ms.actor
102 };
103 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle)
104 .fetch_optional(&state.db)
105 .await
106 {
107 Ok(Some(did)) => did,
108 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(),
109 Err(e) => {
110 warn!("Database error resolving actor handle: {:?}", e);
111 return (proxy_result.status, proxy_result.body).into_response();
112 }
113 }
114 };
115 if actor_did != requester_did {
116 return (StatusCode::OK, Json(feed_output)).into_response();
117 }
118 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await {
119 Ok(r) => r,
120 Err(e) => {
121 warn!("Failed to get local records: {}", e);
122 return (proxy_result.status, proxy_result.body).into_response();
123 }
124 };
125 if local_records.count == 0 {
126 return (StatusCode::OK, Json(feed_output)).into_response();
127 }
128 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did)
129 .fetch_optional(&state.db)
130 .await
131 {
132 Ok(Some(h)) => h,
133 Ok(None) => requester_did.clone(),
134 Err(e) => {
135 warn!("Database error fetching handle: {:?}", e);
136 requester_did.clone()
137 }
138 };
139 if let Some(ref local_profile) = local_records.profile {
140 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile);
141 }
142 let local_posts: Vec<_> = local_records
143 .posts
144 .iter()
145 .map(|p| {
146 format_local_post(
147 p,
148 &requester_did,
149 &handle,
150 local_records.profile.as_ref(),
151 )
152 })
153 .collect();
154 insert_posts_into_feed(&mut feed_output.feed, local_posts);
155 let lag = get_local_lag(&local_records);
156 format_munged_response(feed_output, lag)
157}