Noreposts Feed
at main 195 lines 6.7 kB view raw
1use anyhow::Result; 2use sqlx::Row; 3use std::sync::Arc; 4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 5use tokio::net::{UnixListener, UnixStream}; 6use tracing::{error, info, warn}; 7 8use crate::{backfill, database::Database}; 9 10pub struct AdminSocket { 11 db: Arc<Database>, 12 socket_path: String, 13} 14 15impl AdminSocket { 16 pub fn new(db: Arc<Database>, socket_path: String) -> Self { 17 Self { db, socket_path } 18 } 19 20 pub async fn start(&self) -> Result<()> { 21 // Remove old socket if it exists 22 let _ = std::fs::remove_file(&self.socket_path); 23 24 let listener = UnixListener::bind(&self.socket_path)?; 25 info!("Admin socket listening on {}", self.socket_path); 26 27 // Set socket permissions so anyone can connect 28 #[cfg(unix)] 29 { 30 use std::os::unix::fs::PermissionsExt; 31 let mut perms = std::fs::metadata(&self.socket_path)?.permissions(); 32 perms.set_mode(0o666); 33 std::fs::set_permissions(&self.socket_path, perms)?; 34 } 35 36 loop { 37 match listener.accept().await { 38 Ok((stream, _)) => { 39 let db = Arc::clone(&self.db); 40 tokio::spawn(async move { 41 if let Err(e) = handle_connection(stream, db).await { 42 error!("Error handling admin connection: {}", e); 43 } 44 }); 45 } 46 Err(e) => { 47 warn!("Failed to accept admin connection: {}", e); 48 } 49 } 50 } 51 } 52} 53 54async fn handle_connection(stream: UnixStream, db: Arc<Database>) -> Result<()> { 55 let (reader, mut writer) = stream.into_split(); 56 let mut reader = BufReader::new(reader); 57 let mut line = String::new(); 58 59 writer.write_all(b"Feed Generator Admin Console\n").await?; 60 writer 61 .write_all(b"Commands: backfill <did>, stats, help, quit\n> ") 62 .await?; 63 writer.flush().await?; 64 65 loop { 66 line.clear(); 67 let bytes_read = reader.read_line(&mut line).await?; 68 69 if bytes_read == 0 { 70 break; // Connection closed 71 } 72 73 let command = line.trim(); 74 if command.is_empty() { 75 writer.write_all(b"> ").await?; 76 writer.flush().await?; 77 continue; 78 } 79 80 let parts: Vec<&str> = command.split_whitespace().collect(); 81 82 match parts.first().copied() { 83 Some("backfill") => { 84 if let Some(did) = parts.get(1) { 85 writer 86 .write_all(format!("Starting backfill for {}...\n", did).as_bytes()) 87 .await?; 88 writer.flush().await?; 89 90 // First backfill follows 91 match backfill::backfill_follows(Arc::clone(&db), did).await { 92 Ok(_) => { 93 writer 94 .write_all(b"Follows backfilled successfully\n") 95 .await?; 96 } 97 Err(e) => { 98 writer 99 .write_all(format!("Follow backfill failed: {}\n", e).as_bytes()) 100 .await?; 101 writer.write_all(b"> ").await?; 102 writer.flush().await?; 103 continue; 104 } 105 } 106 107 // Then backfill posts 108 writer.write_all(b"Starting post backfill...\n").await?; 109 writer.flush().await?; 110 111 match backfill::backfill_posts_for_follows(Arc::clone(&db), did, 10).await { 112 Ok(_) => { 113 writer.write_all(b"Posts backfilled successfully\n").await?; 114 } 115 Err(e) => { 116 writer 117 .write_all(format!("Post backfill failed: {}\n", e).as_bytes()) 118 .await?; 119 } 120 } 121 } else { 122 writer.write_all(b"Usage: backfill <did>\n").await?; 123 } 124 } 125 Some("stats") => match get_stats(&db).await { 126 Ok(stats) => { 127 writer.write_all(stats.as_bytes()).await?; 128 } 129 Err(e) => { 130 writer 131 .write_all(format!("Failed to get stats: {}\n", e).as_bytes()) 132 .await?; 133 } 134 }, 135 Some("help") => { 136 writer.write_all(b"Available commands:\n").await?; 137 writer 138 .write_all(b" backfill <did> - Backfill follows and posts for a user\n") 139 .await?; 140 writer 141 .write_all(b" stats - Show database statistics\n") 142 .await?; 143 writer 144 .write_all(b" help - Show this help message\n") 145 .await?; 146 writer 147 .write_all(b" quit - Close connection\n") 148 .await?; 149 } 150 Some("quit") | Some("exit") => { 151 writer.write_all(b"Goodbye!\n").await?; 152 writer.flush().await?; 153 break; 154 } 155 _ => { 156 writer 157 .write_all( 158 format!( 159 "Unknown command: {}. Type 'help' for available commands.\n", 160 command 161 ) 162 .as_bytes(), 163 ) 164 .await?; 165 } 166 } 167 168 writer.write_all(b"> ").await?; 169 writer.flush().await?; 170 } 171 172 Ok(()) 173} 174 175async fn get_stats(db: &Database) -> Result<String> { 176 let post_count: i64 = sqlx::query("SELECT COUNT(*) as count FROM posts") 177 .fetch_one(&db.pool) 178 .await? 179 .try_get("count")?; 180 181 let follow_count: i64 = sqlx::query("SELECT COUNT(*) as count FROM follows") 182 .fetch_one(&db.pool) 183 .await? 184 .try_get("count")?; 185 186 let user_count: i64 = sqlx::query("SELECT COUNT(DISTINCT follower_did) as count FROM follows") 187 .fetch_one(&db.pool) 188 .await? 189 .try_get("count")?; 190 191 Ok(format!( 192 "Database Statistics:\n Posts: {}\n Follows: {}\n Users: {}\n", 193 post_count, follow_count, user_count 194 )) 195}