forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1use spacedust::consumer;
2use spacedust::delay;
3use spacedust::error::MainTaskError;
4use spacedust::removable_delay_queue::removable_delay_queue;
5use spacedust::server;
6
7use clap::Parser;
8use metrics_exporter_prometheus::PrometheusBuilder;
9use std::time::Duration;
10use tokio::sync::broadcast;
11use tokio_util::sync::CancellationToken;
12
13/// Aggregate links in the at-mosphere
14#[derive(Parser, Debug, Clone)]
15#[command(version, about, long_about = None)]
16struct Args {
17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
19 #[arg(long, env = "SPACEDUST_JETSTREAM")]
20 jetstream: String,
21 /// don't request zstd-compressed jetstream events
22 ///
23 /// reduces CPU at the expense of more ingress bandwidth
24 #[arg(long, action, env = "SPACEDUST_JETSTREAM_NO_ZSTD")]
25 jetstream_no_zstd: bool,
26 /// spacedust server's listen address
27 #[arg(long, env = "SPACEDUST_BIND")]
28 #[clap(default_value = "[::]:8080")]
29 bind: std::net::SocketAddr,
30 /// enable metrics collection and serving
31 #[arg(long, action, env = "SPACEDUST_COLLECT_METRICS")]
32 collect_metrics: bool,
33 /// metrics server's listen address
34 #[arg(long, requires("collect_metrics"), env = "SPACEDUST_BIND_METRICS")]
35 #[clap(default_value = "[::]:8765")]
36 bind_metrics: std::net::SocketAddr,
37}
38
39#[tokio::main]
40async fn main() -> Result<(), String> {
41 env_logger::init();
42
43 // tokio broadcast keeps a single main output queue for all subscribers.
44 // each subscriber clones off a copy of an individual value for each recv.
45 // since there's no large per-client buffer, we can make this one kind of
46 // big and accommodate more slow/bursty clients.
47 //
48 // in fact, we *could* even keep lagging clients alive, inserting lag-
49 // indicating messages to their output.... but for now we'll drop them to
50 // avoid accumulating zombies.
51 //
52 // events on the channel are individual links as they are discovered. a link
53 // contains a source and a target. the target is an at-uri, so it's up to
54 // ~1KB max; source is a collection + link path, which can be more but in
55 // practice the whole link rarely approaches 1KB total.
56 //
57 // TODO: determine if a pathological case could blow this up (eg 1MB link
58 // paths + slow subscriber -> 16GiB queue)
59 let (b, _) = broadcast::channel(16_384);
60 let consumer_sender = b.clone();
61 let (d, _) = broadcast::channel(16_384);
62 let consumer_delayed_sender = d.clone();
63
64 let delay = Duration::from_secs(21);
65 let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay);
66
67 let shutdown = CancellationToken::new();
68
69 let ctrlc_shutdown = shutdown.clone();
70 ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
71
72 let args = Args::parse();
73
74 if args.collect_metrics {
75 log::trace!("installing metrics server...");
76 if let Err(e) = install_metrics_server(args.bind_metrics) {
77 log::error!("failed to install metrics server: {e:?}");
78 };
79 }
80
81 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
82
83 let server_shutdown = shutdown.clone();
84 let bind = args.bind;
85 tasks.spawn(async move {
86 server::serve(b, d, server_shutdown, bind).await?;
87 Ok(())
88 });
89
90 let consumer_shutdown = shutdown.clone();
91 tasks.spawn(async move {
92 consumer::consume(
93 consumer_sender,
94 delay_queue_sender,
95 args.jetstream,
96 None,
97 args.jetstream_no_zstd,
98 consumer_shutdown,
99 )
100 .await?;
101 Ok(())
102 });
103
104 let delay_shutdown = shutdown.clone();
105 tasks.spawn(async move {
106 delay::to_broadcast(
107 delay_queue_receiver,
108 consumer_delayed_sender,
109 delay_shutdown,
110 )
111 .await?;
112 Ok(())
113 });
114
115 tokio::select! {
116 _ = shutdown.cancelled() => log::warn!("shutdown requested"),
117 Some(r) = tasks.join_next() => {
118 log::warn!("a task exited, shutting down: {r:?}");
119 shutdown.cancel();
120 }
121 }
122
123 tokio::select! {
124 _ = async {
125 while let Some(completed) = tasks.join_next().await {
126 log::info!("shutdown: task completed: {completed:?}");
127 }
128 } => {},
129 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
130 log::info!("shutdown: not all tasks completed on time. aborting...");
131 tasks.shutdown().await;
132 },
133 }
134
135 log::info!("bye!");
136
137 Ok(())
138}
139
140fn install_metrics_server(
141 bind: std::net::SocketAddr,
142) -> Result<(), metrics_exporter_prometheus::BuildError> {
143 log::info!("installing metrics server...");
144 PrometheusBuilder::new()
145 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
146 .set_bucket_duration(std::time::Duration::from_secs(300))?
147 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
148 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
149 .with_http_listener(bind)
150 .install()?;
151 log::info!("metrics server installed! listening on {bind}");
152 Ok(())
153}