Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages};
2use std::sync::Arc;
3use std::time::Instant;
4use tokio::{
5 sync::{Mutex, mpsc},
6 task::JoinSet,
7};
8
9const FIRST_WEEK: Week = Week::from_n(1668643200);
10
11pub async fn backfill(
12 source: impl BundleSource + Send + 'static,
13 dest: mpsc::Sender<ExportPage>,
14 source_workers: usize,
15 until: Option<Dt>,
16) -> anyhow::Result<&'static str> {
17 // queue up the week bundles that should be available
18 let weeks = Arc::new(Mutex::new(
19 until
20 .map(|u| Week::range(FIRST_WEEK..u.into()))
21 .unwrap_or(Week::range(FIRST_WEEK..)),
22 ));
23 weeks.lock().await.reverse();
24
25 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new();
26
27 let t_step = Instant::now();
28 log::info!(
29 "fetching backfill for {} weeks with {source_workers} workers...",
30 weeks.lock().await.len()
31 );
32
33 // spin up the fetchers to work in parallel
34 for w in 0..source_workers {
35 let weeks = weeks.clone();
36 let dest = dest.clone();
37 let source = source.clone();
38 workers.spawn(async move {
39 while let Some(week) = weeks.lock().await.pop() {
40 let when = Into::<Dt>::into(week).to_rfc3339();
41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
42 week_to_pages(source.clone(), week, dest.clone())
43 .await
44 .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?;
45 }
46 log::info!("done with the weeks ig");
47 Ok(())
48 });
49 }
50
51 // TODO: handle missing/failed weeks
52
53 // wait for the big backfill to finish
54 while let Some(res) = workers.join_next().await {
55 res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56 .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
57 }
58 log::info!(
59 "finished fetching backfill in {:?}. senders remaining: {}",
60 t_step.elapsed(),
61 dest.strong_count()
62 );
63 Ok("backfill")
64}