Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 Db, ExperimentalConf, ListenConf,
3 bin::{GlobalArgs, InstrumentationArgs, bin_init},
4 logo, pages_to_pg, poll_upstream, serve,
5};
6use clap::Parser;
7use reqwest::Url;
8use std::{net::SocketAddr, path::PathBuf, time::Duration};
9use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet};
10
11#[derive(Debug, clap::Args)]
12pub struct Args {
13 /// the wrapped did-method-plc server
14 #[arg(long, env = "ALLEGEDLY_WRAP")]
15 wrap: Url,
16 /// the wrapped did-method-plc server's database (write access required)
17 #[arg(long, env = "ALLEGEDLY_WRAP_PG")]
18 wrap_pg: Option<Url>,
19 /// path to tls cert for the wrapped postgres db, if needed
20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")]
21 wrap_pg_cert: Option<PathBuf>,
22 /// wrapping server listen address
23 #[arg(short, long, env = "ALLEGEDLY_BIND")]
24 #[clap(default_value = "127.0.0.1:8000")]
25 bind: SocketAddr,
26 /// obtain a certificate from letsencrypt
27 ///
28 /// for now this will force listening on all interfaces at :80 and :443
29 /// (:80 will serve an "https required" error, *will not* redirect)
30 #[arg(
31 long,
32 conflicts_with("bind"),
33 requires("acme_cache_path"),
34 env = "ALLEGEDLY_ACME_DOMAIN"
35 )]
36 acme_domain: Vec<String>,
37 /// which local directory to keep the letsencrypt certs in
38 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")]
39 acme_cache_path: Option<PathBuf>,
40 /// which public acme directory to use
41 ///
42 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory"
43 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")]
44 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")]
45 acme_directory_url: Url,
46 /// try to listen for ipv6
47 #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")]
48 acme_ipv6: bool,
49 /// only accept experimental requests at this hostname
50 ///
51 /// a cert will be provisioned for it from letsencrypt. if you're not using
52 /// acme (eg., behind a tls-terminating reverse proxy), open a feature request.
53 #[arg(
54 long,
55 requires("acme_domain"),
56 env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN"
57 )]
58 experimental_acme_domain: Option<String>,
59 /// accept writes! by forwarding them upstream
60 #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")]
61 experimental_write_upstream: bool,
62}
63
64pub async fn run(
65 GlobalArgs {
66 upstream,
67 upstream_throttle_ms,
68 }: GlobalArgs,
69 Args {
70 wrap,
71 wrap_pg,
72 wrap_pg_cert,
73 bind,
74 acme_domain,
75 acme_cache_path,
76 acme_directory_url,
77 acme_ipv6,
78 experimental_acme_domain,
79 experimental_write_upstream,
80 }: Args,
81 sync: bool,
82) -> anyhow::Result<()> {
83 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
84 (_, false, Some(cache_path)) => {
85 create_dir_all(&cache_path).await?;
86 let mut domains = acme_domain.clone();
87 if let Some(ref experimental_domain) = experimental_acme_domain {
88 domains.push(experimental_domain.clone())
89 }
90 log::info!("configuring acme for https at {domains:?}...");
91 ListenConf::Acme {
92 domains,
93 cache_path,
94 directory_url: acme_directory_url.to_string(),
95 ipv6: acme_ipv6,
96 }
97 }
98 (bind, true, None) => ListenConf::Bind(bind),
99 (_, _, _) => unreachable!(),
100 };
101
102 let experimental_conf = ExperimentalConf {
103 acme_domain: experimental_acme_domain,
104 write_upstream: experimental_write_upstream,
105 };
106
107 let mut tasks = JoinSet::new();
108
109 let db = if sync {
110 let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!(
111 "a wrapped reference postgres must be provided to sync"
112 ))?;
113 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?;
114
115 // TODO: allow starting up with polling backfill from beginning?
116 log::debug!("getting the latest op from the db...");
117 let latest = db
118 .get_latest()
119 .await?
120 .expect("there to be at least one op in the db. did you backfill?");
121
122 let (send_page, recv_page) = mpsc::channel(8);
123
124 let mut poll_url = upstream.clone();
125 poll_url.set_path("/export");
126 let throttle = Duration::from_millis(upstream_throttle_ms);
127
128 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page));
129 tasks.spawn(pages_to_pg(db.clone(), recv_page));
130 Some(db)
131 } else {
132 None
133 };
134
135 tasks.spawn(serve(
136 upstream,
137 wrap,
138 listen_conf,
139 experimental_conf,
140 db.clone(),
141 ));
142
143 while let Some(next) = tasks.join_next().await {
144 match next {
145 Err(e) if e.is_panic() => {
146 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
147 return Err(e.into());
148 }
149 Err(e) => {
150 log::error!("a joinset task failed to join: {e}");
151 return Err(e.into());
152 }
153 Ok(Err(e)) => {
154 log::error!("a joinset task completed with error: {e}");
155 return Err(e);
156 }
157 Ok(Ok(name)) => {
158 log::trace!("a task completed: {name:?}. {} left", tasks.len());
159 }
160 }
161 }
162
163 Ok(())
164}
165
166#[derive(Debug, Parser)]
167struct CliArgs {
168 #[command(flatten)]
169 globals: GlobalArgs,
170 #[command(flatten)]
171 instrumentation: InstrumentationArgs,
172 #[command(flatten)]
173 args: Args,
174 /// Run the mirror in wrap mode, no upstream synchronization (read-only)
175 #[arg(long, action)]
176 wrap_mode: bool,
177}
178
179#[allow(dead_code)]
180#[tokio::main]
181async fn main() -> anyhow::Result<()> {
182 let args = CliArgs::parse();
183 bin_init(args.instrumentation.enable_opentelemetry);
184 log::info!("{}", logo("mirror"));
185 run(args.globals, args.args, !args.wrap_mode).await?;
186 Ok(())
187}