this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 ProfileRecord, RecordDescript, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use serde::Deserialize; 14use std::collections::HashMap; 15use tracing::warn; 16 17#[derive(Deserialize)] 18pub struct GetAuthorFeedParams { 19 pub actor: String, 20 pub limit: Option<u32>, 21 pub cursor: Option<String>, 22 pub filter: Option<String>, 23 #[serde(rename = "includePins")] 24 pub include_pins: Option<bool>, 25} 26 27fn update_author_profile_in_feed( 28 feed: &mut [FeedViewPost], 29 author_did: &str, 30 local_profile: &RecordDescript<ProfileRecord>, 31) { 32 for item in feed.iter_mut() { 33 if item.post.author.did == author_did { 34 if let Some(ref display_name) = local_profile.record.display_name { 35 item.post.author.display_name = Some(display_name.clone()); 36 } 37 } 38 } 39} 40 41pub async fn get_author_feed( 42 State(state): State<AppState>, 43 headers: axum::http::HeaderMap, 44 Query(params): Query<GetAuthorFeedParams>, 45) -> Response { 46 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 47 let auth_user = if let Some(h) = auth_header { 48 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 49 crate::auth::validate_bearer_token(&state.db, &token).await.ok() 50 } else { 51 None 52 } 53 } else { 54 None 55 }; 56 let auth_did = auth_user.as_ref().map(|u| u.did.clone()); 57 let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone()); 58 let mut query_params = HashMap::new(); 59 query_params.insert("actor".to_string(), params.actor.clone()); 60 if let Some(limit) = params.limit { 61 query_params.insert("limit".to_string(), limit.to_string()); 62 } 63 if let Some(cursor) = &params.cursor { 64 query_params.insert("cursor".to_string(), cursor.clone()); 65 } 66 if let Some(filter) = &params.filter { 67 query_params.insert("filter".to_string(), filter.clone()); 68 } 69 if let Some(include_pins) = params.include_pins { 70 query_params.insert("includePins".to_string(), include_pins.to_string()); 71 } 72 let proxy_result = 73 match proxy_to_appview("app.bsky.feed.getAuthorFeed", &query_params, auth_did.as_deref().unwrap_or(""), auth_key_bytes.as_deref()).await { 74 Ok(r) => r, 75 Err(e) => return e, 76 }; 77 if !proxy_result.status.is_success() { 78 return proxy_result.into_response(); 79 } 80 let rev = match extract_repo_rev(&proxy_result.headers) { 81 Some(r) => r, 82 None => return proxy_result.into_response(), 83 }; 84 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 85 Ok(f) => f, 86 Err(e) => { 87 warn!("Failed to parse author feed response: {:?}", e); 88 return proxy_result.into_response(); 89 } 90 }; 91 let requester_did = match &auth_did { 92 Some(d) => d.clone(), 93 None => return (StatusCode::OK, Json(feed_output)).into_response(), 94 }; 95 let actor_did = if params.actor.starts_with("did:") { 96 params.actor.clone() 97 } else { 98 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 99 let suffix = format!(".{}", hostname); 100 let short_handle = if params.actor.ends_with(&suffix) { 101 params.actor.strip_suffix(&suffix).unwrap_or(&params.actor) 102 } else { 103 &params.actor 104 }; 105 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle) 106 .fetch_optional(&state.db) 107 .await 108 { 109 Ok(Some(did)) => did, 110 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 111 Err(e) => { 112 warn!("Database error resolving actor handle: {:?}", e); 113 return proxy_result.into_response(); 114 } 115 } 116 }; 117 if actor_did != requester_did { 118 return (StatusCode::OK, Json(feed_output)).into_response(); 119 } 120 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 121 Ok(r) => r, 122 Err(e) => { 123 warn!("Failed to get local records: {}", e); 124 return proxy_result.into_response(); 125 } 126 }; 127 if local_records.count == 0 { 128 return (StatusCode::OK, Json(feed_output)).into_response(); 129 } 130 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 131 .fetch_optional(&state.db) 132 .await 133 { 134 Ok(Some(h)) => h, 135 Ok(None) => requester_did.clone(), 136 Err(e) => { 137 warn!("Database error fetching handle: {:?}", e); 138 requester_did.clone() 139 } 140 }; 141 if let Some(ref local_profile) = local_records.profile { 142 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile); 143 } 144 let local_posts: Vec<_> = local_records 145 .posts 146 .iter() 147 .map(|p| { 148 format_local_post( 149 p, 150 &requester_did, 151 &handle, 152 local_records.profile.as_ref(), 153 ) 154 }) 155 .collect(); 156 insert_posts_into_feed(&mut feed_output.feed, local_posts); 157 let lag = get_local_lag(&local_records); 158 format_munged_response(feed_output, lag) 159}