Server tools to backfill, tail, mirror, and verify PLC logs

slightly less painful tokio-postgres

+44 -45
+11 -45
src/bin/main.rs
··· 1 1 use clap::Parser; 2 - use serde::Deserialize; 3 2 use std::time::Duration; 4 - use tokio_postgres::NoTls; 5 3 use url::Url; 6 4 7 - use allegedly::{Dt, ExportPage, bin_init, poll_upstream, week_to_pages}; 5 + use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream, week_to_pages}; 8 6 9 7 const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 10 8 const WEEK_IN_SECONDS: u64 = 7 * 86400; ··· 40 38 /// URI string with credentials etc 41 39 #[arg(long, env)] 42 40 postgres: String, 43 - } 44 - 45 - #[derive(Deserialize)] 46 - #[serde(rename_all = "camelCase")] 47 - struct Op<'a> { 48 - pub did: &'a str, 49 - pub cid: &'a str, 50 - pub created_at: Dt, 51 - pub nullified: bool, 52 - #[serde(borrow)] 53 - pub operation: &'a serde_json::value::RawValue, 54 41 } 55 42 56 43 async fn bulk_backfill((upstream, epoch): (Url, u64), tx: flume::Sender<ExportPage>) { ··· 181 168 } 182 169 183 170 #[tokio::main] 184 - async fn main() { 171 + async fn main() -> anyhow::Result<()> { 185 172 bin_init("main"); 186 173 let args = Args::parse(); 187 - 188 - log::trace!("connecting postgres..."); 189 - let (pg_client, connection) = tokio_postgres::connect(&args.postgres, NoTls) 190 - .await 191 - .unwrap(); 192 - 193 - // send the connection away to do the actual communication work 194 - // TODO: error and shutdown handling 195 - let conn_task = tokio::task::spawn(async move { 196 - if let Err(e) = connection.await { 197 - eprintln!("connection error: {e}"); 198 - } 199 - }); 200 - 201 - log::trace!("connecting postgres 2..."); 202 - let (pg_client2, connection2) = tokio_postgres::connect(&args.postgres, NoTls) 203 - .await 204 - .unwrap(); 205 - 206 - // send the connection away to do the actual communication work 207 - // TODO: error and shutdown handling 208 - let conn_task2 = tokio::task::spawn(async move { 209 - if let Err(e) = connection2.await { 210 - eprintln!("connection error: {e}"); 211 - } 212 - }); 213 - 174 + let db = Db::new(&args.postgres); 214 175 let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE); 215 176 177 + log::trace!("connecting postgres for export task..."); 178 + let pg_client = db.connect().await?; 216 179 let export_task = tokio::task::spawn(export_upstream( 217 180 args.upstream, 218 181 (args.upstream_bulk, args.bulk_epoch), 219 182 tx, 220 - pg_client2, 183 + pg_client, 221 184 )); 185 + 186 + log::trace!("connecting postgres for writer task..."); 187 + let pg_client = db.connect().await?; 222 188 let writer_task = tokio::task::spawn(write_pages(rx, pg_client)); 223 189 224 190 tokio::select! { 225 - z = conn_task => log::warn!("connection task ended: {z:?}"), 226 - z = conn_task2 => log::warn!("connection task ended: {z:?}"), 227 191 z = export_task => log::warn!("export task ended: {z:?}"), 228 192 z = writer_task => log::warn!("writer task ended: {z:?}"), 229 193 }; 230 194 231 195 log::error!("todo: shutdown"); 196 + 197 + Ok(()) 232 198 }
+2
src/lib.rs
··· 2 2 3 3 mod backfill; 4 4 mod client; 5 + mod plc_pg; 5 6 mod poll; 6 7 7 8 pub use backfill::week_to_pages; 8 9 pub use client::CLIENT; 10 + pub use plc_pg::Db; 9 11 pub use poll::poll_upstream; 10 12 11 13 pub type Dt = chrono::DateTime<chrono::Utc>;
+31
src/plc_pg.rs
··· 1 + use tokio_postgres::{Client, Error as PgError, NoTls, connect}; 2 + 3 + /// a little tokio-postgres helper 4 + #[derive(Debug, Clone)] 5 + pub struct Db { 6 + pg_uri: String, 7 + } 8 + 9 + impl Db { 10 + pub fn new(pg_uri: &str) -> Self { 11 + Self { 12 + pg_uri: pg_uri.to_string(), 13 + } 14 + } 15 + 16 + pub async fn connect(&self) -> Result<Client, PgError> { 17 + log::trace!("connecting postgres..."); 18 + let (client, connection) = connect(&self.pg_uri, NoTls).await?; 19 + 20 + // send the connection away to do the actual communication work 21 + // apparently the connection will complete when the client drops 22 + tokio::task::spawn(async move { 23 + connection 24 + .await 25 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 26 + .unwrap(); 27 + }); 28 + 29 + Ok(client) 30 + } 31 + }