Noreposts Feed
1use anyhow::Result;
2use axum::{
3 extract::{Query, State},
4 http::{HeaderMap, StatusCode},
5 response::{IntoResponse, Json, Response},
6 routing::get,
7 Router,
8};
9use clap::Parser;
10use sqlx::Row;
11use std::sync::Arc;
12use tokio::net::TcpListener;
13use tower_http::cors::CorsLayer;
14use tracing::{info, warn};
15
16mod admin_socket;
17mod auth;
18mod backfill;
19mod cleanup;
20mod database;
21mod feed_algorithm;
22mod jetstream_consumer;
23mod publish;
24mod types;
25
26use crate::{
27 admin_socket::AdminSocket, auth::validate_jwt, database::Database,
28 feed_algorithm::FollowingNoRepostsFeed, jetstream_consumer::JetstreamEventHandler, types::*,
29};
30
31#[derive(Parser)]
32#[command(name = "following-no-reposts-feed")]
33#[command(about = "A Bluesky feed generator for following without reposts")]
34struct Args {
35 #[command(subcommand)]
36 command: Option<Command>,
37
38 #[arg(long, env = "DATABASE_URL", default_value = "sqlite:./feed.db")]
39 database_url: String,
40
41 #[arg(long, env = "PORT", default_value = "3000")]
42 port: u16,
43
44 #[arg(long, env = "FEEDGEN_HOSTNAME")]
45 hostname: Option<String>,
46
47 #[arg(long, env = "FEEDGEN_SERVICE_DID")]
48 service_did: Option<String>,
49
50 #[arg(
51 long,
52 env = "JETSTREAM_HOSTNAME",
53 default_value = "jetstream1.us-east.bsky.network"
54 )]
55 jetstream_hostname: String,
56
57 #[arg(
58 long,
59 env = "ADMIN_SOCKET",
60 default_value = "/var/run/noreposts-feed.sock"
61 )]
62 admin_socket: String,
63}
64
65#[derive(Parser)]
66enum Command {
67 /// Publish the feed to Bluesky
68 Publish,
69 /// Run the feed generator server (default)
70 Serve,
71}
72
73#[derive(Clone)]
74struct AppState {
75 db: Arc<Database>,
76 service_did: String,
77}
78
79#[tokio::main]
80async fn main() -> Result<()> {
81 tracing_subscriber::fmt::init();
82 dotenvy::dotenv().ok();
83
84 let args = Args::parse();
85
86 // Handle publish command
87 if matches!(args.command, Some(Command::Publish)) {
88 return publish::publish_feed().await;
89 }
90
91 // Default to serve mode
92 let service_did = args
93 .service_did
94 .or_else(|| args.hostname.clone().map(|h| format!("did:web:{}", h)))
95 .expect("FEEDGEN_SERVICE_DID or FEEDGEN_HOSTNAME must be set");
96
97 // Initialize database
98 let db = Arc::new(Database::new(&args.database_url).await?);
99 db.migrate().await?;
100
101 let app_state = AppState {
102 db: Arc::clone(&db),
103 service_did: service_did.clone(),
104 };
105
106 // Start admin socket
107 let admin_socket = AdminSocket::new(Arc::clone(&db), args.admin_socket.clone());
108 tokio::spawn(async move {
109 if let Err(e) = admin_socket.start().await {
110 warn!("Admin socket error: {}", e);
111 }
112 });
113
114 // Start cleanup task - runs every 5 minutes
115 let db_cleanup = Arc::clone(&db);
116 tokio::spawn(async move {
117 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes
118 loop {
119 interval.tick().await;
120
121 // Clean up old posts (older than 48 hours)
122 if let Err(e) = db_cleanup.cleanup_old_posts(48).await {
123 warn!("Failed to cleanup old posts: {}", e);
124 }
125
126 // Verify follows for active users (accessed feed in last 7 days)
127 // This removes follows that no longer exist in the user's actual follow list
128 if let Err(e) = cleanup::verify_active_user_follows(Arc::clone(&db_cleanup)).await {
129 warn!("Failed to verify active user follows: {}", e);
130 }
131
132 // Clean up follows for users who haven't accessed the feed
133 // This removes all follow data for users not in the active_users table
134 if let Err(e) = cleanup::cleanup_inactive_user_follows(Arc::clone(&db_cleanup)).await {
135 warn!("Failed to cleanup inactive user follows: {}", e);
136 }
137 }
138 });
139
140 // Start Jetstream consumer with automatic reconnection
141 let event_handler = JetstreamEventHandler::new(Arc::clone(&db));
142 let jetstream_hostname = args.jetstream_hostname.clone();
143 tokio::spawn(async move {
144 loop {
145 info!("Starting Jetstream consumer...");
146 if let Err(e) = event_handler.start(jetstream_hostname.clone()).await {
147 warn!(
148 "Jetstream consumer error: {}. Reconnecting in 5 seconds...",
149 e
150 );
151 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
152 } else {
153 // Consumer stopped without error, wait before restarting
154 warn!("Jetstream consumer stopped unexpectedly. Reconnecting in 5 seconds...");
155 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
156 }
157 }
158 });
159
160 // Setup web server
161 let app = Router::new()
162 .route("/", get(root))
163 .route("/.well-known/did.json", get(did_document))
164 .route(
165 "/xrpc/app.bsky.feed.getFeedSkeleton",
166 get(get_feed_skeleton),
167 )
168 .layer(CorsLayer::permissive())
169 .with_state(app_state);
170
171 let listener = TcpListener::bind(format!("0.0.0.0:{}", args.port)).await?;
172 info!("Feed generator listening on port {}", args.port);
173
174 axum::serve(listener, app).await?;
175 Ok(())
176}
177
178async fn root() -> &'static str {
179 "Following No Reposts Feed Generator"
180}
181
182async fn did_document(State(state): State<AppState>) -> Json<DidDocument> {
183 Json(DidDocument {
184 context: vec!["https://www.w3.org/ns/did/v1".to_string()],
185 id: state.service_did.clone(),
186 service: vec![ServiceEndpoint {
187 id: "#bsky_fg".to_string(),
188 service_type: "BskyFeedGenerator".to_string(),
189 service_endpoint: format!(
190 "https://{}",
191 std::env::var("FEEDGEN_HOSTNAME").unwrap_or_default()
192 ),
193 }],
194 })
195}
196
197async fn get_feed_skeleton(
198 headers: HeaderMap,
199 Query(params): Query<FeedSkeletonParams>,
200 State(state): State<AppState>,
201) -> Response {
202 info!("Received feed skeleton request for feed: {}", params.feed);
203
204 // This feed requires authentication since it's personalized
205 let auth_header = match headers.get("authorization") {
206 Some(h) => h,
207 None => {
208 warn!("Missing Authorization header - this feed requires authentication");
209 return (
210 StatusCode::UNAUTHORIZED,
211 Json(types::ErrorResponse {
212 error: "AuthenticationRequired".to_string(),
213 message:
214 "This feed shows posts from accounts you follow and requires authentication"
215 .to_string(),
216 }),
217 )
218 .into_response();
219 }
220 };
221
222 let auth_str = match auth_header.to_str() {
223 Ok(s) => s,
224 Err(_) => {
225 warn!("Invalid authorization header format");
226 return (
227 StatusCode::UNAUTHORIZED,
228 Json(types::ErrorResponse {
229 error: "AuthenticationRequired".to_string(),
230 message: "Invalid authorization header format".to_string(),
231 }),
232 )
233 .into_response();
234 }
235 };
236
237 // Remove "Bearer " prefix if present
238 let token = auth_str.strip_prefix("Bearer ").unwrap_or(auth_str);
239
240 info!("Validating JWT for request");
241 let requester_did = match validate_jwt(token, &state.service_did).await {
242 Ok(claims) => {
243 info!("Authenticated request from DID: {}", claims.iss);
244 claims.iss
245 }
246 Err(e) => {
247 warn!("JWT validation failed: {}", e);
248 return (
249 StatusCode::UNAUTHORIZED,
250 Json(types::ErrorResponse {
251 error: "AuthenticationRequired".to_string(),
252 message: format!("JWT validation failed: {}", e),
253 }),
254 )
255 .into_response();
256 }
257 };
258
259 // Check if user has any follows, if not, backfill them and their posts
260 let db_for_backfill = Arc::clone(&state.db);
261 let requester_did_clone = requester_did.clone();
262 tokio::spawn(async move {
263 // Check if we have any follows for this user
264 let has_follows =
265 sqlx::query("SELECT COUNT(*) as count FROM follows WHERE follower_did = ?")
266 .bind(&requester_did_clone)
267 .fetch_one(&db_for_backfill.pool)
268 .await
269 .ok()
270 .and_then(|row| row.try_get::<i64, _>("count").ok())
271 .unwrap_or(0);
272
273 if has_follows == 0 {
274 info!(
275 "No follows found for {}, triggering backfill",
276 requester_did_clone
277 );
278
279 // First backfill follows
280 if let Err(e) =
281 backfill::backfill_follows(Arc::clone(&db_for_backfill), &requester_did_clone).await
282 {
283 warn!("Follow backfill failed for {}: {}", requester_did_clone, e);
284 return;
285 }
286
287 // Then backfill recent posts from each follow (10 posts per user)
288 info!("Starting post backfill for {}", requester_did_clone);
289 if let Err(e) = backfill::backfill_posts_for_follows(
290 Arc::clone(&db_for_backfill),
291 &requester_did_clone,
292 10,
293 )
294 .await
295 {
296 warn!("Post backfill failed for {}: {}", requester_did_clone, e);
297 }
298 }
299 });
300
301 // Record that this user accessed the feed
302 if let Err(e) = state.db.record_feed_request(&requester_did).await {
303 warn!("Failed to record feed request for {}: {}", requester_did, e);
304 }
305
306 let feed_algorithm = FollowingNoRepostsFeed::new(Arc::clone(&state.db));
307
308 info!(
309 "Generating feed for requester: {}, limit: {:?}, cursor: {:?}",
310 requester_did, params.limit, params.cursor
311 );
312
313 match feed_algorithm
314 .generate_feed(Some(requester_did.clone()), params.limit, params.cursor)
315 .await
316 {
317 Ok(response) => {
318 info!(
319 "Successfully generated feed with {} posts",
320 response.feed.len()
321 );
322 Json(response).into_response()
323 }
324 Err(e) => {
325 warn!("Feed generation error: {}", e);
326 (
327 StatusCode::INTERNAL_SERVER_ERROR,
328 Json(types::ErrorResponse {
329 error: "InternalServerError".to_string(),
330 message: format!("Failed to generate feed: {}", e),
331 }),
332 )
333 .into_response()
334 }
335 }
336}