Server tools to backfill, tail, mirror, and verify PLC logs
at main 207 lines 6.4 kB view raw
1use allegedly::{ 2 Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 bin::{GlobalArgs, bin_init}, 4 full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5}; 6use clap::Parser; 7use reqwest::Url; 8use std::{path::PathBuf, time::Duration}; 9use tokio::{ 10 sync::{mpsc, oneshot}, 11 task::JoinSet, 12}; 13 14pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 15 16#[derive(Debug, clap::Args)] 17pub struct Args { 18 /// Remote URL prefix to fetch bundles from 19 #[arg(long)] 20 #[clap(default_value = DEFAULT_HTTP)] 21 http: Url, 22 /// Local folder to fetch bundles from (overrides `http`) 23 #[arg(long)] 24 dir: Option<PathBuf>, 25 /// Don't do weekly bulk-loading at all. 26 /// 27 /// overrides `http` and `dir`, makes catch_up redundant 28 #[arg(long, action)] 29 no_bulk: bool, 30 /// Parallel bundle fetchers 31 /// 32 /// Default: 4 for http fetches, 1 for local folder 33 #[arg(long)] 34 source_workers: Option<usize>, 35 /// Bulk load into did-method-plc-compatible postgres instead of stdout 36 /// 37 /// Pass a postgres connection url like "postgresql://localhost:5432" 38 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 39 to_postgres: Option<Url>, 40 /// Cert for postgres (if needed) 41 #[arg(long)] 42 postgres_cert: Option<PathBuf>, 43 /// Delete all operations from the postgres db before starting 44 /// 45 /// only used if `--to-postgres` is present 46 #[arg(long, action)] 47 postgres_reset: bool, 48 /// Stop at the week ending before this date 49 #[arg(long)] 50 until: Option<Dt>, 51 /// After the weekly imports, poll upstream until we're caught up 52 #[arg(long, action)] 53 catch_up: bool, 54} 55 56pub async fn run( 57 GlobalArgs { 58 upstream, 59 upstream_throttle_ms, 60 }: GlobalArgs, 61 Args { 62 http, 63 dir, 64 no_bulk, 65 source_workers, 66 to_postgres, 67 postgres_cert, 68 postgres_reset, 69 until, 70 catch_up, 71 }: Args, 72) -> anyhow::Result<()> { 73 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 74 75 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 76 77 // a bulk sink can notify us as soon as the very last op's time is known 78 // so we can start catching up while the sink might restore indexes and such 79 let (found_last_tx, found_last_out) = if catch_up { 80 let (tx, rx) = oneshot::channel(); 81 (Some(tx), Some(rx)) 82 } else { 83 (None, None) 84 }; 85 86 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 87 let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 88 89 // set up sources 90 if no_bulk { 91 // simple mode, just poll upstream from teh beginning 92 if http != DEFAULT_HTTP.parse()? { 93 log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 94 } 95 if let Some(d) = dir { 96 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 97 } 98 if let Some(u) = until { 99 log::warn!( 100 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 101 ); 102 } 103 let mut upstream = upstream; 104 upstream.set_path("/export"); 105 let throttle = Duration::from_millis(upstream_throttle_ms); 106 tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 107 tasks.spawn(full_pages(poll_out, full_tx)); 108 tasks.spawn(pages_to_stdout(full_out, None)); 109 } else { 110 // fun mode 111 112 // set up bulk sources 113 if let Some(dir) = dir { 114 if http != DEFAULT_HTTP.parse()? { 115 anyhow::bail!( 116 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 117 ); 118 } 119 tasks.spawn(backfill( 120 FolderSource(dir), 121 bulk_tx, 122 source_workers.unwrap_or(1), 123 until, 124 )); 125 } else { 126 tasks.spawn(backfill( 127 HttpSource(http), 128 bulk_tx, 129 source_workers.unwrap_or(4), 130 until, 131 )); 132 } 133 134 // and the catch-up source... 135 if let Some(last) = found_last_out { 136 let throttle = Duration::from_millis(upstream_throttle_ms); 137 tasks.spawn(async move { 138 let mut upstream = upstream; 139 upstream.set_path("/export"); 140 141 poll_upstream(last.await?, upstream, throttle, poll_tx).await 142 }); 143 } 144 145 // set up sinks 146 if let Some(pg_url) = to_postgres { 147 log::trace!("connecting to postgres..."); 148 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 149 log::trace!("connected to postgres"); 150 151 tasks.spawn(backfill_to_pg( 152 db.clone(), 153 postgres_reset, 154 bulk_out, 155 found_last_tx, 156 )); 157 if catch_up { 158 tasks.spawn(pages_to_pg(db, full_out)); 159 } 160 } else { 161 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 162 if catch_up { 163 tasks.spawn(pages_to_stdout(full_out, None)); 164 } 165 } 166 } 167 168 while let Some(next) = tasks.join_next().await { 169 match next { 170 Err(e) if e.is_panic() => { 171 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 172 return Err(e.into()); 173 } 174 Err(e) => { 175 log::error!("a joinset task failed to join: {e}"); 176 return Err(e.into()); 177 } 178 Ok(Err(e)) => { 179 log::error!("a joinset task completed with error: {e}"); 180 return Err(e); 181 } 182 Ok(Ok(name)) => { 183 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 184 } 185 } 186 } 187 188 Ok(()) 189} 190 191#[derive(Debug, Parser)] 192struct CliArgs { 193 #[command(flatten)] 194 globals: GlobalArgs, 195 #[command(flatten)] 196 args: Args, 197} 198 199#[allow(dead_code)] 200#[tokio::main] 201async fn main() -> anyhow::Result<()> { 202 let args = CliArgs::parse(); 203 bin_init(false); 204 log::info!("{}", logo("backfill")); 205 run(args.globals, args.args).await?; 206 Ok(()) 207}