this repo has no description
1use crate::api::read_after_write::{ 2 FeedOutput, FeedViewPost, ProfileRecord, RecordDescript, extract_repo_rev, format_local_post, 3 format_munged_response, get_local_lag, get_records_since_rev, insert_posts_into_feed, 4 proxy_to_appview_via_registry, 5}; 6use crate::state::AppState; 7use axum::{ 8 Json, 9 extract::{Query, State}, 10 http::StatusCode, 11 response::{IntoResponse, Response}, 12}; 13use serde::Deserialize; 14use std::collections::HashMap; 15use tracing::warn; 16 17#[derive(Deserialize)] 18pub struct GetAuthorFeedParams { 19 pub actor: String, 20 pub limit: Option<u32>, 21 pub cursor: Option<String>, 22 pub filter: Option<String>, 23 #[serde(rename = "includePins")] 24 pub include_pins: Option<bool>, 25} 26 27fn update_author_profile_in_feed( 28 feed: &mut [FeedViewPost], 29 author_did: &str, 30 local_profile: &RecordDescript<ProfileRecord>, 31) { 32 for item in feed.iter_mut() { 33 if item.post.author.did == author_did 34 && let Some(ref display_name) = local_profile.record.display_name { 35 item.post.author.display_name = Some(display_name.clone()); 36 } 37 } 38} 39 40pub async fn get_author_feed( 41 State(state): State<AppState>, 42 headers: axum::http::HeaderMap, 43 Query(params): Query<GetAuthorFeedParams>, 44) -> Response { 45 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 46 let auth_user = if let Some(h) = auth_header { 47 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 48 crate::auth::validate_bearer_token(&state.db, &token) 49 .await 50 .ok() 51 } else { 52 None 53 } 54 } else { 55 None 56 }; 57 let auth_did = auth_user.as_ref().map(|u| u.did.clone()); 58 let auth_key_bytes = auth_user.as_ref().and_then(|u| u.key_bytes.clone()); 59 let mut query_params = HashMap::new(); 60 query_params.insert("actor".to_string(), params.actor.clone()); 61 if let Some(limit) = params.limit { 62 query_params.insert("limit".to_string(), limit.to_string()); 63 } 64 if let Some(cursor) = &params.cursor { 65 query_params.insert("cursor".to_string(), cursor.clone()); 66 } 67 if let Some(filter) = &params.filter { 68 query_params.insert("filter".to_string(), filter.clone()); 69 } 70 if let Some(include_pins) = params.include_pins { 71 query_params.insert("includePins".to_string(), include_pins.to_string()); 72 } 73 let proxy_result = match proxy_to_appview_via_registry( 74 &state, 75 "app.bsky.feed.getAuthorFeed", 76 &query_params, 77 auth_did.as_deref().unwrap_or(""), 78 auth_key_bytes.as_deref(), 79 ) 80 .await 81 { 82 Ok(r) => r, 83 Err(e) => return e, 84 }; 85 if !proxy_result.status.is_success() { 86 return proxy_result.into_response(); 87 } 88 let rev = match extract_repo_rev(&proxy_result.headers) { 89 Some(r) => r, 90 None => return proxy_result.into_response(), 91 }; 92 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 93 Ok(f) => f, 94 Err(e) => { 95 warn!("Failed to parse author feed response: {:?}", e); 96 return proxy_result.into_response(); 97 } 98 }; 99 let requester_did = match &auth_did { 100 Some(d) => d.clone(), 101 None => return (StatusCode::OK, Json(feed_output)).into_response(), 102 }; 103 let actor_did = if params.actor.starts_with("did:") { 104 params.actor.clone() 105 } else { 106 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 107 let suffix = format!(".{}", hostname); 108 let short_handle = if params.actor.ends_with(&suffix) { 109 params.actor.strip_suffix(&suffix).unwrap_or(&params.actor) 110 } else { 111 &params.actor 112 }; 113 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle) 114 .fetch_optional(&state.db) 115 .await 116 { 117 Ok(Some(did)) => did, 118 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 119 Err(e) => { 120 warn!("Database error resolving actor handle: {:?}", e); 121 return proxy_result.into_response(); 122 } 123 } 124 }; 125 if actor_did != requester_did { 126 return (StatusCode::OK, Json(feed_output)).into_response(); 127 } 128 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 129 Ok(r) => r, 130 Err(e) => { 131 warn!("Failed to get local records: {}", e); 132 return proxy_result.into_response(); 133 } 134 }; 135 if local_records.count == 0 { 136 return (StatusCode::OK, Json(feed_output)).into_response(); 137 } 138 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 139 .fetch_optional(&state.db) 140 .await 141 { 142 Ok(Some(h)) => h, 143 Ok(None) => requester_did.clone(), 144 Err(e) => { 145 warn!("Database error fetching handle: {:?}", e); 146 requester_did.clone() 147 } 148 }; 149 if let Some(ref local_profile) = local_records.profile { 150 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile); 151 } 152 let local_posts: Vec<_> = local_records 153 .posts 154 .iter() 155 .map(|p| format_local_post(p, &requester_did, &handle, local_records.profile.as_ref())) 156 .collect(); 157 insert_posts_into_feed(&mut feed_output.feed, local_posts); 158 let lag = get_local_lag(&local_records); 159 format_munged_response(feed_output, lag) 160}