this repo has no description
1use crate::appview::DidResolver; 2use crate::cache::{Cache, DistributedRateLimiter, create_cache}; 3use crate::circuit_breaker::CircuitBreakers; 4use crate::config::AuthConfig; 5use crate::rate_limit::RateLimiters; 6use crate::repo::PostgresBlockStore; 7use crate::storage::{BackupStorage, BlobStorage, S3BlobStorage}; 8use crate::sync::firehose::SequencedEvent; 9use sqlx::PgPool; 10use std::error::Error; 11use std::sync::Arc; 12use tokio::sync::broadcast; 13 14#[derive(Clone)] 15pub struct AppState { 16 pub db: PgPool, 17 pub block_store: PostgresBlockStore, 18 pub blob_store: Arc<dyn BlobStorage>, 19 pub backup_storage: Option<Arc<BackupStorage>>, 20 pub firehose_tx: broadcast::Sender<SequencedEvent>, 21 pub rate_limiters: Arc<RateLimiters>, 22 pub circuit_breakers: Arc<CircuitBreakers>, 23 pub cache: Arc<dyn Cache>, 24 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>, 25 pub did_resolver: Arc<DidResolver>, 26} 27 28pub enum RateLimitKind { 29 Login, 30 AccountCreation, 31 PasswordReset, 32 ResetPassword, 33 RefreshSession, 34 OAuthToken, 35 OAuthAuthorize, 36 OAuthPar, 37 OAuthIntrospect, 38 AppPassword, 39 EmailUpdate, 40 TotpVerify, 41 HandleUpdate, 42 HandleUpdateDaily, 43 VerificationCheck, 44} 45 46impl RateLimitKind { 47 fn key_prefix(&self) -> &'static str { 48 match self { 49 Self::Login => "login", 50 Self::AccountCreation => "account_creation", 51 Self::PasswordReset => "password_reset", 52 Self::ResetPassword => "reset_password", 53 Self::RefreshSession => "refresh_session", 54 Self::OAuthToken => "oauth_token", 55 Self::OAuthAuthorize => "oauth_authorize", 56 Self::OAuthPar => "oauth_par", 57 Self::OAuthIntrospect => "oauth_introspect", 58 Self::AppPassword => "app_password", 59 Self::EmailUpdate => "email_update", 60 Self::TotpVerify => "totp_verify", 61 Self::HandleUpdate => "handle_update", 62 Self::HandleUpdateDaily => "handle_update_daily", 63 Self::VerificationCheck => "verification_check", 64 } 65 } 66 67 fn limit_and_window_ms(&self) -> (u32, u64) { 68 match self { 69 Self::Login => (10, 60_000), 70 Self::AccountCreation => (10, 3_600_000), 71 Self::PasswordReset => (5, 3_600_000), 72 Self::ResetPassword => (10, 60_000), 73 Self::RefreshSession => (60, 60_000), 74 Self::OAuthToken => (30, 60_000), 75 Self::OAuthAuthorize => (10, 60_000), 76 Self::OAuthPar => (30, 60_000), 77 Self::OAuthIntrospect => (30, 60_000), 78 Self::AppPassword => (10, 60_000), 79 Self::EmailUpdate => (5, 3_600_000), 80 Self::TotpVerify => (5, 300_000), 81 Self::HandleUpdate => (10, 300_000), 82 Self::HandleUpdateDaily => (50, 86_400_000), 83 Self::VerificationCheck => (60, 60_000), 84 } 85 } 86} 87 88impl AppState { 89 pub async fn new() -> Result<Self, Box<dyn Error>> { 90 let database_url = std::env::var("DATABASE_URL") 91 .map_err(|_| "DATABASE_URL environment variable must be set")?; 92 93 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS") 94 .ok() 95 .and_then(|v| v.parse().ok()) 96 .unwrap_or(100); 97 98 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS") 99 .ok() 100 .and_then(|v| v.parse().ok()) 101 .unwrap_or(10); 102 103 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS") 104 .ok() 105 .and_then(|v| v.parse().ok()) 106 .unwrap_or(10); 107 108 tracing::info!( 109 "Configuring database pool: max={}, min={}, acquire_timeout={}s", 110 max_connections, 111 min_connections, 112 acquire_timeout_secs 113 ); 114 115 let db = sqlx::postgres::PgPoolOptions::new() 116 .max_connections(max_connections) 117 .min_connections(min_connections) 118 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs)) 119 .idle_timeout(std::time::Duration::from_secs(300)) 120 .max_lifetime(std::time::Duration::from_secs(1800)) 121 .connect(&database_url) 122 .await 123 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?; 124 125 sqlx::migrate!("./migrations") 126 .run(&db) 127 .await 128 .map_err(|e| format!("Failed to run migrations: {}", e))?; 129 130 Ok(Self::from_db(db).await) 131 } 132 133 pub async fn from_db(db: PgPool) -> Self { 134 AuthConfig::init(); 135 136 let block_store = PostgresBlockStore::new(db.clone()); 137 let blob_store = S3BlobStorage::new().await; 138 let backup_storage = BackupStorage::new().await.map(Arc::new); 139 140 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE") 141 .ok() 142 .and_then(|v| v.parse().ok()) 143 .unwrap_or(10000); 144 145 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size); 146 let rate_limiters = Arc::new(RateLimiters::new()); 147 let circuit_breakers = Arc::new(CircuitBreakers::new()); 148 let (cache, distributed_rate_limiter) = create_cache().await; 149 let did_resolver = Arc::new(DidResolver::new()); 150 151 Self { 152 db, 153 block_store, 154 blob_store: Arc::new(blob_store), 155 backup_storage, 156 firehose_tx, 157 rate_limiters, 158 circuit_breakers, 159 cache, 160 distributed_rate_limiter, 161 did_resolver, 162 } 163 } 164 165 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self { 166 self.rate_limiters = Arc::new(rate_limiters); 167 self 168 } 169 170 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self { 171 self.circuit_breakers = Arc::new(circuit_breakers); 172 self 173 } 174 175 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool { 176 if std::env::var("DISABLE_RATE_LIMITING").is_ok() { 177 return true; 178 } 179 180 let key = format!("{}:{}", kind.key_prefix(), client_ip); 181 let limiter_name = kind.key_prefix(); 182 let (limit, window_ms) = kind.limit_and_window_ms(); 183 184 if !self 185 .distributed_rate_limiter 186 .check_rate_limit(&key, limit, window_ms) 187 .await 188 { 189 crate::metrics::record_rate_limit_rejection(limiter_name); 190 return false; 191 } 192 193 let limiter = match kind { 194 RateLimitKind::Login => &self.rate_limiters.login, 195 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation, 196 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset, 197 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password, 198 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session, 199 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token, 200 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize, 201 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par, 202 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect, 203 RateLimitKind::AppPassword => &self.rate_limiters.app_password, 204 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update, 205 RateLimitKind::TotpVerify => &self.rate_limiters.totp_verify, 206 RateLimitKind::HandleUpdate => &self.rate_limiters.handle_update, 207 RateLimitKind::HandleUpdateDaily => &self.rate_limiters.handle_update_daily, 208 RateLimitKind::VerificationCheck => &self.rate_limiters.verification_check, 209 }; 210 211 let ok = limiter.check_key(&client_ip.to_string()).is_ok(); 212 if !ok { 213 crate::metrics::record_rate_limit_rejection(limiter_name); 214 } 215 ok 216 } 217}