this repo has no description
1use crate::appview::DidResolver;
2use crate::cache::{Cache, DistributedRateLimiter, create_cache};
3use crate::circuit_breaker::CircuitBreakers;
4use crate::config::AuthConfig;
5use crate::rate_limit::RateLimiters;
6use crate::repo::PostgresBlockStore;
7use crate::storage::{BlobStorage, S3BlobStorage};
8use crate::sync::firehose::SequencedEvent;
9use sqlx::PgPool;
10use std::error::Error;
11use std::sync::Arc;
12use tokio::sync::broadcast;
13
14#[derive(Clone)]
15pub struct AppState {
16 pub db: PgPool,
17 pub block_store: PostgresBlockStore,
18 pub blob_store: Arc<dyn BlobStorage>,
19 pub firehose_tx: broadcast::Sender<SequencedEvent>,
20 pub rate_limiters: Arc<RateLimiters>,
21 pub circuit_breakers: Arc<CircuitBreakers>,
22 pub cache: Arc<dyn Cache>,
23 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
24 pub did_resolver: Arc<DidResolver>,
25}
26
27pub enum RateLimitKind {
28 Login,
29 AccountCreation,
30 PasswordReset,
31 ResetPassword,
32 RefreshSession,
33 OAuthToken,
34 OAuthAuthorize,
35 OAuthPar,
36 OAuthIntrospect,
37 AppPassword,
38 EmailUpdate,
39 TotpVerify,
40 HandleUpdate,
41 HandleUpdateDaily,
42}
43
44impl RateLimitKind {
45 fn key_prefix(&self) -> &'static str {
46 match self {
47 Self::Login => "login",
48 Self::AccountCreation => "account_creation",
49 Self::PasswordReset => "password_reset",
50 Self::ResetPassword => "reset_password",
51 Self::RefreshSession => "refresh_session",
52 Self::OAuthToken => "oauth_token",
53 Self::OAuthAuthorize => "oauth_authorize",
54 Self::OAuthPar => "oauth_par",
55 Self::OAuthIntrospect => "oauth_introspect",
56 Self::AppPassword => "app_password",
57 Self::EmailUpdate => "email_update",
58 Self::TotpVerify => "totp_verify",
59 Self::HandleUpdate => "handle_update",
60 Self::HandleUpdateDaily => "handle_update_daily",
61 }
62 }
63
64 fn limit_and_window_ms(&self) -> (u32, u64) {
65 match self {
66 Self::Login => (10, 60_000),
67 Self::AccountCreation => (10, 3_600_000),
68 Self::PasswordReset => (5, 3_600_000),
69 Self::ResetPassword => (10, 60_000),
70 Self::RefreshSession => (60, 60_000),
71 Self::OAuthToken => (30, 60_000),
72 Self::OAuthAuthorize => (10, 60_000),
73 Self::OAuthPar => (30, 60_000),
74 Self::OAuthIntrospect => (30, 60_000),
75 Self::AppPassword => (10, 60_000),
76 Self::EmailUpdate => (5, 3_600_000),
77 Self::TotpVerify => (5, 300_000),
78 Self::HandleUpdate => (10, 300_000),
79 Self::HandleUpdateDaily => (50, 86_400_000),
80 }
81 }
82}
83
84impl AppState {
85 pub async fn new() -> Result<Self, Box<dyn Error>> {
86 let database_url = std::env::var("DATABASE_URL")
87 .map_err(|_| "DATABASE_URL environment variable must be set")?;
88
89 let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
90 .ok()
91 .and_then(|v| v.parse().ok())
92 .unwrap_or(100);
93
94 let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
95 .ok()
96 .and_then(|v| v.parse().ok())
97 .unwrap_or(10);
98
99 let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
100 .ok()
101 .and_then(|v| v.parse().ok())
102 .unwrap_or(10);
103
104 tracing::info!(
105 "Configuring database pool: max={}, min={}, acquire_timeout={}s",
106 max_connections,
107 min_connections,
108 acquire_timeout_secs
109 );
110
111 let db = sqlx::postgres::PgPoolOptions::new()
112 .max_connections(max_connections)
113 .min_connections(min_connections)
114 .acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
115 .idle_timeout(std::time::Duration::from_secs(300))
116 .max_lifetime(std::time::Duration::from_secs(1800))
117 .connect(&database_url)
118 .await
119 .map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
120
121 sqlx::migrate!("./migrations")
122 .run(&db)
123 .await
124 .map_err(|e| format!("Failed to run migrations: {}", e))?;
125
126 Ok(Self::from_db(db).await)
127 }
128
129 pub async fn from_db(db: PgPool) -> Self {
130 AuthConfig::init();
131
132 let block_store = PostgresBlockStore::new(db.clone());
133 let blob_store = S3BlobStorage::new().await;
134
135 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
136 .ok()
137 .and_then(|v| v.parse().ok())
138 .unwrap_or(10000);
139
140 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
141 let rate_limiters = Arc::new(RateLimiters::new());
142 let circuit_breakers = Arc::new(CircuitBreakers::new());
143 let (cache, distributed_rate_limiter) = create_cache().await;
144 let did_resolver = Arc::new(DidResolver::new());
145
146 Self {
147 db,
148 block_store,
149 blob_store: Arc::new(blob_store),
150 firehose_tx,
151 rate_limiters,
152 circuit_breakers,
153 cache,
154 distributed_rate_limiter,
155 did_resolver,
156 }
157 }
158
159 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
160 self.rate_limiters = Arc::new(rate_limiters);
161 self
162 }
163
164 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
165 self.circuit_breakers = Arc::new(circuit_breakers);
166 self
167 }
168
169 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
170 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
171 return true;
172 }
173
174 let key = format!("{}:{}", kind.key_prefix(), client_ip);
175 let limiter_name = kind.key_prefix();
176 let (limit, window_ms) = kind.limit_and_window_ms();
177
178 if !self
179 .distributed_rate_limiter
180 .check_rate_limit(&key, limit, window_ms)
181 .await
182 {
183 crate::metrics::record_rate_limit_rejection(limiter_name);
184 return false;
185 }
186
187 let limiter = match kind {
188 RateLimitKind::Login => &self.rate_limiters.login,
189 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
190 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
191 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
192 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
193 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
194 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
195 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
196 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
197 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
198 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
199 RateLimitKind::TotpVerify => &self.rate_limiters.totp_verify,
200 RateLimitKind::HandleUpdate => &self.rate_limiters.handle_update,
201 RateLimitKind::HandleUpdateDaily => &self.rate_limiters.handle_update_daily,
202 };
203
204 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
205 if !ok {
206 crate::metrics::record_rate_limit_rejection(limiter_name);
207 }
208 ok
209 }
210}