The smokesignal.events web application
at main 235 lines 7.7 kB view raw
1use deadpool_redis::redis::AsyncCommands; 2use serde::{Deserialize, Serialize}; 3use sqlx::Row; 4use std::sync::Arc; 5use std::time::{Duration, Instant}; 6use thiserror::Error; 7use tokio::sync::RwLock; 8 9use crate::storage::types::{CachePool, StoragePool}; 10 11const CACHE_KEY: &str = "stats:network"; 12const CACHE_TTL_SECONDS: u64 = 600; // 10 minutes 13 14#[derive(Debug, Error)] 15#[allow(clippy::enum_variant_names)] 16pub(crate) enum StatsError { 17 #[error("error-smokesignal-stats-1 Database query failed: {0}")] 18 DatabaseError(String), 19 20 #[error("error-smokesignal-stats-2 Redis error: {0}")] 21 RedisError(String), 22 23 #[error("error-smokesignal-stats-3 JSON serialization error: {0}")] 24 SerializationError(String), 25} 26 27#[derive(Debug, Clone, Serialize, Deserialize)] 28pub(crate) struct NetworkStats { 29 pub event_count: i64, 30 pub rsvp_count: i64, 31 pub lfg_identities_count: i64, 32 pub lfg_locations_count: i64, 33} 34 35impl NetworkStats { 36 /// Format a number with thousands separator for human-friendly display 37 /// Examples: 1234 -> "1,234", 1234567 -> "1,234,567" 38 pub fn format_number(num: i64) -> String { 39 let num_str = num.to_string(); 40 let mut result = String::new(); 41 let chars: Vec<char> = num_str.chars().collect(); 42 43 for (i, ch) in chars.iter().enumerate() { 44 if i > 0 && (chars.len() - i).is_multiple_of(3) { 45 result.push(','); 46 } 47 result.push(*ch); 48 } 49 50 result 51 } 52} 53 54/// In-memory cache for stats when Redis is unavailable 55struct InMemoryCache { 56 stats: Option<NetworkStats>, 57 cached_at: Option<Instant>, 58} 59 60impl InMemoryCache { 61 fn new() -> Self { 62 Self { 63 stats: None, 64 cached_at: None, 65 } 66 } 67 68 fn get(&self) -> Option<NetworkStats> { 69 if let (Some(stats), Some(cached_at)) = (&self.stats, &self.cached_at) 70 && cached_at.elapsed() < Duration::from_secs(CACHE_TTL_SECONDS) 71 { 72 return Some(stats.clone()); 73 } 74 None 75 } 76 77 fn set(&mut self, stats: NetworkStats) { 78 self.stats = Some(stats); 79 self.cached_at = Some(Instant::now()); 80 } 81} 82 83/// Global in-memory cache fallback 84static IN_MEMORY_CACHE: once_cell::sync::Lazy<Arc<RwLock<InMemoryCache>>> = 85 once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(InMemoryCache::new()))); 86 87/// Query database for event, RSVP, and LFG counts 88async fn query_stats(pool: &StoragePool) -> Result<NetworkStats, StatsError> { 89 let row = sqlx::query( 90 r#" 91 SELECT 92 (SELECT COUNT(*) FROM events) as event_count, 93 (SELECT COUNT(*) FROM rsvps) as rsvp_count, 94 (SELECT COUNT(DISTINCT did) FROM atproto_records 95 WHERE collection = 'events.smokesignal.lfg' 96 AND (record->>'active')::boolean = true) as lfg_identities_count, 97 (SELECT COUNT(DISTINCT record->'location'->>'value') FROM atproto_records 98 WHERE collection = 'events.smokesignal.lfg' 99 AND (record->>'active')::boolean = true) as lfg_locations_count 100 "#, 101 ) 102 .fetch_one(pool) 103 .await 104 .map_err(|e| StatsError::DatabaseError(e.to_string()))?; 105 106 Ok(NetworkStats { 107 event_count: row 108 .try_get("event_count") 109 .map_err(|e| StatsError::DatabaseError(e.to_string()))?, 110 rsvp_count: row 111 .try_get("rsvp_count") 112 .map_err(|e| StatsError::DatabaseError(e.to_string()))?, 113 lfg_identities_count: row 114 .try_get("lfg_identities_count") 115 .map_err(|e| StatsError::DatabaseError(e.to_string()))?, 116 lfg_locations_count: row 117 .try_get("lfg_locations_count") 118 .map_err(|e| StatsError::DatabaseError(e.to_string()))?, 119 }) 120} 121 122/// Get stats from Redis cache 123async fn get_cached_stats(cache_pool: &CachePool) -> Result<Option<NetworkStats>, StatsError> { 124 let mut conn = cache_pool 125 .get() 126 .await 127 .map_err(|e| StatsError::RedisError(e.to_string()))?; 128 129 let cached: Option<String> = conn 130 .get(CACHE_KEY) 131 .await 132 .map_err(|e| StatsError::RedisError(e.to_string()))?; 133 134 match cached { 135 Some(json) => { 136 let stats: NetworkStats = serde_json::from_str(&json) 137 .map_err(|e| StatsError::SerializationError(e.to_string()))?; 138 Ok(Some(stats)) 139 } 140 None => Ok(None), 141 } 142} 143 144/// Set stats in Redis cache with TTL 145async fn set_cached_stats(cache_pool: &CachePool, stats: &NetworkStats) -> Result<(), StatsError> { 146 let json = 147 serde_json::to_string(stats).map_err(|e| StatsError::SerializationError(e.to_string()))?; 148 149 let mut conn = cache_pool 150 .get() 151 .await 152 .map_err(|e| StatsError::RedisError(e.to_string()))?; 153 154 let _: () = conn 155 .set_ex(CACHE_KEY, json, CACHE_TTL_SECONDS) 156 .await 157 .map_err(|e| StatsError::RedisError(e.to_string()))?; 158 159 Ok(()) 160} 161 162/// Get network statistics with caching 163/// 164/// This function retrieves event and RSVP counts with the following strategy: 165/// 1. Try to get from Redis cache 166/// 2. If Redis unavailable, try in-memory cache 167/// 3. If cache miss or expired, query database and update all caches 168/// 169/// Cache expires after 10 minutes. 170pub(crate) async fn get_network_stats( 171 pool: &StoragePool, 172 cache_pool: &CachePool, 173) -> Result<NetworkStats, StatsError> { 174 // Try Redis cache first 175 match get_cached_stats(cache_pool).await { 176 Ok(Some(stats)) => { 177 tracing::debug!("Cache hit for network stats (Redis)"); 178 return Ok(stats); 179 } 180 Ok(None) => { 181 tracing::debug!("Cache miss for network stats (Redis)"); 182 } 183 Err(e) => { 184 tracing::warn!("Redis unavailable for stats cache: {}", e); 185 186 // Try in-memory cache as fallback 187 let cache = IN_MEMORY_CACHE.read().await; 188 if let Some(stats) = cache.get() { 189 tracing::debug!("Cache hit for network stats (in-memory fallback)"); 190 return Ok(stats); 191 } 192 tracing::debug!("Cache miss for network stats (in-memory fallback)"); 193 } 194 } 195 196 // Cache miss or expired - query database 197 tracing::debug!("Querying database for network stats"); 198 let stats = query_stats(pool).await?; 199 200 // Update Redis cache (best effort) 201 if let Err(e) = set_cached_stats(cache_pool, &stats).await { 202 tracing::warn!("Failed to update Redis cache for stats: {}", e); 203 204 // Update in-memory cache as fallback 205 let mut cache = IN_MEMORY_CACHE.write().await; 206 cache.set(stats.clone()); 207 tracing::debug!("Updated in-memory cache for network stats"); 208 } else { 209 tracing::debug!("Updated Redis cache for network stats"); 210 211 // Also update in-memory cache for future Redis failures 212 let mut cache = IN_MEMORY_CACHE.write().await; 213 cache.set(stats.clone()); 214 } 215 216 Ok(stats) 217} 218 219#[cfg(test)] 220mod tests { 221 use super::*; 222 223 #[test] 224 fn test_format_number() { 225 assert_eq!(NetworkStats::format_number(0), "0"); 226 assert_eq!(NetworkStats::format_number(1), "1"); 227 assert_eq!(NetworkStats::format_number(12), "12"); 228 assert_eq!(NetworkStats::format_number(123), "123"); 229 assert_eq!(NetworkStats::format_number(1234), "1,234"); 230 assert_eq!(NetworkStats::format_number(12345), "12,345"); 231 assert_eq!(NetworkStats::format_number(123456), "123,456"); 232 assert_eq!(NetworkStats::format_number(1234567), "1,234,567"); 233 assert_eq!(NetworkStats::format_number(12345678), "12,345,678"); 234 } 235}