this repo has no description
1use crate::api::read_after_write::{ 2 extract_repo_rev, format_local_post, format_munged_response, get_local_lag, 3 get_records_since_rev, insert_posts_into_feed, proxy_to_appview, FeedOutput, FeedViewPost, 4 ProfileRecord, RecordDescript, 5}; 6use crate::state::AppState; 7use axum::{ 8 extract::{Query, State}, 9 http::StatusCode, 10 response::{IntoResponse, Response}, 11 Json, 12}; 13use serde::Deserialize; 14use std::collections::HashMap; 15use tracing::warn; 16 17#[derive(Deserialize)] 18pub struct GetAuthorFeedParams { 19 pub actor: String, 20 pub limit: Option<u32>, 21 pub cursor: Option<String>, 22 pub filter: Option<String>, 23 #[serde(rename = "includePins")] 24 pub include_pins: Option<bool>, 25} 26 27fn update_author_profile_in_feed( 28 feed: &mut [FeedViewPost], 29 author_did: &str, 30 local_profile: &RecordDescript<ProfileRecord>, 31) { 32 for item in feed.iter_mut() { 33 if item.post.author.did == author_did { 34 if let Some(ref display_name) = local_profile.record.display_name { 35 item.post.author.display_name = Some(display_name.clone()); 36 } 37 } 38 } 39} 40 41pub async fn get_author_feed( 42 State(state): State<AppState>, 43 headers: axum::http::HeaderMap, 44 Query(params): Query<GetAuthorFeedParams>, 45) -> Response { 46 let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok()); 47 48 let auth_did = if let Some(h) = auth_header { 49 if let Some(token) = crate::auth::extract_bearer_token_from_header(Some(h)) { 50 match crate::auth::validate_bearer_token(&state.db, &token).await { 51 Ok(user) => Some(user.did), 52 Err(_) => None, 53 } 54 } else { 55 None 56 } 57 } else { 58 None 59 }; 60 61 let mut query_params = HashMap::new(); 62 query_params.insert("actor".to_string(), params.actor.clone()); 63 if let Some(limit) = params.limit { 64 query_params.insert("limit".to_string(), limit.to_string()); 65 } 66 if let Some(cursor) = &params.cursor { 67 query_params.insert("cursor".to_string(), cursor.clone()); 68 } 69 if let Some(filter) = &params.filter { 70 query_params.insert("filter".to_string(), filter.clone()); 71 } 72 if let Some(include_pins) = params.include_pins { 73 query_params.insert("includePins".to_string(), include_pins.to_string()); 74 } 75 76 let proxy_result = 77 match proxy_to_appview("app.bsky.feed.getAuthorFeed", &query_params, auth_header).await { 78 Ok(r) => r, 79 Err(e) => return e, 80 }; 81 82 if !proxy_result.status.is_success() { 83 return (proxy_result.status, proxy_result.body).into_response(); 84 } 85 86 let rev = match extract_repo_rev(&proxy_result.headers) { 87 Some(r) => r, 88 None => return (proxy_result.status, proxy_result.body).into_response(), 89 }; 90 91 let mut feed_output: FeedOutput = match serde_json::from_slice(&proxy_result.body) { 92 Ok(f) => f, 93 Err(e) => { 94 warn!("Failed to parse author feed response: {:?}", e); 95 return (proxy_result.status, proxy_result.body).into_response(); 96 } 97 }; 98 99 let requester_did = match auth_did { 100 Some(d) => d, 101 None => return (StatusCode::OK, Json(feed_output)).into_response(), 102 }; 103 104 let actor_did = if params.actor.starts_with("did:") { 105 params.actor.clone() 106 } else { 107 let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); 108 let suffix = format!(".{}", hostname); 109 let short_handle = if params.actor.ends_with(&suffix) { 110 params.actor.strip_suffix(&suffix).unwrap_or(&params.actor) 111 } else { 112 &params.actor 113 }; 114 match sqlx::query_scalar!("SELECT did FROM users WHERE handle = $1", short_handle) 115 .fetch_optional(&state.db) 116 .await 117 { 118 Ok(Some(did)) => did, 119 Ok(None) => return (StatusCode::OK, Json(feed_output)).into_response(), 120 Err(e) => { 121 warn!("Database error resolving actor handle: {:?}", e); 122 return (proxy_result.status, proxy_result.body).into_response(); 123 } 124 } 125 }; 126 127 if actor_did != requester_did { 128 return (StatusCode::OK, Json(feed_output)).into_response(); 129 } 130 131 let local_records = match get_records_since_rev(&state, &requester_did, &rev).await { 132 Ok(r) => r, 133 Err(e) => { 134 warn!("Failed to get local records: {}", e); 135 return (proxy_result.status, proxy_result.body).into_response(); 136 } 137 }; 138 139 if local_records.count == 0 { 140 return (StatusCode::OK, Json(feed_output)).into_response(); 141 } 142 143 let handle = match sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", requester_did) 144 .fetch_optional(&state.db) 145 .await 146 { 147 Ok(Some(h)) => h, 148 Ok(None) => requester_did.clone(), 149 Err(e) => { 150 warn!("Database error fetching handle: {:?}", e); 151 requester_did.clone() 152 } 153 }; 154 155 if let Some(ref local_profile) = local_records.profile { 156 update_author_profile_in_feed(&mut feed_output.feed, &requester_did, local_profile); 157 } 158 159 let local_posts: Vec<_> = local_records 160 .posts 161 .iter() 162 .map(|p| { 163 format_local_post( 164 p, 165 &requester_did, 166 &handle, 167 local_records.profile.as_ref(), 168 ) 169 }) 170 .collect(); 171 172 insert_posts_into_feed(&mut feed_output.feed, local_posts); 173 174 let lag = get_local_lag(&local_records); 175 format_munged_response(feed_output, lag) 176}