Server tools to backfill, tail, mirror, and verify PLC logs
at main 187 lines 6.1 kB view raw
1use allegedly::{ 2 Db, ExperimentalConf, ListenConf, 3 bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 logo, pages_to_pg, poll_upstream, serve, 5}; 6use clap::Parser; 7use reqwest::Url; 8use std::{net::SocketAddr, path::PathBuf, time::Duration}; 9use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 10 11#[derive(Debug, clap::Args)] 12pub struct Args { 13 /// the wrapped did-method-plc server 14 #[arg(long, env = "ALLEGEDLY_WRAP")] 15 wrap: Url, 16 /// the wrapped did-method-plc server's database (write access required) 17 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 18 wrap_pg: Option<Url>, 19 /// path to tls cert for the wrapped postgres db, if needed 20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 21 wrap_pg_cert: Option<PathBuf>, 22 /// wrapping server listen address 23 #[arg(short, long, env = "ALLEGEDLY_BIND")] 24 #[clap(default_value = "127.0.0.1:8000")] 25 bind: SocketAddr, 26 /// obtain a certificate from letsencrypt 27 /// 28 /// for now this will force listening on all interfaces at :80 and :443 29 /// (:80 will serve an "https required" error, *will not* redirect) 30 #[arg( 31 long, 32 conflicts_with("bind"), 33 requires("acme_cache_path"), 34 env = "ALLEGEDLY_ACME_DOMAIN" 35 )] 36 acme_domain: Vec<String>, 37 /// which local directory to keep the letsencrypt certs in 38 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 39 acme_cache_path: Option<PathBuf>, 40 /// which public acme directory to use 41 /// 42 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 43 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 44 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 45 acme_directory_url: Url, 46 /// try to listen for ipv6 47 #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 48 acme_ipv6: bool, 49 /// only accept experimental requests at this hostname 50 /// 51 /// a cert will be provisioned for it from letsencrypt. if you're not using 52 /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 53 #[arg( 54 long, 55 requires("acme_domain"), 56 env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 57 )] 58 experimental_acme_domain: Option<String>, 59 /// accept writes! by forwarding them upstream 60 #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 61 experimental_write_upstream: bool, 62} 63 64pub async fn run( 65 GlobalArgs { 66 upstream, 67 upstream_throttle_ms, 68 }: GlobalArgs, 69 Args { 70 wrap, 71 wrap_pg, 72 wrap_pg_cert, 73 bind, 74 acme_domain, 75 acme_cache_path, 76 acme_directory_url, 77 acme_ipv6, 78 experimental_acme_domain, 79 experimental_write_upstream, 80 }: Args, 81 sync: bool, 82) -> anyhow::Result<()> { 83 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 84 (_, false, Some(cache_path)) => { 85 create_dir_all(&cache_path).await?; 86 let mut domains = acme_domain.clone(); 87 if let Some(ref experimental_domain) = experimental_acme_domain { 88 domains.push(experimental_domain.clone()) 89 } 90 log::info!("configuring acme for https at {domains:?}..."); 91 ListenConf::Acme { 92 domains, 93 cache_path, 94 directory_url: acme_directory_url.to_string(), 95 ipv6: acme_ipv6, 96 } 97 } 98 (bind, true, None) => ListenConf::Bind(bind), 99 (_, _, _) => unreachable!(), 100 }; 101 102 let experimental_conf = ExperimentalConf { 103 acme_domain: experimental_acme_domain, 104 write_upstream: experimental_write_upstream, 105 }; 106 107 let mut tasks = JoinSet::new(); 108 109 let db = if sync { 110 let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 "a wrapped reference postgres must be provided to sync" 112 ))?; 113 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 114 115 // TODO: allow starting up with polling backfill from beginning? 116 log::debug!("getting the latest op from the db..."); 117 let latest = db 118 .get_latest() 119 .await? 120 .expect("there to be at least one op in the db. did you backfill?"); 121 122 let (send_page, recv_page) = mpsc::channel(8); 123 124 let mut poll_url = upstream.clone(); 125 poll_url.set_path("/export"); 126 let throttle = Duration::from_millis(upstream_throttle_ms); 127 128 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 Some(db) 131 } else { 132 None 133 }; 134 135 tasks.spawn(serve( 136 upstream, 137 wrap, 138 listen_conf, 139 experimental_conf, 140 db.clone(), 141 )); 142 143 while let Some(next) = tasks.join_next().await { 144 match next { 145 Err(e) if e.is_panic() => { 146 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 147 return Err(e.into()); 148 } 149 Err(e) => { 150 log::error!("a joinset task failed to join: {e}"); 151 return Err(e.into()); 152 } 153 Ok(Err(e)) => { 154 log::error!("a joinset task completed with error: {e}"); 155 return Err(e); 156 } 157 Ok(Ok(name)) => { 158 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 159 } 160 } 161 } 162 163 Ok(()) 164} 165 166#[derive(Debug, Parser)] 167struct CliArgs { 168 #[command(flatten)] 169 globals: GlobalArgs, 170 #[command(flatten)] 171 instrumentation: InstrumentationArgs, 172 #[command(flatten)] 173 args: Args, 174 /// Run the mirror in wrap mode, no upstream synchronization (read-only) 175 #[arg(long, action)] 176 wrap_mode: bool, 177} 178 179#[allow(dead_code)] 180#[tokio::main] 181async fn main() -> anyhow::Result<()> { 182 let args = CliArgs::parse(); 183 bin_init(args.instrumentation.enable_opentelemetry); 184 log::info!("{}", logo("mirror")); 185 run(args.globals, args.args, !args.wrap_mode).await?; 186 Ok(()) 187}