Noreposts Feed
at main 248 lines 7.6 kB view raw
1use anyhow::Result; 2use chrono::{DateTime, Utc}; 3use sqlx::{Row, SqlitePool}; 4 5use crate::types::{Follow, Post}; 6 7pub struct Database { 8 pub pool: SqlitePool, 9} 10 11impl Database { 12 pub async fn new(database_url: &str) -> Result<Self> { 13 let pool = SqlitePool::connect(database_url).await?; 14 15 // Enable WAL mode for better concurrency 16 sqlx::query("PRAGMA journal_mode=WAL;") 17 .execute(&pool) 18 .await?; 19 20 // Set busy timeout to 5 seconds 21 sqlx::query("PRAGMA busy_timeout=5000;") 22 .execute(&pool) 23 .await?; 24 25 Ok(Self { pool }) 26 } 27 28 pub async fn migrate(&self) -> Result<()> { 29 sqlx::migrate!("./migrations").run(&self.pool).await?; 30 Ok(()) 31 } 32 33 // Post operations 34 pub async fn insert_post(&self, post: &Post) -> Result<()> { 35 sqlx::query( 36 r#" 37 INSERT OR REPLACE INTO posts (uri, cid, author_did, text, created_at, indexed_at) 38 VALUES (?, ?, ?, ?, ?, ?) 39 "#, 40 ) 41 .bind(&post.uri) 42 .bind(&post.cid) 43 .bind(&post.author_did) 44 .bind(&post.text) 45 .bind(post.created_at.to_rfc3339()) 46 .bind(post.indexed_at.to_rfc3339()) 47 .execute(&self.pool) 48 .await?; 49 Ok(()) 50 } 51 52 pub async fn delete_post(&self, uri: &str) -> Result<()> { 53 sqlx::query("DELETE FROM posts WHERE uri = ?") 54 .bind(uri) 55 .execute(&self.pool) 56 .await?; 57 Ok(()) 58 } 59 60 // Follow operations 61 pub async fn insert_follow(&self, follow: &Follow) -> Result<()> { 62 sqlx::query( 63 r#" 64 INSERT OR REPLACE INTO follows (uri, follower_did, target_did, created_at, indexed_at) 65 VALUES (?, ?, ?, ?, ?) 66 "#, 67 ) 68 .bind(&follow.uri) 69 .bind(&follow.follower_did) 70 .bind(&follow.target_did) 71 .bind(follow.created_at.to_rfc3339()) 72 .bind(follow.indexed_at.to_rfc3339()) 73 .execute(&self.pool) 74 .await?; 75 Ok(()) 76 } 77 78 pub async fn delete_follow(&self, uri: &str) -> Result<()> { 79 sqlx::query("DELETE FROM follows WHERE uri = ?") 80 .bind(uri) 81 .execute(&self.pool) 82 .await?; 83 Ok(()) 84 } 85 86 // Feed generation queries 87 pub async fn get_following_posts( 88 &self, 89 follower_did: &str, 90 limit: i32, 91 cursor: Option<&str>, 92 ) -> Result<Vec<Post>> { 93 let cursor_time = cursor 94 .and_then(|c| DateTime::parse_from_rfc3339(c).ok()) 95 .map(|dt| dt.with_timezone(&Utc)) 96 .unwrap_or_else(Utc::now); 97 98 let rows = sqlx::query( 99 r#" 100 SELECT p.uri, p.cid, p.author_did, p.text, p.created_at, p.indexed_at 101 FROM posts p 102 INNER JOIN follows f ON f.target_did = p.author_did 103 WHERE f.follower_did = ? 104 AND p.created_at < ? 105 ORDER BY p.created_at DESC 106 LIMIT ? 107 "#, 108 ) 109 .bind(follower_did) 110 .bind(cursor_time.to_rfc3339()) 111 .bind(limit) 112 .fetch_all(&self.pool) 113 .await?; 114 115 let mut posts = Vec::new(); 116 for row in rows { 117 let uri: String = row.try_get("uri")?; 118 let cid: String = row.try_get("cid")?; 119 let author_did: String = row.try_get("author_did")?; 120 let text: String = row.try_get("text")?; 121 let created_at_str: String = row.try_get("created_at")?; 122 let indexed_at_str: String = row.try_get("indexed_at")?; 123 124 posts.push(Post { 125 uri, 126 cid, 127 author_did, 128 text, 129 created_at: DateTime::parse_from_rfc3339(&created_at_str)?.with_timezone(&Utc), 130 indexed_at: DateTime::parse_from_rfc3339(&indexed_at_str)?.with_timezone(&Utc), 131 }); 132 } 133 134 Ok(posts) 135 } 136 137 pub async fn cleanup_old_posts(&self, hours: i64) -> Result<()> { 138 let cutoff = Utc::now() - chrono::Duration::hours(hours); 139 let result = sqlx::query("DELETE FROM posts WHERE indexed_at < ?") 140 .bind(cutoff.to_rfc3339()) 141 .execute(&self.pool) 142 .await?; 143 144 let deleted = result.rows_affected(); 145 if deleted > 0 { 146 tracing::info!("Cleaned up {} posts older than {} hours", deleted, hours); 147 } 148 Ok(()) 149 } 150 151 pub async fn record_feed_request(&self, user_did: &str) -> Result<()> { 152 sqlx::query( 153 r#" 154 INSERT INTO active_users (did, last_feed_request) 155 VALUES (?, ?) 156 ON CONFLICT(did) DO UPDATE SET last_feed_request = excluded.last_feed_request 157 "#, 158 ) 159 .bind(user_did) 160 .bind(Utc::now().to_rfc3339()) 161 .execute(&self.pool) 162 .await?; 163 Ok(()) 164 } 165 166 pub async fn get_active_users(&self, days: i64) -> Result<Vec<String>> { 167 let cutoff = Utc::now() - chrono::Duration::days(days); 168 let rows = sqlx::query( 169 "SELECT did FROM active_users WHERE last_feed_request > ? ORDER BY last_feed_request DESC" 170 ) 171 .bind(cutoff.to_rfc3339()) 172 .fetch_all(&self.pool) 173 .await?; 174 175 let dids: Vec<String> = rows 176 .into_iter() 177 .filter_map(|row| row.try_get("did").ok()) 178 .collect(); 179 180 Ok(dids) 181 } 182 183 pub async fn update_follow_sync(&self, user_did: &str) -> Result<()> { 184 sqlx::query("UPDATE active_users SET last_follow_sync = ? WHERE did = ?") 185 .bind(Utc::now().to_rfc3339()) 186 .bind(user_did) 187 .execute(&self.pool) 188 .await?; 189 Ok(()) 190 } 191 192 pub async fn sync_follows_for_user( 193 &self, 194 user_did: &str, 195 current_target_dids: Vec<String>, 196 ) -> Result<()> { 197 // Get all follows for this user in our database 198 let rows = sqlx::query("SELECT target_did FROM follows WHERE follower_did = ?") 199 .bind(user_did) 200 .fetch_all(&self.pool) 201 .await?; 202 203 let db_target_dids: Vec<String> = rows 204 .into_iter() 205 .filter_map(|row| row.try_get("target_did").ok()) 206 .collect(); 207 208 // Find follows in database that no longer exist in current follows 209 let mut removed_count = 0; 210 for db_target in &db_target_dids { 211 if !current_target_dids.contains(db_target) { 212 // This follow no longer exists, remove it 213 sqlx::query("DELETE FROM follows WHERE follower_did = ? AND target_did = ?") 214 .bind(user_did) 215 .bind(db_target) 216 .execute(&self.pool) 217 .await?; 218 removed_count += 1; 219 tracing::info!("Removed stale follow: {} -> {}", user_did, db_target); 220 } 221 } 222 223 if removed_count > 0 { 224 tracing::info!( 225 "Cleaned up {} stale follows for {}", 226 removed_count, 227 user_did 228 ); 229 } 230 231 Ok(()) 232 } 233 234 // Unused but kept for potential future use 235 #[allow(dead_code)] 236 pub async fn is_following(&self, follower_did: &str, target_did: &str) -> Result<bool> { 237 let row = sqlx::query( 238 "SELECT COUNT(*) as count FROM follows WHERE follower_did = ? AND target_did = ?", 239 ) 240 .bind(follower_did) 241 .bind(target_did) 242 .fetch_one(&self.pool) 243 .await?; 244 245 let count: i64 = row.try_get("count")?; 246 Ok(count > 0) 247 } 248}