this repo has no description
1use crate::cache::{Cache, DistributedRateLimiter, create_cache};
2use crate::circuit_breaker::CircuitBreakers;
3use crate::config::AuthConfig;
4use crate::rate_limit::RateLimiters;
5use crate::repo::PostgresBlockStore;
6use crate::storage::{BlobStorage, S3BlobStorage};
7use crate::sync::firehose::SequencedEvent;
8use sqlx::PgPool;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12#[derive(Clone)]
13pub struct AppState {
14 pub db: PgPool,
15 pub block_store: PostgresBlockStore,
16 pub blob_store: Arc<dyn BlobStorage>,
17 pub firehose_tx: broadcast::Sender<SequencedEvent>,
18 pub rate_limiters: Arc<RateLimiters>,
19 pub circuit_breakers: Arc<CircuitBreakers>,
20 pub cache: Arc<dyn Cache>,
21 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
22}
23
24pub enum RateLimitKind {
25 Login,
26 AccountCreation,
27 PasswordReset,
28 ResetPassword,
29 RefreshSession,
30 OAuthToken,
31 OAuthAuthorize,
32 OAuthPar,
33 OAuthIntrospect,
34 AppPassword,
35 EmailUpdate,
36}
37
38impl RateLimitKind {
39 fn key_prefix(&self) -> &'static str {
40 match self {
41 Self::Login => "login",
42 Self::AccountCreation => "account_creation",
43 Self::PasswordReset => "password_reset",
44 Self::ResetPassword => "reset_password",
45 Self::RefreshSession => "refresh_session",
46 Self::OAuthToken => "oauth_token",
47 Self::OAuthAuthorize => "oauth_authorize",
48 Self::OAuthPar => "oauth_par",
49 Self::OAuthIntrospect => "oauth_introspect",
50 Self::AppPassword => "app_password",
51 Self::EmailUpdate => "email_update",
52 }
53 }
54
55 fn limit_and_window_ms(&self) -> (u32, u64) {
56 match self {
57 Self::Login => (10, 60_000),
58 Self::AccountCreation => (10, 3_600_000),
59 Self::PasswordReset => (5, 3_600_000),
60 Self::ResetPassword => (10, 60_000),
61 Self::RefreshSession => (60, 60_000),
62 Self::OAuthToken => (30, 60_000),
63 Self::OAuthAuthorize => (10, 60_000),
64 Self::OAuthPar => (30, 60_000),
65 Self::OAuthIntrospect => (30, 60_000),
66 Self::AppPassword => (10, 60_000),
67 Self::EmailUpdate => (5, 3_600_000),
68 }
69 }
70}
71
72impl AppState {
73 pub async fn new(db: PgPool) -> Self {
74 AuthConfig::init();
75
76 let block_store = PostgresBlockStore::new(db.clone());
77 let blob_store = S3BlobStorage::new().await;
78 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
79 .ok()
80 .and_then(|v| v.parse().ok())
81 .unwrap_or(10000);
82 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
83 let rate_limiters = Arc::new(RateLimiters::new());
84 let circuit_breakers = Arc::new(CircuitBreakers::new());
85 let (cache, distributed_rate_limiter) = create_cache().await;
86 Self {
87 db,
88 block_store,
89 blob_store: Arc::new(blob_store),
90 firehose_tx,
91 rate_limiters,
92 circuit_breakers,
93 cache,
94 distributed_rate_limiter,
95 }
96 }
97
98 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
99 self.rate_limiters = Arc::new(rate_limiters);
100 self
101 }
102
103 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
104 self.circuit_breakers = Arc::new(circuit_breakers);
105 self
106 }
107
108 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
109 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
110 return true;
111 }
112
113 let key = format!("{}:{}", kind.key_prefix(), client_ip);
114 let limiter_name = kind.key_prefix();
115 let (limit, window_ms) = kind.limit_and_window_ms();
116
117 if !self.distributed_rate_limiter.check_rate_limit(&key, limit, window_ms).await {
118 crate::metrics::record_rate_limit_rejection(limiter_name);
119 return false;
120 }
121
122 let limiter = match kind {
123 RateLimitKind::Login => &self.rate_limiters.login,
124 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
125 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
126 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
127 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
128 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
129 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
130 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
131 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
132 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
133 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
134 };
135
136 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
137 if !ok {
138 crate::metrics::record_rate_limit_rejection(limiter_name);
139 }
140 ok
141 }
142}