Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op};
2use async_compression::tokio::bufread::GzipDecoder;
3use async_compression::tokio::write::GzipEncoder;
4use core::pin::pin;
5use reqwest::Url;
6use std::future::Future;
7use std::ops::{Bound, RangeBounds};
8use std::path::PathBuf;
9use tokio::{
10 fs::File,
11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader},
12 sync::mpsc,
13};
14use tokio_stream::wrappers::LinesStream;
15use tokio_util::compat::FuturesAsyncReadCompatExt;
16
17const WEEK_IN_SECONDS: i64 = 7 * 86_400;
18
19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
20pub struct Week(i64);
21
22impl Week {
23 pub const fn from_n(n: i64) -> Self {
24 Self(n)
25 }
26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> {
27 let first = match r.start_bound() {
28 Bound::Included(week) => *week,
29 Bound::Excluded(week) => week.next(),
30 Bound::Unbounded => panic!("week range must have a defined start bound"),
31 };
32 let last = match r.end_bound() {
33 Bound::Included(week) => *week,
34 Bound::Excluded(week) => week.prev(),
35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(),
36 };
37 let mut out = Vec::new();
38 let mut current = first;
39 while current <= last {
40 out.push(current);
41 current = current.next();
42 }
43 out
44 }
45 pub fn n_ago(&self) -> i64 {
46 let now = chrono::Utc::now().timestamp();
47 (now - self.0) / WEEK_IN_SECONDS
48 }
49 pub fn n_until(&self, other: Week) -> i64 {
50 let Self(until) = other;
51 (until - self.0) / WEEK_IN_SECONDS
52 }
53 pub fn next(&self) -> Week {
54 Self(self.0 + WEEK_IN_SECONDS)
55 }
56 pub fn prev(&self) -> Week {
57 Self(self.0 - WEEK_IN_SECONDS)
58 }
59 /// whether the plc log for this week outside the 72h nullification window
60 ///
61 /// plus one hour for safety (week must have ended > 73 hours ago)
62 pub fn is_immutable(&self) -> bool {
63 self.next().0 <= Self::nullification_cutoff()
64 }
65 fn nullification_cutoff() -> i64 {
66 const HOUR_IN_SECONDS: i64 = 3600;
67 let now = chrono::Utc::now().timestamp();
68 now - (73 * HOUR_IN_SECONDS)
69 }
70}
71
72impl From<Dt> for Week {
73 fn from(dt: Dt) -> Self {
74 let ts = dt.timestamp();
75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS;
76 Week(truncated)
77 }
78}
79
80impl From<Week> for Dt {
81 fn from(week: Week) -> Dt {
82 let Week(ts) = week;
83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range")
84 }
85}
86
87pub trait BundleSource: Clone {
88 fn reader_for(
89 &self,
90 week: Week,
91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send;
92}
93
94#[derive(Debug, Clone)]
95pub struct FolderSource(pub PathBuf);
96impl BundleSource for FolderSource {
97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
98 let FolderSource(dir) = self;
99 let path = dir.join(format!("{}.jsonl.gz", week.0));
100 log::debug!("opening folder source: {path:?}");
101 let file = File::open(path)
102 .await
103 .inspect_err(|e| log::error!("failed to open file: {e}"))?;
104 Ok(file)
105 }
106}
107
108#[derive(Debug, Clone)]
109pub struct HttpSource(pub Url);
110impl BundleSource for HttpSource {
111 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
112 use futures::TryStreamExt;
113 let HttpSource(base) = self;
114 let url = base.join(&format!("{}.jsonl.gz", week.0))?;
115 Ok(CLIENT
116 .get(url)
117 .send()
118 .await?
119 .error_for_status()?
120 .bytes_stream()
121 .map_err(futures::io::Error::other)
122 .into_async_read()
123 .compat())
124 }
125}
126
127pub async fn pages_to_weeks(
128 mut rx: mpsc::Receiver<ExportPage>,
129 dir: PathBuf,
130 clobber: bool,
131) -> anyhow::Result<()> {
132 pub use std::time::Instant;
133
134 // ...there is certainly a nicer way to write this
135 let mut current_week: Option<Week> = None;
136 let dummy_file = File::create(dir.join("_dummy")).await?;
137 let mut encoder = GzipEncoder::new(dummy_file);
138
139 let mut total_ops = 0;
140 let total_t0 = Instant::now();
141 let mut week_ops = 0;
142 let mut week_t0 = total_t0;
143
144 while let Some(page) = rx.recv().await {
145 for op in page.ops {
146 let op_week = op.created_at.into();
147 if current_week.map(|w| w != op_week).unwrap_or(true) {
148 encoder.shutdown().await?;
149 let now = Instant::now();
150
151 log::info!(
152 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
153 current_week.map(|w| -w.n_ago()).unwrap_or(0),
154 current_week.unwrap_or(Week(0)).0,
155 (week_ops as f64) / (now - week_t0).as_secs_f64(),
156 total_ops / 1000,
157 (total_ops as f64) / (now - total_t0).as_secs_f64(),
158 );
159 let path = dir.join(format!("{}.jsonl.gz", op_week.0));
160 let file = if clobber {
161 File::create(path).await?
162 } else {
163 File::create_new(path).await?
164 };
165 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best);
166 current_week = Some(op_week);
167 week_ops = 0;
168 week_t0 = now;
169 }
170 log::trace!("writing: {op:?}");
171 encoder
172 .write_all(serde_json::to_string(&op)?.as_bytes())
173 .await?;
174 total_ops += 1;
175 week_ops += 1;
176 }
177 }
178
179 // don't forget the final file
180 encoder.shutdown().await?;
181 let now = Instant::now();
182 log::info!(
183 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
184 current_week.map(|w| -w.n_ago()).unwrap_or(0),
185 current_week.unwrap_or(Week(0)).0,
186 (week_ops as f64) / (now - week_t0).as_secs_f64(),
187 total_ops / 1000,
188 (total_ops as f64) / (now - total_t0).as_secs_f64(),
189 );
190
191 Ok(())
192}
193
194pub async fn week_to_pages(
195 source: impl BundleSource,
196 week: Week,
197 dest: mpsc::Sender<ExportPage>,
198) -> anyhow::Result<()> {
199 use futures::TryStreamExt;
200 let reader = source
201 .reader_for(week)
202 .await
203 .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?;
204 let decoder = GzipDecoder::new(BufReader::new(reader));
205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000));
206
207 while let Some(chunk) = chunks
208 .try_next()
209 .await
210 .inspect_err(|e| log::error!("failed to get next chunk: {e}"))?
211 {
212 let ops: Vec<Op> = chunk
213 .into_iter()
214 .filter_map(|s| {
215 serde_json::from_str::<Op>(&s)
216 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
217 .ok()
218 })
219 .collect();
220 let page = ExportPage { ops };
221 dest.send(page)
222 .await
223 .inspect_err(|e| log::error!("failed to send page: {e}"))?;
224 }
225 Ok(())
226}