this repo has no description
1use crate::cache::{Cache, DistributedRateLimiter, create_cache};
2use crate::circuit_breaker::CircuitBreakers;
3use crate::config::AuthConfig;
4use crate::rate_limit::RateLimiters;
5use crate::repo::PostgresBlockStore;
6use crate::storage::{BlobStorage, S3BlobStorage};
7use crate::sync::firehose::SequencedEvent;
8use sqlx::PgPool;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12#[derive(Clone)]
13pub struct AppState {
14 pub db: PgPool,
15 pub block_store: PostgresBlockStore,
16 pub blob_store: Arc<dyn BlobStorage>,
17 pub firehose_tx: broadcast::Sender<SequencedEvent>,
18 pub rate_limiters: Arc<RateLimiters>,
19 pub circuit_breakers: Arc<CircuitBreakers>,
20 pub cache: Arc<dyn Cache>,
21 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
22}
23
24pub enum RateLimitKind {
25 Login,
26 AccountCreation,
27 PasswordReset,
28 ResetPassword,
29 RefreshSession,
30 OAuthToken,
31 OAuthAuthorize,
32 OAuthPar,
33 OAuthIntrospect,
34 AppPassword,
35 EmailUpdate,
36}
37
38impl RateLimitKind {
39 fn key_prefix(&self) -> &'static str {
40 match self {
41 Self::Login => "login",
42 Self::AccountCreation => "account_creation",
43 Self::PasswordReset => "password_reset",
44 Self::ResetPassword => "reset_password",
45 Self::RefreshSession => "refresh_session",
46 Self::OAuthToken => "oauth_token",
47 Self::OAuthAuthorize => "oauth_authorize",
48 Self::OAuthPar => "oauth_par",
49 Self::OAuthIntrospect => "oauth_introspect",
50 Self::AppPassword => "app_password",
51 Self::EmailUpdate => "email_update",
52 }
53 }
54
55 fn limit_and_window_ms(&self) -> (u32, u64) {
56 match self {
57 Self::Login => (10, 60_000),
58 Self::AccountCreation => (10, 3_600_000),
59 Self::PasswordReset => (5, 3_600_000),
60 Self::ResetPassword => (10, 60_000),
61 Self::RefreshSession => (60, 60_000),
62 Self::OAuthToken => (30, 60_000),
63 Self::OAuthAuthorize => (10, 60_000),
64 Self::OAuthPar => (30, 60_000),
65 Self::OAuthIntrospect => (30, 60_000),
66 Self::AppPassword => (10, 60_000),
67 Self::EmailUpdate => (5, 3_600_000),
68 }
69 }
70}
71
72impl AppState {
73 pub async fn new(db: PgPool) -> Self {
74 AuthConfig::init();
75
76 let block_store = PostgresBlockStore::new(db.clone());
77 let blob_store = S3BlobStorage::new().await;
78
79 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
80 .ok()
81 .and_then(|v| v.parse().ok())
82 .unwrap_or(10000);
83
84 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
85 let rate_limiters = Arc::new(RateLimiters::new());
86 let circuit_breakers = Arc::new(CircuitBreakers::new());
87 let (cache, distributed_rate_limiter) = create_cache().await;
88
89 Self {
90 db,
91 block_store,
92 blob_store: Arc::new(blob_store),
93 firehose_tx,
94 rate_limiters,
95 circuit_breakers,
96 cache,
97 distributed_rate_limiter,
98 }
99 }
100
101 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
102 self.rate_limiters = Arc::new(rate_limiters);
103 self
104 }
105
106 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
107 self.circuit_breakers = Arc::new(circuit_breakers);
108 self
109 }
110
111 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
112 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
113 return true;
114 }
115
116 let key = format!("{}:{}", kind.key_prefix(), client_ip);
117 let limiter_name = kind.key_prefix();
118 let (limit, window_ms) = kind.limit_and_window_ms();
119
120 if !self
121 .distributed_rate_limiter
122 .check_rate_limit(&key, limit, window_ms)
123 .await
124 {
125 crate::metrics::record_rate_limit_rejection(limiter_name);
126 return false;
127 }
128
129 let limiter = match kind {
130 RateLimitKind::Login => &self.rate_limiters.login,
131 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
132 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
133 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
134 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
135 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
136 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
137 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
138 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
139 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
140 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
141 };
142
143 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
144 if !ok {
145 crate::metrics::record_rate_limit_rejection(limiter_name);
146 }
147 ok
148 }
149}