A Rust application to showcase badge awards in the AT Protocol ecosystem.
1use atproto_identity::resolve::{IdentityResolver, InnerIdentityResolver, create_resolver};
2use atproto_jetstream::{CancellationToken, Consumer as JetstreamConsumer, ConsumerTaskConfig};
3use showcase::errors::Result;
4use showcase::http::AppEngine;
5#[cfg(feature = "s3")]
6use showcase::storage::S3FileStorage;
7#[cfg(feature = "s3")]
8use showcase::storage::file_storage::parse_s3_url;
9#[cfg(feature = "postgres")]
10use showcase::storage::{PostgresStorage, PostgresStorageDidDocumentStorage};
11#[cfg(feature = "sqlite")]
12use showcase::storage::{SqliteStorage, SqliteStorageDidDocumentStorage};
13use showcase::{
14 config::Config,
15 consumer::Consumer,
16 http::{AppState, create_router},
17 process::BadgeProcessor,
18 storage::{FileStorage, LocalFileStorage, Storage},
19};
20#[cfg(feature = "sqlite")]
21use sqlx::SqlitePool;
22#[cfg(feature = "postgres")]
23use sqlx::postgres::PgPool;
24use std::{env, sync::Arc};
25use tokio::net::TcpListener;
26use tokio::signal;
27use tokio_util::task::TaskTracker;
28use tracing::{error, info};
29use tracing_subscriber::prelude::*;
30
31#[cfg(feature = "embed")]
32use showcase::templates::build_env;
33
34#[cfg(feature = "reload")]
35use showcase::templates::build_env;
36
37/// Create the appropriate FileStorage implementation based on the storage configuration
38fn create_file_storage(storage_config: &str) -> Result<Arc<dyn FileStorage>> {
39 if storage_config.starts_with("s3://") {
40 #[cfg(feature = "s3")]
41 {
42 tracing::warn!("object storage used");
43
44 let (endpoint, access_key, secret_key, bucket, prefix) = parse_s3_url(storage_config)?;
45 let s3_storage = S3FileStorage::new(endpoint, access_key, secret_key, bucket, prefix)?;
46 Ok(Arc::new(s3_storage))
47 }
48 #[cfg(not(feature = "s3"))]
49 {
50 Err(showcase::errors::ShowcaseError::ConfigFeatureNotEnabled {
51 feature: "S3 storage requested but s3 feature is not enabled".to_string(),
52 })
53 }
54 } else {
55 tracing::warn!("file storage used");
56 // Use local file storage for non-S3 configurations
57 Ok(Arc::new(LocalFileStorage::new(storage_config.to_string())))
58 }
59}
60
61#[tokio::main]
62async fn main() -> Result<()> {
63 // Initialize logging
64 tracing_subscriber::registry()
65 .with(tracing_subscriber::EnvFilter::new(
66 std::env::var("RUST_LOG").unwrap_or_else(|_| "showcase=info,info".into()),
67 ))
68 .with(tracing_subscriber::fmt::layer().pretty())
69 .init();
70
71 // Handle version flag
72 env::args().for_each(|arg| {
73 if arg == "--version" {
74 println!("showcase {}", env!("CARGO_PKG_VERSION"));
75 std::process::exit(0);
76 }
77 });
78
79 // Load configuration
80 let config = Arc::new(Config::from_env()?);
81 info!("Starting Showcase with config");
82
83 // Setup HTTP client
84 let mut client_builder = reqwest::Client::builder();
85 for ca_certificate in &config.certificate_bundles {
86 info!("Loading CA certificate: {:?}", ca_certificate);
87 let cert = std::fs::read(ca_certificate)?;
88 let cert = reqwest::Certificate::from_pem(&cert)?;
89 client_builder = client_builder.add_root_certificate(cert);
90 }
91
92 let http_client = client_builder
93 .user_agent(config.user_agent.clone())
94 .timeout(config.http_client_timeout)
95 .build()?;
96
97 // Setup database based on the database URL and available features
98 let (storage, document_storage): (
99 Arc<dyn Storage>,
100 Arc<dyn atproto_identity::storage::DidDocumentStorage + Send + Sync>,
101 ) = {
102 #[cfg(all(feature = "sqlite", not(feature = "postgres")))]
103 {
104 let pool = SqlitePool::connect(&config.database_url).await?;
105 let storage = Arc::new(SqliteStorage::new(pool));
106 let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone()));
107 (storage, document_storage)
108 }
109
110 #[cfg(all(feature = "postgres", not(feature = "sqlite")))]
111 {
112 let pool = PgPool::connect(&config.database_url).await?;
113 let storage = Arc::new(PostgresStorage::new(pool));
114 let document_storage =
115 Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone()));
116 (storage, document_storage)
117 }
118
119 #[cfg(all(feature = "sqlite", feature = "postgres"))]
120 {
121 // When both features are enabled, determine based on the database URL
122 if config.database_url.starts_with("postgres://")
123 || config.database_url.starts_with("postgresql://")
124 {
125 let pool = PgPool::connect(&config.database_url).await?;
126 let storage = Arc::new(PostgresStorage::new(pool));
127 let document_storage =
128 Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone()));
129 (storage, document_storage)
130 } else {
131 let pool = SqlitePool::connect(&config.database_url).await?;
132 let storage = Arc::new(SqliteStorage::new(pool));
133 let document_storage =
134 Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone()));
135 (storage, document_storage)
136 }
137 }
138 };
139
140 // Run migrations
141 storage.migrate().await?;
142 info!("Database migrations completed");
143
144 // Initialize DNS resolver
145 let nameserver_ips: Vec<std::net::IpAddr> = config
146 .dns_nameservers
147 .iter()
148 .filter_map(|s| s.parse().ok())
149 .collect();
150 let dns_resolver = create_resolver(&nameserver_ips);
151
152 // Initialize identity resolver
153 let identity_resolver = IdentityResolver(Arc::new(InnerIdentityResolver {
154 dns_resolver,
155 http_client: http_client.clone(),
156 plc_hostname: config.plc_hostname.clone(),
157 }));
158
159 // Setup template engine
160 let template_env = {
161 #[cfg(feature = "embed")]
162 {
163 AppEngine::from(build_env(
164 config.external_base.clone(),
165 env!("CARGO_PKG_VERSION").to_string(),
166 ))
167 }
168
169 #[cfg(feature = "reload")]
170 {
171 AppEngine::from(build_env())
172 }
173
174 #[cfg(not(any(feature = "reload", feature = "embed")))]
175 {
176 use minijinja::Environment;
177 let mut env = Environment::new();
178 // Add a simple template for the minimal case
179 env.add_template(
180 "index.html",
181 "<!DOCTYPE html><html><body>Showcase</body></html>",
182 )
183 .unwrap();
184 env.add_template(
185 "identity.html",
186 "<!DOCTYPE html><html><body>Identity</body></html>",
187 )
188 .unwrap();
189 AppEngine::from(env)
190 }
191 };
192
193 // Create file storage for badge images
194 let file_storage = create_file_storage(&config.badge_image_storage)?;
195
196 // Create application state for HTTP server
197 let app_state = AppState {
198 storage: storage.clone(),
199 config: config.clone(),
200 document_storage: document_storage.clone(),
201 identity_resolver: identity_resolver.clone(),
202 template_env,
203 file_storage: file_storage.clone(),
204 };
205
206 // Create HTTP router
207 let app = create_router(app_state);
208
209 // Setup task tracking and cancellation
210 let tracker = TaskTracker::new();
211 let token = CancellationToken::new();
212
213 // Setup signal handling
214 {
215 let tracker = tracker.clone();
216 let inner_token = token.clone();
217
218 let ctrl_c = async {
219 signal::ctrl_c()
220 .await
221 .expect("failed to install Ctrl+C handler");
222 };
223
224 #[cfg(unix)]
225 let terminate = async {
226 signal::unix::signal(signal::unix::SignalKind::terminate())
227 .expect("failed to install signal handler")
228 .recv()
229 .await;
230 };
231
232 #[cfg(not(unix))]
233 let terminate = std::future::pending::<()>();
234
235 tokio::spawn(async move {
236 tokio::select! {
237 () = inner_token.cancelled() => { },
238 _ = terminate => {
239 info!("Received SIGTERM, shutting down");
240 },
241 _ = ctrl_c => {
242 info!("Received Ctrl+C, shutting down");
243 },
244 }
245
246 tracker.close();
247 inner_token.cancel();
248 });
249 }
250
251 // Start HTTP server
252 {
253 let inner_config = config.clone();
254 let http_port = inner_config.http.port;
255 let inner_token = token.clone();
256 tracker.spawn(async move {
257 let bind_address = format!("0.0.0.0:{}", http_port);
258 info!("Starting HTTP server on {}", bind_address);
259 let listener = TcpListener::bind(&bind_address).await.unwrap();
260
261 let shutdown_token = inner_token.clone();
262 let result = axum::serve(listener, app)
263 .with_graceful_shutdown(async move {
264 tokio::select! {
265 () = shutdown_token.cancelled() => { }
266 }
267 info!("HTTP server graceful shutdown complete");
268 })
269 .await;
270
271 if let Err(err) = result {
272 error!("error-showcase-runtime-1 HTTP server task failed: {}", err);
273 }
274
275 inner_token.cancel();
276 });
277 }
278
279 // Start badge processor
280 {
281 let consumer = Consumer {};
282 let (badge_handler, event_receiver) = consumer.create_badge_handler();
283
284 let badge_processor = BadgeProcessor::new(
285 storage.clone(),
286 config.clone(),
287 identity_resolver.clone(),
288 document_storage.clone(),
289 http_client.clone(),
290 file_storage.clone(),
291 );
292
293 let inner_token = token.clone();
294 tracker.spawn(async move {
295 tokio::select! {
296 result = badge_processor.start_processing(event_receiver) => {
297 if let Err(err) = result {
298 error!("error-showcase-runtime-2 Badge processor failed: {}", err);
299 }
300 }
301 () = inner_token.cancelled() => {
302 info!("Badge processor cancelled");
303 }
304 }
305 });
306
307 // Read cursor from file if configured
308 let cursor = if let Some(cursor_path) = &config.jetstream_cursor_path {
309 match tokio::fs::read_to_string(cursor_path).await {
310 Ok(contents) => match contents.trim().parse::<u64>() {
311 Ok(cursor_value) if cursor_value > 1 => {
312 info!("Loaded cursor from {}: {}", cursor_path, cursor_value);
313 Some(cursor_value as i64)
314 }
315 _ => {
316 info!(
317 "Invalid or low cursor value in {}, starting fresh",
318 cursor_path
319 );
320 None
321 }
322 },
323 Err(err) => {
324 info!(
325 "Could not read cursor file {}: {}, starting fresh",
326 cursor_path, err
327 );
328 None
329 }
330 }
331 } else {
332 None
333 };
334
335 // Start Jetstream consumer with reconnect logic
336 let inner_token = token.clone();
337 let inner_config = config.clone();
338 tracker.spawn(async move {
339 let mut disconnect_times = Vec::new();
340 let disconnect_window = std::time::Duration::from_secs(60); // 1 minute window
341 let max_disconnects_per_minute = 1;
342 let reconnect_delay = std::time::Duration::from_secs(5);
343
344 loop {
345 // Create new consumer for each connection attempt
346 let jetstream_config = ConsumerTaskConfig {
347 user_agent: inner_config.user_agent.clone(),
348 compression: false,
349 zstd_dictionary_location: String::new(),
350 jetstream_hostname: "jetstream2.us-east.bsky.network".to_string(),
351 collections: vec!["community.lexicon.badge.award".to_string()],
352 dids: vec![],
353 max_message_size_bytes: Some(10 * 1024 * 1024), // 10MB
354 cursor,
355 require_hello: true,
356 };
357
358 let jetstream_consumer = JetstreamConsumer::new(jetstream_config);
359
360 // Register badge handler
361 if let Err(err) = jetstream_consumer.register_handler(badge_handler.clone()).await {
362 error!("Failed to register badge handler: {}", err);
363 inner_token.cancel();
364 break;
365 }
366
367 // Register cursor writer if configured
368 if let Some(cursor_path) = inner_config.jetstream_cursor_path.clone() {
369 let cursor_writer = consumer.create_cursor_writer_handler(cursor_path);
370 if let Err(err) = jetstream_consumer.register_handler(cursor_writer).await {
371 error!("Failed to register cursor writer: {}", err);
372 inner_token.cancel();
373 break;
374 }
375 }
376
377 tokio::select! {
378 result = jetstream_consumer.run_background(inner_token.clone()) => {
379 if let Err(err) = result {
380 let now = std::time::Instant::now();
381 disconnect_times.push(now);
382
383 // Remove disconnect times older than the window
384 disconnect_times.retain(|&t| now.duration_since(t) <= disconnect_window);
385
386 if disconnect_times.len() > max_disconnects_per_minute {
387 error!(
388 "error-showcase-consumer-3 Jetstream disconnect rate exceeded: {} disconnects in 1 minute, exiting",
389 disconnect_times.len()
390 );
391 inner_token.cancel();
392 break;
393 }
394
395 error!("error-showcase-consumer-2 Jetstream disconnected: {}, reconnecting in {:?}", err, reconnect_delay);
396
397 // Wait before reconnecting
398 tokio::select! {
399 () = tokio::time::sleep(reconnect_delay) => {},
400 () = inner_token.cancelled() => {
401 info!("Jetstream consumer cancelled during reconnect delay");
402 break;
403 }
404 }
405
406 // Continue the loop to reconnect
407 continue;
408 }
409 }
410 () = inner_token.cancelled() => {
411 info!("Jetstream consumer cancelled");
412 break;
413 }
414 }
415
416 // If we reach here, the consumer exited without error (unlikely)
417 info!("Jetstream consumer exited normally");
418 break;
419 }
420 });
421 }
422
423 info!("All services started successfully");
424
425 // Wait for all tasks to complete
426 tracker.wait().await;
427
428 info!("Showcase shutting down");
429 Ok(())
430}