Server tools to backfill, tail, mirror, and verify PLC logs
at main 226 lines 7.3 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op}; 2use async_compression::tokio::bufread::GzipDecoder; 3use async_compression::tokio::write::GzipEncoder; 4use core::pin::pin; 5use reqwest::Url; 6use std::future::Future; 7use std::ops::{Bound, RangeBounds}; 8use std::path::PathBuf; 9use tokio::{ 10 fs::File, 11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, 12 sync::mpsc, 13}; 14use tokio_stream::wrappers::LinesStream; 15use tokio_util::compat::FuturesAsyncReadCompatExt; 16 17const WEEK_IN_SECONDS: i64 = 7 * 86_400; 18 19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] 20pub struct Week(i64); 21 22impl Week { 23 pub const fn from_n(n: i64) -> Self { 24 Self(n) 25 } 26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> { 27 let first = match r.start_bound() { 28 Bound::Included(week) => *week, 29 Bound::Excluded(week) => week.next(), 30 Bound::Unbounded => panic!("week range must have a defined start bound"), 31 }; 32 let last = match r.end_bound() { 33 Bound::Included(week) => *week, 34 Bound::Excluded(week) => week.prev(), 35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(), 36 }; 37 let mut out = Vec::new(); 38 let mut current = first; 39 while current <= last { 40 out.push(current); 41 current = current.next(); 42 } 43 out 44 } 45 pub fn n_ago(&self) -> i64 { 46 let now = chrono::Utc::now().timestamp(); 47 (now - self.0) / WEEK_IN_SECONDS 48 } 49 pub fn n_until(&self, other: Week) -> i64 { 50 let Self(until) = other; 51 (until - self.0) / WEEK_IN_SECONDS 52 } 53 pub fn next(&self) -> Week { 54 Self(self.0 + WEEK_IN_SECONDS) 55 } 56 pub fn prev(&self) -> Week { 57 Self(self.0 - WEEK_IN_SECONDS) 58 } 59 /// whether the plc log for this week outside the 72h nullification window 60 /// 61 /// plus one hour for safety (week must have ended > 73 hours ago) 62 pub fn is_immutable(&self) -> bool { 63 self.next().0 <= Self::nullification_cutoff() 64 } 65 fn nullification_cutoff() -> i64 { 66 const HOUR_IN_SECONDS: i64 = 3600; 67 let now = chrono::Utc::now().timestamp(); 68 now - (73 * HOUR_IN_SECONDS) 69 } 70} 71 72impl From<Dt> for Week { 73 fn from(dt: Dt) -> Self { 74 let ts = dt.timestamp(); 75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 76 Week(truncated) 77 } 78} 79 80impl From<Week> for Dt { 81 fn from(week: Week) -> Dt { 82 let Week(ts) = week; 83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range") 84 } 85} 86 87pub trait BundleSource: Clone { 88 fn reader_for( 89 &self, 90 week: Week, 91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send; 92} 93 94#[derive(Debug, Clone)] 95pub struct FolderSource(pub PathBuf); 96impl BundleSource for FolderSource { 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 log::debug!("opening folder source: {path:?}"); 101 let file = File::open(path) 102 .await 103 .inspect_err(|e| log::error!("failed to open file: {e}"))?; 104 Ok(file) 105 } 106} 107 108#[derive(Debug, Clone)] 109pub struct HttpSource(pub Url); 110impl BundleSource for HttpSource { 111 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 112 use futures::TryStreamExt; 113 let HttpSource(base) = self; 114 let url = base.join(&format!("{}.jsonl.gz", week.0))?; 115 Ok(CLIENT 116 .get(url) 117 .send() 118 .await? 119 .error_for_status()? 120 .bytes_stream() 121 .map_err(futures::io::Error::other) 122 .into_async_read() 123 .compat()) 124 } 125} 126 127pub async fn pages_to_weeks( 128 mut rx: mpsc::Receiver<ExportPage>, 129 dir: PathBuf, 130 clobber: bool, 131) -> anyhow::Result<()> { 132 pub use std::time::Instant; 133 134 // ...there is certainly a nicer way to write this 135 let mut current_week: Option<Week> = None; 136 let dummy_file = File::create(dir.join("_dummy")).await?; 137 let mut encoder = GzipEncoder::new(dummy_file); 138 139 let mut total_ops = 0; 140 let total_t0 = Instant::now(); 141 let mut week_ops = 0; 142 let mut week_t0 = total_t0; 143 144 while let Some(page) = rx.recv().await { 145 for op in page.ops { 146 let op_week = op.created_at.into(); 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 148 encoder.shutdown().await?; 149 let now = Instant::now(); 150 151 log::info!( 152 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 153 current_week.map(|w| -w.n_ago()).unwrap_or(0), 154 current_week.unwrap_or(Week(0)).0, 155 (week_ops as f64) / (now - week_t0).as_secs_f64(), 156 total_ops / 1000, 157 (total_ops as f64) / (now - total_t0).as_secs_f64(), 158 ); 159 let path = dir.join(format!("{}.jsonl.gz", op_week.0)); 160 let file = if clobber { 161 File::create(path).await? 162 } else { 163 File::create_new(path).await? 164 }; 165 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best); 166 current_week = Some(op_week); 167 week_ops = 0; 168 week_t0 = now; 169 } 170 log::trace!("writing: {op:?}"); 171 encoder 172 .write_all(serde_json::to_string(&op)?.as_bytes()) 173 .await?; 174 total_ops += 1; 175 week_ops += 1; 176 } 177 } 178 179 // don't forget the final file 180 encoder.shutdown().await?; 181 let now = Instant::now(); 182 log::info!( 183 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 184 current_week.map(|w| -w.n_ago()).unwrap_or(0), 185 current_week.unwrap_or(Week(0)).0, 186 (week_ops as f64) / (now - week_t0).as_secs_f64(), 187 total_ops / 1000, 188 (total_ops as f64) / (now - total_t0).as_secs_f64(), 189 ); 190 191 Ok(()) 192} 193 194pub async fn week_to_pages( 195 source: impl BundleSource, 196 week: Week, 197 dest: mpsc::Sender<ExportPage>, 198) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 let reader = source 201 .reader_for(week) 202 .await 203 .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 let decoder = GzipDecoder::new(BufReader::new(reader)); 205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 206 207 while let Some(chunk) = chunks 208 .try_next() 209 .await 210 .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 { 212 let ops: Vec<Op> = chunk 213 .into_iter() 214 .filter_map(|s| { 215 serde_json::from_str::<Op>(&s) 216 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 .ok() 218 }) 219 .collect(); 220 let page = ExportPage { ops }; 221 dest.send(page) 222 .await 223 .inspect_err(|e| log::error!("failed to send page: {e}"))?; 224 } 225 Ok(()) 226}