//! Redis cache implementation for distributed caching. //! //! Provides a production-ready Redis cache with connection pooling, //! error handling, and fallback behavior when Redis is unavailable. use super::Cache; use anyhow::Result; use async_trait::async_trait; use redis::aio::ConnectionManager; use redis::{AsyncCommands, Client}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, warn}; /// Redis cache implementation using ConnectionManager for automatic reconnection. /// /// Designed to be resilient - cache operations fail gracefully when Redis is /// unavailable rather than breaking the application. pub struct RedisCache { conn: ConnectionManager, default_ttl_seconds: u64, } impl RedisCache { /// Create a new Redis cache instance. /// /// # Arguments /// * `redis_url` - Redis connection URL (e.g., "redis://localhost:6379") /// * `default_ttl_seconds` - Default expiration time for cache entries (default: 3600) /// /// # Returns /// Error if cannot establish initial connection to Redis pub async fn new(redis_url: &str, default_ttl_seconds: Option) -> Result { let client = Client::open(redis_url)?; let conn = ConnectionManager::new(client).await?; Ok(Self { conn, default_ttl_seconds: default_ttl_seconds.unwrap_or(3600), }) } /// Get a value from Redis cache. /// /// Returns None if key doesn't exist, is expired, or cannot be deserialized. /// Redis errors are logged but don't fail - returns None for cache miss. pub async fn get_value(&mut self, key: &str) -> Result> where T: for<'de> Deserialize<'de>, { match self.conn.get::<_, Option>(key).await { Ok(Some(value)) => match serde_json::from_str::(&value) { Ok(parsed) => Ok(Some(parsed)), Err(e) => { error!( error = ?e, cache_key = %key, "Failed to deserialize cached value" ); let _ = self.conn.del::<_, ()>(key).await; Ok(None) } }, Ok(None) => Ok(None), Err(e) => { error!( error = ?e, cache_key = %key, "Redis error during get" ); Ok(None) } } } /// Set a value in Redis cache with TTL. /// /// Uses SETEX for atomic set-with-expiration. /// Failures are logged but don't return errors - cache is optional. pub async fn set_value( &mut self, key: &str, value: &T, ttl_seconds: Option, ) -> Result<()> where T: Serialize, { let ttl = ttl_seconds.unwrap_or(self.default_ttl_seconds); match serde_json::to_string(value) { Ok(serialized) => match self.conn.set_ex::<_, _, ()>(key, serialized, ttl).await { Ok(_) => { debug!( cache_key = %key, ttl_seconds = ttl, "Cached value in Redis" ); Ok(()) } Err(e) => { error!( error = ?e, cache_key = %key, "Failed to cache value in Redis" ); Ok(()) } }, Err(e) => { error!( error = ?e, cache_key = %key, "Failed to serialize value for caching" ); Ok(()) } } } /// Check if a key exists in Redis. pub async fn key_exists(&mut self, key: &str) -> Result { match self.conn.exists(key).await { Ok(exists) => { debug!(cache_key = %key, exists = exists, "Redis exists check"); Ok(exists) } Err(e) => { error!( error = ?e, cache_key = %key, "Redis error during exists check" ); Ok(false) } } } /// Delete a key from Redis cache. pub async fn delete_key(&mut self, key: &str) -> Result<()> { match self.conn.del::<_, ()>(key).await { Ok(_) => { debug!(cache_key = %key, "Deleted key from Redis cache"); Ok(()) } Err(e) => { error!( error = ?e, cache_key = %key, "Failed to delete key from Redis cache" ); Ok(()) } } } /// Set multiple key-value pairs using pipelined commands for efficiency. /// /// Much faster than individual SET commands for bulk operations. pub async fn set_multiple_values( &mut self, items: Vec<(&str, &T, Option)>, ) -> Result<()> where T: Serialize, { if items.is_empty() { return Ok(()); } let mut pipe = redis::pipe(); let mut serialization_errors = 0; for (key, value, ttl) in &items { match serde_json::to_string(value) { Ok(serialized) => { let ttl_to_use = ttl.unwrap_or(self.default_ttl_seconds); pipe.set_ex(key, serialized, ttl_to_use); } Err(e) => { error!( error = ?e, cache_key = %key, "Failed to serialize value for bulk caching" ); serialization_errors += 1; } } } match pipe.query_async::<()>(&mut self.conn).await { Ok(_) => { debug!( items_count = items.len() - serialization_errors, serialization_errors = serialization_errors, "Successfully bulk cached items in Redis" ); Ok(()) } Err(e) => { error!( error = ?e, items_count = items.len(), "Failed to bulk cache items in Redis" ); Ok(()) } } } /// Test Redis connection health. pub async fn ping(&mut self) -> Result { match self.conn.ping::().await { Ok(response) => Ok(response == "PONG"), Err(e) => { error!(error = ?e, "Redis ping failed"); Ok(false) } } } /// Get Redis memory statistics for monitoring. pub async fn get_info(&mut self) -> Result { match redis::cmd("INFO") .arg("memory") .query_async::(&mut self.conn) .await { Ok(info) => Ok(info), Err(e) => { warn!(error = ?e, "Failed to get Redis info"); Ok("Redis info unavailable".to_string()) } } } } #[async_trait] impl Cache for RedisCache { async fn get(&mut self, key: &str) -> Result> where T: for<'de> Deserialize<'de> + Send, { self.get_value(key).await } async fn set(&mut self, key: &str, value: &T, ttl_seconds: Option) -> Result<()> where T: Serialize + Send + Sync, { self.set_value(key, value, ttl_seconds).await } async fn delete(&mut self, key: &str) -> Result<()> { self.delete_key(key).await } async fn set_multiple(&mut self, items: Vec<(&str, &T, Option)>) -> Result<()> where T: Serialize + Send + Sync, { self.set_multiple_values(items).await } async fn ping(&mut self) -> Result { RedisCache::ping(self).await } async fn get_info(&mut self) -> Result { RedisCache::get_info(self).await } }