Noreposts Feed
at main 336 lines 11 kB view raw
1use anyhow::Result; 2use axum::{ 3 extract::{Query, State}, 4 http::{HeaderMap, StatusCode}, 5 response::{IntoResponse, Json, Response}, 6 routing::get, 7 Router, 8}; 9use clap::Parser; 10use sqlx::Row; 11use std::sync::Arc; 12use tokio::net::TcpListener; 13use tower_http::cors::CorsLayer; 14use tracing::{info, warn}; 15 16mod admin_socket; 17mod auth; 18mod backfill; 19mod cleanup; 20mod database; 21mod feed_algorithm; 22mod jetstream_consumer; 23mod publish; 24mod types; 25 26use crate::{ 27 admin_socket::AdminSocket, auth::validate_jwt, database::Database, 28 feed_algorithm::FollowingNoRepostsFeed, jetstream_consumer::JetstreamEventHandler, types::*, 29}; 30 31#[derive(Parser)] 32#[command(name = "following-no-reposts-feed")] 33#[command(about = "A Bluesky feed generator for following without reposts")] 34struct Args { 35 #[command(subcommand)] 36 command: Option<Command>, 37 38 #[arg(long, env = "DATABASE_URL", default_value = "sqlite:./feed.db")] 39 database_url: String, 40 41 #[arg(long, env = "PORT", default_value = "3000")] 42 port: u16, 43 44 #[arg(long, env = "FEEDGEN_HOSTNAME")] 45 hostname: Option<String>, 46 47 #[arg(long, env = "FEEDGEN_SERVICE_DID")] 48 service_did: Option<String>, 49 50 #[arg( 51 long, 52 env = "JETSTREAM_HOSTNAME", 53 default_value = "jetstream1.us-east.bsky.network" 54 )] 55 jetstream_hostname: String, 56 57 #[arg( 58 long, 59 env = "ADMIN_SOCKET", 60 default_value = "/var/run/noreposts-feed.sock" 61 )] 62 admin_socket: String, 63} 64 65#[derive(Parser)] 66enum Command { 67 /// Publish the feed to Bluesky 68 Publish, 69 /// Run the feed generator server (default) 70 Serve, 71} 72 73#[derive(Clone)] 74struct AppState { 75 db: Arc<Database>, 76 service_did: String, 77} 78 79#[tokio::main] 80async fn main() -> Result<()> { 81 tracing_subscriber::fmt::init(); 82 dotenvy::dotenv().ok(); 83 84 let args = Args::parse(); 85 86 // Handle publish command 87 if matches!(args.command, Some(Command::Publish)) { 88 return publish::publish_feed().await; 89 } 90 91 // Default to serve mode 92 let service_did = args 93 .service_did 94 .or_else(|| args.hostname.clone().map(|h| format!("did:web:{}", h))) 95 .expect("FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME must be set"); 96 97 // Initialize database 98 let db = Arc::new(Database::new(&args.database_url).await?); 99 db.migrate().await?; 100 101 let app_state = AppState { 102 db: Arc::clone(&db), 103 service_did: service_did.clone(), 104 }; 105 106 // Start admin socket 107 let admin_socket = AdminSocket::new(Arc::clone(&db), args.admin_socket.clone()); 108 tokio::spawn(async move { 109 if let Err(e) = admin_socket.start().await { 110 warn!("Admin socket error: {}", e); 111 } 112 }); 113 114 // Start cleanup task - runs every 5 minutes 115 let db_cleanup = Arc::clone(&db); 116 tokio::spawn(async move { 117 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes 118 loop { 119 interval.tick().await; 120 121 // Clean up old posts (older than 48 hours) 122 if let Err(e) = db_cleanup.cleanup_old_posts(48).await { 123 warn!("Failed to cleanup old posts: {}", e); 124 } 125 126 // Verify follows for active users (accessed feed in last 7 days) 127 // This removes follows that no longer exist in the user's actual follow list 128 if let Err(e) = cleanup::verify_active_user_follows(Arc::clone(&db_cleanup)).await { 129 warn!("Failed to verify active user follows: {}", e); 130 } 131 132 // Clean up follows for users who haven't accessed the feed 133 // This removes all follow data for users not in the active_users table 134 if let Err(e) = cleanup::cleanup_inactive_user_follows(Arc::clone(&db_cleanup)).await { 135 warn!("Failed to cleanup inactive user follows: {}", e); 136 } 137 } 138 }); 139 140 // Start Jetstream consumer with automatic reconnection 141 let event_handler = JetstreamEventHandler::new(Arc::clone(&db)); 142 let jetstream_hostname = args.jetstream_hostname.clone(); 143 tokio::spawn(async move { 144 loop { 145 info!("Starting Jetstream consumer..."); 146 if let Err(e) = event_handler.start(jetstream_hostname.clone()).await { 147 warn!( 148 "Jetstream consumer error: {}. Reconnecting in 5 seconds...", 149 e 150 ); 151 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 152 } else { 153 // Consumer stopped without error, wait before restarting 154 warn!("Jetstream consumer stopped unexpectedly. Reconnecting in 5 seconds..."); 155 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 156 } 157 } 158 }); 159 160 // Setup web server 161 let app = Router::new() 162 .route("/", get(root)) 163 .route("/.well-known/did.json", get(did_document)) 164 .route( 165 "/xrpc/app.bsky.feed.getFeedSkeleton", 166 get(get_feed_skeleton), 167 ) 168 .layer(CorsLayer::permissive()) 169 .with_state(app_state); 170 171 let listener = TcpListener::bind(format!("0.0.0.0:{}", args.port)).await?; 172 info!("Feed generator listening on port {}", args.port); 173 174 axum::serve(listener, app).await?; 175 Ok(()) 176} 177 178async fn root() -> &'static str { 179 "Following No Reposts Feed Generator" 180} 181 182async fn did_document(State(state): State<AppState>) -> Json<DidDocument> { 183 Json(DidDocument { 184 context: vec!["https://www.w3.org/ns/did/v1".to_string()], 185 id: state.service_did.clone(), 186 service: vec![ServiceEndpoint { 187 id: "#bsky_fg".to_string(), 188 service_type: "BskyFeedGenerator".to_string(), 189 service_endpoint: format!( 190 "https://{}", 191 std::env::var("FEEDGEN_HOSTNAME").unwrap_or_default() 192 ), 193 }], 194 }) 195} 196 197async fn get_feed_skeleton( 198 headers: HeaderMap, 199 Query(params): Query<FeedSkeletonParams>, 200 State(state): State<AppState>, 201) -> Response { 202 info!("Received feed skeleton request for feed: {}", params.feed); 203 204 // This feed requires authentication since it's personalized 205 let auth_header = match headers.get("authorization") { 206 Some(h) => h, 207 None => { 208 warn!("Missing Authorization header - this feed requires authentication"); 209 return ( 210 StatusCode::UNAUTHORIZED, 211 Json(types::ErrorResponse { 212 error: "AuthenticationRequired".to_string(), 213 message: 214 "This feed shows posts from accounts you follow and requires authentication" 215 .to_string(), 216 }), 217 ) 218 .into_response(); 219 } 220 }; 221 222 let auth_str = match auth_header.to_str() { 223 Ok(s) => s, 224 Err(_) => { 225 warn!("Invalid authorization header format"); 226 return ( 227 StatusCode::UNAUTHORIZED, 228 Json(types::ErrorResponse { 229 error: "AuthenticationRequired".to_string(), 230 message: "Invalid authorization header format".to_string(), 231 }), 232 ) 233 .into_response(); 234 } 235 }; 236 237 // Remove "Bearer " prefix if present 238 let token = auth_str.strip_prefix("Bearer ").unwrap_or(auth_str); 239 240 info!("Validating JWT for request"); 241 let requester_did = match validate_jwt(token, &state.service_did).await { 242 Ok(claims) => { 243 info!("Authenticated request from DID: {}", claims.iss); 244 claims.iss 245 } 246 Err(e) => { 247 warn!("JWT validation failed: {}", e); 248 return ( 249 StatusCode::UNAUTHORIZED, 250 Json(types::ErrorResponse { 251 error: "AuthenticationRequired".to_string(), 252 message: format!("JWT validation failed: {}", e), 253 }), 254 ) 255 .into_response(); 256 } 257 }; 258 259 // Check if user has any follows, if not, backfill them and their posts 260 let db_for_backfill = Arc::clone(&state.db); 261 let requester_did_clone = requester_did.clone(); 262 tokio::spawn(async move { 263 // Check if we have any follows for this user 264 let has_follows = 265 sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ?") 266 .bind(&requester_did_clone) 267 .fetch_one(&db_for_backfill.pool) 268 .await 269 .ok() 270 .and_then(|row| row.try_get::<i64, _>("count").ok()) 271 .unwrap_or(0); 272 273 if has_follows == 0 { 274 info!( 275 "No follows found for {}, triggering backfill", 276 requester_did_clone 277 ); 278 279 // First backfill follows 280 if let Err(e) = 281 backfill::backfill_follows(Arc::clone(&db_for_backfill), &requester_did_clone).await 282 { 283 warn!("Follow backfill failed for {}: {}", requester_did_clone, e); 284 return; 285 } 286 287 // Then backfill recent posts from each follow (10 posts per user) 288 info!("Starting post backfill for {}", requester_did_clone); 289 if let Err(e) = backfill::backfill_posts_for_follows( 290 Arc::clone(&db_for_backfill), 291 &requester_did_clone, 292 10, 293 ) 294 .await 295 { 296 warn!("Post backfill failed for {}: {}", requester_did_clone, e); 297 } 298 } 299 }); 300 301 // Record that this user accessed the feed 302 if let Err(e) = state.db.record_feed_request(&requester_did).await { 303 warn!("Failed to record feed request for {}: {}", requester_did, e); 304 } 305 306 let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&state.db)); 307 308 info!( 309 "Generating feed for requester: {}, limit: {:?}, cursor: {:?}", 310 requester_did, params.limit, params.cursor 311 ); 312 313 match feed_algorithm 314 .generate_feed(Some(requester_did.clone()), params.limit, params.cursor) 315 .await 316 { 317 Ok(response) => { 318 info!( 319 "Successfully generated feed with {} posts", 320 response.feed.len() 321 ); 322 Json(response).into_response() 323 } 324 Err(e) => { 325 warn!("Feed generation error: {}", e); 326 ( 327 StatusCode::INTERNAL_SERVER_ERROR, 328 Json(types::ErrorResponse { 329 error: "InternalServerError".to_string(), 330 message: format!("Failed to generate feed: {}", e), 331 }), 332 ) 333 .into_response() 334 } 335 } 336}