Server tools to backfill, tail, mirror, and verify PLC logs
at main 64 lines 2.1 kB view raw
1use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages}; 2use std::sync::Arc; 3use std::time::Instant; 4use tokio::{ 5 sync::{Mutex, mpsc}, 6 task::JoinSet, 7}; 8 9const FIRST_WEEK: Week = Week::from_n(1668643200); 10 11pub async fn backfill( 12 source: impl BundleSource + Send + 'static, 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16) -> anyhow::Result<&'static str> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until 20 .map(|u| Week::range(FIRST_WEEK..u.into())) 21 .unwrap_or(Week::range(FIRST_WEEK..)), 22 )); 23 weeks.lock().await.reverse(); 24 25 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new(); 26 27 let t_step = Instant::now(); 28 log::info!( 29 "fetching backfill for {} weeks with {source_workers} workers...", 30 weeks.lock().await.len() 31 ); 32 33 // spin up the fetchers to work in parallel 34 for w in 0..source_workers { 35 let weeks = weeks.clone(); 36 let dest = dest.clone(); 37 let source = source.clone(); 38 workers.spawn(async move { 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 week_to_pages(source.clone(), week, dest.clone()) 43 .await 44 .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 45 } 46 log::info!("done with the weeks ig"); 47 Ok(()) 48 }); 49 } 50 51 // TODO: handle missing/failed weeks 52 53 // wait for the big backfill to finish 54 while let Some(res) = workers.join_next().await { 55 res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 57 } 58 log::info!( 59 "finished fetching backfill in {:?}. senders remaining: {}", 60 t_step.elapsed(), 61 dest.strong_count() 62 ); 63 Ok("backfill") 64}