Server tools to backfill, tail, mirror, and verify PLC logs

show latest at in health endpoint

track upstream lag sorta

+99 -17
+1 -1
src/bin/mirror.rs
··· 90 90 91 91 tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 92 92 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 93 - tasks.spawn(serve(upstream, wrap, listen_conf)); 93 + tasks.spawn(serve(upstream, wrap, listen_conf, db.clone())); 94 94 95 95 while let Some(next) = tasks.join_next().await { 96 96 match next {
+58
src/cached_value.rs
··· 1 + use std::error::Error; 2 + use std::sync::Arc; 3 + use std::time::{Duration, Instant}; 4 + use tokio::sync::Mutex; 5 + 6 + pub trait Fetcher<T> { 7 + fn fetch(&self) -> impl Future<Output = Result<T, Box<dyn Error>>>; 8 + } 9 + 10 + #[derive(Debug)] 11 + struct ExpiringValue<T: Clone> { 12 + value: T, 13 + expires: Instant, 14 + } 15 + 16 + impl<T: Clone> ExpiringValue<T> { 17 + fn get(&self, now: Instant) -> Option<T> { 18 + if now <= self.expires { 19 + Some(self.value.clone()) 20 + } else { 21 + None 22 + } 23 + } 24 + } 25 + 26 + // TODO: generic over the fetcher's actual error type 27 + #[derive(Clone)] 28 + pub struct CachedValue<T: Clone, F: Fetcher<T>> { 29 + latest: Arc<Mutex<Option<ExpiringValue<T>>>>, 30 + fetcher: F, 31 + validitiy: Duration, 32 + } 33 + 34 + impl<T: Clone, F: Fetcher<T>> CachedValue<T, F> { 35 + pub fn new(f: F, validitiy: Duration) -> Self { 36 + Self { 37 + latest: Default::default(), 38 + fetcher: f, 39 + validitiy, 40 + } 41 + } 42 + pub async fn get(&self) -> Result<T, Box<dyn Error>> { 43 + let now = Instant::now(); 44 + return self.get_impl(now).await; 45 + } 46 + async fn get_impl(&self, now: Instant) -> Result<T, Box<dyn Error>> { 47 + let mut val = self.latest.lock().await; 48 + if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 49 + return Ok(v); 50 + } 51 + let new = self.fetcher.fetch().await?; 52 + *val = Some(ExpiringValue { 53 + value: new.clone(), 54 + expires: now + self.validitiy, 55 + }); 56 + Ok(new) 57 + } 58 + }
+2
src/lib.rs
··· 2 2 use tokio::sync::{mpsc, oneshot}; 3 3 4 4 mod backfill; 5 + mod cached_value; 5 6 mod client; 6 7 mod mirror; 7 8 mod plc_pg; ··· 12 13 pub mod bin; 13 14 14 15 pub use backfill::backfill; 16 + pub use cached_value::{CachedValue, Fetcher}; 15 17 pub use client::{CLIENT, UA}; 16 18 pub use mirror::{ListenConf, serve}; 17 19 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
+38 -16
src/mirror.rs
··· 1 - use crate::{GovernorMiddleware, UA, logo}; 1 + use crate::{CachedValue, Db, Dt, Fetcher, GovernorMiddleware, UA, logo}; 2 2 use futures::TryStreamExt; 3 3 use governor::Quota; 4 4 use poem::{ ··· 12 12 use reqwest::{Client, Url}; 13 13 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 14 15 - #[derive(Debug, Clone)] 15 + #[derive(Clone)] 16 16 struct State { 17 17 client: Client, 18 18 plc: Url, 19 19 upstream: Url, 20 + latest_at: CachedValue<Dt, LatestAt>, 21 + } 22 + 23 + #[derive(Clone)] 24 + struct LatestAt(Db); 25 + impl Fetcher<Dt> for LatestAt { 26 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 27 + let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 28 + "expected to find at least one thing in the db" 29 + ))?; 30 + Ok(now) 31 + } 20 32 } 21 33 22 34 #[handler] ··· 119 131 plc, 120 132 client, 121 133 upstream, 134 + latest_at, 122 135 }): Data<&State>, 123 136 ) -> impl IntoResponse { 124 137 let mut overall_status = StatusCode::OK; ··· 130 143 if !ok { 131 144 overall_status = StatusCode::BAD_GATEWAY; 132 145 } 146 + let latest = latest_at.get().await.ok(); 133 147 ( 134 148 overall_status, 135 149 Json(serde_json::json!({ ··· 137 151 "version": env!("CARGO_PKG_VERSION"), 138 152 "wrapped_plc": wrapped_status, 139 153 "upstream_plc": upstream_status, 154 + "latest_at": latest, 140 155 })), 141 156 ) 142 157 } ··· 203 218 Bind(SocketAddr), 204 219 } 205 220 206 - pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 221 + pub async fn serve( 222 + upstream: Url, 223 + plc: Url, 224 + listen: ListenConf, 225 + db: Db, 226 + ) -> anyhow::Result<&'static str> { 207 227 log::info!("starting server..."); 208 228 209 229 // not using crate CLIENT: don't want the retries etc ··· 213 233 .build() 214 234 .expect("reqwest client to build"); 215 235 236 + let latest_at = CachedValue::new(LatestAt(db), Duration::from_secs(1)); 237 + 216 238 let state = State { 217 239 client, 218 240 plc, 219 241 upstream: upstream.clone(), 242 + latest_at, 220 243 }; 221 244 222 245 let app = Route::new() ··· 252 275 } 253 276 let auto_cert = auto_cert.build().expect("acme config to build"); 254 277 255 - let notice_task = tokio::task::spawn(run_insecure_notice()); 256 - let listener = TcpListener::bind("0.0.0.0:443"); 257 - let app_res = if ipv6 { 258 - let listener = listener.combine(TcpListener::bind("[::]:443")); 259 - run(app, listener.acme(auto_cert)).await 260 - } else { 261 - run(app, listener.acme(auto_cert)).await 262 - }; 278 + let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 279 + let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 280 + let app_res = run(app, listener.acme(auto_cert)).await; 263 281 log::warn!("server task ended, aborting insecure server task..."); 264 282 notice_task.abort(); 265 283 app_res?; ··· 283 301 } 284 302 285 303 /// kick off a tiny little server on a tokio task to tell people to use 443 286 - async fn run_insecure_notice() -> Result<(), std::io::Error> { 304 + async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 287 305 #[handler] 288 306 fn oop_plz_be_secure() -> (StatusCode, String) { 289 307 ( ··· 302 320 .at("/", get(oop_plz_be_secure)) 303 321 .at("/favicon.ico", get(favicon)) 304 322 .with(Tracing); 305 - Server::new(TcpListener::bind("0.0.0.0:80")) 306 - .name("allegedly (mirror:80 helper)") 307 - .run(app) 308 - .await 323 + Server::new(TcpListener::bind(if ipv6 { 324 + "[::]:80" 325 + } else { 326 + "0.0.0.0:80" 327 + })) 328 + .name("allegedly (mirror:80 helper)") 329 + .run(app) 330 + .await 309 331 }