Noreposts Feed
1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use sqlx::{Row, SqlitePool};
4
5use crate::types::{Follow, Post};
6
7pub struct Database {
8 pub pool: SqlitePool,
9}
10
11impl Database {
12 pub async fn new(database_url: &str) -> Result<Self> {
13 let pool = SqlitePool::connect(database_url).await?;
14
15 // Enable WAL mode for better concurrency
16 sqlx::query("PRAGMA journal_mode=WAL;")
17 .execute(&pool)
18 .await?;
19
20 // Set busy timeout to 5 seconds
21 sqlx::query("PRAGMA busy_timeout=5000;")
22 .execute(&pool)
23 .await?;
24
25 Ok(Self { pool })
26 }
27
28 pub async fn migrate(&self) -> Result<()> {
29 sqlx::migrate!("./migrations").run(&self.pool).await?;
30 Ok(())
31 }
32
33 // Post operations
34 pub async fn insert_post(&self, post: &Post) -> Result<()> {
35 sqlx::query(
36 r#"
37 INSERT OR REPLACE INTO posts (uri, cid, author_did, text, created_at, indexed_at)
38 VALUES (?, ?, ?, ?, ?, ?)
39 "#,
40 )
41 .bind(&post.uri)
42 .bind(&post.cid)
43 .bind(&post.author_did)
44 .bind(&post.text)
45 .bind(post.created_at.to_rfc3339())
46 .bind(post.indexed_at.to_rfc3339())
47 .execute(&self.pool)
48 .await?;
49 Ok(())
50 }
51
52 pub async fn delete_post(&self, uri: &str) -> Result<()> {
53 sqlx::query("DELETE FROM posts WHERE uri = ?")
54 .bind(uri)
55 .execute(&self.pool)
56 .await?;
57 Ok(())
58 }
59
60 // Follow operations
61 pub async fn insert_follow(&self, follow: &Follow) -> Result<()> {
62 sqlx::query(
63 r#"
64 INSERT OR REPLACE INTO follows (uri, follower_did, target_did, created_at, indexed_at)
65 VALUES (?, ?, ?, ?, ?)
66 "#,
67 )
68 .bind(&follow.uri)
69 .bind(&follow.follower_did)
70 .bind(&follow.target_did)
71 .bind(follow.created_at.to_rfc3339())
72 .bind(follow.indexed_at.to_rfc3339())
73 .execute(&self.pool)
74 .await?;
75 Ok(())
76 }
77
78 pub async fn delete_follow(&self, uri: &str) -> Result<()> {
79 sqlx::query("DELETE FROM follows WHERE uri = ?")
80 .bind(uri)
81 .execute(&self.pool)
82 .await?;
83 Ok(())
84 }
85
86 // Feed generation queries
87 pub async fn get_following_posts(
88 &self,
89 follower_did: &str,
90 limit: i32,
91 cursor: Option<&str>,
92 ) -> Result<Vec<Post>> {
93 let cursor_time = cursor
94 .and_then(|c| DateTime::parse_from_rfc3339(c).ok())
95 .map(|dt| dt.with_timezone(&Utc))
96 .unwrap_or_else(Utc::now);
97
98 let rows = sqlx::query(
99 r#"
100 SELECT p.uri, p.cid, p.author_did, p.text, p.created_at, p.indexed_at
101 FROM posts p
102 INNER JOIN follows f ON f.target_did = p.author_did
103 WHERE f.follower_did = ?
104 AND p.created_at < ?
105 ORDER BY p.created_at DESC
106 LIMIT ?
107 "#,
108 )
109 .bind(follower_did)
110 .bind(cursor_time.to_rfc3339())
111 .bind(limit)
112 .fetch_all(&self.pool)
113 .await?;
114
115 let mut posts = Vec::new();
116 for row in rows {
117 let uri: String = row.try_get("uri")?;
118 let cid: String = row.try_get("cid")?;
119 let author_did: String = row.try_get("author_did")?;
120 let text: String = row.try_get("text")?;
121 let created_at_str: String = row.try_get("created_at")?;
122 let indexed_at_str: String = row.try_get("indexed_at")?;
123
124 posts.push(Post {
125 uri,
126 cid,
127 author_did,
128 text,
129 created_at: DateTime::parse_from_rfc3339(&created_at_str)?.with_timezone(&Utc),
130 indexed_at: DateTime::parse_from_rfc3339(&indexed_at_str)?.with_timezone(&Utc),
131 });
132 }
133
134 Ok(posts)
135 }
136
137 pub async fn cleanup_old_posts(&self, hours: i64) -> Result<()> {
138 let cutoff = Utc::now() - chrono::Duration::hours(hours);
139 let result = sqlx::query("DELETE FROM posts WHERE indexed_at < ?")
140 .bind(cutoff.to_rfc3339())
141 .execute(&self.pool)
142 .await?;
143
144 let deleted = result.rows_affected();
145 if deleted > 0 {
146 tracing::info!("Cleaned up {} posts older than {} hours", deleted, hours);
147 }
148 Ok(())
149 }
150
151 pub async fn record_feed_request(&self, user_did: &str) -> Result<()> {
152 sqlx::query(
153 r#"
154 INSERT INTO active_users (did, last_feed_request)
155 VALUES (?, ?)
156 ON CONFLICT(did) DO UPDATE SET last_feed_request = excluded.last_feed_request
157 "#,
158 )
159 .bind(user_did)
160 .bind(Utc::now().to_rfc3339())
161 .execute(&self.pool)
162 .await?;
163 Ok(())
164 }
165
166 pub async fn get_active_users(&self, days: i64) -> Result<Vec<String>> {
167 let cutoff = Utc::now() - chrono::Duration::days(days);
168 let rows = sqlx::query(
169 "SELECT did FROM active_users WHERE last_feed_request > ? ORDER BY last_feed_request DESC"
170 )
171 .bind(cutoff.to_rfc3339())
172 .fetch_all(&self.pool)
173 .await?;
174
175 let dids: Vec<String> = rows
176 .into_iter()
177 .filter_map(|row| row.try_get("did").ok())
178 .collect();
179
180 Ok(dids)
181 }
182
183 pub async fn update_follow_sync(&self, user_did: &str) -> Result<()> {
184 sqlx::query("UPDATE active_users SET last_follow_sync = ? WHERE did = ?")
185 .bind(Utc::now().to_rfc3339())
186 .bind(user_did)
187 .execute(&self.pool)
188 .await?;
189 Ok(())
190 }
191
192 pub async fn sync_follows_for_user(
193 &self,
194 user_did: &str,
195 current_target_dids: Vec<String>,
196 ) -> Result<()> {
197 // Get all follows for this user in our database
198 let rows = sqlx::query("SELECT target_did FROM follows WHERE follower_did = ?")
199 .bind(user_did)
200 .fetch_all(&self.pool)
201 .await?;
202
203 let db_target_dids: Vec<String> = rows
204 .into_iter()
205 .filter_map(|row| row.try_get("target_did").ok())
206 .collect();
207
208 // Find follows in database that no longer exist in current follows
209 let mut removed_count = 0;
210 for db_target in &db_target_dids {
211 if !current_target_dids.contains(db_target) {
212 // This follow no longer exists, remove it
213 sqlx::query("DELETE FROM follows WHERE follower_did = ? AND target_did = ?")
214 .bind(user_did)
215 .bind(db_target)
216 .execute(&self.pool)
217 .await?;
218 removed_count += 1;
219 tracing::info!("Removed stale follow: {} -> {}", user_did, db_target);
220 }
221 }
222
223 if removed_count > 0 {
224 tracing::info!(
225 "Cleaned up {} stale follows for {}",
226 removed_count,
227 user_did
228 );
229 }
230
231 Ok(())
232 }
233
234 // Unused but kept for potential future use
235 #[allow(dead_code)]
236 pub async fn is_following(&self, follower_did: &str, target_did: &str) -> Result<bool> {
237 let row = sqlx::query(
238 "SELECT COUNT(*) as count FROM follows WHERE follower_did = ? AND target_did = ?",
239 )
240 .bind(follower_did)
241 .bind(target_did)
242 .fetch_one(&self.pool)
243 .await?;
244
245 let count: i64 = row.try_get("count")?;
246 Ok(count > 0)
247 }
248}