Highly ambitious ATProtocol AppView service and sdks
at main 271 lines 8.3 kB view raw
1//! Redis cache implementation for distributed caching. 2//! 3//! Provides a production-ready Redis cache with connection pooling, 4//! error handling, and fallback behavior when Redis is unavailable. 5 6use super::Cache; 7use anyhow::Result; 8use async_trait::async_trait; 9use redis::aio::ConnectionManager; 10use redis::{AsyncCommands, Client}; 11use serde::{Deserialize, Serialize}; 12use tracing::{debug, error, warn}; 13 14/// Redis cache implementation using ConnectionManager for automatic reconnection. 15/// 16/// Designed to be resilient - cache operations fail gracefully when Redis is 17/// unavailable rather than breaking the application. 18pub struct RedisCache { 19 conn: ConnectionManager, 20 default_ttl_seconds: u64, 21} 22 23impl RedisCache { 24 /// Create a new Redis cache instance. 25 /// 26 /// # Arguments 27 /// * `redis_url` - Redis connection URL (e.g., "redis://localhost:6379") 28 /// * `default_ttl_seconds` - Default expiration time for cache entries (default: 3600) 29 /// 30 /// # Returns 31 /// Error if cannot establish initial connection to Redis 32 pub async fn new(redis_url: &str, default_ttl_seconds: Option<u64>) -> Result<Self> { 33 let client = Client::open(redis_url)?; 34 let conn = ConnectionManager::new(client).await?; 35 36 Ok(Self { 37 conn, 38 default_ttl_seconds: default_ttl_seconds.unwrap_or(3600), 39 }) 40 } 41 42 /// Get a value from Redis cache. 43 /// 44 /// Returns None if key doesn't exist, is expired, or cannot be deserialized. 45 /// Redis errors are logged but don't fail - returns None for cache miss. 46 pub async fn get_value<T>(&mut self, key: &str) -> Result<Option<T>> 47 where 48 T: for<'de> Deserialize<'de>, 49 { 50 match self.conn.get::<_, Option<String>>(key).await { 51 Ok(Some(value)) => match serde_json::from_str::<T>(&value) { 52 Ok(parsed) => Ok(Some(parsed)), 53 Err(e) => { 54 error!( 55 error = ?e, 56 cache_key = %key, 57 "Failed to deserialize cached value" 58 ); 59 let _ = self.conn.del::<_, ()>(key).await; 60 Ok(None) 61 } 62 }, 63 Ok(None) => Ok(None), 64 Err(e) => { 65 error!( 66 error = ?e, 67 cache_key = %key, 68 "Redis error during get" 69 ); 70 Ok(None) 71 } 72 } 73 } 74 75 /// Set a value in Redis cache with TTL. 76 /// 77 /// Uses SETEX for atomic set-with-expiration. 78 /// Failures are logged but don't return errors - cache is optional. 79 pub async fn set_value<T>( 80 &mut self, 81 key: &str, 82 value: &T, 83 ttl_seconds: Option<u64>, 84 ) -> Result<()> 85 where 86 T: Serialize, 87 { 88 let ttl = ttl_seconds.unwrap_or(self.default_ttl_seconds); 89 90 match serde_json::to_string(value) { 91 Ok(serialized) => match self.conn.set_ex::<_, _, ()>(key, serialized, ttl).await { 92 Ok(_) => { 93 debug!( 94 cache_key = %key, 95 ttl_seconds = ttl, 96 "Cached value in Redis" 97 ); 98 Ok(()) 99 } 100 Err(e) => { 101 error!( 102 error = ?e, 103 cache_key = %key, 104 "Failed to cache value in Redis" 105 ); 106 Ok(()) 107 } 108 }, 109 Err(e) => { 110 error!( 111 error = ?e, 112 cache_key = %key, 113 "Failed to serialize value for caching" 114 ); 115 Ok(()) 116 } 117 } 118 } 119 120 /// Check if a key exists in Redis. 121 pub async fn key_exists(&mut self, key: &str) -> Result<bool> { 122 match self.conn.exists(key).await { 123 Ok(exists) => { 124 debug!(cache_key = %key, exists = exists, "Redis exists check"); 125 Ok(exists) 126 } 127 Err(e) => { 128 error!( 129 error = ?e, 130 cache_key = %key, 131 "Redis error during exists check" 132 ); 133 Ok(false) 134 } 135 } 136 } 137 138 /// Delete a key from Redis cache. 139 pub async fn delete_key(&mut self, key: &str) -> Result<()> { 140 match self.conn.del::<_, ()>(key).await { 141 Ok(_) => { 142 debug!(cache_key = %key, "Deleted key from Redis cache"); 143 Ok(()) 144 } 145 Err(e) => { 146 error!( 147 error = ?e, 148 cache_key = %key, 149 "Failed to delete key from Redis cache" 150 ); 151 Ok(()) 152 } 153 } 154 } 155 156 /// Set multiple key-value pairs using pipelined commands for efficiency. 157 /// 158 /// Much faster than individual SET commands for bulk operations. 159 pub async fn set_multiple_values<T>( 160 &mut self, 161 items: Vec<(&str, &T, Option<u64>)>, 162 ) -> Result<()> 163 where 164 T: Serialize, 165 { 166 if items.is_empty() { 167 return Ok(()); 168 } 169 170 let mut pipe = redis::pipe(); 171 let mut serialization_errors = 0; 172 173 for (key, value, ttl) in &items { 174 match serde_json::to_string(value) { 175 Ok(serialized) => { 176 let ttl_to_use = ttl.unwrap_or(self.default_ttl_seconds); 177 pipe.set_ex(key, serialized, ttl_to_use); 178 } 179 Err(e) => { 180 error!( 181 error = ?e, 182 cache_key = %key, 183 "Failed to serialize value for bulk caching" 184 ); 185 serialization_errors += 1; 186 } 187 } 188 } 189 190 match pipe.query_async::<()>(&mut self.conn).await { 191 Ok(_) => { 192 debug!( 193 items_count = items.len() - serialization_errors, 194 serialization_errors = serialization_errors, 195 "Successfully bulk cached items in Redis" 196 ); 197 Ok(()) 198 } 199 Err(e) => { 200 error!( 201 error = ?e, 202 items_count = items.len(), 203 "Failed to bulk cache items in Redis" 204 ); 205 Ok(()) 206 } 207 } 208 } 209 210 /// Test Redis connection health. 211 pub async fn ping(&mut self) -> Result<bool> { 212 match self.conn.ping::<String>().await { 213 Ok(response) => Ok(response == "PONG"), 214 Err(e) => { 215 error!(error = ?e, "Redis ping failed"); 216 Ok(false) 217 } 218 } 219 } 220 221 /// Get Redis memory statistics for monitoring. 222 pub async fn get_info(&mut self) -> Result<String> { 223 match redis::cmd("INFO") 224 .arg("memory") 225 .query_async::<String>(&mut self.conn) 226 .await 227 { 228 Ok(info) => Ok(info), 229 Err(e) => { 230 warn!(error = ?e, "Failed to get Redis info"); 231 Ok("Redis info unavailable".to_string()) 232 } 233 } 234 } 235} 236 237#[async_trait] 238impl Cache for RedisCache { 239 async fn get<T>(&mut self, key: &str) -> Result<Option<T>> 240 where 241 T: for<'de> Deserialize<'de> + Send, 242 { 243 self.get_value(key).await 244 } 245 246 async fn set<T>(&mut self, key: &str, value: &T, ttl_seconds: Option<u64>) -> Result<()> 247 where 248 T: Serialize + Send + Sync, 249 { 250 self.set_value(key, value, ttl_seconds).await 251 } 252 253 async fn delete(&mut self, key: &str) -> Result<()> { 254 self.delete_key(key).await 255 } 256 257 async fn set_multiple<T>(&mut self, items: Vec<(&str, &T, Option<u64>)>) -> Result<()> 258 where 259 T: Serialize + Send + Sync, 260 { 261 self.set_multiple_values(items).await 262 } 263 264 async fn ping(&mut self) -> Result<bool> { 265 RedisCache::ping(self).await 266 } 267 268 async fn get_info(&mut self) -> Result<String> { 269 RedisCache::get_info(self).await 270 } 271}