A Rust application to showcase badge awards in the AT Protocol ecosystem.
at main 430 lines 16 kB view raw
1use atproto_identity::resolve::{IdentityResolver, InnerIdentityResolver, create_resolver}; 2use atproto_jetstream::{CancellationToken, Consumer as JetstreamConsumer, ConsumerTaskConfig}; 3use showcase::errors::Result; 4use showcase::http::AppEngine; 5#[cfg(feature = "s3")] 6use showcase::storage::S3FileStorage; 7#[cfg(feature = "s3")] 8use showcase::storage::file_storage::parse_s3_url; 9#[cfg(feature = "postgres")] 10use showcase::storage::{PostgresStorage, PostgresStorageDidDocumentStorage}; 11#[cfg(feature = "sqlite")] 12use showcase::storage::{SqliteStorage, SqliteStorageDidDocumentStorage}; 13use showcase::{ 14 config::Config, 15 consumer::Consumer, 16 http::{AppState, create_router}, 17 process::BadgeProcessor, 18 storage::{FileStorage, LocalFileStorage, Storage}, 19}; 20#[cfg(feature = "sqlite")] 21use sqlx::SqlitePool; 22#[cfg(feature = "postgres")] 23use sqlx::postgres::PgPool; 24use std::{env, sync::Arc}; 25use tokio::net::TcpListener; 26use tokio::signal; 27use tokio_util::task::TaskTracker; 28use tracing::{error, info}; 29use tracing_subscriber::prelude::*; 30 31#[cfg(feature = "embed")] 32use showcase::templates::build_env; 33 34#[cfg(feature = "reload")] 35use showcase::templates::build_env; 36 37/// Create the appropriate FileStorage implementation based on the storage configuration 38fn create_file_storage(storage_config: &str) -> Result<Arc<dyn FileStorage>> { 39 if storage_config.starts_with("s3://") { 40 #[cfg(feature = "s3")] 41 { 42 tracing::warn!("object storage used"); 43 44 let (endpoint, access_key, secret_key, bucket, prefix) = parse_s3_url(storage_config)?; 45 let s3_storage = S3FileStorage::new(endpoint, access_key, secret_key, bucket, prefix)?; 46 Ok(Arc::new(s3_storage)) 47 } 48 #[cfg(not(feature = "s3"))] 49 { 50 Err(showcase::errors::ShowcaseError::ConfigFeatureNotEnabled { 51 feature: "S3 storage requested but s3 feature is not enabled".to_string(), 52 }) 53 } 54 } else { 55 tracing::warn!("file storage used"); 56 // Use local file storage for non-S3 configurations 57 Ok(Arc::new(LocalFileStorage::new(storage_config.to_string()))) 58 } 59} 60 61#[tokio::main] 62async fn main() -> Result<()> { 63 // Initialize logging 64 tracing_subscriber::registry() 65 .with(tracing_subscriber::EnvFilter::new( 66 std::env::var("RUST_LOG").unwrap_or_else(|_| "showcase=info,info".into()), 67 )) 68 .with(tracing_subscriber::fmt::layer().pretty()) 69 .init(); 70 71 // Handle version flag 72 env::args().for_each(|arg| { 73 if arg == "--version" { 74 println!("showcase {}", env!("CARGO_PKG_VERSION")); 75 std::process::exit(0); 76 } 77 }); 78 79 // Load configuration 80 let config = Arc::new(Config::from_env()?); 81 info!("Starting Showcase with config"); 82 83 // Setup HTTP client 84 let mut client_builder = reqwest::Client::builder(); 85 for ca_certificate in &config.certificate_bundles { 86 info!("Loading CA certificate: {:?}", ca_certificate); 87 let cert = std::fs::read(ca_certificate)?; 88 let cert = reqwest::Certificate::from_pem(&cert)?; 89 client_builder = client_builder.add_root_certificate(cert); 90 } 91 92 let http_client = client_builder 93 .user_agent(config.user_agent.clone()) 94 .timeout(config.http_client_timeout) 95 .build()?; 96 97 // Setup database based on the database URL and available features 98 let (storage, document_storage): ( 99 Arc<dyn Storage>, 100 Arc<dyn atproto_identity::storage::DidDocumentStorage + Send + Sync>, 101 ) = { 102 #[cfg(all(feature = "sqlite", not(feature = "postgres")))] 103 { 104 let pool = SqlitePool::connect(&config.database_url).await?; 105 let storage = Arc::new(SqliteStorage::new(pool)); 106 let document_storage = Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); 107 (storage, document_storage) 108 } 109 110 #[cfg(all(feature = "postgres", not(feature = "sqlite")))] 111 { 112 let pool = PgPool::connect(&config.database_url).await?; 113 let storage = Arc::new(PostgresStorage::new(pool)); 114 let document_storage = 115 Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone())); 116 (storage, document_storage) 117 } 118 119 #[cfg(all(feature = "sqlite", feature = "postgres"))] 120 { 121 // When both features are enabled, determine based on the database URL 122 if config.database_url.starts_with("postgres://") 123 || config.database_url.starts_with("postgresql://") 124 { 125 let pool = PgPool::connect(&config.database_url).await?; 126 let storage = Arc::new(PostgresStorage::new(pool)); 127 let document_storage = 128 Arc::new(PostgresStorageDidDocumentStorage::new(storage.clone())); 129 (storage, document_storage) 130 } else { 131 let pool = SqlitePool::connect(&config.database_url).await?; 132 let storage = Arc::new(SqliteStorage::new(pool)); 133 let document_storage = 134 Arc::new(SqliteStorageDidDocumentStorage::new(storage.clone())); 135 (storage, document_storage) 136 } 137 } 138 }; 139 140 // Run migrations 141 storage.migrate().await?; 142 info!("Database migrations completed"); 143 144 // Initialize DNS resolver 145 let nameserver_ips: Vec<std::net::IpAddr> = config 146 .dns_nameservers 147 .iter() 148 .filter_map(|s| s.parse().ok()) 149 .collect(); 150 let dns_resolver = create_resolver(&nameserver_ips); 151 152 // Initialize identity resolver 153 let identity_resolver = IdentityResolver(Arc::new(InnerIdentityResolver { 154 dns_resolver, 155 http_client: http_client.clone(), 156 plc_hostname: config.plc_hostname.clone(), 157 })); 158 159 // Setup template engine 160 let template_env = { 161 #[cfg(feature = "embed")] 162 { 163 AppEngine::from(build_env( 164 config.external_base.clone(), 165 env!("CARGO_PKG_VERSION").to_string(), 166 )) 167 } 168 169 #[cfg(feature = "reload")] 170 { 171 AppEngine::from(build_env()) 172 } 173 174 #[cfg(not(any(feature = "reload", feature = "embed")))] 175 { 176 use minijinja::Environment; 177 let mut env = Environment::new(); 178 // Add a simple template for the minimal case 179 env.add_template( 180 "index.html", 181 "<!DOCTYPE html><html><body>Showcase</body></html>", 182 ) 183 .unwrap(); 184 env.add_template( 185 "identity.html", 186 "<!DOCTYPE html><html><body>Identity</body></html>", 187 ) 188 .unwrap(); 189 AppEngine::from(env) 190 } 191 }; 192 193 // Create file storage for badge images 194 let file_storage = create_file_storage(&config.badge_image_storage)?; 195 196 // Create application state for HTTP server 197 let app_state = AppState { 198 storage: storage.clone(), 199 config: config.clone(), 200 document_storage: document_storage.clone(), 201 identity_resolver: identity_resolver.clone(), 202 template_env, 203 file_storage: file_storage.clone(), 204 }; 205 206 // Create HTTP router 207 let app = create_router(app_state); 208 209 // Setup task tracking and cancellation 210 let tracker = TaskTracker::new(); 211 let token = CancellationToken::new(); 212 213 // Setup signal handling 214 { 215 let tracker = tracker.clone(); 216 let inner_token = token.clone(); 217 218 let ctrl_c = async { 219 signal::ctrl_c() 220 .await 221 .expect("failed to install Ctrl+C handler"); 222 }; 223 224 #[cfg(unix)] 225 let terminate = async { 226 signal::unix::signal(signal::unix::SignalKind::terminate()) 227 .expect("failed to install signal handler") 228 .recv() 229 .await; 230 }; 231 232 #[cfg(not(unix))] 233 let terminate = std::future::pending::<()>(); 234 235 tokio::spawn(async move { 236 tokio::select! { 237 () = inner_token.cancelled() => { }, 238 _ = terminate => { 239 info!("Received SIGTERM, shutting down"); 240 }, 241 _ = ctrl_c => { 242 info!("Received Ctrl+C, shutting down"); 243 }, 244 } 245 246 tracker.close(); 247 inner_token.cancel(); 248 }); 249 } 250 251 // Start HTTP server 252 { 253 let inner_config = config.clone(); 254 let http_port = inner_config.http.port; 255 let inner_token = token.clone(); 256 tracker.spawn(async move { 257 let bind_address = format!("0.0.0.0:{}", http_port); 258 info!("Starting HTTP server on {}", bind_address); 259 let listener = TcpListener::bind(&bind_address).await.unwrap(); 260 261 let shutdown_token = inner_token.clone(); 262 let result = axum::serve(listener, app) 263 .with_graceful_shutdown(async move { 264 tokio::select! { 265 () = shutdown_token.cancelled() => { } 266 } 267 info!("HTTP server graceful shutdown complete"); 268 }) 269 .await; 270 271 if let Err(err) = result { 272 error!("error-showcase-runtime-1 HTTP server task failed: {}", err); 273 } 274 275 inner_token.cancel(); 276 }); 277 } 278 279 // Start badge processor 280 { 281 let consumer = Consumer {}; 282 let (badge_handler, event_receiver) = consumer.create_badge_handler(); 283 284 let badge_processor = BadgeProcessor::new( 285 storage.clone(), 286 config.clone(), 287 identity_resolver.clone(), 288 document_storage.clone(), 289 http_client.clone(), 290 file_storage.clone(), 291 ); 292 293 let inner_token = token.clone(); 294 tracker.spawn(async move { 295 tokio::select! { 296 result = badge_processor.start_processing(event_receiver) => { 297 if let Err(err) = result { 298 error!("error-showcase-runtime-2 Badge processor failed: {}", err); 299 } 300 } 301 () = inner_token.cancelled() => { 302 info!("Badge processor cancelled"); 303 } 304 } 305 }); 306 307 // Read cursor from file if configured 308 let cursor = if let Some(cursor_path) = &config.jetstream_cursor_path { 309 match tokio::fs::read_to_string(cursor_path).await { 310 Ok(contents) => match contents.trim().parse::<u64>() { 311 Ok(cursor_value) if cursor_value > 1 => { 312 info!("Loaded cursor from {}: {}", cursor_path, cursor_value); 313 Some(cursor_value as i64) 314 } 315 _ => { 316 info!( 317 "Invalid or low cursor value in {}, starting fresh", 318 cursor_path 319 ); 320 None 321 } 322 }, 323 Err(err) => { 324 info!( 325 "Could not read cursor file {}: {}, starting fresh", 326 cursor_path, err 327 ); 328 None 329 } 330 } 331 } else { 332 None 333 }; 334 335 // Start Jetstream consumer with reconnect logic 336 let inner_token = token.clone(); 337 let inner_config = config.clone(); 338 tracker.spawn(async move { 339 let mut disconnect_times = Vec::new(); 340 let disconnect_window = std::time::Duration::from_secs(60); // 1 minute window 341 let max_disconnects_per_minute = 1; 342 let reconnect_delay = std::time::Duration::from_secs(5); 343 344 loop { 345 // Create new consumer for each connection attempt 346 let jetstream_config = ConsumerTaskConfig { 347 user_agent: inner_config.user_agent.clone(), 348 compression: false, 349 zstd_dictionary_location: String::new(), 350 jetstream_hostname: "jetstream2.us-east.bsky.network".to_string(), 351 collections: vec!["community.lexicon.badge.award".to_string()], 352 dids: vec![], 353 max_message_size_bytes: Some(10 * 1024 * 1024), // 10MB 354 cursor, 355 require_hello: true, 356 }; 357 358 let jetstream_consumer = JetstreamConsumer::new(jetstream_config); 359 360 // Register badge handler 361 if let Err(err) = jetstream_consumer.register_handler(badge_handler.clone()).await { 362 error!("Failed to register badge handler: {}", err); 363 inner_token.cancel(); 364 break; 365 } 366 367 // Register cursor writer if configured 368 if let Some(cursor_path) = inner_config.jetstream_cursor_path.clone() { 369 let cursor_writer = consumer.create_cursor_writer_handler(cursor_path); 370 if let Err(err) = jetstream_consumer.register_handler(cursor_writer).await { 371 error!("Failed to register cursor writer: {}", err); 372 inner_token.cancel(); 373 break; 374 } 375 } 376 377 tokio::select! { 378 result = jetstream_consumer.run_background(inner_token.clone()) => { 379 if let Err(err) = result { 380 let now = std::time::Instant::now(); 381 disconnect_times.push(now); 382 383 // Remove disconnect times older than the window 384 disconnect_times.retain(|&t| now.duration_since(t) <= disconnect_window); 385 386 if disconnect_times.len() > max_disconnects_per_minute { 387 error!( 388 "error-showcase-consumer-3 Jetstream disconnect rate exceeded: {} disconnects in 1 minute, exiting", 389 disconnect_times.len() 390 ); 391 inner_token.cancel(); 392 break; 393 } 394 395 error!("error-showcase-consumer-2 Jetstream disconnected: {}, reconnecting in {:?}", err, reconnect_delay); 396 397 // Wait before reconnecting 398 tokio::select! { 399 () = tokio::time::sleep(reconnect_delay) => {}, 400 () = inner_token.cancelled() => { 401 info!("Jetstream consumer cancelled during reconnect delay"); 402 break; 403 } 404 } 405 406 // Continue the loop to reconnect 407 continue; 408 } 409 } 410 () = inner_token.cancelled() => { 411 info!("Jetstream consumer cancelled"); 412 break; 413 } 414 } 415 416 // If we reach here, the consumer exited without error (unlikely) 417 info!("Jetstream consumer exited normally"); 418 break; 419 } 420 }); 421 } 422 423 info!("All services started successfully"); 424 425 // Wait for all tasks to complete 426 tracker.wait().await; 427 428 info!("Showcase shutting down"); 429 Ok(()) 430}