Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

slingshot: configurable cache sizes #3

closed opened by nekomimi.pet targeting main from nekomimi.pet/microcosm-rs: main

I noticed defaults were hardcoded

./slingshot --jetstream us-east-1 --cache-dir ./foyer \
    --cache-memory-mb 128 \
    --cache-disk-gb 2
Labels

None yet.

Participants 2
AT URI
at://did:plc:ttdrpj45ibqunmfhdsb4zdwq/sh.tangled.repo.pull/3m5zrhmq67x22
+33 -8
Diff #1
+4 -2
slingshot/src/firehose_cache.rs
··· 4 5 pub async fn firehose_cache( 6 cache_dir: impl AsRef<Path>, 7 ) -> Result<HybridCache<String, CachedRecord>, String> { 8 let cache = HybridCacheBuilder::new() 9 .with_name("firehose") 10 - .memory(64 * 2_usize.pow(20)) 11 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 12 .storage(Engine::large()) 13 .with_device_options( 14 DirectFsDeviceOptions::new(cache_dir) 15 - .with_capacity(2_usize.pow(30)) // TODO: configurable (1GB to have something) 16 .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 17 ) 18 .build()
··· 4 5 pub async fn firehose_cache( 6 cache_dir: impl AsRef<Path>, 7 + memory_mb: usize, 8 + disk_gb: usize, 9 ) -> Result<HybridCache<String, CachedRecord>, String> { 10 let cache = HybridCacheBuilder::new() 11 .with_name("firehose") 12 + .memory(memory_mb * 2_usize.pow(20)) 13 .with_weighter(|k: &String, v| k.len() + std::mem::size_of_val(v)) 14 .storage(Engine::large()) 15 .with_device_options( 16 DirectFsDeviceOptions::new(cache_dir) 17 + .with_capacity(disk_gb * 2_usize.pow(30)) 18 .with_file_size(16 * 2_usize.pow(20)), // note: this does limit the max cached item size, warning jumbo records 19 ) 20 .build()
+22 -5
slingshot/src/main.rs
··· 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 /// the domain pointing to this server 29 /// 30 /// if present: ··· 62 63 let args = Args::parse(); 64 65 - if let Err(e) = install_metrics_server() { 66 log::error!("failed to install metrics server: {e:?}"); 67 } else { 68 - log::info!("metrics listening at http://0.0.0.0:8765"); 69 } 70 71 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 83 log::info!("cache dir ready at at {cache_dir:?}."); 84 85 log::info!("setting up firehose cache..."); 86 - let cache = firehose_cache(cache_dir.join("./firehose")).await?; 87 log::info!("firehose cache ready."); 88 89 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); ··· 112 args.domain, 113 args.acme_contact, 114 args.certs, 115 server_shutdown, 116 ) 117 .await?; ··· 172 Ok(()) 173 } 174 175 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 176 log::info!("installing metrics server..."); 177 let host = [0, 0, 0, 0]; 178 - let port = 8765; 179 PrometheusBuilder::new() 180 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 .set_bucket_duration(std::time::Duration::from_secs(300))?
··· 25 /// where to keep disk caches 26 #[arg(long)] 27 cache_dir: PathBuf, 28 + /// memory cache size in MB 29 + #[arg(long, default_value_t = 64)] 30 + cache_memory_mb: usize, 31 + /// disk cache size in GB 32 + #[arg(long, default_value_t = 1)] 33 + cache_disk_gb: usize, 34 + /// port for HTTP server (when not using --domain) 35 + #[arg(long, default_value_t = 3000)] 36 + port: u16, 37 + /// port for metrics/prometheus server 38 + #[arg(long, default_value_t = 8765)] 39 + metrics_port: u16, 40 /// the domain pointing to this server 41 /// 42 /// if present: ··· 74 75 let args = Args::parse(); 76 77 + if let Err(e) = install_metrics_server(args.metrics_port) { 78 log::error!("failed to install metrics server: {e:?}"); 79 } else { 80 + log::info!("metrics listening at http://0.0.0.0:{}", args.metrics_port); 81 } 82 83 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 95 log::info!("cache dir ready at at {cache_dir:?}."); 96 97 log::info!("setting up firehose cache..."); 98 + let cache = firehose_cache( 99 + cache_dir.join("./firehose"), 100 + args.cache_memory_mb, 101 + args.cache_disk_gb, 102 + ) 103 + .await?; 104 log::info!("firehose cache ready."); 105 106 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); ··· 129 args.domain, 130 args.acme_contact, 131 args.certs, 132 + args.port, 133 server_shutdown, 134 ) 135 .await?; ··· 190 Ok(()) 191 } 192 193 + fn install_metrics_server(port: u16) -> Result<(), metrics_exporter_prometheus::BuildError> { 194 log::info!("installing metrics server..."); 195 let host = [0, 0, 0, 0]; 196 PrometheusBuilder::new() 197 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 198 .set_bucket_duration(std::time::Duration::from_secs(300))?
+7 -1
slingshot/src/server.rs
··· 694 domain: Option<String>, 695 acme_contact: Option<String>, 696 certs: Option<PathBuf>, 697 shutdown: CancellationToken, 698 ) -> Result<(), ServerError> { 699 let repo = Arc::new(repo); ··· 752 ) 753 .await 754 } else { 755 - run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 756 } 757 } 758
··· 694 domain: Option<String>, 695 acme_contact: Option<String>, 696 certs: Option<PathBuf>, 697 + port: u16, 698 shutdown: CancellationToken, 699 ) -> Result<(), ServerError> { 700 let repo = Arc::new(repo); ··· 753 ) 754 .await 755 } else { 756 + run( 757 + TcpListener::bind(format!("127.0.0.1:{port}")), 758 + app, 759 + shutdown, 760 + ) 761 + .await 762 } 763 } 764

History

4 rounds 4 comments
sign up or login to add to the discussion
1 commit
expand
slingshot: add configurable cache sizes, host, and ports
expand 4 comments

sorry about the messy history, i ordered the git reset soft and amend wrong

thank you!!!

the cache size config is perfect, host and port close: i've started moving to a bind arg and giving clap the SocketAddr type directly so it gets parsed out early + one less thing to pass around.

bind stuff came in through a different pr that i just picked up to get merged, so i'm going to get these cache sizes in by picking this up and extracting that for merging

thanks for getting it started!

closed without merging
1 commit
expand
slingshot: add configurable cache sizes, host, and ports
expand 0 comments
2 commits
expand
slingshot: configurable cache sizes
slingshot: add configurable port and host to slingshot, configurable port to metrics
expand 0 comments
1 commit
expand
slingshot: configurable cache sizes
expand 0 comments