Server tools to backfill, tail, mirror, and verify PLC logs

wrap mode (reverse proxy)

+131 -53
+14 -1
readme.md
··· 36 36 --experimental-write-upstream 37 37 ``` 38 38 39 + - Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream 40 + 41 + ```bash 42 + sudo allegedly wrap \ 43 + --wrap "http://127.0.0.1:3000" \ 44 + --acme-ipv6 \ 45 + --acme-cache-path ./acme-cache \ 46 + --acme-domain "plc.wtf" \ 47 + --experimental-acme-domain "experimental.plc.wtf" \ 48 + --experimental-write-upstream \ 49 + --upstream "https://plc.wtf" \ 50 + ``` 51 + 39 52 40 53 add `--help` to any command for more info about it 41 54 ··· 66 79 - monitoring of the various tasks 67 80 - health check pings 68 81 - expose metrics/tracing 69 - - read-only flag for mirror wrapper 82 + - [x] read-only flag for mirror wrapper 70 83 - bundle: write directly to s3-compatible object storage 71 84 - helpers for automating periodic `bundle` runs 72 85
+7 -1
src/bin/allegedly.rs
··· 49 49 #[command(flatten)] 50 50 args: mirror::Args, 51 51 }, 52 + /// Wrap any did-method-plc server, without syncing upstream (read-only) 53 + Wrap { 54 + #[command(flatten)] 55 + args: mirror::Args, 56 + }, 52 57 /// Poll an upstream PLC server and log new ops to stdout 53 58 Tail { 54 59 /// Begin tailing from a specific timestamp for replay or wait-until ··· 91 96 .await 92 97 .expect("to write bundles to output files"); 93 98 } 94 - Commands::Mirror { args } => mirror::run(globals, args).await?, 99 + Commands::Mirror { args } => mirror::run(globals, args, true).await?, 100 + Commands::Wrap { args } => mirror::run(globals, args, false).await?, 95 101 Commands::Tail { after } => { 96 102 let mut url = globals.upstream; 97 103 url.set_path("/export");
+30 -17
src/bin/mirror.rs
··· 13 13 wrap: Url, 14 14 /// the wrapped did-method-plc server's database (write access required) 15 15 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 16 - wrap_pg: Url, 16 + wrap_pg: Option<Url>, 17 17 /// path to tls cert for the wrapped postgres db, if needed 18 18 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 19 19 wrap_pg_cert: Option<PathBuf>, ··· 76 76 experimental_acme_domain, 77 77 experimental_write_upstream, 78 78 }: Args, 79 + sync: bool, 79 80 ) -> anyhow::Result<()> { 80 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 81 - 82 - // TODO: allow starting up with polling backfill from beginning? 83 - log::debug!("getting the latest op from the db..."); 84 - let latest = db 85 - .get_latest() 86 - .await? 87 - .expect("there to be at least one op in the db. did you backfill?"); 88 - 89 81 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 90 82 (_, false, Some(cache_path)) => { 91 83 create_dir_all(&cache_path).await?; ··· 112 104 113 105 let mut tasks = JoinSet::new(); 114 106 115 - let (send_page, recv_page) = mpsc::channel(8); 107 + let db = if sync { 108 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 109 + "a wrapped reference postgres must be provided to sync" 110 + ))?; 111 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 112 + 113 + // TODO: allow starting up with polling backfill from beginning? 114 + log::debug!("getting the latest op from the db..."); 115 + let latest = db 116 + .get_latest() 117 + .await? 118 + .expect("there to be at least one op in the db. did you backfill?"); 116 119 117 - let mut poll_url = upstream.clone(); 118 - poll_url.set_path("/export"); 119 - let throttle = Duration::from_millis(upstream_throttle_ms); 120 + let (send_page, recv_page) = mpsc::channel(8); 120 121 121 - tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 122 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 122 + let mut poll_url = upstream.clone(); 123 + poll_url.set_path("/export"); 124 + let throttle = Duration::from_millis(upstream_throttle_ms); 125 + 126 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 127 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 128 + Some(db) 129 + } else { 130 + None 131 + }; 132 + 123 133 tasks.spawn(serve( 124 134 upstream, 125 135 wrap, ··· 157 167 globals: GlobalArgs, 158 168 #[command(flatten)] 159 169 args: Args, 170 + /// Run the mirror in wrap mode, no upstream synchronization (read-only) 171 + #[arg(long, action)] 172 + wrap_mode: bool, 160 173 } 161 174 162 175 #[allow(dead_code)] ··· 164 177 async fn main() -> anyhow::Result<()> { 165 178 let args = CliArgs::parse(); 166 179 bin_init("mirror"); 167 - run(args.globals, args.args).await?; 180 + run(args.globals, args.args, !args.wrap_mode).await?; 168 181 Ok(()) 169 182 }
+80 -34
src/mirror.rs
··· 19 19 client: Client, 20 20 plc: Url, 21 21 upstream: Url, 22 + sync_info: Option<SyncInfo>, 23 + experimental: ExperimentalConf, 24 + } 25 + 26 + /// server info that only applies in mirror (synchronizing) mode 27 + #[derive(Clone)] 28 + struct SyncInfo { 22 29 latest_at: CachedValue<Dt, GetLatestAt>, 23 30 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 24 - experimental: ExperimentalConf, 25 31 } 26 32 27 33 #[handler] 28 34 fn hello( 29 35 Data(State { 36 + sync_info, 30 37 upstream, 31 38 experimental: exp, 32 39 .. 33 40 }): Data<&State>, 34 41 req: &Request, 35 42 ) -> String { 43 + // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 + let pre_info = if sync_info.is_some() { 45 + format!( 46 + r#" 47 + This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 48 + synchronizes a local PLC reference server instance[2] (why?[3]). 49 + 50 + 51 + Configured upstream: 52 + 53 + {upstream} 54 + 55 + "# 56 + ) 57 + } else { 58 + format!( 59 + r#" 60 + This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61 + proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 + 63 + 64 + Configured upstream (only used if experimental op forwarding is enabled): 65 + 66 + {upstream} 67 + 68 + "# 69 + ) 70 + }; 71 + 36 72 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 37 73 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 38 74 (_, None, _) => { ··· 49 85 50 86 format!( 51 87 r#"{} 52 - 53 - This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 54 - synchronizes a local PLC reference server instance[2] (why?[3]). 55 - 56 - 57 - Configured upstream: 58 - 59 - {upstream} 60 - 88 + {pre_info} 61 89 62 90 Available APIs: 63 91 ··· 176 204 Data(State { 177 205 plc, 178 206 client, 179 - latest_at, 180 - upstream_status, 207 + sync_info, 181 208 .. 182 209 }): Data<&State>, 183 210 ) -> impl IntoResponse { ··· 186 213 if !ok { 187 214 overall_status = StatusCode::BAD_GATEWAY; 188 215 } 189 - let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 190 - if !ok { 191 - overall_status = StatusCode::BAD_GATEWAY; 216 + if let Some(SyncInfo { 217 + latest_at, 218 + upstream_status, 219 + }) = sync_info 220 + { 221 + // mirror mode 222 + let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 + if !ok { 224 + overall_status = StatusCode::BAD_GATEWAY; 225 + } 226 + let latest = latest_at.get().await.ok(); 227 + ( 228 + overall_status, 229 + Json(serde_json::json!({ 230 + "server": "allegedly (mirror)", 231 + "version": env!("CARGO_PKG_VERSION"), 232 + "wrapped_plc": wrapped_status, 233 + "upstream_plc": upstream_status, 234 + "latest_at": latest, 235 + })), 236 + ) 237 + } else { 238 + // wrap mode 239 + ( 240 + overall_status, 241 + Json(serde_json::json!({ 242 + "server": "allegedly (mirror)", 243 + "version": env!("CARGO_PKG_VERSION"), 244 + "wrapped_plc": wrapped_status, 245 + })), 246 + ) 192 247 } 193 - let latest = latest_at.get().await.ok(); 194 - ( 195 - overall_status, 196 - Json(serde_json::json!({ 197 - "server": "allegedly (mirror)", 198 - "version": env!("CARGO_PKG_VERSION"), 199 - "wrapped_plc": wrapped_status, 200 - "upstream_plc": upstream_status, 201 - "latest_at": latest, 202 - })), 203 - ) 204 248 } 205 249 206 250 fn proxy_response(res: reqwest::Response) -> Response { ··· 344 388 plc: Url, 345 389 listen: ListenConf, 346 390 experimental: ExperimentalConf, 347 - db: Db, 391 + db: Option<Db>, 348 392 ) -> anyhow::Result<&'static str> { 349 393 log::info!("starting server..."); 350 394 ··· 355 399 .build() 356 400 .expect("reqwest client to build"); 357 401 358 - let latest_at = CachedValue::new(GetLatestAt(db), Duration::from_secs(2)); 359 - let upstream_status = CachedValue::new( 360 - CheckUpstream(upstream.clone(), client.clone()), 361 - Duration::from_secs(6), 362 - ); 402 + // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 + let sync_info = db.map(|db| SyncInfo { 404 + latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 + upstream_status: CachedValue::new( 406 + CheckUpstream(upstream.clone(), client.clone()), 407 + Duration::from_secs(6), 408 + ), 409 + }); 363 410 364 411 let state = State { 365 412 client, 366 413 plc, 367 414 upstream: upstream.clone(), 368 - latest_at, 369 - upstream_status, 415 + sync_info, 370 416 experimental: experimental.clone(), 371 417 }; 372 418