Server tools to backfill, tail, mirror, and verify PLC logs

fast pg copy in

>250k rows/sec on dev machine (...with pg running directly on host, not in docker)

`unlogged` seems to work in place of ~all the other tweakable knobs to make this as fast as possible

hopefully that's fine since we'll just be copying it back to the main table after

+137 -49
+12 -9
src/backfill.rs
··· 1 1 use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages}; 2 - use tokio::task::JoinSet; 2 + use std::sync::Arc; 3 + use tokio::{sync::Mutex, task::JoinSet}; 3 4 4 5 const FIRST_WEEK: Week = Week::from_n(1668643200); 5 6 ··· 9 10 source_workers: usize, 10 11 ) -> anyhow::Result<()> { 11 12 // queue up the week bundles that should be available 12 - let (week_tx, week_rx) = flume::bounded(1024); // work queue 13 - let mut week = FIRST_WEEK; 14 - while week.is_immutable() { 15 - week_tx.try_send(week)?; // if this fails, something has gone really wrong or we're farrrr in the future 16 - week = week.next(); 17 - } 13 + let weeks = Arc::new(Mutex::new(Week::range(FIRST_WEEK..))); 14 + weeks.lock().await.reverse(); 18 15 19 16 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new(); 20 17 21 18 // spin up the fetchers to work in parallel 22 19 for w in 0..source_workers { 23 - let weeks = week_rx.clone(); 20 + let weeks = weeks.clone(); 24 21 let dest = dest.clone(); 25 22 let source = source.clone(); 26 23 workers.spawn(async move { 27 - while let Ok(week) = weeks.recv_async().await { 24 + log::info!("about to get weeks..."); 25 + 26 + while let Some(week) = weeks.lock().await.pop() { 28 27 log::info!( 29 28 "worker {w}: fetching week {} (-{})", 30 29 Into::<Dt>::into(week).to_rfc3339(), 31 30 week.n_ago(), 32 31 ); 33 32 week_to_pages(source.clone(), week, dest.clone()).await?; 33 + log::info!("done a week"); 34 34 } 35 + log::info!("done with the weeks ig"); 35 36 Ok(()) 36 37 }); 37 38 } 39 + 40 + // TODO: handle missing/failed weeks 38 41 39 42 // wait for them to finish 40 43 while let Some(res) = workers.join_next().await {
+15 -10
src/bin/allegedly.rs
··· 1 - use allegedly::{Db, Dt, ExportPage, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks, poll_upstream, write_bulk as pages_to_pg}; 1 + use allegedly::{ 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks, 3 + poll_upstream, write_bulk as pages_to_pg, 4 + }; 2 5 use clap::{Parser, Subcommand}; 3 6 use std::path::PathBuf; 4 7 use url::Url; ··· 25 28 #[arg(long)] 26 29 dir: Option<PathBuf>, 27 30 /// Parallel bundle fetchers 31 + /// 32 + /// Default: 4 for http fetches, 1 for local folder 28 33 #[arg(long)] 29 - #[clap(default_value = "4")] 30 - source_workers: usize, 34 + source_workers: Option<usize>, 31 35 /// Bulk load into did-method-plc-compatible postgres instead of stdout 32 36 /// 33 37 /// Pass a postgres connection url like "postgresql://localhost:5432" ··· 64 68 } 65 69 66 70 async fn pages_to_stdout(rx: flume::Receiver<ExportPage>) -> Result<(), flume::RecvError> { 67 - loop { 68 - for op in rx.recv_async().await?.ops { 71 + while let Ok(page) = rx.recv_async().await { 72 + for op in page.ops { 69 73 println!("{op}") 70 74 } 71 75 } 76 + Ok(()) 72 77 } 73 78 74 79 #[tokio::main] ··· 84 89 source_workers, 85 90 to_postgres, 86 91 } => { 87 - let (tx, rx) = flume::bounded(1024); // big pages 92 + let (tx, rx) = flume::bounded(32); // big pages 88 93 tokio::task::spawn(async move { 89 94 if let Some(dir) = dir { 90 95 log::info!("Reading weekly bundles from local folder {dir:?}"); 91 - backfill(FolderSource(dir), tx, source_workers) 96 + backfill(FolderSource(dir), tx, source_workers.unwrap_or(1)) 92 97 .await 93 98 .unwrap(); 94 99 } else { 95 100 log::info!("Fetching weekly bundles from from {http}"); 96 - backfill(HttpSource(http), tx, source_workers) 101 + backfill(HttpSource(http), tx, source_workers.unwrap_or(4)) 97 102 .await 98 103 .unwrap(); 99 104 } 100 105 }); 101 106 if let Some(url) = to_postgres { 102 - let db = Db::new(url.as_str()); 107 + let db = Db::new(url.as_str()).await.unwrap(); 103 108 pages_to_pg(db, rx).await.unwrap(); 104 109 } else { 105 110 pages_to_stdout(rx).await.unwrap(); ··· 122 127 let mut url = args.upstream; 123 128 url.set_path("/export"); 124 129 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 - let (tx, rx) = flume::bounded(0); // rendezvous, don't read ahead 130 + let (tx, rx) = flume::bounded(1); 126 131 tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 127 132 pages_to_stdout(rx).await.unwrap(); 128 133 }
+75 -23
src/plc_pg.rs
··· 1 1 use crate::{ExportPage, Op}; 2 - use tokio_postgres::{Client, types::{Type, Json}, Error as PgError, NoTls, connect, binary_copy::BinaryCopyInWriter}; 3 2 use std::pin::pin; 4 - 3 + use std::time::Instant; 4 + use tokio_postgres::{ 5 + Client, Error as PgError, NoTls, 6 + binary_copy::BinaryCopyInWriter, 7 + connect, 8 + types::{Json, Type}, 9 + }; 5 10 6 11 /// a little tokio-postgres helper 7 12 /// ··· 13 18 } 14 19 15 20 impl Db { 16 - pub fn new(pg_uri: &str) -> Self { 17 - Self { 21 + pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> { 22 + // we're going to interact with did-method-plc's database, so make sure 23 + // it's what we expect: check for db migrations. 24 + log::trace!("checking migrations..."); 25 + let (client, connection) = connect(pg_uri, NoTls).await?; 26 + let connection_task = tokio::task::spawn(async move { 27 + connection 28 + .await 29 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 30 + .unwrap(); 31 + }); 32 + let migrations: Vec<String> = client 33 + .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 34 + .await? 35 + .iter() 36 + .map(|row| row.get(0)) 37 + .collect(); 38 + assert_eq!( 39 + &migrations, 40 + &[ 41 + "_20221020T204908820Z", 42 + "_20230223T215019669Z", 43 + "_20230406T174552885Z", 44 + "_20231128T203323431Z", 45 + ] 46 + ); 47 + drop(client); 48 + // make sure the connection worker thing doesn't linger 49 + connection_task.await?; 50 + log::info!("db connection succeeded and plc migrations appear as expected"); 51 + 52 + Ok(Self { 18 53 pg_uri: pg_uri.to_string(), 19 - } 54 + }) 20 55 } 21 56 22 57 pub async fn connect(&self) -> Result<Client, PgError> { ··· 36 71 } 37 72 } 38 73 39 - pub async fn write_bulk( 40 - db: Db, 41 - pages: flume::Receiver<ExportPage>, 42 - ) -> Result<(), PgError> { 74 + pub async fn write_bulk(db: Db, pages: flume::Receiver<ExportPage>) -> Result<(), PgError> { 43 75 let mut client = db.connect().await?; 76 + 77 + // TODO: maybe we want to be more cautious 78 + client 79 + .execute( 80 + r#" 81 + DROP TABLE IF EXISTS allegedly_backfill"#, 82 + &[], 83 + ) 84 + .await?; 85 + 44 86 let tx = client.transaction().await?; 45 87 46 - tx 47 - .execute(r#" 48 - CREATE TABLE backfill ( 88 + tx.execute( 89 + r#" 90 + CREATE UNLOGGED TABLE allegedly_backfill ( 49 91 did text not null, 50 92 cid text not null, 51 93 operation jsonb not null, 52 94 nullified boolean not null, 53 95 createdAt timestamptz not null 54 - )"#, &[]) 55 - .await?; 56 - 96 + )"#, 97 + &[], 98 + ) 99 + .await?; 57 100 58 101 let types = &[ 59 102 Type::TEXT, ··· 62 105 Type::BOOL, 63 106 Type::TIMESTAMPTZ, 64 107 ]; 108 + let t0 = Instant::now(); 65 109 66 - let sync = tx.copy_in("COPY backfill FROM STDIN BINARY").await?; 110 + let sync = tx 111 + .copy_in("COPY allegedly_backfill FROM STDIN BINARY") 112 + .await?; 67 113 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 68 114 69 115 while let Ok(page) = pages.recv_async().await { ··· 72 118 log::warn!("ignoring unparseable op: {s:?}"); 73 119 continue; 74 120 }; 75 - writer.as_mut().write(&[ 76 - &op.did, 77 - &op.cid, 78 - &Json(op.operation), 79 - &op.nullified, 80 - &op.created_at, 81 - ]).await?; 121 + writer 122 + .as_mut() 123 + .write(&[ 124 + &op.did, 125 + &op.cid, 126 + &Json(op.operation), 127 + &op.nullified, 128 + &op.created_at, 129 + ]) 130 + .await?; 82 131 } 83 132 } 84 133 ··· 86 135 log::info!("copied in {n} rows"); 87 136 88 137 tx.commit().await?; 138 + 139 + let dt = t0.elapsed(); 140 + log::info!("backfill time: {dt:?}"); 89 141 90 142 Ok(()) 91 143 }
+35 -7
src/weekly.rs
··· 3 3 use async_compression::tokio::write::GzipEncoder; 4 4 use core::pin::pin; 5 5 use std::future::Future; 6 + use std::ops::{Bound, RangeBounds}; 6 7 use std::path::PathBuf; 7 8 use tokio::{ 8 9 fs::File, ··· 14 15 15 16 const WEEK_IN_SECONDS: i64 = 7 * 86_400; 16 17 17 - #[derive(Debug, Clone, Copy, PartialEq)] 18 + #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] 18 19 pub struct Week(i64); 19 20 20 21 impl Week { 21 22 pub const fn from_n(n: i64) -> Self { 22 23 Self(n) 23 24 } 25 + pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> { 26 + let first = match r.start_bound() { 27 + Bound::Included(week) => *week, 28 + Bound::Excluded(week) => week.next(), 29 + Bound::Unbounded => panic!("week range must have a defined start bound"), 30 + }; 31 + let last = match r.end_bound() { 32 + Bound::Included(week) => *week, 33 + Bound::Excluded(week) => week.prev(), 34 + Bound::Unbounded => Self(Self::nullification_cutoff()).prev(), 35 + }; 36 + let mut out = Vec::new(); 37 + let mut current = first; 38 + while current <= last { 39 + out.push(current); 40 + current = current.next(); 41 + } 42 + out 43 + } 24 44 pub fn n_ago(&self) -> i64 { 25 - let Self(us) = self; 26 - let Self(cur) = chrono::Utc::now().into(); 27 - (cur - us) / WEEK_IN_SECONDS 45 + let now = chrono::Utc::now().timestamp(); 46 + (now - self.0) / WEEK_IN_SECONDS 47 + } 48 + pub fn n_until(&self, other: Week) -> i64 { 49 + let Self(until) = other; 50 + (until - self.0) / WEEK_IN_SECONDS 28 51 } 29 52 pub fn next(&self) -> Week { 30 53 Self(self.0 + WEEK_IN_SECONDS) 31 54 } 32 - /// is the plc log for this week entirely outside the 72h nullification window 55 + pub fn prev(&self) -> Week { 56 + Self(self.0 - WEEK_IN_SECONDS) 57 + } 58 + /// whether the plc log for this week outside the 72h nullification window 33 59 /// 34 60 /// plus one hour for safety (week must have ended > 73 hours ago) 35 61 pub fn is_immutable(&self) -> bool { 62 + self.next().0 <= Self::nullification_cutoff() 63 + } 64 + fn nullification_cutoff() -> i64 { 36 65 const HOUR_IN_SECONDS: i64 = 3600; 37 66 let now = chrono::Utc::now().timestamp(); 38 - let nullification_cutoff = now - (73 * HOUR_IN_SECONDS); 39 - self.next().0 <= nullification_cutoff 67 + now - (73 * HOUR_IN_SECONDS) 40 68 } 41 69 } 42 70