The smokesignal.events web application
1use deadpool_redis::redis::AsyncCommands;
2use serde::{Deserialize, Serialize};
3use sqlx::Row;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use thiserror::Error;
7use tokio::sync::RwLock;
8
9use crate::storage::types::{CachePool, StoragePool};
10
11const CACHE_KEY: &str = "stats:network";
12const CACHE_TTL_SECONDS: u64 = 600; // 10 minutes
13
14#[derive(Debug, Error)]
15#[allow(clippy::enum_variant_names)]
16pub(crate) enum StatsError {
17 #[error("error-smokesignal-stats-1 Database query failed: {0}")]
18 DatabaseError(String),
19
20 #[error("error-smokesignal-stats-2 Redis error: {0}")]
21 RedisError(String),
22
23 #[error("error-smokesignal-stats-3 JSON serialization error: {0}")]
24 SerializationError(String),
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub(crate) struct NetworkStats {
29 pub event_count: i64,
30 pub rsvp_count: i64,
31 pub lfg_identities_count: i64,
32 pub lfg_locations_count: i64,
33}
34
35impl NetworkStats {
36 /// Format a number with thousands separator for human-friendly display
37 /// Examples: 1234 -> "1,234", 1234567 -> "1,234,567"
38 pub fn format_number(num: i64) -> String {
39 let num_str = num.to_string();
40 let mut result = String::new();
41 let chars: Vec<char> = num_str.chars().collect();
42
43 for (i, ch) in chars.iter().enumerate() {
44 if i > 0 && (chars.len() - i).is_multiple_of(3) {
45 result.push(',');
46 }
47 result.push(*ch);
48 }
49
50 result
51 }
52}
53
54/// In-memory cache for stats when Redis is unavailable
55struct InMemoryCache {
56 stats: Option<NetworkStats>,
57 cached_at: Option<Instant>,
58}
59
60impl InMemoryCache {
61 fn new() -> Self {
62 Self {
63 stats: None,
64 cached_at: None,
65 }
66 }
67
68 fn get(&self) -> Option<NetworkStats> {
69 if let (Some(stats), Some(cached_at)) = (&self.stats, &self.cached_at)
70 && cached_at.elapsed() < Duration::from_secs(CACHE_TTL_SECONDS)
71 {
72 return Some(stats.clone());
73 }
74 None
75 }
76
77 fn set(&mut self, stats: NetworkStats) {
78 self.stats = Some(stats);
79 self.cached_at = Some(Instant::now());
80 }
81}
82
83/// Global in-memory cache fallback
84static IN_MEMORY_CACHE: once_cell::sync::Lazy<Arc<RwLock<InMemoryCache>>> =
85 once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(InMemoryCache::new())));
86
87/// Query database for event, RSVP, and LFG counts
88async fn query_stats(pool: &StoragePool) -> Result<NetworkStats, StatsError> {
89 let row = sqlx::query(
90 r#"
91 SELECT
92 (SELECT COUNT(*) FROM events) as event_count,
93 (SELECT COUNT(*) FROM rsvps) as rsvp_count,
94 (SELECT COUNT(DISTINCT did) FROM atproto_records
95 WHERE collection = 'events.smokesignal.lfg'
96 AND (record->>'active')::boolean = true) as lfg_identities_count,
97 (SELECT COUNT(DISTINCT record->'location'->>'value') FROM atproto_records
98 WHERE collection = 'events.smokesignal.lfg'
99 AND (record->>'active')::boolean = true) as lfg_locations_count
100 "#,
101 )
102 .fetch_one(pool)
103 .await
104 .map_err(|e| StatsError::DatabaseError(e.to_string()))?;
105
106 Ok(NetworkStats {
107 event_count: row
108 .try_get("event_count")
109 .map_err(|e| StatsError::DatabaseError(e.to_string()))?,
110 rsvp_count: row
111 .try_get("rsvp_count")
112 .map_err(|e| StatsError::DatabaseError(e.to_string()))?,
113 lfg_identities_count: row
114 .try_get("lfg_identities_count")
115 .map_err(|e| StatsError::DatabaseError(e.to_string()))?,
116 lfg_locations_count: row
117 .try_get("lfg_locations_count")
118 .map_err(|e| StatsError::DatabaseError(e.to_string()))?,
119 })
120}
121
122/// Get stats from Redis cache
123async fn get_cached_stats(cache_pool: &CachePool) -> Result<Option<NetworkStats>, StatsError> {
124 let mut conn = cache_pool
125 .get()
126 .await
127 .map_err(|e| StatsError::RedisError(e.to_string()))?;
128
129 let cached: Option<String> = conn
130 .get(CACHE_KEY)
131 .await
132 .map_err(|e| StatsError::RedisError(e.to_string()))?;
133
134 match cached {
135 Some(json) => {
136 let stats: NetworkStats = serde_json::from_str(&json)
137 .map_err(|e| StatsError::SerializationError(e.to_string()))?;
138 Ok(Some(stats))
139 }
140 None => Ok(None),
141 }
142}
143
144/// Set stats in Redis cache with TTL
145async fn set_cached_stats(cache_pool: &CachePool, stats: &NetworkStats) -> Result<(), StatsError> {
146 let json =
147 serde_json::to_string(stats).map_err(|e| StatsError::SerializationError(e.to_string()))?;
148
149 let mut conn = cache_pool
150 .get()
151 .await
152 .map_err(|e| StatsError::RedisError(e.to_string()))?;
153
154 let _: () = conn
155 .set_ex(CACHE_KEY, json, CACHE_TTL_SECONDS)
156 .await
157 .map_err(|e| StatsError::RedisError(e.to_string()))?;
158
159 Ok(())
160}
161
162/// Get network statistics with caching
163///
164/// This function retrieves event and RSVP counts with the following strategy:
165/// 1. Try to get from Redis cache
166/// 2. If Redis unavailable, try in-memory cache
167/// 3. If cache miss or expired, query database and update all caches
168///
169/// Cache expires after 10 minutes.
170pub(crate) async fn get_network_stats(
171 pool: &StoragePool,
172 cache_pool: &CachePool,
173) -> Result<NetworkStats, StatsError> {
174 // Try Redis cache first
175 match get_cached_stats(cache_pool).await {
176 Ok(Some(stats)) => {
177 tracing::debug!("Cache hit for network stats (Redis)");
178 return Ok(stats);
179 }
180 Ok(None) => {
181 tracing::debug!("Cache miss for network stats (Redis)");
182 }
183 Err(e) => {
184 tracing::warn!("Redis unavailable for stats cache: {}", e);
185
186 // Try in-memory cache as fallback
187 let cache = IN_MEMORY_CACHE.read().await;
188 if let Some(stats) = cache.get() {
189 tracing::debug!("Cache hit for network stats (in-memory fallback)");
190 return Ok(stats);
191 }
192 tracing::debug!("Cache miss for network stats (in-memory fallback)");
193 }
194 }
195
196 // Cache miss or expired - query database
197 tracing::debug!("Querying database for network stats");
198 let stats = query_stats(pool).await?;
199
200 // Update Redis cache (best effort)
201 if let Err(e) = set_cached_stats(cache_pool, &stats).await {
202 tracing::warn!("Failed to update Redis cache for stats: {}", e);
203
204 // Update in-memory cache as fallback
205 let mut cache = IN_MEMORY_CACHE.write().await;
206 cache.set(stats.clone());
207 tracing::debug!("Updated in-memory cache for network stats");
208 } else {
209 tracing::debug!("Updated Redis cache for network stats");
210
211 // Also update in-memory cache for future Redis failures
212 let mut cache = IN_MEMORY_CACHE.write().await;
213 cache.set(stats.clone());
214 }
215
216 Ok(stats)
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_format_number() {
225 assert_eq!(NetworkStats::format_number(0), "0");
226 assert_eq!(NetworkStats::format_number(1), "1");
227 assert_eq!(NetworkStats::format_number(12), "12");
228 assert_eq!(NetworkStats::format_number(123), "123");
229 assert_eq!(NetworkStats::format_number(1234), "1,234");
230 assert_eq!(NetworkStats::format_number(12345), "12,345");
231 assert_eq!(NetworkStats::format_number(123456), "123,456");
232 assert_eq!(NetworkStats::format_number(1234567), "1,234,567");
233 assert_eq!(NetworkStats::format_number(12345678), "12,345,678");
234 }
235}