forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve};
2use clap::Parser;
3use reqwest::Url;
4use std::{net::SocketAddr, path::PathBuf};
5use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet};
6
7#[derive(Debug, clap::Args)]
8pub struct Args {
9 /// the wrapped did-method-plc server
10 #[arg(long, env = "ALLEGEDLY_WRAP")]
11 wrap: Url,
12 /// the wrapped did-method-plc server's database (write access required)
13 #[arg(long, env = "ALLEGEDLY_WRAP_PG")]
14 wrap_pg: Url,
15 /// path to tls cert for the wrapped postgres db, if needed
16 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")]
17 wrap_pg_cert: Option<PathBuf>,
18 /// wrapping server listen address
19 #[arg(short, long, env = "ALLEGEDLY_BIND")]
20 #[clap(default_value = "127.0.0.1:8000")]
21 bind: SocketAddr,
22 /// obtain a certificate from letsencrypt
23 ///
24 /// for now this will force listening on all interfaces at :80 and :443
25 /// (:80 will serve an "https required" error, *will not* redirect)
26 #[arg(
27 long,
28 conflicts_with("bind"),
29 requires("acme_cache_path"),
30 env = "ALLEGEDLY_ACME_DOMAIN"
31 )]
32 acme_domain: Vec<String>,
33 /// which local directory to keep the letsencrypt certs in
34 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
35 acme_cache_path: Option<PathBuf>,
36 /// which public acme directory to use
37 ///
38 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
39 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
40 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
41 acme_directory_url: Url,
42}
43
44pub async fn run(
45 GlobalArgs { upstream }: GlobalArgs,
46 Args {
47 wrap,
48 wrap_pg,
49 wrap_pg_cert,
50 bind,
51 acme_domain,
52 acme_cache_path,
53 acme_directory_url,
54 }: Args,
55) -> anyhow::Result<()> {
56 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?;
57
58 // TODO: allow starting up with polling backfill from beginning?
59 log::debug!("getting the latest op from the db...");
60 let latest = db
61 .get_latest()
62 .await?
63 .expect("there to be at least one op in the db. did you backfill?");
64
65 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
66 (_, false, Some(cache_path)) => {
67 log::info!("configuring acme for https at {acme_domain:?}...");
68 create_dir_all(&cache_path).await?;
69 ListenConf::Acme {
70 domains: acme_domain,
71 cache_path,
72 directory_url: acme_directory_url.to_string(),
73 }
74 }
75 (bind, true, None) => ListenConf::Bind(bind),
76 (_, _, _) => unreachable!(),
77 };
78
79 let mut tasks = JoinSet::new();
80
81 let (send_page, recv_page) = mpsc::channel(8);
82
83 let mut poll_url = upstream.clone();
84 poll_url.set_path("/export");
85
86 tasks.spawn(poll_upstream(Some(latest), poll_url, send_page));
87 tasks.spawn(pages_to_pg(db.clone(), recv_page));
88 tasks.spawn(serve(upstream, wrap, listen_conf));
89
90 while let Some(next) = tasks.join_next().await {
91 match next {
92 Err(e) if e.is_panic() => {
93 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
94 return Err(e.into());
95 }
96 Err(e) => {
97 log::error!("a joinset task failed to join: {e}");
98 return Err(e.into());
99 }
100 Ok(Err(e)) => {
101 log::error!("a joinset task completed with error: {e}");
102 return Err(e);
103 }
104 Ok(Ok(name)) => {
105 log::trace!("a task completed: {name:?}. {} left", tasks.len());
106 }
107 }
108 }
109
110 Ok(())
111}
112
113#[derive(Debug, Parser)]
114struct CliArgs {
115 #[command(flatten)]
116 globals: GlobalArgs,
117 #[command(flatten)]
118 args: Args,
119}
120
121#[allow(dead_code)]
122#[tokio::main]
123async fn main() -> anyhow::Result<()> {
124 let args = CliArgs::parse();
125 bin_init("mirror");
126 run(args.globals, args.args).await?;
127 Ok(())
128}