this repo has no description
1use crate::cache::{Cache, DistributedRateLimiter, create_cache};
2use crate::circuit_breaker::CircuitBreakers;
3use crate::config::AuthConfig;
4use crate::rate_limit::RateLimiters;
5use crate::repo::PostgresBlockStore;
6use crate::storage::{BlobStorage, S3BlobStorage};
7use crate::sync::firehose::SequencedEvent;
8use sqlx::PgPool;
9use std::sync::Arc;
10use tokio::sync::broadcast;
11#[derive(Clone)]
12pub struct AppState {
13 pub db: PgPool,
14 pub block_store: PostgresBlockStore,
15 pub blob_store: Arc<dyn BlobStorage>,
16 pub firehose_tx: broadcast::Sender<SequencedEvent>,
17 pub rate_limiters: Arc<RateLimiters>,
18 pub circuit_breakers: Arc<CircuitBreakers>,
19 pub cache: Arc<dyn Cache>,
20 pub distributed_rate_limiter: Arc<dyn DistributedRateLimiter>,
21}
22pub enum RateLimitKind {
23 Login,
24 AccountCreation,
25 PasswordReset,
26 ResetPassword,
27 RefreshSession,
28 OAuthToken,
29 OAuthAuthorize,
30 OAuthPar,
31 OAuthIntrospect,
32 AppPassword,
33 EmailUpdate,
34}
35impl RateLimitKind {
36 fn key_prefix(&self) -> &'static str {
37 match self {
38 Self::Login => "login",
39 Self::AccountCreation => "account_creation",
40 Self::PasswordReset => "password_reset",
41 Self::ResetPassword => "reset_password",
42 Self::RefreshSession => "refresh_session",
43 Self::OAuthToken => "oauth_token",
44 Self::OAuthAuthorize => "oauth_authorize",
45 Self::OAuthPar => "oauth_par",
46 Self::OAuthIntrospect => "oauth_introspect",
47 Self::AppPassword => "app_password",
48 Self::EmailUpdate => "email_update",
49 }
50 }
51 fn limit_and_window_ms(&self) -> (u32, u64) {
52 match self {
53 Self::Login => (10, 60_000),
54 Self::AccountCreation => (10, 3_600_000),
55 Self::PasswordReset => (5, 3_600_000),
56 Self::ResetPassword => (10, 60_000),
57 Self::RefreshSession => (60, 60_000),
58 Self::OAuthToken => (30, 60_000),
59 Self::OAuthAuthorize => (10, 60_000),
60 Self::OAuthPar => (30, 60_000),
61 Self::OAuthIntrospect => (30, 60_000),
62 Self::AppPassword => (10, 60_000),
63 Self::EmailUpdate => (5, 3_600_000),
64 }
65 }
66}
67impl AppState {
68 pub async fn new(db: PgPool) -> Self {
69 AuthConfig::init();
70 let block_store = PostgresBlockStore::new(db.clone());
71 let blob_store = S3BlobStorage::new().await;
72 let firehose_buffer_size: usize = std::env::var("FIREHOSE_BUFFER_SIZE")
73 .ok()
74 .and_then(|v| v.parse().ok())
75 .unwrap_or(10000);
76 let (firehose_tx, _) = broadcast::channel(firehose_buffer_size);
77 let rate_limiters = Arc::new(RateLimiters::new());
78 let circuit_breakers = Arc::new(CircuitBreakers::new());
79 let (cache, distributed_rate_limiter) = create_cache().await;
80 Self {
81 db,
82 block_store,
83 blob_store: Arc::new(blob_store),
84 firehose_tx,
85 rate_limiters,
86 circuit_breakers,
87 cache,
88 distributed_rate_limiter,
89 }
90 }
91 pub fn with_rate_limiters(mut self, rate_limiters: RateLimiters) -> Self {
92 self.rate_limiters = Arc::new(rate_limiters);
93 self
94 }
95 pub fn with_circuit_breakers(mut self, circuit_breakers: CircuitBreakers) -> Self {
96 self.circuit_breakers = Arc::new(circuit_breakers);
97 self
98 }
99 pub async fn check_rate_limit(&self, kind: RateLimitKind, client_ip: &str) -> bool {
100 if std::env::var("DISABLE_RATE_LIMITING").is_ok() {
101 return true;
102 }
103 let key = format!("{}:{}", kind.key_prefix(), client_ip);
104 let limiter_name = kind.key_prefix();
105 let (limit, window_ms) = kind.limit_and_window_ms();
106 if !self.distributed_rate_limiter.check_rate_limit(&key, limit, window_ms).await {
107 crate::metrics::record_rate_limit_rejection(limiter_name);
108 return false;
109 }
110 let limiter = match kind {
111 RateLimitKind::Login => &self.rate_limiters.login,
112 RateLimitKind::AccountCreation => &self.rate_limiters.account_creation,
113 RateLimitKind::PasswordReset => &self.rate_limiters.password_reset,
114 RateLimitKind::ResetPassword => &self.rate_limiters.reset_password,
115 RateLimitKind::RefreshSession => &self.rate_limiters.refresh_session,
116 RateLimitKind::OAuthToken => &self.rate_limiters.oauth_token,
117 RateLimitKind::OAuthAuthorize => &self.rate_limiters.oauth_authorize,
118 RateLimitKind::OAuthPar => &self.rate_limiters.oauth_par,
119 RateLimitKind::OAuthIntrospect => &self.rate_limiters.oauth_introspect,
120 RateLimitKind::AppPassword => &self.rate_limiters.app_password,
121 RateLimitKind::EmailUpdate => &self.rate_limiters.email_update,
122 };
123 let ok = limiter.check_key(&client_ip.to_string()).is_ok();
124 if !ok {
125 crate::metrics::record_rate_limit_rejection(limiter_name);
126 }
127 ok
128 }
129}