Server tools to backfill, tail, mirror, and verify PLC logs

manage tasks for db and server

+60 -46
+37 -29
src/bin/mirror.rs
··· 3 3 use reqwest::Url; 4 4 use std::{net::SocketAddr, path::PathBuf}; 5 5 use tokio::sync::mpsc; 6 + use tokio::task::JoinSet; 6 7 7 8 #[derive(Debug, clap::Args)] 8 9 pub struct Args { ··· 53 54 acme_directory_url, 54 55 }: Args, 55 56 ) -> anyhow::Result<()> { 56 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert) 57 - .await 58 - .expect("to connect to pg for mirroring"); 57 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 58 + 59 + // TODO: allow starting up with polling backfill from beginning? 60 + log::debug!("getting the latest op from the db..."); 59 61 let latest = db 60 62 .get_latest() 61 - .await 62 - .expect("to query for last createdAt") 63 + .await? 63 64 .expect("there to be at least one op in the db. did you backfill?"); 64 65 65 - let (tx, rx) = mpsc::channel(2); 66 - // upstream poller 67 - let mut url = upstream.clone(); 68 - tokio::task::spawn(async move { 69 - log::info!("starting poll reader..."); 70 - url.set_path("/export"); 71 - tokio::task::spawn(async move { 72 - poll_upstream(Some(latest), url, tx) 73 - .await 74 - .expect("to poll upstream for mirror sync") 75 - }); 76 - }); 77 - // db writer 78 - let poll_db = db.clone(); 79 - tokio::task::spawn(async move { 80 - log::info!("starting db writer..."); 81 - pages_to_pg(poll_db, rx) 82 - .await 83 - .expect("to write to pg for mirror"); 84 - }); 85 - 86 66 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 87 67 (_, false, Some(cache_path)) => ListenConf::Acme { 88 68 domains: acme_domain, ··· 93 73 (_, _, _) => unreachable!(), 94 74 }; 95 75 96 - serve(&upstream, wrap, listen_conf) 97 - .await 98 - .expect("to be able to serve the mirror proxy app"); 76 + let mut tasks = JoinSet::new(); 77 + 78 + let (send_page, recv_page) = mpsc::channel(8); 79 + 80 + let mut poll_url = upstream.clone(); 81 + poll_url.set_path("/export"); 82 + 83 + tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 84 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 85 + tasks.spawn(serve(upstream, wrap, listen_conf)); 86 + 87 + while let Some(next) = tasks.join_next().await { 88 + match next { 89 + Err(e) if e.is_panic() => { 90 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 91 + return Err(e.into()); 92 + } 93 + Err(e) => { 94 + log::error!("a joinset task failed to join: {e}"); 95 + return Err(e.into()); 96 + } 97 + Ok(Err(e)) => { 98 + log::error!("a joinset task completed with error: {e}"); 99 + return Err(e); 100 + } 101 + Ok(Ok(name)) => { 102 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 103 + } 104 + } 105 + } 106 + 99 107 Ok(()) 100 108 } 101 109
+17 -12
src/mirror.rs
··· 186 186 Bind(SocketAddr), 187 187 } 188 188 189 - pub async fn serve(upstream: &Url, plc: Url, listen: ListenConf) -> std::io::Result<()> { 189 + pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 190 + log::info!("starting server..."); 191 + 190 192 // not using crate CLIENT: don't want the retries etc 191 193 let client = Client::builder() 192 194 .user_agent(UA) ··· 231 233 } 232 234 let auto_cert = auto_cert.build().expect("acme config to build"); 233 235 234 - run_insecure_notice(); 235 - run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await 236 + let notice_task = tokio::task::spawn(run_insecure_notice()); 237 + let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 238 + log::warn!("server task ended, aborting insecure server task..."); 239 + notice_task.abort(); 240 + app_res?; 241 + notice_task.await??; 236 242 } 237 - ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await, 243 + ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 238 244 } 245 + 246 + Ok("server (uh oh?)") 239 247 } 240 248 241 249 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 250 258 } 251 259 252 260 /// kick off a tiny little server on a tokio task to tell people to use 443 253 - fn run_insecure_notice() { 261 + async fn run_insecure_notice() -> Result<(), std::io::Error> { 254 262 #[handler] 255 263 fn oop_plz_be_secure() -> (StatusCode, String) { 256 264 ( ··· 266 274 } 267 275 268 276 let app = Route::new().at("/", get(oop_plz_be_secure)).with(Tracing); 269 - let listener = TcpListener::bind("0.0.0.0:80"); 270 - tokio::task::spawn(async move { 271 - Server::new(listener) 272 - .name("allegedly (mirror:80 helper)") 273 - .run(app) 274 - .await 275 - }); 277 + Server::new(TcpListener::bind("0.0.0.0:80")) 278 + .name("allegedly (mirror:80 helper)") 279 + .run(app) 280 + .await 276 281 }
+5 -5
src/plc_pg.rs
··· 5 5 use std::pin::pin; 6 6 use std::time::Instant; 7 7 use tokio::{ 8 - task::{spawn, JoinHandle}, 9 8 sync::{mpsc, oneshot}, 9 + task::{JoinHandle, spawn}, 10 10 }; 11 11 use tokio_postgres::{ 12 - Client, Error as PgError, NoTls, 12 + Client, Error as PgError, NoTls, Socket, 13 13 binary_copy::BinaryCopyInWriter, 14 14 connect, 15 - Socket, 15 + tls::MakeTlsConnect, 16 16 types::{Json, Type}, 17 - tls::MakeTlsConnect 18 17 }; 19 18 20 19 fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> { ··· 86 85 }) 87 86 } 88 87 89 - #[must_use] 90 88 pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 91 89 log::trace!("connecting postgres..."); 92 90 if let Some(ref connector) = self.cert { ··· 117 115 db: Db, 118 116 mut pages: mpsc::Receiver<ExportPage>, 119 117 ) -> anyhow::Result<&'static str> { 118 + log::info!("starting pages_to_pg writer..."); 119 + 120 120 let (mut client, task) = db.connect().await?; 121 121 122 122 let ops_stmt = client
+1
src/poll.rs
··· 156 156 base: Url, 157 157 dest: mpsc::Sender<ExportPage>, 158 158 ) -> anyhow::Result<&'static str> { 159 + log::info!("starting upstream poller after {after:?}"); 159 160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 160 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 161 162 let mut boundary_state: Option<PageBoundaryState> = None;