Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
at main 153 lines 5.4 kB view raw
1use spacedust::consumer; 2use spacedust::delay; 3use spacedust::error::MainTaskError; 4use spacedust::removable_delay_queue::removable_delay_queue; 5use spacedust::server; 6 7use clap::Parser; 8use metrics_exporter_prometheus::PrometheusBuilder; 9use std::time::Duration; 10use tokio::sync::broadcast; 11use tokio_util::sync::CancellationToken; 12 13/// Aggregate links in the at-mosphere 14#[derive(Parser, Debug, Clone)] 15#[command(version, about, long_about = None)] 16struct Args { 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 #[arg(long, env = "SPACEDUST_JETSTREAM")] 20 jetstream: String, 21 /// don't request zstd-compressed jetstream events 22 /// 23 /// reduces CPU at the expense of more ingress bandwidth 24 #[arg(long, action, env = "SPACEDUST_JETSTREAM_NO_ZSTD")] 25 jetstream_no_zstd: bool, 26 /// spacedust server's listen address 27 #[arg(long, env = "SPACEDUST_BIND")] 28 #[clap(default_value = "[::]:8080")] 29 bind: std::net::SocketAddr, 30 /// enable metrics collection and serving 31 #[arg(long, action, env = "SPACEDUST_COLLECT_METRICS")] 32 collect_metrics: bool, 33 /// metrics server's listen address 34 #[arg(long, requires("collect_metrics"), env = "SPACEDUST_BIND_METRICS")] 35 #[clap(default_value = "[::]:8765")] 36 bind_metrics: std::net::SocketAddr, 37} 38 39#[tokio::main] 40async fn main() -> Result<(), String> { 41 env_logger::init(); 42 43 // tokio broadcast keeps a single main output queue for all subscribers. 44 // each subscriber clones off a copy of an individual value for each recv. 45 // since there's no large per-client buffer, we can make this one kind of 46 // big and accommodate more slow/bursty clients. 47 // 48 // in fact, we *could* even keep lagging clients alive, inserting lag- 49 // indicating messages to their output.... but for now we'll drop them to 50 // avoid accumulating zombies. 51 // 52 // events on the channel are individual links as they are discovered. a link 53 // contains a source and a target. the target is an at-uri, so it's up to 54 // ~1KB max; source is a collection + link path, which can be more but in 55 // practice the whole link rarely approaches 1KB total. 56 // 57 // TODO: determine if a pathological case could blow this up (eg 1MB link 58 // paths + slow subscriber -> 16GiB queue) 59 let (b, _) = broadcast::channel(16_384); 60 let consumer_sender = b.clone(); 61 let (d, _) = broadcast::channel(16_384); 62 let consumer_delayed_sender = d.clone(); 63 64 let delay = Duration::from_secs(21); 65 let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay); 66 67 let shutdown = CancellationToken::new(); 68 69 let ctrlc_shutdown = shutdown.clone(); 70 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler"); 71 72 let args = Args::parse(); 73 74 if args.collect_metrics { 75 log::trace!("installing metrics server..."); 76 if let Err(e) = install_metrics_server(args.bind_metrics) { 77 log::error!("failed to install metrics server: {e:?}"); 78 }; 79 } 80 81 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 82 83 let server_shutdown = shutdown.clone(); 84 let bind = args.bind; 85 tasks.spawn(async move { 86 server::serve(b, d, server_shutdown, bind).await?; 87 Ok(()) 88 }); 89 90 let consumer_shutdown = shutdown.clone(); 91 tasks.spawn(async move { 92 consumer::consume( 93 consumer_sender, 94 delay_queue_sender, 95 args.jetstream, 96 None, 97 args.jetstream_no_zstd, 98 consumer_shutdown, 99 ) 100 .await?; 101 Ok(()) 102 }); 103 104 let delay_shutdown = shutdown.clone(); 105 tasks.spawn(async move { 106 delay::to_broadcast( 107 delay_queue_receiver, 108 consumer_delayed_sender, 109 delay_shutdown, 110 ) 111 .await?; 112 Ok(()) 113 }); 114 115 tokio::select! { 116 _ = shutdown.cancelled() => log::warn!("shutdown requested"), 117 Some(r) = tasks.join_next() => { 118 log::warn!("a task exited, shutting down: {r:?}"); 119 shutdown.cancel(); 120 } 121 } 122 123 tokio::select! { 124 _ = async { 125 while let Some(completed) = tasks.join_next().await { 126 log::info!("shutdown: task completed: {completed:?}"); 127 } 128 } => {}, 129 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { 130 log::info!("shutdown: not all tasks completed on time. aborting..."); 131 tasks.shutdown().await; 132 }, 133 } 134 135 log::info!("bye!"); 136 137 Ok(()) 138} 139 140fn install_metrics_server( 141 bind: std::net::SocketAddr, 142) -> Result<(), metrics_exporter_prometheus::BuildError> { 143 log::info!("installing metrics server..."); 144 PrometheusBuilder::new() 145 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 146 .set_bucket_duration(std::time::Duration::from_secs(300))? 147 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 148 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 149 .with_http_listener(bind) 150 .install()?; 151 log::info!("metrics server installed! listening on {bind}"); 152 Ok(()) 153}