Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init};
2use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream};
3use clap::{CommandFactory, Parser, Subcommand};
4use std::{path::PathBuf, time::Duration, time::Instant};
5use tokio::fs::create_dir_all;
6use tokio::sync::mpsc;
7
8mod backfill;
9mod mirror;
10
11#[derive(Debug, Parser)]
12struct Cli {
13 #[command(flatten)]
14 globals: GlobalArgs,
15
16 #[command(subcommand)]
17 command: Commands,
18}
19
20#[derive(Debug, Subcommand)]
21enum Commands {
22 /// Use weekly bundled ops to get a complete directory mirror FAST
23 Backfill {
24 #[command(flatten)]
25 args: backfill::Args,
26 },
27 /// Scrape a PLC server, collecting ops into weekly bundles
28 ///
29 /// Bundles are gzipped files named `<WEEK>.jsonl.gz` where WEEK is a unix
30 /// timestamp rounded down to a multiple of 604,800 (one week in seconds).
31 ///
32 /// Will stop by default at floor((now - 73hrs) / one week) * one week. PLC
33 /// operations can be invalidated within 72 hrs, so stopping before that
34 /// time ensures that the bundles are (hopefully) immutable.
35 Bundle {
36 /// Where to save the bundled files
37 #[arg(short, long)]
38 #[clap(default_value = "./weekly/")]
39 dest: PathBuf,
40 /// Start the export from this time. Should be a week boundary.
41 #[arg(short, long)]
42 #[clap(default_value = "2022-11-17T00:00:00Z")]
43 after: Dt,
44 /// Overwrite existing files, if present
45 #[arg(long, action)]
46 clobber: bool,
47 },
48 /// Wrap a did-method-plc server, syncing upstream and blocking op submits
49 Mirror {
50 #[command(flatten)]
51 args: mirror::Args,
52 #[command(flatten)]
53 instrumentation: InstrumentationArgs,
54 },
55 /// Wrap any did-method-plc server, without syncing upstream (read-only)
56 Wrap {
57 #[command(flatten)]
58 args: mirror::Args,
59 #[command(flatten)]
60 instrumentation: InstrumentationArgs,
61 },
62 /// Poll an upstream PLC server and log new ops to stdout
63 Tail {
64 /// Begin tailing from a specific timestamp for replay or wait-until
65 #[arg(short, long)]
66 after: Option<Dt>,
67 },
68}
69
70impl Commands {
71 fn enable_otel(&self) -> bool {
72 match self {
73 Commands::Mirror {
74 instrumentation, ..
75 }
76 | Commands::Wrap {
77 instrumentation, ..
78 } => instrumentation.enable_opentelemetry,
79 _ => false,
80 }
81 }
82}
83
84#[tokio::main]
85async fn main() -> anyhow::Result<()> {
86 let args = Cli::parse();
87 let matches = Cli::command().get_matches();
88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
89 bin_init(args.command.enable_otel());
90 log::info!("{}", logo(name));
91
92 let globals = args.globals.clone();
93
94 let t0 = Instant::now();
95 match args.command {
96 Commands::Backfill { args } => backfill::run(globals, args).await?,
97 Commands::Bundle {
98 dest,
99 after,
100 clobber,
101 } => {
102 let mut url = globals.upstream;
103 url.set_path("/export");
104 let throttle = Duration::from_millis(globals.upstream_throttle_ms);
105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
106 tokio::task::spawn(async move {
107 poll_upstream(Some(after), url, throttle, tx)
108 .await
109 .expect("to poll upstream")
110 });
111 log::trace!("ensuring output directory exists");
112 create_dir_all(&dest)
113 .await
114 .expect("to ensure output dir exists");
115 pages_to_weeks(rx, dest, clobber)
116 .await
117 .expect("to write bundles to output files");
118 }
119 Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?,
120 Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?,
121 Commands::Tail { after } => {
122 let mut url = globals.upstream;
123 url.set_path("/export");
124 let start_at = after.or_else(|| Some(chrono::Utc::now()));
125 let throttle = Duration::from_millis(globals.upstream_throttle_ms);
126 let (tx, rx) = mpsc::channel(1);
127 tokio::task::spawn(async move {
128 poll_upstream(start_at, url, throttle, tx)
129 .await
130 .expect("to poll upstream")
131 });
132 pages_to_stdout(rx, None)
133 .await
134 .expect("to write pages to stdout");
135 }
136 }
137 log::info!("whew, {:?}. goodbye!", t0.elapsed());
138 Ok(())
139}