+2
-40
src/main.rs
+2
-40
src/main.rs
···
23
}
24
25
async fn run() -> Result<(), Box<dyn std::error::Error>> {
26
-
let database_url = std::env::var("DATABASE_URL")
27
-
.map_err(|_| "DATABASE_URL environment variable must be set")?;
28
-
29
-
let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
30
-
.ok()
31
-
.and_then(|v| v.parse().ok())
32
-
.unwrap_or(100);
33
-
34
-
let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
35
-
.ok()
36
-
.and_then(|v| v.parse().ok())
37
-
.unwrap_or(10);
38
-
39
-
let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
40
-
.ok()
41
-
.and_then(|v| v.parse().ok())
42
-
.unwrap_or(10);
43
-
44
-
info!(
45
-
"Configuring database pool: max={}, min={}, acquire_timeout={}s",
46
-
max_connections, min_connections, acquire_timeout_secs
47
-
);
48
-
49
-
let pool = sqlx::postgres::PgPoolOptions::new()
50
-
.max_connections(max_connections)
51
-
.min_connections(min_connections)
52
-
.acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
53
-
.idle_timeout(std::time::Duration::from_secs(300))
54
-
.max_lifetime(std::time::Duration::from_secs(1800))
55
-
.connect(&database_url)
56
-
.await
57
-
.map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
58
-
59
-
sqlx::migrate!("./migrations")
60
-
.run(&pool)
61
-
.await
62
-
.map_err(|e| format!("Failed to run migrations: {}", e))?;
63
-
64
-
let state = AppState::new(pool.clone()).await;
65
tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
66
67
let (shutdown_tx, shutdown_rx) = watch::channel(false);
68
69
-
let mut comms_service = CommsService::new(pool);
70
71
if let Some(email_sender) = EmailSender::from_env() {
72
info!("Email comms enabled");
···
23
}
24
25
async fn run() -> Result<(), Box<dyn std::error::Error>> {
26
+
let state = AppState::new().await?;
27
tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
28
29
let (shutdown_tx, shutdown_rx) = watch::channel(false);
30
31
+
let mut comms_service = CommsService::new(state.db.clone());
32
33
if let Some(email_sender) = EmailSender::from_env() {
34
info!("Email comms enabled");
+46
-1
src/state.rs
+46
-1
src/state.rs
···
7
use crate::storage::{BlobStorage, S3BlobStorage};
8
use crate::sync::firehose::SequencedEvent;
9
use sqlx::PgPool;
10
use std::sync::Arc;
11
use tokio::sync::broadcast;
12
···
75
}
76
77
impl AppState {
78
-
pub async fn new(db: PgPool) -> Self {
79
AuthConfig::init();
80
81
let block_store = PostgresBlockStore::new(db.clone());
···
7
use crate::storage::{BlobStorage, S3BlobStorage};
8
use crate::sync::firehose::SequencedEvent;
9
use sqlx::PgPool;
10
+
use std::error::Error;
11
use std::sync::Arc;
12
use tokio::sync::broadcast;
13
···
76
}
77
78
impl AppState {
79
+
pub async fn new() -> Result<Self, Box<dyn Error>> {
80
+
let database_url = std::env::var("DATABASE_URL")
81
+
.map_err(|_| "DATABASE_URL environment variable must be set")?;
82
+
83
+
let max_connections: u32 = std::env::var("DATABASE_MAX_CONNECTIONS")
84
+
.ok()
85
+
.and_then(|v| v.parse().ok())
86
+
.unwrap_or(100);
87
+
88
+
let min_connections: u32 = std::env::var("DATABASE_MIN_CONNECTIONS")
89
+
.ok()
90
+
.and_then(|v| v.parse().ok())
91
+
.unwrap_or(10);
92
+
93
+
let acquire_timeout_secs: u64 = std::env::var("DATABASE_ACQUIRE_TIMEOUT_SECS")
94
+
.ok()
95
+
.and_then(|v| v.parse().ok())
96
+
.unwrap_or(10);
97
+
98
+
tracing::info!(
99
+
"Configuring database pool: max={}, min={}, acquire_timeout={}s",
100
+
max_connections,
101
+
min_connections,
102
+
acquire_timeout_secs
103
+
);
104
+
105
+
let db = sqlx::postgres::PgPoolOptions::new()
106
+
.max_connections(max_connections)
107
+
.min_connections(min_connections)
108
+
.acquire_timeout(std::time::Duration::from_secs(acquire_timeout_secs))
109
+
.idle_timeout(std::time::Duration::from_secs(300))
110
+
.max_lifetime(std::time::Duration::from_secs(1800))
111
+
.connect(&database_url)
112
+
.await
113
+
.map_err(|e| format!("Failed to connect to Postgres: {}", e))?;
114
+
115
+
sqlx::migrate!("./migrations")
116
+
.run(&db)
117
+
.await
118
+
.map_err(|e| format!("Failed to run migrations: {}", e))?;
119
+
120
+
Ok(Self::from_db(db).await)
121
+
}
122
+
123
+
pub async fn from_db(db: PgPool) -> Self {
124
AuthConfig::init();
125
126
let block_store = PostgresBlockStore::new(db.clone());
+3
-1
tests/common/mod.rs
+3
-1
tests/common/mod.rs
···
258
.with_email_update_limit(10000)
259
.with_oauth_authorize_limit(10000)
260
.with_oauth_token_limit(10000);
261
-
let state = AppState::new(pool).await.with_rate_limiters(rate_limiters);
262
tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
263
let app = tranquil_pds::app(state);
264
tokio::spawn(async move {
···
258
.with_email_update_limit(10000)
259
.with_oauth_authorize_limit(10000)
260
.with_oauth_token_limit(10000);
261
+
let state = AppState::from_db(pool)
262
+
.await
263
+
.with_rate_limiters(rate_limiters);
264
tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await;
265
let app = tranquil_pds::app(state);
266
tokio::spawn(async move {