Noreposts Feed
1use anyhow::Result;
2use sqlx::Row;
3use std::sync::Arc;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use tokio::net::{UnixListener, UnixStream};
6use tracing::{error, info, warn};
7
8use crate::{backfill, database::Database};
9
10pub struct AdminSocket {
11 db: Arc<Database>,
12 socket_path: String,
13}
14
15impl AdminSocket {
16 pub fn new(db: Arc<Database>, socket_path: String) -> Self {
17 Self { db, socket_path }
18 }
19
20 pub async fn start(&self) -> Result<()> {
21 // Remove old socket if it exists
22 let _ = std::fs::remove_file(&self.socket_path);
23
24 let listener = UnixListener::bind(&self.socket_path)?;
25 info!("Admin socket listening on {}", self.socket_path);
26
27 // Set socket permissions so anyone can connect
28 #[cfg(unix)]
29 {
30 use std::os::unix::fs::PermissionsExt;
31 let mut perms = std::fs::metadata(&self.socket_path)?.permissions();
32 perms.set_mode(0o666);
33 std::fs::set_permissions(&self.socket_path, perms)?;
34 }
35
36 loop {
37 match listener.accept().await {
38 Ok((stream, _)) => {
39 let db = Arc::clone(&self.db);
40 tokio::spawn(async move {
41 if let Err(e) = handle_connection(stream, db).await {
42 error!("Error handling admin connection: {}", e);
43 }
44 });
45 }
46 Err(e) => {
47 warn!("Failed to accept admin connection: {}", e);
48 }
49 }
50 }
51 }
52}
53
54async fn handle_connection(stream: UnixStream, db: Arc<Database>) -> Result<()> {
55 let (reader, mut writer) = stream.into_split();
56 let mut reader = BufReader::new(reader);
57 let mut line = String::new();
58
59 writer.write_all(b"Feed Generator Admin Console\n").await?;
60 writer
61 .write_all(b"Commands: backfill <did>, stats, help, quit\n> ")
62 .await?;
63 writer.flush().await?;
64
65 loop {
66 line.clear();
67 let bytes_read = reader.read_line(&mut line).await?;
68
69 if bytes_read == 0 {
70 break; // Connection closed
71 }
72
73 let command = line.trim();
74 if command.is_empty() {
75 writer.write_all(b"> ").await?;
76 writer.flush().await?;
77 continue;
78 }
79
80 let parts: Vec<&str> = command.split_whitespace().collect();
81
82 match parts.first().copied() {
83 Some("backfill") => {
84 if let Some(did) = parts.get(1) {
85 writer
86 .write_all(format!("Starting backfill for {}...\n", did).as_bytes())
87 .await?;
88 writer.flush().await?;
89
90 // First backfill follows
91 match backfill::backfill_follows(Arc::clone(&db), did).await {
92 Ok(_) => {
93 writer
94 .write_all(b"Follows backfilled successfully\n")
95 .await?;
96 }
97 Err(e) => {
98 writer
99 .write_all(format!("Follow backfill failed: {}\n", e).as_bytes())
100 .await?;
101 writer.write_all(b"> ").await?;
102 writer.flush().await?;
103 continue;
104 }
105 }
106
107 // Then backfill posts
108 writer.write_all(b"Starting post backfill...\n").await?;
109 writer.flush().await?;
110
111 match backfill::backfill_posts_for_follows(Arc::clone(&db), did, 10).await {
112 Ok(_) => {
113 writer.write_all(b"Posts backfilled successfully\n").await?;
114 }
115 Err(e) => {
116 writer
117 .write_all(format!("Post backfill failed: {}\n", e).as_bytes())
118 .await?;
119 }
120 }
121 } else {
122 writer.write_all(b"Usage: backfill <did>\n").await?;
123 }
124 }
125 Some("stats") => match get_stats(&db).await {
126 Ok(stats) => {
127 writer.write_all(stats.as_bytes()).await?;
128 }
129 Err(e) => {
130 writer
131 .write_all(format!("Failed to get stats: {}\n", e).as_bytes())
132 .await?;
133 }
134 },
135 Some("help") => {
136 writer.write_all(b"Available commands:\n").await?;
137 writer
138 .write_all(b" backfill <did> - Backfill follows and posts for a user\n")
139 .await?;
140 writer
141 .write_all(b" stats - Show database statistics\n")
142 .await?;
143 writer
144 .write_all(b" help - Show this help message\n")
145 .await?;
146 writer
147 .write_all(b" quit - Close connection\n")
148 .await?;
149 }
150 Some("quit") | Some("exit") => {
151 writer.write_all(b"Goodbye!\n").await?;
152 writer.flush().await?;
153 break;
154 }
155 _ => {
156 writer
157 .write_all(
158 format!(
159 "Unknown command: {}. Type 'help' for available commands.\n",
160 command
161 )
162 .as_bytes(),
163 )
164 .await?;
165 }
166 }
167
168 writer.write_all(b"> ").await?;
169 writer.flush().await?;
170 }
171
172 Ok(())
173}
174
175async fn get_stats(db: &Database) -> Result<String> {
176 let post_count: i64 = sqlx::query("SELECT COUNT(*) as count FROM posts")
177 .fetch_one(&db.pool)
178 .await?
179 .try_get("count")?;
180
181 let follow_count: i64 = sqlx::query("SELECT COUNT(*) as count FROM follows")
182 .fetch_one(&db.pool)
183 .await?
184 .try_get("count")?;
185
186 let user_count: i64 = sqlx::query("SELECT COUNT(DISTINCT follower_did) as count FROM follows")
187 .fetch_one(&db.pool)
188 .await?
189 .try_get("count")?;
190
191 Ok(format!(
192 "Database Statistics:\n Posts: {}\n Follows: {}\n Users: {}\n",
193 post_count, follow_count, user_count
194 ))
195}