Noreposts Feed
at main 124 lines 3.8 kB view raw
1use anyhow::Result; 2use sqlx::Row; 3use std::sync::Arc; 4use tracing::{info, warn}; 5 6use crate::database::Database; 7 8pub async fn verify_active_user_follows(db: Arc<Database>) -> Result<()> { 9 info!("Starting follow verification for active users"); 10 11 // Only verify follows for users who have accessed the feed in the last 7 days 12 let active_users = db.get_active_users(7).await?; 13 info!("Verifying follows for {} active users", active_users.len()); 14 15 let client = reqwest::Client::new(); 16 17 for user_did in active_users { 18 match verify_follows_for_user(&client, Arc::clone(&db), &user_did).await { 19 Ok(_) => { 20 // Record that we synced this user's follows 21 if let Err(e) = db.update_follow_sync(&user_did).await { 22 warn!( 23 "Failed to update follow sync timestamp for {}: {}", 24 user_did, e 25 ); 26 } 27 } 28 Err(e) => { 29 warn!("Failed to verify follows for {}: {}", user_did, e); 30 } 31 } 32 } 33 34 info!("Follow verification completed"); 35 Ok(()) 36} 37 38pub async fn cleanup_inactive_user_follows(db: Arc<Database>) -> Result<()> { 39 info!("Starting cleanup of follows for inactive users"); 40 41 // Get all unique follower DIDs from the follows table 42 let all_follower_dids: Vec<String> = sqlx::query("SELECT DISTINCT follower_did FROM follows") 43 .fetch_all(&db.pool) 44 .await? 45 .into_iter() 46 .filter_map(|row| row.try_get("follower_did").ok()) 47 .collect(); 48 49 info!("Found {} unique users with follows", all_follower_dids.len()); 50 51 // Get active users (accessed feed in last 7 days) 52 let active_users = db.get_active_users(7).await?; 53 let active_user_set: std::collections::HashSet<String> = 54 active_users.into_iter().collect(); 55 56 // Delete follows for users who are not active 57 let mut deleted_count = 0; 58 for follower_did in all_follower_dids { 59 if !active_user_set.contains(&follower_did) { 60 let result = sqlx::query("DELETE FROM follows WHERE follower_did = ?") 61 .bind(&follower_did) 62 .execute(&db.pool) 63 .await?; 64 65 deleted_count += result.rows_affected(); 66 } 67 } 68 69 if deleted_count > 0 { 70 info!("Cleaned up {} follows from inactive users", deleted_count); 71 } else { 72 info!("No inactive user follows to clean up"); 73 } 74 75 Ok(()) 76} 77 78async fn verify_follows_for_user( 79 client: &reqwest::Client, 80 db: Arc<Database>, 81 user_did: &str, 82) -> Result<()> { 83 let mut cursor: Option<String> = None; 84 let mut current_follows = Vec::new(); 85 86 loop { 87 let mut url = format!( 88 "https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor={}&limit=100", 89 user_did 90 ); 91 if let Some(ref c) = cursor { 92 url.push_str(&format!("&cursor={}", c)); 93 } 94 95 let response: serde_json::Value = match client.get(&url).send().await { 96 Ok(r) => r.json().await?, 97 Err(e) => { 98 warn!("Failed to fetch follows for {}: {}", user_did, e); 99 return Err(e.into()); 100 } 101 }; 102 103 let follows = response["follows"].as_array(); 104 if follows.is_none() { 105 break; 106 } 107 108 for follow in follows.unwrap() { 109 if let Some(target_did) = follow["did"].as_str() { 110 current_follows.push(target_did.to_string()); 111 } 112 } 113 114 cursor = response["cursor"].as_str().map(|s| s.to_string()); 115 if cursor.is_none() { 116 break; 117 } 118 } 119 120 // Sync the database with current follows 121 db.sync_follows_for_user(user_did, current_follows).await?; 122 123 Ok(()) 124}