Server tools to backfill, tail, mirror, and verify PLC logs
at debug 128 lines 4.2 kB view raw
1use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 2use clap::Parser; 3use reqwest::Url; 4use std::{net::SocketAddr, path::PathBuf}; 5use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 6 7#[derive(Debug, clap::Args)] 8pub struct Args { 9 /// the wrapped did-method-plc server 10 #[arg(long, env = "ALLEGEDLY_WRAP")] 11 wrap: Url, 12 /// the wrapped did-method-plc server's database (write access required) 13 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 wrap_pg: Url, 15 /// path to tls cert for the wrapped postgres db, if needed 16 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 wrap_pg_cert: Option<PathBuf>, 18 /// wrapping server listen address 19 #[arg(short, long, env = "ALLEGEDLY_BIND")] 20 #[clap(default_value = "127.0.0.1:8000")] 21 bind: SocketAddr, 22 /// obtain a certificate from letsencrypt 23 /// 24 /// for now this will force listening on all interfaces at :80 and :443 25 /// (:80 will serve an "https required" error, *will not* redirect) 26 #[arg( 27 long, 28 conflicts_with("bind"), 29 requires("acme_cache_path"), 30 env = "ALLEGEDLY_ACME_DOMAIN" 31 )] 32 acme_domain: Vec<String>, 33 /// which local directory to keep the letsencrypt certs in 34 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 35 acme_cache_path: Option<PathBuf>, 36 /// which public acme directory to use 37 /// 38 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 39 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 acme_directory_url: Url, 42} 43 44pub async fn run( 45 GlobalArgs { upstream }: GlobalArgs, 46 Args { 47 wrap, 48 wrap_pg, 49 wrap_pg_cert, 50 bind, 51 acme_domain, 52 acme_cache_path, 53 acme_directory_url, 54 }: Args, 55) -> anyhow::Result<()> { 56 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 57 58 // TODO: allow starting up with polling backfill from beginning? 59 log::debug!("getting the latest op from the db..."); 60 let latest = db 61 .get_latest() 62 .await? 63 .expect("there to be at least one op in the db. did you backfill?"); 64 65 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 66 (_, false, Some(cache_path)) => { 67 log::info!("configuring acme for https at {acme_domain:?}..."); 68 create_dir_all(&cache_path).await?; 69 ListenConf::Acme { 70 domains: acme_domain, 71 cache_path, 72 directory_url: acme_directory_url.to_string(), 73 } 74 } 75 (bind, true, None) => ListenConf::Bind(bind), 76 (_, _, _) => unreachable!(), 77 }; 78 79 let mut tasks = JoinSet::new(); 80 81 let (send_page, recv_page) = mpsc::channel(8); 82 83 let mut poll_url = upstream.clone(); 84 poll_url.set_path("/export"); 85 86 tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 87 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 88 tasks.spawn(serve(upstream, wrap, listen_conf)); 89 90 while let Some(next) = tasks.join_next().await { 91 match next { 92 Err(e) if e.is_panic() => { 93 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 94 return Err(e.into()); 95 } 96 Err(e) => { 97 log::error!("a joinset task failed to join: {e}"); 98 return Err(e.into()); 99 } 100 Ok(Err(e)) => { 101 log::error!("a joinset task completed with error: {e}"); 102 return Err(e); 103 } 104 Ok(Ok(name)) => { 105 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 106 } 107 } 108 } 109 110 Ok(()) 111} 112 113#[derive(Debug, Parser)] 114struct CliArgs { 115 #[command(flatten)] 116 globals: GlobalArgs, 117 #[command(flatten)] 118 args: Args, 119} 120 121#[allow(dead_code)] 122#[tokio::main] 123async fn main() -> anyhow::Result<()> { 124 let args = CliArgs::parse(); 125 bin_init("mirror"); 126 run(args.globals, args.args).await?; 127 Ok(()) 128}