this repo has no description
1use crate::cache::{Cache, DistributedRateLimiter, create_cache}; 2use crate::circuit_breaker::CircuitBreakers; 3use crate::config::AuthConfig; 4use crate::rate_limit::RateLimiters; 5use crate::repo::PostgresBlockStore; 6use crate::storage::{BlobStorage, S3BlobStorage}; 7use crate::sync::firehose::SequencedEvent; 8use sqlx::PgPool; 9use std::sync::Arc; 10use tokio::sync::broadcast; 11 12#[derive(Clone)] 13pub struct AppState { 14 pub db: PgPool, 15 pub block_store: PostgresBlockStore, 16 pub blob_store: Arc<dyn BlobStorage>, 17 pub firehose_tx: broadcast::Sender<SequencedEvent>, 18 pub rate_limiters: Arc<RateLimiters>, 19 pub circuit_breakers: Arc<CircuitBreakers>, 20 pub cache: Arc<dyn Cache>, 21 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>, 22} 23 24pub enum RateLimitKind { 25 Login, 26 AccountCreation, 27 PasswordReset, 28 ResetPassword, 29 RefreshSession, 30 OAuthToken, 31 OAuthAuthorize, 32 OAuthPar, 33 OAuthIntrospect, 34 AppPassword, 35 EmailUpdate, 36} 37 38impl RateLimitKind { 39 fn key_prefix(&self) -> &'static str { 40 match self { 41 Self::Login => "login", 42 Self::AccountCreation => "account_creation", 43 Self::PasswordReset => "password_reset", 44 Self::ResetPassword => "reset_password", 45 Self::RefreshSession => "refresh_session", 46 Self::OAuthToken => "oauth_token", 47 Self::OAuthAuthorize => "oauth_authorize", 48 Self::OAuthPar => "oauth_par", 49 Self::OAuthIntrospect => "oauth_introspect", 50 Self::AppPassword => "app_password", 51 Self::EmailUpdate => "email_update", 52 } 53 } 54 55 fn limit_and_window_ms(&self) -> (u32, u64) { 56 match self { 57 Self::Login => (10, 60_000), 58 Self::AccountCreation => (10, 3_600_000), 59 Self::PasswordReset => (5, 3_600_000), 60 Self::ResetPassword => (10, 60_000), 61 Self::RefreshSession => (60, 60_000), 62 Self::OAuthToken => (30, 60_000), 63 Self::OAuthAuthorize => (10, 60_000), 64 Self::OAuthPar => (30, 60_000), 65 Self::OAuthIntrospect => (30, 60_000), 66 Self::AppPassword => (10, 60_000), 67 Self::EmailUpdate => (5, 3_600_000), 68 } 69 } 70} 71 72impl AppState { 73 pub async fn new(db: PgPool) -> Self { 74 AuthConfig::init(); 75 76 let block_store = PostgresBlockStore::new(db.clone()); 77 let blob_store = S3BlobStorage::new().await; 78 79 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE") 80 .ok() 81 .and_then(|v| v.parse().ok()) 82 .unwrap_or(10000); 83 84 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size); 85 let rate_limiters = Arc::new(RateLimiters::new()); 86 let circuit_breakers = Arc::new(CircuitBreakers::new()); 87 let (cache, distributed_rate_limiter) = create_cache().await; 88 89 Self { 90 db, 91 block_store, 92 blob_store: Arc::new(blob_store), 93 firehose_tx, 94 rate_limiters, 95 circuit_breakers, 96 cache, 97 distributed_rate_limiter, 98 } 99 } 100 101 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self { 102 self.rate_limiters = Arc::new(rate_limiters); 103 self 104 } 105 106 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self { 107 self.circuit_breakers = Arc::new(circuit_breakers); 108 self 109 } 110 111 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool { 112 if std::env::var("DISABLE_RATE_LIMITING").is_ok() { 113 return true; 114 } 115 116 let key = format!("{}:{}", kind.key_prefix(), client_ip); 117 let limiter_name = kind.key_prefix(); 118 let (limit, window_ms) = kind.limit_and_window_ms(); 119 120 if !self.distributed_rate_limiter.check_rate_limit(&key, limit, window_ms).await { 121 crate::metrics::record_rate_limit_rejection(limiter_name); 122 return false; 123 } 124 125 let limiter = match kind { 126 RateLimitKind::Login => &self.rate_limiters.login, 127 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation, 128 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset, 129 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password, 130 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session, 131 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token, 132 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize, 133 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par, 134 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect, 135 RateLimitKind::AppPassword => &self.rate_limiters.app_password, 136 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update, 137 }; 138 139 let ok = limiter.check_key(&client_ip.to_string()).is_ok(); 140 if !ok { 141 crate::metrics::record_rate_limit_rejection(limiter_name); 142 } 143 ok 144 } 145}