Server tools to backfill, tail, mirror, and verify PLC logs

parallel backfiller

+227 -56
+14
Cargo.lock
··· 46 46 "thiserror 2.0.16", 47 47 "tokio", 48 48 "tokio-postgres", 49 + "tokio-stream", 50 + "tokio-util", 49 51 "url", 50 52 ] 51 53 ··· 2035 2037 ] 2036 2038 2037 2039 [[package]] 2040 + name = "tokio-stream" 2041 + version = "0.1.17" 2042 + source = "registry+https://github.com/rust-lang/crates.io-index" 2043 + checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" 2044 + dependencies = [ 2045 + "futures-core", 2046 + "pin-project-lite", 2047 + "tokio", 2048 + ] 2049 + 2050 + [[package]] 2038 2051 name = "tokio-util" 2039 2052 version = "0.7.16" 2040 2053 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2042 2055 dependencies = [ 2043 2056 "bytes", 2044 2057 "futures-core", 2058 + "futures-io", 2045 2059 "futures-sink", 2046 2060 "pin-project-lite", 2047 2061 "tokio",
+2
Cargo.toml
··· 23 23 thiserror = "2.0.16" 24 24 tokio = { version = "1.47.1", features = ["full"] } 25 25 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 26 + tokio-stream = { version = "0.1.17", features = ["io-util"] } 27 + tokio-util = { version = "0.7.16", features = ["compat"] } 26 28 url = "2.5.7"
+1
readme.md
··· 6 6 7 7 - Tail PLC ops to stdout: `allegedly tail | jq` 8 8 - Export PLC ops to weekly gzipped bundles: `allegdly bundle --dest ./some-folder` 9 + - Dump bundled ops to stdout FAST: `allegedly backfill --source-workers 6 | pv -l > /ops-unordered.jsonl` 9 10 10 11 (add `--help` to any command for more info about it) 11 12
+37 -19
src/backfill.rs
··· 1 - use crate::{CLIENT, ExportPage}; 2 - use url::Url; 1 + use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages}; 2 + use tokio::task::JoinSet; 3 3 4 - use async_compression::futures::bufread::GzipDecoder; 5 - use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 4 + const FIRST_WEEK: Week = Week::from_n(1668643200); 6 5 7 - pub async fn week_to_pages(url: Url, dest: flume::Sender<ExportPage>) -> anyhow::Result<()> { 8 - let reader = CLIENT 9 - .get(url) 10 - .send() 11 - .await? 12 - .error_for_status()? 13 - .bytes_stream() 14 - .map_err(io::Error::other) 15 - .into_async_read(); 6 + pub async fn backfill( 7 + source: impl BundleSource + Send + 'static, 8 + dest: flume::Sender<ExportPage>, 9 + source_workers: usize, 10 + ) -> anyhow::Result<()> { 11 + // queue up the week bundles that should be available 12 + let (week_tx, week_rx) = flume::bounded(1024); // work queue 13 + let mut week = FIRST_WEEK; 14 + while week.is_immutable() { 15 + week_tx.try_send(week)?; // if this fails, something has gone really wrong or we're farrrr in the future 16 + week = week.next(); 17 + } 16 18 17 - let decoder = GzipDecoder::new(io::BufReader::new(reader)); 19 + let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new(); 18 20 19 - let mut chunks = io::BufReader::new(decoder).lines().chunks(1000); 21 + // spin up the fetchers to work in parallel 22 + for w in 0..source_workers { 23 + let weeks = week_rx.clone(); 24 + let dest = dest.clone(); 25 + let source = source.clone(); 26 + workers.spawn(async move { 27 + while let Ok(week) = weeks.recv_async().await { 28 + log::info!( 29 + "worker {w}: fetching week {} (-{})", 30 + Into::<Dt>::into(week).to_rfc3339(), 31 + week.n_ago(), 32 + ); 33 + week_to_pages(source.clone(), week, dest.clone()).await?; 34 + } 35 + Ok(()) 36 + }); 37 + } 20 38 21 - while let Some(chunk) = chunks.next().await { 22 - let ops = chunk.into_iter().collect::<Result<Vec<_>, io::Error>>()?; 23 - let page = ExportPage { ops }; 24 - dest.send_async(page).await?; 39 + // wait for them to finish 40 + while let Some(res) = workers.join_next().await { 41 + res??; 25 42 } 43 + 26 44 Ok(()) 27 45 }
+40 -1
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin_init, pages_to_weeks, poll_upstream}; 1 + use allegedly::{Dt, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks, poll_upstream}; 2 2 use clap::{Parser, Subcommand}; 3 3 use std::path::PathBuf; 4 4 use url::Url; ··· 15 15 16 16 #[derive(Debug, Subcommand)] 17 17 enum Commands { 18 + /// Use weekly bundled ops to get a complete directory mirror FAST 19 + Backfill { 20 + /// Remote URL prefix to fetch bundles from 21 + #[arg(long)] 22 + #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 23 + http: Url, 24 + /// Local folder to fetch bundles from (overrides `http`) 25 + #[arg(long)] 26 + dir: Option<PathBuf>, 27 + /// Parallel bundle fetchers 28 + #[arg(long)] 29 + #[clap(default_value = "4")] 30 + source_workers: usize, 31 + }, 18 32 /// Scrape a PLC server, collecting ops into weekly bundles 19 33 /// 20 34 /// Bundles are gzipped files named `<WEEK>.jsonl.gz` where WEEK is a unix ··· 51 65 let args = Cli::parse(); 52 66 53 67 match args.command { 68 + Commands::Backfill { 69 + http, 70 + dir, 71 + source_workers, 72 + } => { 73 + let (tx, rx) = flume::bounded(1024); // big pages 74 + tokio::task::spawn(async move { 75 + if let Some(dir) = dir { 76 + log::info!("Reading weekly bundles from local folder {dir:?}"); 77 + backfill(FolderSource(dir), tx, source_workers) 78 + .await 79 + .unwrap(); 80 + } else { 81 + log::info!("Fetching weekly bundles from from {http}"); 82 + backfill(HttpSource(http), tx, source_workers) 83 + .await 84 + .unwrap(); 85 + } 86 + }); 87 + loop { 88 + for op in rx.recv_async().await.unwrap().ops { 89 + println!("{op}") 90 + } 91 + } 92 + } 54 93 Commands::Bundle { 55 94 dest, 56 95 after,
+13 -12
src/bin/backfill.rs
··· 2 2 use std::time::Duration; 3 3 use url::Url; 4 4 5 - use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream, week_to_pages}; 5 + use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream}; 6 6 7 7 const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 8 8 const WEEK_IN_SECONDS: u64 = 7 * 86400; ··· 40 40 postgres: String, 41 41 } 42 42 43 - async fn bulk_backfill((upstream, epoch): (Url, u64), tx: flume::Sender<ExportPage>) { 43 + async fn bulk_backfill((_upstream, epoch): (Url, u64), _tx: flume::Sender<ExportPage>) { 44 44 let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400); 45 45 let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH)) 46 46 .unwrap() 47 47 .as_secs(); 48 - let immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 49 - let mut week = epoch; 50 - let mut week_n = 0; 51 - while week < immutable_week { 52 - log::info!("backfilling week {week_n} ({week})"); 53 - let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 54 - week_to_pages(url, tx.clone()).await.unwrap(); 55 - week_n += 1; 56 - week += WEEK_IN_SECONDS; 57 - } 48 + let _immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 49 + let _week = epoch; 50 + let _week_n = 0; 51 + todo!(); 52 + // while week < immutable_week { 53 + // log::info!("backfilling week {week_n} ({week})"); 54 + // let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 55 + // week_to_pages(url, tx.clone()).await.unwrap(); 56 + // week_n += 1; 57 + // week += WEEK_IN_SECONDS; 58 + // } 58 59 } 59 60 60 61 async fn export_upstream(
+39 -17
src/bin/get_backfill_chunk_adsf.rs
··· 1 - use allegedly::CLIENT; 2 - use async_compression::futures::bufread::GzipDecoder; 3 - use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 1 + use allegedly::{HttpSource, Week, week_to_pages}; 2 + use std::io::Write; 4 3 5 4 #[tokio::main] 6 5 async fn main() { 7 - let reader = CLIENT 8 - .get("https://plc.t3.storage.dev/plc.directory/1699488000.jsonl.gz") 9 - // .get("https://plc.t3.storage.dev/plc.directory/1669248000.jsonl.gz") 10 - .send() 11 - .await 12 - .unwrap() 13 - .error_for_status() 14 - .unwrap() 15 - .bytes_stream() 16 - .map_err(io::Error::other) 17 - .into_async_read(); 6 + let url: url::Url = "https://plc.t3.storage.dev/plc.directory/".parse().unwrap(); 7 + let source = HttpSource(url); 8 + // let source = FolderSource("./weekly/".into()); 9 + let week = Week::from_n(1699488000); 18 10 19 - let decoder = GzipDecoder::new(io::BufReader::new(reader)); 20 - let mut chunks = io::BufReader::new(decoder).lines().chunks(1000); 21 - while let Some(ref _chunk) = chunks.next().await { 11 + let (tx, rx) = flume::bounded(32); 12 + 13 + tokio::task::spawn(async move { 14 + week_to_pages(source, week, tx).await.unwrap(); 15 + }); 16 + 17 + let mut n = 0; 18 + 19 + print!("receiving"); 20 + while let Ok(page) = rx.recv_async().await { 22 21 print!("."); 22 + std::io::stdout().flush().unwrap(); 23 + n += page.ops.len(); 23 24 } 24 25 println!(); 26 + 27 + println!("bye ({n})"); 28 + 29 + // let reader = CLIENT 30 + // .get("https://plc.t3.storage.dev/plc.directory/1699488000.jsonl.gz") 31 + // // .get("https://plc.t3.storage.dev/plc.directory/1669248000.jsonl.gz") 32 + // .send() 33 + // .await 34 + // .unwrap() 35 + // .error_for_status() 36 + // .unwrap() 37 + // .bytes_stream() 38 + // .map_err(io::Error::other) 39 + // .into_async_read(); 40 + 41 + // let decoder = GzipDecoder::new(io::BufReader::new(reader)); 42 + // let mut chunks = io::BufReader::new(decoder).lines().chunks(1000); 43 + // while let Some(ref _chunk) = chunks.next().await { 44 + // print!("."); 45 + // } 46 + // println!(); 25 47 }
+3 -3
src/lib.rs
··· 6 6 mod poll; 7 7 mod weekly; 8 8 9 - pub use backfill::week_to_pages; 9 + pub use backfill::backfill; 10 10 pub use client::CLIENT; 11 11 pub use plc_pg::Db; 12 12 pub use poll::{get_page, poll_upstream}; 13 - pub use weekly::{Week, pages_to_weeks}; 13 + pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 14 14 15 15 pub type Dt = chrono::DateTime<chrono::Utc>; 16 16 17 17 /// One page of PLC export 18 18 /// 19 - /// Expected to have up to around 1000 lines of raw json ops 19 + /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 20 20 #[derive(Debug)] 21 21 pub struct ExportPage { 22 22 pub ops: Vec<String>,
+78 -4
src/weekly.rs
··· 1 - use crate::{Dt, ExportPage, Op}; 1 + use crate::{CLIENT, Dt, ExportPage, Op}; 2 + use async_compression::tokio::bufread::GzipDecoder; 2 3 use async_compression::tokio::write::GzipEncoder; 4 + use core::pin::pin; 5 + use std::future::Future; 3 6 use std::path::PathBuf; 4 - use tokio::{fs::File, io::AsyncWriteExt}; 7 + use tokio::{ 8 + fs::File, 9 + io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, 10 + }; 11 + use tokio_stream::wrappers::LinesStream; 12 + use tokio_util::compat::FuturesAsyncReadCompatExt; 13 + use url::Url; 5 14 6 - const WEEK_IN_SECONDS: i64 = 7 * 86400; 15 + const WEEK_IN_SECONDS: i64 = 7 * 86_400; 7 16 8 17 #[derive(Debug, Clone, Copy, PartialEq)] 9 18 pub struct Week(i64); 10 19 11 20 impl Week { 12 - pub fn from_n(n: i64) -> Self { 21 + pub const fn from_n(n: i64) -> Self { 13 22 Self(n) 14 23 } 15 24 pub fn n_ago(&self) -> i64 { ··· 17 26 let Self(cur) = chrono::Utc::now().into(); 18 27 (cur - us) / WEEK_IN_SECONDS 19 28 } 29 + pub fn next(&self) -> Week { 30 + Self(self.0 + WEEK_IN_SECONDS) 31 + } 32 + /// is the plc log for this week entirely outside the 72h nullification window 33 + /// 34 + /// plus one hour for safety (week must have ended > 73 hours ago) 35 + pub fn is_immutable(&self) -> bool { 36 + const HOUR_IN_SECONDS: i64 = 3600; 37 + let now = chrono::Utc::now().timestamp(); 38 + let nullification_cutoff = now - (73 * HOUR_IN_SECONDS); 39 + self.next().0 <= nullification_cutoff 40 + } 20 41 } 21 42 22 43 impl From<Dt> for Week { ··· 34 55 } 35 56 } 36 57 58 + pub trait BundleSource: Clone { 59 + fn reader_for( 60 + &self, 61 + week: Week, 62 + ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send; 63 + } 64 + 65 + #[derive(Debug, Clone)] 66 + pub struct FolderSource(pub PathBuf); 67 + impl BundleSource for FolderSource { 68 + async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 69 + let FolderSource(dir) = self; 70 + let path = dir.join(format!("{}.jsonl.gz", week.0)); 71 + Ok(File::open(path).await?) 72 + } 73 + } 74 + 75 + #[derive(Debug, Clone)] 76 + pub struct HttpSource(pub Url); 77 + impl BundleSource for HttpSource { 78 + async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 79 + use futures::TryStreamExt; 80 + let HttpSource(base) = self; 81 + let url = base.join(&format!("{}.jsonl.gz", week.0))?; 82 + Ok(CLIENT 83 + .get(url) 84 + .send() 85 + .await? 86 + .error_for_status()? 87 + .bytes_stream() 88 + .map_err(futures::io::Error::other) 89 + .into_async_read() 90 + .compat()) 91 + } 92 + } 93 + 37 94 pub async fn pages_to_weeks( 38 95 rx: flume::Receiver<ExportPage>, 39 96 dir: PathBuf, ··· 104 161 105 162 Ok(()) 106 163 } 164 + 165 + pub async fn week_to_pages( 166 + source: impl BundleSource, 167 + week: Week, 168 + dest: flume::Sender<ExportPage>, 169 + ) -> anyhow::Result<()> { 170 + use futures::TryStreamExt; 171 + let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?)); 172 + let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 173 + 174 + while let Some(chunk) = chunks.try_next().await? { 175 + let ops: Vec<String> = chunk.into_iter().collect(); 176 + let page = ExportPage { ops }; 177 + dest.send_async(page).await?; 178 + } 179 + Ok(()) 180 + }