···11+mod consumer;
22+pub mod error;
33+mod firehose_cache;
44+mod record;
55+66+pub use consumer::consume;
77+pub use firehose_cache::firehose_cache;
88+pub use record::CachedRecord;
+107
slingshot/src/main.rs
···11+// use foyer::HybridCache;
22+// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
33+use metrics_exporter_prometheus::PrometheusBuilder;
44+use slingshot::{consume, error::MainTaskError, firehose_cache};
55+66+use clap::Parser;
77+use tokio_util::sync::CancellationToken;
88+99+1010+/// Slingshot record edge cache
1111+#[derive(Parser, Debug, Clone)]
1212+#[command(version, about, long_about = None)]
1313+struct Args {
1414+ /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
1515+ /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
1616+ #[arg(long)]
1717+ jetstream: String,
1818+ /// don't request zstd-compressed jetstream events
1919+ ///
2020+ /// reduces CPU at the expense of more ingress bandwidth
2121+ #[arg(long, action)]
2222+ jetstream_no_zstd: bool,
2323+}
2424+2525+#[tokio::main]
2626+async fn main() -> Result<(), String> {
2727+ env_logger::init();
2828+2929+ let shutdown = CancellationToken::new();
3030+3131+ let ctrlc_shutdown = shutdown.clone();
3232+ ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
3333+3434+ let args = Args::parse();
3535+3636+ if let Err(e) = install_metrics_server() {
3737+ log::error!("failed to install metrics server: {e:?}");
3838+ } else {
3939+ log::info!("metrics listening at http://0.0.0.0:8765");
4040+ }
4141+4242+ log::info!("setting up firehose cache...");
4343+ let cache = firehose_cache("./foyer").await?;
4444+ log::info!("firehose cache ready.");
4545+4646+ let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
4747+4848+ let consumer_shutdown = shutdown.clone();
4949+ tasks.spawn(async move {
5050+ consume(
5151+ args.jetstream,
5252+ None,
5353+ args.jetstream_no_zstd,
5454+ consumer_shutdown,
5555+ cache,
5656+ )
5757+ .await?;
5858+ Ok(())
5959+ });
6060+6161+6262+ tokio::select! {
6363+ _ = shutdown.cancelled() => log::warn!("shutdown requested"),
6464+ Some(r) = tasks.join_next() => {
6565+ log::warn!("a task exited, shutting down: {r:?}");
6666+ shutdown.cancel();
6767+ }
6868+ }
6969+7070+ tokio::select! {
7171+ _ = async {
7272+ while let Some(completed) = tasks.join_next().await {
7373+ log::info!("shutdown: task completed: {completed:?}");
7474+ }
7575+ } => {},
7676+ _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
7777+ log::info!("shutdown: not all tasks completed on time. aborting...");
7878+ tasks.shutdown().await;
7979+ },
8080+ }
8181+8282+ log::info!("bye!");
8383+8484+ Ok(())
8585+}
8686+8787+fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
8888+ log::info!("installing metrics server...");
8989+ let host = [0, 0, 0, 0];
9090+ let port = 8765;
9191+ PrometheusBuilder::new()
9292+ .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
9393+ .set_bucket_duration(std::time::Duration::from_secs(300))?
9494+ .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
9595+ .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
9696+ .with_http_listener((host, port))
9797+ .install()?;
9898+ log::info!(
9999+ "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
100100+ host[0],
101101+ host[1],
102102+ host[2],
103103+ host[3]
104104+ );
105105+ Ok(())
106106+}
107107+
+24
slingshot/src/record.rs
···11+use serde_json::value::RawValue;
22+use serde::{Serialize, Deserialize};
33+44+#[derive(Debug, Serialize, Deserialize)]
55+pub struct RawRecord(String);
66+77+impl From<Box<RawValue>> for RawRecord {
88+ fn from(rv: Box<RawValue>) -> Self {
99+ Self(rv.get().to_string())
1010+ }
1111+}
1212+1313+/// only for use with stored (validated) values, not general strings
1414+impl From<RawRecord> for Box<RawValue> {
1515+ fn from(RawRecord(s): RawRecord) -> Self {
1616+ RawValue::from_string(s).expect("stored string from RawValue to be valid")
1717+ }
1818+}
1919+2020+#[derive(Debug, Serialize, Deserialize)]
2121+pub enum CachedRecord {
2222+ Found(RawRecord),
2323+ Deleted,
2424+}