forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 FjallDb, audit_fjall,
3 bin::{GlobalArgs, InstrumentationArgs, bin_init},
4 file_to_invalid_ops, fix_ops_fjall, invalid_ops_to_stdout, logo,
5};
6use clap::Parser;
7use std::path::PathBuf;
8use tokio::task::JoinSet;
9
10#[derive(Debug, clap::Args)]
11pub struct Args {
12 /// path to a local fjall database directory
13 #[arg(long, env = "ALLEGEDLY_FJALL")]
14 fjall: Option<PathBuf>,
15 /// path to a file containing invalid ops to fix using upstream
16 #[arg(long, env = "ALLEGEDLY_FIX")]
17 fix: Option<PathBuf>,
18 /// drop invalid ops instead of trying to fix them from upstream
19 #[arg(long, env = "ALLEGEDLY_DROP")]
20 drop: bool,
21}
22
23pub async fn run(globals: GlobalArgs, Args { fjall, fix, drop }: Args) -> anyhow::Result<()> {
24 let mut tasks = JoinSet::new();
25
26 if let Some(fjall) = fjall {
27 let (invalid_ops_tx, invalid_ops_rx) = tokio::sync::mpsc::channel(128);
28 let db = FjallDb::open(&fjall)?;
29
30 if let Some(fix) = fix {
31 tasks.spawn(file_to_invalid_ops(fix, invalid_ops_tx));
32 tasks.spawn(fix_ops_fjall(db, globals.upstream, drop, invalid_ops_rx));
33 } else {
34 tasks.spawn(audit_fjall(db, invalid_ops_tx));
35 tasks.spawn(invalid_ops_to_stdout(invalid_ops_rx));
36 }
37 } else {
38 anyhow::bail!("no audit target provided");
39 }
40
41 while let Some(next) = tasks.join_next().await {
42 match next {
43 Err(e) if e.is_panic() => {
44 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
45 return Err(e.into());
46 }
47 Err(e) => {
48 log::error!("a joinset task failed to join: {e}");
49 return Err(e.into());
50 }
51 Ok(Err(e)) => {
52 log::error!("a joinset task completed with error: {e}");
53 return Err(e);
54 }
55 Ok(Ok(name)) => {
56 log::trace!("a task completed: {name:?}. {} left", tasks.len());
57 }
58 }
59 }
60
61 Ok(())
62}
63
64#[derive(Debug, Parser)]
65struct CliArgs {
66 #[command(flatten)]
67 globals: GlobalArgs,
68 #[command(flatten)]
69 instrumentation: InstrumentationArgs,
70 #[command(flatten)]
71 args: Args,
72}
73
74#[allow(dead_code)]
75#[tokio::main]
76async fn main() -> anyhow::Result<()> {
77 let args = CliArgs::parse();
78 bin_init(args.instrumentation.enable_opentelemetry);
79 log::info!("{}", logo("audit"));
80 run(args.globals, args.args).await?;
81 Ok(())
82}