this repo has no description
1use crate::appview::DidResolver;
2use crate::cache::{Cache, DistributedRateLimiter, create_cache};
3use crate::circuit_breaker::CircuitBreakers;
4use crate::config::AuthConfig;
5use crate::rate_limit::RateLimiters;
6use crate::repo::PostgresBlockStore;
7use crate::storage::{BlobStorage, S3BlobStorage};
8use crate::sync::firehose::SequencedEvent;
9use sqlx::PgPool;
10use std::sync::Arc;
11use tokio::sync::broadcast;
12
13#[derive(Clone)]
14pub struct AppState {
15 pub db: PgPool,
16 pub block_store: PostgresBlockStore,
17 pub blob_store: Arc<dyn BlobStorage>,
18 pub firehose_tx: broadcast::Sender<SequencedEvent>,
19 pub rate_limiters: Arc<RateLimiters>,
20 pub circuit_breakers: Arc<CircuitBreakers>,
21 pub cache: Arc<dyn Cache>,
22 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
23 pub did_resolver: Arc<DidResolver>,
24}
25
26pub enum RateLimitKind {
27 Login,
28 AccountCreation,
29 PasswordReset,
30 ResetPassword,
31 RefreshSession,
32 OAuthToken,
33 OAuthAuthorize,
34 OAuthPar,
35 OAuthIntrospect,
36 AppPassword,
37 EmailUpdate,
38 TotpVerify,
39}
40
41impl RateLimitKind {
42 fn key_prefix(&self) -> &'static str {
43 match self {
44 Self::Login => "login",
45 Self::AccountCreation => "account_creation",
46 Self::PasswordReset => "password_reset",
47 Self::ResetPassword => "reset_password",
48 Self::RefreshSession => "refresh_session",
49 Self::OAuthToken => "oauth_token",
50 Self::OAuthAuthorize => "oauth_authorize",
51 Self::OAuthPar => "oauth_par",
52 Self::OAuthIntrospect => "oauth_introspect",
53 Self::AppPassword => "app_password",
54 Self::EmailUpdate => "email_update",
55 Self::TotpVerify => "totp_verify",
56 }
57 }
58
59 fn limit_and_window_ms(&self) -> (u32, u64) {
60 match self {
61 Self::Login => (10, 60_000),
62 Self::AccountCreation => (10, 3_600_000),
63 Self::PasswordReset => (5, 3_600_000),
64 Self::ResetPassword => (10, 60_000),
65 Self::RefreshSession => (60, 60_000),
66 Self::OAuthToken => (30, 60_000),
67 Self::OAuthAuthorize => (10, 60_000),
68 Self::OAuthPar => (30, 60_000),
69 Self::OAuthIntrospect => (30, 60_000),
70 Self::AppPassword => (10, 60_000),
71 Self::EmailUpdate => (5, 3_600_000),
72 Self::TotpVerify => (5, 300_000),
73 }
74 }
75}
76
77impl AppState {
78 pub async fn new(db: PgPool) -> Self {
79 AuthConfig::init();
80
81 let block_store = PostgresBlockStore::new(db.clone());
82 let blob_store = S3BlobStorage::new().await;
83
84 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
85 .ok()
86 .and_then(|v| v.parse().ok())
87 .unwrap_or(10000);
88
89 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
90 let rate_limiters = Arc::new(RateLimiters::new());
91 let circuit_breakers = Arc::new(CircuitBreakers::new());
92 let (cache, distributed_rate_limiter) = create_cache().await;
93 let did_resolver = Arc::new(DidResolver::new());
94
95 Self {
96 db,
97 block_store,
98 blob_store: Arc::new(blob_store),
99 firehose_tx,
100 rate_limiters,
101 circuit_breakers,
102 cache,
103 distributed_rate_limiter,
104 did_resolver,
105 }
106 }
107
108 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
109 self.rate_limiters = Arc::new(rate_limiters);
110 self
111 }
112
113 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
114 self.circuit_breakers = Arc::new(circuit_breakers);
115 self
116 }
117
118 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
119 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
120 return true;
121 }
122
123 let key = format!("{}:{}", kind.key_prefix(), client_ip);
124 let limiter_name = kind.key_prefix();
125 let (limit, window_ms) = kind.limit_and_window_ms();
126
127 if !self
128 .distributed_rate_limiter
129 .check_rate_limit(&key, limit, window_ms)
130 .await
131 {
132 crate::metrics::record_rate_limit_rejection(limiter_name);
133 return false;
134 }
135
136 let limiter = match kind {
137 RateLimitKind::Login => &self.rate_limiters.login,
138 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
139 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
140 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
141 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
142 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
143 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
144 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
145 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
146 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
147 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
148 RateLimitKind::TotpVerify => &self.rate_limiters.totp_verify,
149 };
150
151 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
152 if !ok {
153 crate::metrics::record_rate_limit_rejection(limiter_name);
154 }
155 ok
156 }
157}