use atproto_identity::resolve::{IdentityResolver, InnerIdentityResolver, create_resolver}; use atproto_jetstream::{CancellationToken, Consumer as JetstreamConsumer, ConsumerTaskConfig}; use showcase::errors::Result; use showcase::http::AppEngine; #[cfg(feature = "s3")] use showcase::storage::S3FileStorage; #[cfg(feature = "s3")] use showcase::storage::file_storage::parse_s3_url; #[cfg(feature = "postgres")] use showcase::storage::{PostgresStorage, PostgresStorageDidDocumentStorage}; #[cfg(feature = "sqlite")] use showcase::storage::{SqliteStorage, SqliteStorageDidDocumentStorage}; use showcase::{ config::Config, consumer::Consumer, http::{AppState, create_router}, process::BadgeProcessor, storage::{FileStorage, LocalFileStorage, Storage}, }; #[cfg(feature = "sqlite")] use sqlx::SqlitePool; #[cfg(feature = "postgres")] use sqlx::postgres::PgPool; use std::{env, sync::Arc}; use tokio::net::TcpListener; use tokio::signal; use tokio_util::task::TaskTracker; use tracing::{error, info}; use tracing_subscriber::prelude::*; #[cfg(feature = "embed")] use showcase::templates::build_env; #[cfg(feature = "reload")] use showcase::templates::build_env; /// Create the appropriate FileStorage implementation based on the storage configuration fn create_file_storage(storage_config: &str) -> Result> { if storage_config.starts_with("s3://") { #[cfg(feature = "s3")] { tracing::warn!("object storage used"); let (endpoint, access_key, secret_key, bucket, prefix) = parse_s3_url(storage_config)?; let s3_storage = S3FileStorage::new(endpoint, access_key, secret_key, bucket, prefix)?; Ok(Arc::new(s3_storage)) } #[cfg(not(feature = "s3"))] { Err(showcase::errors::ShowcaseError::ConfigFeatureNotEnabled { feature: "S3 storage requested but s3 feature is not enabled".to_string(), }) } } else { tracing::warn!("file storage used"); // Use local file storage for non-S3 configurations Ok(Arc::new(LocalFileStorage::new(storage_config.to_string()))) } } #[tokio::main] async fn main() -> Result<()> { // Initialize logging tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "showcase=info,info".into()), )) .with(tracing_subscriber::fmt::layer().pretty()) .init(); // Handle version flag env::args().for_each(|arg| { if arg == "--version" { println!("showcase {}", env!("CARGO_PKG_VERSION")); std::process::exit(0); } }); // Load configuration let config = Arc::new(Config::from_env()?); info!("Starting Showcase with config"); // Setup HTTP client let mut client_builder = reqwest::Client::builder(); for ca_certificate in &config.certificate_bundles { info!("Loading CA certificate: {:?}", ca_certificate); let cert = std::fs::read(ca_certificate)?; let cert = reqwest::Certificate::from_pem(&cert)?; client_builder = client_builder.add_root_certificate(cert); } let http_client = client_builder .user_agent(config.user_agent.clone()) .timeout(config.http_client_timeout) .build()?; // Setup database based on the database URL and available features let (storage, document_storage): ( Arc, Arc, ) = { #[cfg(all(feature = "sqlite", not(feature = "postgres")))] { let pool = SqlitePool::connect(&config.database_url).await?; let storage = Arc::new(SqliteStorage::new(pool)); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); (storage, document_storage) } #[cfg(all(feature = "postgres", not(feature = "sqlite")))] { let pool = PgPool::connect(&config.database_url).await?; let storage = Arc::new(PostgresStorage::new(pool)); let document_storage = Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone())); (storage, document_storage) } #[cfg(all(feature = "sqlite", feature = "postgres"))] { // When both features are enabled, determine based on the database URL if config.database_url.starts_with("postgres://") || config.database_url.starts_with("postgresql://") { let pool = PgPool::connect(&config.database_url).await?; let storage = Arc::new(PostgresStorage::new(pool)); let document_storage = Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone())); (storage, document_storage) } else { let pool = SqlitePool::connect(&config.database_url).await?; let storage = Arc::new(SqliteStorage::new(pool)); let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); (storage, document_storage) } } }; // Run migrations storage.migrate().await?; info!("Database migrations completed"); // Initialize DNS resolver let nameserver_ips: Vec = config .dns_nameservers .iter() .filter_map(|s| s.parse().ok()) .collect(); let dns_resolver = create_resolver(&nameserver_ips); // Initialize identity resolver let identity_resolver = IdentityResolver(Arc::new(InnerIdentityResolver { dns_resolver, http_client: http_client.clone(), plc_hostname: config.plc_hostname.clone(), })); // Setup template engine let template_env = { #[cfg(feature = "embed")] { AppEngine::from(build_env( config.external_base.clone(), env!("CARGO_PKG_VERSION").to_string(), )) } #[cfg(feature = "reload")] { AppEngine::from(build_env()) } #[cfg(not(any(feature = "reload", feature = "embed")))] { use minijinja::Environment; let mut env = Environment::new(); // Add a simple template for the minimal case env.add_template( "index.html", "Showcase", ) .unwrap(); env.add_template( "identity.html", "Identity", ) .unwrap(); AppEngine::from(env) } }; // Create file storage for badge images let file_storage = create_file_storage(&config.badge_image_storage)?; // Create application state for HTTP server let app_state = AppState { storage: storage.clone(), config: config.clone(), document_storage: document_storage.clone(), identity_resolver: identity_resolver.clone(), template_env, file_storage: file_storage.clone(), }; // Create HTTP router let app = create_router(app_state); // Setup task tracking and cancellation let tracker = TaskTracker::new(); let token = CancellationToken::new(); // Setup signal handling { let tracker = tracker.clone(); let inner_token = token.clone(); let ctrl_c = async { signal::ctrl_c() .await .expect("failed to install Ctrl+C handler"); }; #[cfg(unix)] let terminate = async { signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("failed to install signal handler") .recv() .await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::spawn(async move { tokio::select! { () = inner_token.cancelled() => { }, _ = terminate => { info!("Received SIGTERM, shutting down"); }, _ = ctrl_c => { info!("Received Ctrl+C, shutting down"); }, } tracker.close(); inner_token.cancel(); }); } // Start HTTP server { let inner_config = config.clone(); let http_port = inner_config.http.port; let inner_token = token.clone(); tracker.spawn(async move { let bind_address = format!("0.0.0.0:{}", http_port); info!("Starting HTTP server on {}", bind_address); let listener = TcpListener::bind(&bind_address).await.unwrap(); let shutdown_token = inner_token.clone(); let result = axum::serve(listener, app) .with_graceful_shutdown(async move { tokio::select! { () = shutdown_token.cancelled() => { } } info!("HTTP server graceful shutdown complete"); }) .await; if let Err(err) = result { error!("error-showcase-runtime-1 HTTP server task failed: {}", err); } inner_token.cancel(); }); } // Start badge processor { let consumer = Consumer {}; let (badge_handler, event_receiver) = consumer.create_badge_handler(); let badge_processor = BadgeProcessor::new( storage.clone(), config.clone(), identity_resolver.clone(), document_storage.clone(), http_client.clone(), file_storage.clone(), ); let inner_token = token.clone(); tracker.spawn(async move { tokio::select! { result = badge_processor.start_processing(event_receiver) => { if let Err(err) = result { error!("error-showcase-runtime-2 Badge processor failed: {}", err); } } () = inner_token.cancelled() => { info!("Badge processor cancelled"); } } }); // Read cursor from file if configured let cursor = if let Some(cursor_path) = &config.jetstream_cursor_path { match tokio::fs::read_to_string(cursor_path).await { Ok(contents) => match contents.trim().parse::() { Ok(cursor_value) if cursor_value > 1 => { info!("Loaded cursor from {}: {}", cursor_path, cursor_value); Some(cursor_value as i64) } _ => { info!( "Invalid or low cursor value in {}, starting fresh", cursor_path ); None } }, Err(err) => { info!( "Could not read cursor file {}: {}, starting fresh", cursor_path, err ); None } } } else { None }; // Start Jetstream consumer with reconnect logic let inner_token = token.clone(); let inner_config = config.clone(); tracker.spawn(async move { let mut disconnect_times = Vec::new(); let disconnect_window = std::time::Duration::from_secs(60); // 1 minute window let max_disconnects_per_minute = 1; let reconnect_delay = std::time::Duration::from_secs(5); loop { // Create new consumer for each connection attempt let jetstream_config = ConsumerTaskConfig { user_agent: inner_config.user_agent.clone(), compression: false, zstd_dictionary_location: String::new(), jetstream_hostname: "jetstream2.us-east.bsky.network".to_string(), collections: vec!["community.lexicon.badge.award".to_string()], dids: vec![], max_message_size_bytes: Some(10 * 1024 * 1024), // 10MB cursor, require_hello: true, }; let jetstream_consumer = JetstreamConsumer::new(jetstream_config); // Register badge handler if let Err(err) = jetstream_consumer.register_handler(badge_handler.clone()).await { error!("Failed to register badge handler: {}", err); inner_token.cancel(); break; } // Register cursor writer if configured if let Some(cursor_path) = inner_config.jetstream_cursor_path.clone() { let cursor_writer = consumer.create_cursor_writer_handler(cursor_path); if let Err(err) = jetstream_consumer.register_handler(cursor_writer).await { error!("Failed to register cursor writer: {}", err); inner_token.cancel(); break; } } tokio::select! { result = jetstream_consumer.run_background(inner_token.clone()) => { if let Err(err) = result { let now = std::time::Instant::now(); disconnect_times.push(now); // Remove disconnect times older than the window disconnect_times.retain(|&t| now.duration_since(t) <= disconnect_window); if disconnect_times.len() > max_disconnects_per_minute { error!( "error-showcase-consumer-3 Jetstream disconnect rate exceeded: {} disconnects in 1 minute, exiting", disconnect_times.len() ); inner_token.cancel(); break; } error!("error-showcase-consumer-2 Jetstream disconnected: {}, reconnecting in {:?}", err, reconnect_delay); // Wait before reconnecting tokio::select! { () = tokio::time::sleep(reconnect_delay) => {}, () = inner_token.cancelled() => { info!("Jetstream consumer cancelled during reconnect delay"); break; } } // Continue the loop to reconnect continue; } } () = inner_token.cancelled() => { info!("Jetstream consumer cancelled"); break; } } // If we reach here, the consumer exited without error (unlikely) info!("Jetstream consumer exited normally"); break; } }); } info!("All services started successfully"); // Wait for all tasks to complete tracker.wait().await; info!("Showcase shutting down"); Ok(()) }