Server tools to backfill, tail, mirror, and verify PLC logs
at main 82 lines 2.5 kB view raw
1use allegedly::{ 2 FjallDb, audit_fjall, 3 bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 file_to_invalid_ops, fix_ops_fjall, invalid_ops_to_stdout, logo, 5}; 6use clap::Parser; 7use std::path::PathBuf; 8use tokio::task::JoinSet; 9 10#[derive(Debug, clap::Args)] 11pub struct Args { 12 /// path to a local fjall database directory 13 #[arg(long, env = "ALLEGEDLY_FJALL")] 14 fjall: Option<PathBuf>, 15 /// path to a file containing invalid ops to fix using upstream 16 #[arg(long, env = "ALLEGEDLY_FIX")] 17 fix: Option<PathBuf>, 18 /// drop invalid ops instead of trying to fix them from upstream 19 #[arg(long, env = "ALLEGEDLY_DROP")] 20 drop: bool, 21} 22 23pub async fn run(globals: GlobalArgs, Args { fjall, fix, drop }: Args) -> anyhow::Result<()> { 24 let mut tasks = JoinSet::new(); 25 26 if let Some(fjall) = fjall { 27 let (invalid_ops_tx, invalid_ops_rx) = tokio::sync::mpsc::channel(128); 28 let db = FjallDb::open(&fjall)?; 29 30 if let Some(fix) = fix { 31 tasks.spawn(file_to_invalid_ops(fix, invalid_ops_tx)); 32 tasks.spawn(fix_ops_fjall(db, globals.upstream, drop, invalid_ops_rx)); 33 } else { 34 tasks.spawn(audit_fjall(db, invalid_ops_tx)); 35 tasks.spawn(invalid_ops_to_stdout(invalid_ops_rx)); 36 } 37 } else { 38 anyhow::bail!("no audit target provided"); 39 } 40 41 while let Some(next) = tasks.join_next().await { 42 match next { 43 Err(e) if e.is_panic() => { 44 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 45 return Err(e.into()); 46 } 47 Err(e) => { 48 log::error!("a joinset task failed to join: {e}"); 49 return Err(e.into()); 50 } 51 Ok(Err(e)) => { 52 log::error!("a joinset task completed with error: {e}"); 53 return Err(e); 54 } 55 Ok(Ok(name)) => { 56 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 57 } 58 } 59 } 60 61 Ok(()) 62} 63 64#[derive(Debug, Parser)] 65struct CliArgs { 66 #[command(flatten)] 67 globals: GlobalArgs, 68 #[command(flatten)] 69 instrumentation: InstrumentationArgs, 70 #[command(flatten)] 71 args: Args, 72} 73 74#[allow(dead_code)] 75#[tokio::main] 76async fn main() -> anyhow::Result<()> { 77 let args = CliArgs::parse(); 78 bin_init(args.instrumentation.enable_opentelemetry); 79 log::info!("{}", logo("audit")); 80 run(args.globals, args.args).await?; 81 Ok(()) 82}