this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 ProfileRecord, RecordDescript, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use serde::Deserialize; 14use std::collections::HashMap; 15use tracing::warn; 16#[derive(Deserialize)] 17pub struct GetAuthorFeedParams { 18 pub actor: String, 19 pub limit: Option<u32>, 20 pub cursor: Option<String>, 21 pub filter: Option<String>, 22 #[serde(rename = "includePins")] 23 pub include_pins: Option<bool>, 24} 25fn update_author_profile_in_feed( 26 feed: &mut [FeedViewPost], 27 author_did: &str, 28 local_profile: &RecordDescript<ProfileRecord>, 29) { 30 for item in feed.iter_mut() { 31 if item.post.author.did == author_did { 32 if let Some(ref display_name) = local_profile.record.display_name { 33 item.post.author.display_name = Some(display_name.clone()); 34 } 35 } 36 } 37} 38pub async fn get_author_feed( 39 State(state): State<AppState>, 40 headers: axum::http::HeaderMap, 41 Query(params): Query<GetAuthorFeedParams>, 42) -> Response { 43 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 44 let auth_did = if let Some(h) = auth_header { 45 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 46 match crate::auth::validate_bearer_token(&state.db, &token).await { 47 Ok(user) => Some(user.did), 48 Err(_) => None, 49 } 50 } else { 51 None 52 } 53 } else { 54 None 55 }; 56 let mut query_params = HashMap::new(); 57 query_params.insert("actor".to_string(), params.actor.clone()); 58 if let Some(limit) = params.limit { 59 query_params.insert("limit".to_string(), limit.to_string()); 60 } 61 if let Some(cursor) = &params.cursor { 62 query_params.insert("cursor".to_string(), cursor.clone()); 63 } 64 if let Some(filter) = &params.filter { 65 query_params.insert("filter".to_string(), filter.clone()); 66 } 67 if let Some(include_pins) = params.include_pins { 68 query_params.insert("includePins".to_string(), include_pins.to_string()); 69 } 70 let proxy_result = 71 match proxy_to_appview("app.bsky.feed.getAuthorFeed", &query_params, auth_header).await { 72 Ok(r) => r, 73 Err(e) => return e, 74 }; 75 if !proxy_result.status.is_success() { 76 return (proxy_result.status, proxy_result.body).into_response(); 77 } 78 let rev = match extract_repo_rev(&proxy_result.headers) { 79 Some(r) => r, 80 None => return (proxy_result.status, proxy_result.body).into_response(), 81 }; 82 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 83 Ok(f) => f, 84 Err(e) => { 85 warn!("Failed to parse author feed response: {:?}", e); 86 return (proxy_result.status, proxy_result.body).into_response(); 87 } 88 }; 89 let requester_did = match auth_did { 90 Some(d) => d, 91 None => return (StatusCode::OK, Json(feed_output)).into_response(), 92 }; 93 let actor_did = if params.actor.starts_with("did:") { 94 params.actor.clone() 95 } else { 96 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 97 let suffix = format!(".{}", hostname); 98 let short_handle = if params.actor.ends_with(&suffix) { 99 params.actor.strip_suffix(&suffix).unwrap_or(&params.actor) 100 } else { 101 &params.actor 102 }; 103 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle) 104 .fetch_optional(&state.db) 105 .await 106 { 107 Ok(Some(did)) => did, 108 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 109 Err(e) => { 110 warn!("Database error resolving actor handle: {:?}", e); 111 return (proxy_result.status, proxy_result.body).into_response(); 112 } 113 } 114 }; 115 if actor_did != requester_did { 116 return (StatusCode::OK, Json(feed_output)).into_response(); 117 } 118 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 119 Ok(r) => r, 120 Err(e) => { 121 warn!("Failed to get local records: {}", e); 122 return (proxy_result.status, proxy_result.body).into_response(); 123 } 124 }; 125 if local_records.count == 0 { 126 return (StatusCode::OK, Json(feed_output)).into_response(); 127 } 128 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 129 .fetch_optional(&state.db) 130 .await 131 { 132 Ok(Some(h)) => h, 133 Ok(None) => requester_did.clone(), 134 Err(e) => { 135 warn!("Database error fetching handle: {:?}", e); 136 requester_did.clone() 137 } 138 }; 139 if let Some(ref local_profile) = local_records.profile { 140 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile); 141 } 142 let local_posts: Vec<_> = local_records 143 .posts 144 .iter() 145 .map(|p| { 146 format_local_post( 147 p, 148 &requester_did, 149 &handle, 150 local_records.profile.as_ref(), 151 ) 152 }) 153 .collect(); 154 insert_posts_into_feed(&mut feed_output.feed, local_posts); 155 let lag = get_local_lag(&local_records); 156 format_munged_response(feed_output, lag) 157}