Server tools to backfill, tail, mirror, and verify PLC logs

drop flume

don't need multiple consumers after all

+34 -59
-31
Cargo.lock
··· 35 35 "chrono", 36 36 "clap", 37 37 "env_logger", 38 - "flume", 39 38 "futures", 40 39 "log", 41 40 "reqwest", ··· 446 445 dependencies = [ 447 446 "crc32fast", 448 447 "miniz_oxide", 449 - ] 450 - 451 - [[package]] 452 - name = "flume" 453 - version = "0.11.1" 454 - source = "registry+https://github.com/rust-lang/crates.io-index" 455 - checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" 456 - dependencies = [ 457 - "futures-core", 458 - "futures-sink", 459 - "nanorand", 460 - "spin", 461 448 ] 462 449 463 450 [[package]] ··· 1094 1081 ] 1095 1082 1096 1083 [[package]] 1097 - name = "nanorand" 1098 - version = "0.7.0" 1099 - source = "registry+https://github.com/rust-lang/crates.io-index" 1100 - checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" 1101 - dependencies = [ 1102 - "getrandom 0.2.16", 1103 - ] 1104 - 1105 - [[package]] 1106 1084 name = "native-tls" 1107 1085 version = "0.2.14" 1108 1086 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1789 1767 dependencies = [ 1790 1768 "libc", 1791 1769 "windows-sys 0.59.0", 1792 - ] 1793 - 1794 - [[package]] 1795 - name = "spin" 1796 - version = "0.9.8" 1797 - source = "registry+https://github.com/rust-lang/crates.io-index" 1798 - checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1799 - dependencies = [ 1800 - "lock_api", 1801 1770 ] 1802 1771 1803 1772 [[package]]
-1
Cargo.toml
··· 12 12 chrono = { version = "0.4.42", features = ["serde"] } 13 13 clap = { version = "4.5.47", features = ["derive", "env"] } 14 14 env_logger = "0.11.8" 15 - flume = "0.11.1" 16 15 futures = "0.3.31" 17 16 log = "0.4.28" 18 17 reqwest = { version = "0.12.23", features = ["stream"] }
+5 -2
src/backfill.rs
··· 1 1 use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages}; 2 2 use std::sync::Arc; 3 3 use std::time::Instant; 4 - use tokio::{sync::Mutex, task::JoinSet}; 4 + use tokio::{ 5 + sync::{Mutex, mpsc}, 6 + task::JoinSet, 7 + }; 5 8 6 9 const FIRST_WEEK: Week = Week::from_n(1668643200); 7 10 8 11 pub async fn backfill( 9 12 source: impl BundleSource + Send + 'static, 10 - dest: flume::Sender<ExportPage>, 13 + dest: mpsc::Sender<ExportPage>, 11 14 source_workers: usize, 12 15 until: Option<Dt>, 13 16 ) -> anyhow::Result<()> {
+15 -13
src/bin/allegedly.rs
··· 3 3 bin_init, pages_to_pg, pages_to_weeks, poll_upstream, 4 4 }; 5 5 use clap::{Parser, Subcommand}; 6 - use std::path::PathBuf; 7 - use tokio::sync::oneshot; 6 + use std::{path::PathBuf, time::Instant}; 7 + use tokio::sync::{mpsc, oneshot}; 8 8 use url::Url; 9 9 10 10 #[derive(Debug, Parser)] ··· 80 80 } 81 81 82 82 async fn pages_to_stdout( 83 - rx: flume::Receiver<ExportPage>, 83 + mut rx: mpsc::Receiver<ExportPage>, 84 84 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 85 - ) -> Result<(), flume::RecvError> { 85 + ) -> anyhow::Result<()> { 86 86 let mut last_at = None; 87 - while let Ok(page) = rx.recv_async().await { 87 + while let Some(page) = rx.recv().await { 88 88 for op in &page.ops { 89 89 println!("{op}"); 90 90 } ··· 107 107 /// 108 108 /// PLC will return up to 1000 ops on a page, and returns full pages until it 109 109 /// has caught up, so this is a (hacky?) way to stop polling once we're up. 110 - fn full_pages(rx: flume::Receiver<ExportPage>) -> flume::Receiver<ExportPage> { 111 - let (tx, fwd) = flume::bounded(0); 110 + fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 111 + let (tx, fwd) = mpsc::channel(1); 112 112 tokio::task::spawn(async move { 113 - while let Ok(page) = rx.recv_async().await 113 + while let Some(page) = rx.recv().await 114 114 && page.ops.len() > 900 115 115 { 116 - tx.send_async(page).await.unwrap(); 116 + tx.send(page).await.unwrap(); 117 117 } 118 118 }); 119 119 fwd ··· 125 125 126 126 let args = Cli::parse(); 127 127 128 + let t0 = Instant::now(); 128 129 match args.command { 129 130 Commands::Backfill { 130 131 http, ··· 135 136 until, 136 137 catch_up, 137 138 } => { 138 - let (tx, rx) = flume::bounded(32); // these are big pages 139 + let (tx, rx) = mpsc::channel(32); // these are big pages 139 140 tokio::task::spawn(async move { 140 141 if let Some(dir) = dir { 141 142 log::info!("Reading weekly bundles from local folder {dir:?}"); ··· 177 178 // wait until the time for `after` is known 178 179 let last_at = rx_last.await.unwrap(); 179 180 log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 180 - let (tx, rx) = flume::bounded(256); 181 + let (tx, rx) = mpsc::channel(256); // these are small pages 181 182 tokio::task::spawn( 182 183 async move { poll_upstream(last_at, upstream, tx).await.unwrap() }, 183 184 ); ··· 199 200 } => { 200 201 let mut url = args.upstream; 201 202 url.set_path("/export"); 202 - let (tx, rx) = flume::bounded(32); // read ahead if gzip stalls for some reason 203 + let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 203 204 tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() }); 204 205 log::trace!("ensuring output directory exists"); 205 206 std::fs::create_dir_all(&dest).unwrap(); ··· 209 210 let mut url = args.upstream; 210 211 url.set_path("/export"); 211 212 let start_at = after.or_else(|| Some(chrono::Utc::now())); 212 - let (tx, rx) = flume::bounded(1); 213 + let (tx, rx) = mpsc::channel(1); 213 214 tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 214 215 pages_to_stdout(rx, None).await.unwrap(); 215 216 } 216 217 } 218 + log::info!("whew, {:?}. goodbye!", t0.elapsed()); 217 219 }
+5 -5
src/plc_pg.rs
··· 1 1 use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 2 use std::pin::pin; 3 3 use std::time::Instant; 4 - use tokio::sync::oneshot; 4 + use tokio::sync::{mpsc, oneshot}; 5 5 use tokio_postgres::{ 6 6 Client, Error as PgError, NoTls, 7 7 binary_copy::BinaryCopyInWriter, ··· 72 72 } 73 73 } 74 74 75 - pub async fn pages_to_pg(db: Db, pages: flume::Receiver<ExportPage>) -> Result<(), PgError> { 75 + pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), PgError> { 76 76 let mut client = db.connect().await?; 77 77 78 78 let ops_stmt = client ··· 89 89 let mut ops_inserted = 0; 90 90 let mut dids_inserted = 0; 91 91 92 - while let Ok(page) = pages.recv_async().await { 92 + while let Some(page) = pages.recv().await { 93 93 log::trace!("writing page with {} ops", page.ops.len()); 94 94 let tx = client.transaction().await?; 95 95 for s in page.ops { ··· 137 137 pub async fn backfill_to_pg( 138 138 db: Db, 139 139 reset: bool, 140 - pages: flume::Receiver<ExportPage>, 140 + mut pages: mpsc::Receiver<ExportPage>, 141 141 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 142 142 ) -> Result<(), PgError> { 143 143 let mut client = db.connect().await?; ··· 195 195 .await?; 196 196 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 197 197 let mut last_at = None; 198 - while let Ok(page) = pages.recv_async().await { 198 + while let Some(page) = pages.recv().await { 199 199 for s in &page.ops { 200 200 let Ok(op) = serde_json::from_str::<Op>(s) else { 201 201 log::warn!("ignoring unparseable op: {s:?}");
+4 -3
src/poll.rs
··· 1 1 use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2 2 use std::time::Duration; 3 3 use thiserror::Error; 4 + use tokio::sync::mpsc; 4 5 use url::Url; 5 6 6 7 // plc.directory ratelimit on /export is 500 per 5 mins ··· 209 210 pub async fn poll_upstream( 210 211 after: Option<Dt>, 211 212 base: Url, 212 - dest: flume::Sender<ExportPage>, 213 + dest: mpsc::Sender<ExportPage>, 213 214 ) -> anyhow::Result<()> { 214 215 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 215 216 let mut prev_last: Option<LastOp> = after.map(Into::into); ··· 232 233 if !page.is_empty() { 233 234 match dest.try_send(page) { 234 235 Ok(()) => {} 235 - Err(flume::TrySendError::Full(page)) => { 236 + Err(mpsc::error::TrySendError::Full(page)) => { 236 237 log::warn!("export: destination channel full, awaiting..."); 237 - dest.send_async(page).await?; 238 + dest.send(page).await?; 238 239 } 239 240 e => e?, 240 241 };
+5 -4
src/weekly.rs
··· 8 8 use tokio::{ 9 9 fs::File, 10 10 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, 11 + sync::mpsc, 11 12 }; 12 13 use tokio_stream::wrappers::LinesStream; 13 14 use tokio_util::compat::FuturesAsyncReadCompatExt; ··· 120 121 } 121 122 122 123 pub async fn pages_to_weeks( 123 - rx: flume::Receiver<ExportPage>, 124 + mut rx: mpsc::Receiver<ExportPage>, 124 125 dir: PathBuf, 125 126 clobber: bool, 126 127 ) -> anyhow::Result<()> { ··· 136 137 let mut week_ops = 0; 137 138 let mut week_t0 = total_t0; 138 139 139 - while let Ok(page) = rx.recv_async().await { 140 + while let Some(page) = rx.recv().await { 140 141 for mut s in page.ops { 141 142 let Ok(op) = serde_json::from_str::<Op>(&s) 142 143 .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) ··· 193 194 pub async fn week_to_pages( 194 195 source: impl BundleSource, 195 196 week: Week, 196 - dest: flume::Sender<ExportPage>, 197 + dest: mpsc::Sender<ExportPage>, 197 198 ) -> anyhow::Result<()> { 198 199 use futures::TryStreamExt; 199 200 let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?)); ··· 202 203 while let Some(chunk) = chunks.try_next().await? { 203 204 let ops: Vec<String> = chunk.into_iter().collect(); 204 205 let page = ExportPage { ops }; 205 - dest.send_async(page).await?; 206 + dest.send(page).await?; 206 207 } 207 208 Ok(()) 208 209 }