Noreposts Feed
1use anyhow::Result;
2use sqlx::Row;
3use std::sync::Arc;
4use tracing::{info, warn};
5
6use crate::database::Database;
7
8pub async fn verify_active_user_follows(db: Arc<Database>) -> Result<()> {
9 info!("Starting follow verification for active users");
10
11 // Only verify follows for users who have accessed the feed in the last 7 days
12 let active_users = db.get_active_users(7).await?;
13 info!("Verifying follows for {} active users", active_users.len());
14
15 let client = reqwest::Client::new();
16
17 for user_did in active_users {
18 match verify_follows_for_user(&client, Arc::clone(&db), &user_did).await {
19 Ok(_) => {
20 // Record that we synced this user's follows
21 if let Err(e) = db.update_follow_sync(&user_did).await {
22 warn!(
23 "Failed to update follow sync timestamp for {}: {}",
24 user_did, e
25 );
26 }
27 }
28 Err(e) => {
29 warn!("Failed to verify follows for {}: {}", user_did, e);
30 }
31 }
32 }
33
34 info!("Follow verification completed");
35 Ok(())
36}
37
38pub async fn cleanup_inactive_user_follows(db: Arc<Database>) -> Result<()> {
39 info!("Starting cleanup of follows for inactive users");
40
41 // Get all unique follower DIDs from the follows table
42 let all_follower_dids: Vec<String> = sqlx::query("SELECT DISTINCT follower_did FROM follows")
43 .fetch_all(&db.pool)
44 .await?
45 .into_iter()
46 .filter_map(|row| row.try_get("follower_did").ok())
47 .collect();
48
49 info!("Found {} unique users with follows", all_follower_dids.len());
50
51 // Get active users (accessed feed in last 7 days)
52 let active_users = db.get_active_users(7).await?;
53 let active_user_set: std::collections::HashSet<String> =
54 active_users.into_iter().collect();
55
56 // Delete follows for users who are not active
57 let mut deleted_count = 0;
58 for follower_did in all_follower_dids {
59 if !active_user_set.contains(&follower_did) {
60 let result = sqlx::query("DELETE FROM follows WHERE follower_did = ?")
61 .bind(&follower_did)
62 .execute(&db.pool)
63 .await?;
64
65 deleted_count += result.rows_affected();
66 }
67 }
68
69 if deleted_count > 0 {
70 info!("Cleaned up {} follows from inactive users", deleted_count);
71 } else {
72 info!("No inactive user follows to clean up");
73 }
74
75 Ok(())
76}
77
78async fn verify_follows_for_user(
79 client: &reqwest::Client,
80 db: Arc<Database>,
81 user_did: &str,
82) -> Result<()> {
83 let mut cursor: Option<String> = None;
84 let mut current_follows = Vec::new();
85
86 loop {
87 let mut url = format!(
88 "https://public.api.bsky.app/xrpc/app.bsky.graph.getFollows?actor={}&limit=100",
89 user_did
90 );
91 if let Some(ref c) = cursor {
92 url.push_str(&format!("&cursor={}", c));
93 }
94
95 let response: serde_json::Value = match client.get(&url).send().await {
96 Ok(r) => r.json().await?,
97 Err(e) => {
98 warn!("Failed to fetch follows for {}: {}", user_did, e);
99 return Err(e.into());
100 }
101 };
102
103 let follows = response["follows"].as_array();
104 if follows.is_none() {
105 break;
106 }
107
108 for follow in follows.unwrap() {
109 if let Some(target_did) = follow["did"].as_str() {
110 current_follows.push(target_did.to_string());
111 }
112 }
113
114 cursor = response["cursor"].as_str().map(|s| s.to_string());
115 if cursor.is_none() {
116 break;
117 }
118 }
119
120 // Sync the database with current follows
121 db.sync_follows_for_user(user_did, current_follows).await?;
122
123 Ok(())
124}