Noreposts Feed

Add periodic follow verification and cleanup

- Run cleanup task every 5 minutes (instead of hourly)
- Verify all follows against Bluesky API and remove stale ones
- Clean up posts and follows older than 48 hours
- Add logging for cleanup operations

This ensures unfollows are properly reflected even if Jetstream
events are missed or delayed.

+164 -3
+74
src/cleanup.rs
··· 1 + use anyhow::Result; 2 + use std::sync::Arc; 3 + use tracing::{info, warn}; 4 + 5 + use crate::database::Database; 6 + 7 + pub async fn verify_all_follows(db: Arc<Database>) -> Result<()> { 8 + info!("Starting follow verification cleanup"); 9 + 10 + let follower_dids = db.get_all_follower_dids().await?; 11 + info!("Verifying follows for {} users", follower_dids.len()); 12 + 13 + let client = reqwest::Client::new(); 14 + 15 + for follower_did in follower_dids { 16 + match verify_follows_for_user(&client, Arc::clone(&db), &follower_did).await { 17 + Ok(_) => {} 18 + Err(e) => { 19 + warn!("Failed to verify follows for {}: {}", follower_did, e); 20 + } 21 + } 22 + } 23 + 24 + info!("Follow verification cleanup completed"); 25 + Ok(()) 26 + } 27 + 28 + async fn verify_follows_for_user( 29 + client: &reqwest::Client, 30 + db: Arc<Database>, 31 + user_did: &str, 32 + ) -> Result<()> { 33 + let mut cursor: Option<String> = None; 34 + let mut current_follows = Vec::new(); 35 + 36 + loop { 37 + let mut url = format!( 38 + "https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor={}&limit=100", 39 + user_did 40 + ); 41 + if let Some(ref c) = cursor { 42 + url.push_str(&format!("&cursor={}", c)); 43 + } 44 + 45 + let response: serde_json::Value = match client.get(&url).send().await { 46 + Ok(r) => r.json().await?, 47 + Err(e) => { 48 + warn!("Failed to fetch follows for {}: {}", user_did, e); 49 + return Err(e.into()); 50 + } 51 + }; 52 + 53 + let follows = response["follows"].as_array(); 54 + if follows.is_none() { 55 + break; 56 + } 57 + 58 + for follow in follows.unwrap() { 59 + if let Some(target_did) = follow["did"].as_str() { 60 + current_follows.push(target_did.to_string()); 61 + } 62 + } 63 + 64 + cursor = response["cursor"].as_str().map(|s| s.to_string()); 65 + if cursor.is_none() { 66 + break; 67 + } 68 + } 69 + 70 + // Sync the database with current follows 71 + db.sync_follows_for_user(user_did, current_follows).await?; 72 + 73 + Ok(()) 74 + }
+75 -1
src/database.rs
··· 125 125 126 126 pub async fn cleanup_old_posts(&self, hours: i64) -> Result<()> { 127 127 let cutoff = Utc::now() - chrono::Duration::hours(hours); 128 - sqlx::query("DELETE FROM posts WHERE indexed_at < ?") 128 + let result = sqlx::query("DELETE FROM posts WHERE indexed_at < ?") 129 + .bind(cutoff.to_rfc3339()) 130 + .execute(&self.pool) 131 + .await?; 132 + 133 + let deleted = result.rows_affected(); 134 + if deleted > 0 { 135 + tracing::info!("Cleaned up {} posts older than {} hours", deleted, hours); 136 + } 137 + Ok(()) 138 + } 139 + 140 + pub async fn cleanup_old_follows(&self, hours: i64) -> Result<()> { 141 + let cutoff = Utc::now() - chrono::Duration::hours(hours); 142 + let result = sqlx::query("DELETE FROM follows WHERE indexed_at < ?") 129 143 .bind(cutoff.to_rfc3339()) 130 144 .execute(&self.pool) 131 145 .await?; 146 + 147 + let deleted = result.rows_affected(); 148 + if deleted > 0 { 149 + tracing::info!("Cleaned up {} follows older than {} hours", deleted, hours); 150 + } 151 + Ok(()) 152 + } 153 + 154 + pub async fn get_all_follower_dids(&self) -> Result<Vec<String>> { 155 + let rows = sqlx::query("SELECT DISTINCT follower_did FROM follows") 156 + .fetch_all(&self.pool) 157 + .await?; 158 + 159 + let dids: Vec<String> = rows 160 + .into_iter() 161 + .filter_map(|row| row.try_get("follower_did").ok()) 162 + .collect(); 163 + 164 + Ok(dids) 165 + } 166 + 167 + pub async fn sync_follows_for_user( 168 + &self, 169 + user_did: &str, 170 + current_target_dids: Vec<String>, 171 + ) -> Result<()> { 172 + // Get all follows for this user in our database 173 + let rows = sqlx::query("SELECT target_did FROM follows WHERE follower_did = ?") 174 + .bind(user_did) 175 + .fetch_all(&self.pool) 176 + .await?; 177 + 178 + let db_target_dids: Vec<String> = rows 179 + .into_iter() 180 + .filter_map(|row| row.try_get("target_did").ok()) 181 + .collect(); 182 + 183 + // Find follows in database that no longer exist in current follows 184 + let mut removed_count = 0; 185 + for db_target in &db_target_dids { 186 + if !current_target_dids.contains(db_target) { 187 + // This follow no longer exists, remove it 188 + sqlx::query("DELETE FROM follows WHERE follower_did = ? AND target_did = ?") 189 + .bind(user_did) 190 + .bind(db_target) 191 + .execute(&self.pool) 192 + .await?; 193 + removed_count += 1; 194 + tracing::info!("Removed stale follow: {} -> {}", user_did, db_target); 195 + } 196 + } 197 + 198 + if removed_count > 0 { 199 + tracing::info!( 200 + "Cleaned up {} stale follows for {}", 201 + removed_count, 202 + user_did 203 + ); 204 + } 205 + 132 206 Ok(()) 133 207 } 134 208
+15 -2
src/main.rs
··· 16 16 mod admin_socket; 17 17 mod auth; 18 18 mod backfill; 19 + mod cleanup; 19 20 mod database; 20 21 mod feed_algorithm; 21 22 mod jetstream_consumer; ··· 110 111 } 111 112 }); 112 113 113 - // Start cleanup task 114 + // Start cleanup task - runs every 5 minutes 114 115 let db_cleanup = Arc::clone(&db); 115 116 tokio::spawn(async move { 116 - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(3600)); // Every hour 117 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); // Every 5 minutes 117 118 loop { 118 119 interval.tick().await; 120 + 121 + // Clean up old posts (older than 48 hours) 119 122 if let Err(e) = db_cleanup.cleanup_old_posts(48).await { 120 123 warn!("Failed to cleanup old posts: {}", e); 124 + } 125 + 126 + // Clean up old follows (older than 48 hours) 127 + if let Err(e) = db_cleanup.cleanup_old_follows(48).await { 128 + warn!("Failed to cleanup old follows: {}", e); 129 + } 130 + 131 + // Verify follows against API and remove stale ones 132 + if let Err(e) = cleanup::verify_all_follows(Arc::clone(&db_cleanup)).await { 133 + warn!("Failed to verify follows: {}", e); 121 134 } 122 135 } 123 136 });