this repo has no description
at main 4.8 kB view raw
1pub use tranquil_infra::{Cache, CacheError, DistributedRateLimiter}; 2 3use async_trait::async_trait; 4use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; 5use std::sync::Arc; 6use std::time::Duration; 7 8#[derive(Clone)] 9pub struct ValkeyCache { 10 conn: redis::aio::ConnectionManager, 11} 12 13impl ValkeyCache { 14 pub async fn new(url: &str) -> Result<Self, CacheError> { 15 let client = redis::Client::open(url).map_err(|e| CacheError::Connection(e.to_string()))?; 16 let manager = client 17 .get_connection_manager() 18 .await 19 .map_err(|e| CacheError::Connection(e.to_string()))?; 20 Ok(Self { conn: manager }) 21 } 22 23 pub fn connection(&self) -> redis::aio::ConnectionManager { 24 self.conn.clone() 25 } 26} 27 28#[async_trait] 29impl Cache for ValkeyCache { 30 async fn get(&self, key: &str) -> Option<String> { 31 let mut conn = self.conn.clone(); 32 redis::cmd("GET") 33 .arg(key) 34 .query_async::<Option<String>>(&mut conn) 35 .await 36 .ok() 37 .flatten() 38 } 39 40 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { 41 let mut conn = self.conn.clone(); 42 redis::cmd("SET") 43 .arg(key) 44 .arg(value) 45 .arg("EX") 46 .arg(ttl.as_secs() as i64) 47 .query_async::<()>(&mut conn) 48 .await 49 .map_err(|e| CacheError::Connection(e.to_string())) 50 } 51 52 async fn delete(&self, key: &str) -> Result<(), CacheError> { 53 let mut conn = self.conn.clone(); 54 redis::cmd("DEL") 55 .arg(key) 56 .query_async::<()>(&mut conn) 57 .await 58 .map_err(|e| CacheError::Connection(e.to_string())) 59 } 60 61 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> { 62 self.get(key).await.and_then(|s| BASE64.decode(&s).ok()) 63 } 64 65 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { 66 let encoded = BASE64.encode(value); 67 self.set(key, &encoded, ttl).await 68 } 69} 70 71pub struct NoOpCache; 72 73#[async_trait] 74impl Cache for NoOpCache { 75 async fn get(&self, _key: &str) -> Option<String> { 76 None 77 } 78 79 async fn set(&self, _key: &str, _value: &str, _ttl: Duration) -> Result<(), CacheError> { 80 Ok(()) 81 } 82 83 async fn delete(&self, _key: &str) -> Result<(), CacheError> { 84 Ok(()) 85 } 86 87 async fn get_bytes(&self, _key: &str) -> Option<Vec<u8>> { 88 None 89 } 90 91 async fn set_bytes(&self, _key: &str, _value: &[u8], _ttl: Duration) -> Result<(), CacheError> { 92 Ok(()) 93 } 94} 95 96#[derive(Clone)] 97pub struct RedisRateLimiter { 98 conn: redis::aio::ConnectionManager, 99} 100 101impl RedisRateLimiter { 102 pub fn new(conn: redis::aio::ConnectionManager) -> Self { 103 Self { conn } 104 } 105} 106 107#[async_trait] 108impl DistributedRateLimiter for RedisRateLimiter { 109 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { 110 let mut conn = self.conn.clone(); 111 let full_key = format!("rl:{}", key); 112 let window_secs = window_ms.div_ceil(1000).max(1) as i64; 113 let count: Result<i64, _> = redis::cmd("INCR") 114 .arg(&full_key) 115 .query_async(&mut conn) 116 .await; 117 let count = match count { 118 Ok(c) => c, 119 Err(e) => { 120 tracing::warn!("Redis rate limit INCR failed: {}. Allowing request.", e); 121 return true; 122 } 123 }; 124 if count == 1 { 125 let _: Result<bool, redis::RedisError> = redis::cmd("EXPIRE") 126 .arg(&full_key) 127 .arg(window_secs) 128 .query_async(&mut conn) 129 .await; 130 } 131 count <= limit as i64 132 } 133} 134 135pub struct NoOpRateLimiter; 136 137#[async_trait] 138impl DistributedRateLimiter for NoOpRateLimiter { 139 async fn check_rate_limit(&self, _key: &str, _limit: u32, _window_ms: u64) -> bool { 140 true 141 } 142} 143 144pub async fn create_cache() -> (Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>) { 145 match std::env::var("VALKEY_URL") { 146 Ok(url) => match ValkeyCache::new(&url).await { 147 Ok(cache) => { 148 tracing::info!("Connected to Valkey cache at {}", url); 149 let rate_limiter = Arc::new(RedisRateLimiter::new(cache.connection())); 150 (Arc::new(cache), rate_limiter) 151 } 152 Err(e) => { 153 tracing::warn!("Failed to connect to Valkey: {}. Running without cache.", e); 154 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter)) 155 } 156 }, 157 Err(_) => { 158 tracing::info!("VALKEY_URL not set. Running without cache."); 159 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter)) 160 } 161 } 162}