this repo has no description
1pub use tranquil_infra::{Cache, CacheError, DistributedRateLimiter};
2
3use async_trait::async_trait;
4use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct ValkeyCache {
10 conn: redis::aio::ConnectionManager,
11}
12
13impl ValkeyCache {
14 pub async fn new(url: &str) -> Result<Self, CacheError> {
15 let client = redis::Client::open(url).map_err(|e| CacheError::Connection(e.to_string()))?;
16 let manager = client
17 .get_connection_manager()
18 .await
19 .map_err(|e| CacheError::Connection(e.to_string()))?;
20 Ok(Self { conn: manager })
21 }
22
23 pub fn connection(&self) -> redis::aio::ConnectionManager {
24 self.conn.clone()
25 }
26}
27
28#[async_trait]
29impl Cache for ValkeyCache {
30 async fn get(&self, key: &str) -> Option<String> {
31 let mut conn = self.conn.clone();
32 redis::cmd("GET")
33 .arg(key)
34 .query_async::<Option<String>>(&mut conn)
35 .await
36 .ok()
37 .flatten()
38 }
39
40 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> {
41 let mut conn = self.conn.clone();
42 redis::cmd("SET")
43 .arg(key)
44 .arg(value)
45 .arg("EX")
46 .arg(ttl.as_secs() as i64)
47 .query_async::<()>(&mut conn)
48 .await
49 .map_err(|e| CacheError::Connection(e.to_string()))
50 }
51
52 async fn delete(&self, key: &str) -> Result<(), CacheError> {
53 let mut conn = self.conn.clone();
54 redis::cmd("DEL")
55 .arg(key)
56 .query_async::<()>(&mut conn)
57 .await
58 .map_err(|e| CacheError::Connection(e.to_string()))
59 }
60
61 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> {
62 self.get(key).await.and_then(|s| BASE64.decode(&s).ok())
63 }
64
65 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> {
66 let encoded = BASE64.encode(value);
67 self.set(key, &encoded, ttl).await
68 }
69}
70
71pub struct NoOpCache;
72
73#[async_trait]
74impl Cache for NoOpCache {
75 async fn get(&self, _key: &str) -> Option<String> {
76 None
77 }
78
79 async fn set(&self, _key: &str, _value: &str, _ttl: Duration) -> Result<(), CacheError> {
80 Ok(())
81 }
82
83 async fn delete(&self, _key: &str) -> Result<(), CacheError> {
84 Ok(())
85 }
86
87 async fn get_bytes(&self, _key: &str) -> Option<Vec<u8>> {
88 None
89 }
90
91 async fn set_bytes(&self, _key: &str, _value: &[u8], _ttl: Duration) -> Result<(), CacheError> {
92 Ok(())
93 }
94}
95
96#[derive(Clone)]
97pub struct RedisRateLimiter {
98 conn: redis::aio::ConnectionManager,
99}
100
101impl RedisRateLimiter {
102 pub fn new(conn: redis::aio::ConnectionManager) -> Self {
103 Self { conn }
104 }
105}
106
107#[async_trait]
108impl DistributedRateLimiter for RedisRateLimiter {
109 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool {
110 let mut conn = self.conn.clone();
111 let full_key = format!("rl:{}", key);
112 let window_secs = window_ms.div_ceil(1000).max(1) as i64;
113 let count: Result<i64, _> = redis::cmd("INCR")
114 .arg(&full_key)
115 .query_async(&mut conn)
116 .await;
117 let count = match count {
118 Ok(c) => c,
119 Err(e) => {
120 tracing::warn!("Redis rate limit INCR failed: {}. Allowing request.", e);
121 return true;
122 }
123 };
124 if count == 1 {
125 let _: Result<bool, redis::RedisError> = redis::cmd("EXPIRE")
126 .arg(&full_key)
127 .arg(window_secs)
128 .query_async(&mut conn)
129 .await;
130 }
131 count <= limit as i64
132 }
133}
134
135pub struct NoOpRateLimiter;
136
137#[async_trait]
138impl DistributedRateLimiter for NoOpRateLimiter {
139 async fn check_rate_limit(&self, _key: &str, _limit: u32, _window_ms: u64) -> bool {
140 true
141 }
142}
143
144pub async fn create_cache() -> (Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>) {
145 match std::env::var("VALKEY_URL") {
146 Ok(url) => match ValkeyCache::new(&url).await {
147 Ok(cache) => {
148 tracing::info!("Connected to Valkey cache at {}", url);
149 let rate_limiter = Arc::new(RedisRateLimiter::new(cache.connection()));
150 (Arc::new(cache), rate_limiter)
151 }
152 Err(e) => {
153 tracing::warn!("Failed to connect to Valkey: {}. Running without cache.", e);
154 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter))
155 }
156 },
157 Err(_) => {
158 tracing::info!("VALKEY_URL not set. Running without cache.");
159 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter))
160 }
161 }
162}