Server tools to backfill, tail, mirror, and verify PLC logs
at main 139 lines 4.7 kB view raw
1use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 3use clap::{CommandFactory, Parser, Subcommand}; 4use std::{path::PathBuf, time::Duration, time::Instant}; 5use tokio::fs::create_dir_all; 6use tokio::sync::mpsc; 7 8mod backfill; 9mod mirror; 10 11#[derive(Debug, Parser)] 12struct Cli { 13 #[command(flatten)] 14 globals: GlobalArgs, 15 16 #[command(subcommand)] 17 command: Commands, 18} 19 20#[derive(Debug, Subcommand)] 21enum Commands { 22 /// Use weekly bundled ops to get a complete directory mirror FAST 23 Backfill { 24 #[command(flatten)] 25 args: backfill::Args, 26 }, 27 /// Scrape a PLC server, collecting ops into weekly bundles 28 /// 29 /// Bundles are gzipped files named `<WEEK>.jsonl.gz` where WEEK is a unix 30 /// timestamp rounded down to a multiple of 604,800 (one week in seconds). 31 /// 32 /// Will stop by default at floor((now - 73hrs) / one week) * one week. PLC 33 /// operations can be invalidated within 72 hrs, so stopping before that 34 /// time ensures that the bundles are (hopefully) immutable. 35 Bundle { 36 /// Where to save the bundled files 37 #[arg(short, long)] 38 #[clap(default_value = "./weekly/")] 39 dest: PathBuf, 40 /// Start the export from this time. Should be a week boundary. 41 #[arg(short, long)] 42 #[clap(default_value = "2022-11-17T00:00:00Z")] 43 after: Dt, 44 /// Overwrite existing files, if present 45 #[arg(long, action)] 46 clobber: bool, 47 }, 48 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 49 Mirror { 50 #[command(flatten)] 51 args: mirror::Args, 52 #[command(flatten)] 53 instrumentation: InstrumentationArgs, 54 }, 55 /// Wrap any did-method-plc server, without syncing upstream (read-only) 56 Wrap { 57 #[command(flatten)] 58 args: mirror::Args, 59 #[command(flatten)] 60 instrumentation: InstrumentationArgs, 61 }, 62 /// Poll an upstream PLC server and log new ops to stdout 63 Tail { 64 /// Begin tailing from a specific timestamp for replay or wait-until 65 #[arg(short, long)] 66 after: Option<Dt>, 67 }, 68} 69 70impl Commands { 71 fn enable_otel(&self) -> bool { 72 match self { 73 Commands::Mirror { 74 instrumentation, .. 75 } 76 | Commands::Wrap { 77 instrumentation, .. 78 } => instrumentation.enable_opentelemetry, 79 _ => false, 80 } 81 } 82} 83 84#[tokio::main] 85async fn main() -> anyhow::Result<()> { 86 let args = Cli::parse(); 87 let matches = Cli::command().get_matches(); 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 89 bin_init(args.command.enable_otel()); 90 log::info!("{}", logo(name)); 91 92 let globals = args.globals.clone(); 93 94 let t0 = Instant::now(); 95 match args.command { 96 Commands::Backfill { args } => backfill::run(globals, args).await?, 97 Commands::Bundle { 98 dest, 99 after, 100 clobber, 101 } => { 102 let mut url = globals.upstream; 103 url.set_path("/export"); 104 let throttle = Duration::from_millis(globals.upstream_throttle_ms); 105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 106 tokio::task::spawn(async move { 107 poll_upstream(Some(after), url, throttle, tx) 108 .await 109 .expect("to poll upstream") 110 }); 111 log::trace!("ensuring output directory exists"); 112 create_dir_all(&dest) 113 .await 114 .expect("to ensure output dir exists"); 115 pages_to_weeks(rx, dest, clobber) 116 .await 117 .expect("to write bundles to output files"); 118 } 119 Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 121 Commands::Tail { after } => { 122 let mut url = globals.upstream; 123 url.set_path("/export"); 124 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 let throttle = Duration::from_millis(globals.upstream_throttle_ms); 126 let (tx, rx) = mpsc::channel(1); 127 tokio::task::spawn(async move { 128 poll_upstream(start_at, url, throttle, tx) 129 .await 130 .expect("to poll upstream") 131 }); 132 pages_to_stdout(rx, None) 133 .await 134 .expect("to write pages to stdout"); 135 } 136 } 137 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 138 Ok(()) 139}