Highly ambitious ATProtocol AppView service and sdks
1//! Redis cache implementation for distributed caching.
2//!
3//! Provides a production-ready Redis cache with connection pooling,
4//! error handling, and fallback behavior when Redis is unavailable.
5
6use super::Cache;
7use anyhow::Result;
8use async_trait::async_trait;
9use redis::aio::ConnectionManager;
10use redis::{AsyncCommands, Client};
11use serde::{Deserialize, Serialize};
12use tracing::{debug, error, warn};
13
14/// Redis cache implementation using ConnectionManager for automatic reconnection.
15///
16/// Designed to be resilient - cache operations fail gracefully when Redis is
17/// unavailable rather than breaking the application.
18pub struct RedisCache {
19 conn: ConnectionManager,
20 default_ttl_seconds: u64,
21}
22
23impl RedisCache {
24 /// Create a new Redis cache instance.
25 ///
26 /// # Arguments
27 /// * `redis_url` - Redis connection URL (e.g., "redis://localhost:6379")
28 /// * `default_ttl_seconds` - Default expiration time for cache entries (default: 3600)
29 ///
30 /// # Returns
31 /// Error if cannot establish initial connection to Redis
32 pub async fn new(redis_url: &str, default_ttl_seconds: Option<u64>) -> Result<Self> {
33 let client = Client::open(redis_url)?;
34 let conn = ConnectionManager::new(client).await?;
35
36 Ok(Self {
37 conn,
38 default_ttl_seconds: default_ttl_seconds.unwrap_or(3600),
39 })
40 }
41
42 /// Get a value from Redis cache.
43 ///
44 /// Returns None if key doesn't exist, is expired, or cannot be deserialized.
45 /// Redis errors are logged but don't fail - returns None for cache miss.
46 pub async fn get_value<T>(&mut self, key: &str) -> Result<Option<T>>
47 where
48 T: for<'de> Deserialize<'de>,
49 {
50 match self.conn.get::<_, Option<String>>(key).await {
51 Ok(Some(value)) => match serde_json::from_str::<T>(&value) {
52 Ok(parsed) => Ok(Some(parsed)),
53 Err(e) => {
54 error!(
55 error = ?e,
56 cache_key = %key,
57 "Failed to deserialize cached value"
58 );
59 let _ = self.conn.del::<_, ()>(key).await;
60 Ok(None)
61 }
62 },
63 Ok(None) => Ok(None),
64 Err(e) => {
65 error!(
66 error = ?e,
67 cache_key = %key,
68 "Redis error during get"
69 );
70 Ok(None)
71 }
72 }
73 }
74
75 /// Set a value in Redis cache with TTL.
76 ///
77 /// Uses SETEX for atomic set-with-expiration.
78 /// Failures are logged but don't return errors - cache is optional.
79 pub async fn set_value<T>(
80 &mut self,
81 key: &str,
82 value: &T,
83 ttl_seconds: Option<u64>,
84 ) -> Result<()>
85 where
86 T: Serialize,
87 {
88 let ttl = ttl_seconds.unwrap_or(self.default_ttl_seconds);
89
90 match serde_json::to_string(value) {
91 Ok(serialized) => match self.conn.set_ex::<_, _, ()>(key, serialized, ttl).await {
92 Ok(_) => {
93 debug!(
94 cache_key = %key,
95 ttl_seconds = ttl,
96 "Cached value in Redis"
97 );
98 Ok(())
99 }
100 Err(e) => {
101 error!(
102 error = ?e,
103 cache_key = %key,
104 "Failed to cache value in Redis"
105 );
106 Ok(())
107 }
108 },
109 Err(e) => {
110 error!(
111 error = ?e,
112 cache_key = %key,
113 "Failed to serialize value for caching"
114 );
115 Ok(())
116 }
117 }
118 }
119
120 /// Check if a key exists in Redis.
121 pub async fn key_exists(&mut self, key: &str) -> Result<bool> {
122 match self.conn.exists(key).await {
123 Ok(exists) => {
124 debug!(cache_key = %key, exists = exists, "Redis exists check");
125 Ok(exists)
126 }
127 Err(e) => {
128 error!(
129 error = ?e,
130 cache_key = %key,
131 "Redis error during exists check"
132 );
133 Ok(false)
134 }
135 }
136 }
137
138 /// Delete a key from Redis cache.
139 pub async fn delete_key(&mut self, key: &str) -> Result<()> {
140 match self.conn.del::<_, ()>(key).await {
141 Ok(_) => {
142 debug!(cache_key = %key, "Deleted key from Redis cache");
143 Ok(())
144 }
145 Err(e) => {
146 error!(
147 error = ?e,
148 cache_key = %key,
149 "Failed to delete key from Redis cache"
150 );
151 Ok(())
152 }
153 }
154 }
155
156 /// Set multiple key-value pairs using pipelined commands for efficiency.
157 ///
158 /// Much faster than individual SET commands for bulk operations.
159 pub async fn set_multiple_values<T>(
160 &mut self,
161 items: Vec<(&str, &T, Option<u64>)>,
162 ) -> Result<()>
163 where
164 T: Serialize,
165 {
166 if items.is_empty() {
167 return Ok(());
168 }
169
170 let mut pipe = redis::pipe();
171 let mut serialization_errors = 0;
172
173 for (key, value, ttl) in &items {
174 match serde_json::to_string(value) {
175 Ok(serialized) => {
176 let ttl_to_use = ttl.unwrap_or(self.default_ttl_seconds);
177 pipe.set_ex(key, serialized, ttl_to_use);
178 }
179 Err(e) => {
180 error!(
181 error = ?e,
182 cache_key = %key,
183 "Failed to serialize value for bulk caching"
184 );
185 serialization_errors += 1;
186 }
187 }
188 }
189
190 match pipe.query_async::<()>(&mut self.conn).await {
191 Ok(_) => {
192 debug!(
193 items_count = items.len() - serialization_errors,
194 serialization_errors = serialization_errors,
195 "Successfully bulk cached items in Redis"
196 );
197 Ok(())
198 }
199 Err(e) => {
200 error!(
201 error = ?e,
202 items_count = items.len(),
203 "Failed to bulk cache items in Redis"
204 );
205 Ok(())
206 }
207 }
208 }
209
210 /// Test Redis connection health.
211 pub async fn ping(&mut self) -> Result<bool> {
212 match self.conn.ping::<String>().await {
213 Ok(response) => Ok(response == "PONG"),
214 Err(e) => {
215 error!(error = ?e, "Redis ping failed");
216 Ok(false)
217 }
218 }
219 }
220
221 /// Get Redis memory statistics for monitoring.
222 pub async fn get_info(&mut self) -> Result<String> {
223 match redis::cmd("INFO")
224 .arg("memory")
225 .query_async::<String>(&mut self.conn)
226 .await
227 {
228 Ok(info) => Ok(info),
229 Err(e) => {
230 warn!(error = ?e, "Failed to get Redis info");
231 Ok("Redis info unavailable".to_string())
232 }
233 }
234 }
235}
236
237#[async_trait]
238impl Cache for RedisCache {
239 async fn get<T>(&mut self, key: &str) -> Result<Option<T>>
240 where
241 T: for<'de> Deserialize<'de> + Send,
242 {
243 self.get_value(key).await
244 }
245
246 async fn set<T>(&mut self, key: &str, value: &T, ttl_seconds: Option<u64>) -> Result<()>
247 where
248 T: Serialize + Send + Sync,
249 {
250 self.set_value(key, value, ttl_seconds).await
251 }
252
253 async fn delete(&mut self, key: &str) -> Result<()> {
254 self.delete_key(key).await
255 }
256
257 async fn set_multiple<T>(&mut self, items: Vec<(&str, &T, Option<u64>)>) -> Result<()>
258 where
259 T: Serialize + Send + Sync,
260 {
261 self.set_multiple_values(items).await
262 }
263
264 async fn ping(&mut self) -> Result<bool> {
265 RedisCache::ping(self).await
266 }
267
268 async fn get_info(&mut self) -> Result<String> {
269 RedisCache::get_info(self).await
270 }
271}