this repo has no description
1use crate::appview::DidResolver; 2use crate::cache::{Cache, DistributedRateLimiter, create_cache}; 3use crate::circuit_breaker::CircuitBreakers; 4use crate::config::AuthConfig; 5use crate::rate_limit::RateLimiters; 6use crate::repo::PostgresBlockStore; 7use crate::storage::{BlobStorage, S3BlobStorage}; 8use crate::sync::firehose::SequencedEvent; 9use sqlx::PgPool; 10use std::sync::Arc; 11use tokio::sync::broadcast; 12 13#[derive(Clone)] 14pub struct AppState { 15 pub db: PgPool, 16 pub block_store: PostgresBlockStore, 17 pub blob_store: Arc<dyn BlobStorage>, 18 pub firehose_tx: broadcast::Sender<SequencedEvent>, 19 pub rate_limiters: Arc<RateLimiters>, 20 pub circuit_breakers: Arc<CircuitBreakers>, 21 pub cache: Arc<dyn Cache>, 22 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>, 23 pub did_resolver: Arc<DidResolver>, 24} 25 26pub enum RateLimitKind { 27 Login, 28 AccountCreation, 29 PasswordReset, 30 ResetPassword, 31 RefreshSession, 32 OAuthToken, 33 OAuthAuthorize, 34 OAuthPar, 35 OAuthIntrospect, 36 AppPassword, 37 EmailUpdate, 38 TotpVerify, 39} 40 41impl RateLimitKind { 42 fn key_prefix(&self) -> &'static str { 43 match self { 44 Self::Login => "login", 45 Self::AccountCreation => "account_creation", 46 Self::PasswordReset => "password_reset", 47 Self::ResetPassword => "reset_password", 48 Self::RefreshSession => "refresh_session", 49 Self::OAuthToken => "oauth_token", 50 Self::OAuthAuthorize => "oauth_authorize", 51 Self::OAuthPar => "oauth_par", 52 Self::OAuthIntrospect => "oauth_introspect", 53 Self::AppPassword => "app_password", 54 Self::EmailUpdate => "email_update", 55 Self::TotpVerify => "totp_verify", 56 } 57 } 58 59 fn limit_and_window_ms(&self) -> (u32, u64) { 60 match self { 61 Self::Login => (10, 60_000), 62 Self::AccountCreation => (10, 3_600_000), 63 Self::PasswordReset => (5, 3_600_000), 64 Self::ResetPassword => (10, 60_000), 65 Self::RefreshSession => (60, 60_000), 66 Self::OAuthToken => (30, 60_000), 67 Self::OAuthAuthorize => (10, 60_000), 68 Self::OAuthPar => (30, 60_000), 69 Self::OAuthIntrospect => (30, 60_000), 70 Self::AppPassword => (10, 60_000), 71 Self::EmailUpdate => (5, 3_600_000), 72 Self::TotpVerify => (5, 300_000), 73 } 74 } 75} 76 77impl AppState { 78 pub async fn new(db: PgPool) -> Self { 79 AuthConfig::init(); 80 81 let block_store = PostgresBlockStore::new(db.clone()); 82 let blob_store = S3BlobStorage::new().await; 83 84 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE") 85 .ok() 86 .and_then(|v| v.parse().ok()) 87 .unwrap_or(10000); 88 89 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size); 90 let rate_limiters = Arc::new(RateLimiters::new()); 91 let circuit_breakers = Arc::new(CircuitBreakers::new()); 92 let (cache, distributed_rate_limiter) = create_cache().await; 93 let did_resolver = Arc::new(DidResolver::new()); 94 95 Self { 96 db, 97 block_store, 98 blob_store: Arc::new(blob_store), 99 firehose_tx, 100 rate_limiters, 101 circuit_breakers, 102 cache, 103 distributed_rate_limiter, 104 did_resolver, 105 } 106 } 107 108 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self { 109 self.rate_limiters = Arc::new(rate_limiters); 110 self 111 } 112 113 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self { 114 self.circuit_breakers = Arc::new(circuit_breakers); 115 self 116 } 117 118 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool { 119 if std::env::var("DISABLE_RATE_LIMITING").is_ok() { 120 return true; 121 } 122 123 let key = format!("{}:{}", kind.key_prefix(), client_ip); 124 let limiter_name = kind.key_prefix(); 125 let (limit, window_ms) = kind.limit_and_window_ms(); 126 127 if !self 128 .distributed_rate_limiter 129 .check_rate_limit(&key, limit, window_ms) 130 .await 131 { 132 crate::metrics::record_rate_limit_rejection(limiter_name); 133 return false; 134 } 135 136 let limiter = match kind { 137 RateLimitKind::Login => &self.rate_limiters.login, 138 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation, 139 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset, 140 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password, 141 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session, 142 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token, 143 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize, 144 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par, 145 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect, 146 RateLimitKind::AppPassword => &self.rate_limiters.app_password, 147 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update, 148 RateLimitKind::TotpVerify => &self.rate_limiters.totp_verify, 149 }; 150 151 let ok = limiter.check_key(&client_ip.to_string()).is_ok(); 152 if !ok { 153 crate::metrics::record_rate_limit_rejection(limiter_name); 154 } 155 ok 156 } 157}