Server tools to backfill, tail, mirror, and verify PLC logs
at debug 199 lines 6.1 kB view raw
1use allegedly::{ 2 Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 4}; 5use clap::Parser; 6use reqwest::Url; 7use std::path::PathBuf; 8use tokio::{ 9 sync::{mpsc, oneshot}, 10 task::JoinSet, 11}; 12 13pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 14 15#[derive(Debug, clap::Args)] 16pub struct Args { 17 /// Remote URL prefix to fetch bundles from 18 #[arg(long)] 19 #[clap(default_value = DEFAULT_HTTP)] 20 http: Url, 21 /// Local folder to fetch bundles from (overrides `http`) 22 #[arg(long)] 23 dir: Option<PathBuf>, 24 /// Don't do weekly bulk-loading at all. 25 /// 26 /// overrides `http` and `dir`, makes catch_up redundant 27 #[arg(long, action)] 28 no_bulk: bool, 29 /// Parallel bundle fetchers 30 /// 31 /// Default: 4 for http fetches, 1 for local folder 32 #[arg(long)] 33 source_workers: Option<usize>, 34 /// Bulk load into did-method-plc-compatible postgres instead of stdout 35 /// 36 /// Pass a postgres connection url like "postgresql://localhost:5432" 37 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 38 to_postgres: Option<Url>, 39 /// Cert for postgres (if needed) 40 #[arg(long)] 41 postgres_cert: Option<PathBuf>, 42 /// Delete all operations from the postgres db before starting 43 /// 44 /// only used if `--to-postgres` is present 45 #[arg(long, action)] 46 postgres_reset: bool, 47 /// Stop at the week ending before this date 48 #[arg(long)] 49 until: Option<Dt>, 50 /// After the weekly imports, poll upstream until we're caught up 51 #[arg(long, action)] 52 catch_up: bool, 53} 54 55pub async fn run( 56 GlobalArgs { upstream }: GlobalArgs, 57 Args { 58 http, 59 dir, 60 no_bulk, 61 source_workers, 62 to_postgres, 63 postgres_cert, 64 postgres_reset, 65 until, 66 catch_up, 67 }: Args, 68) -> anyhow::Result<()> { 69 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 70 71 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 72 73 // a bulk sink can notify us as soon as the very last op's time is known 74 // so we can start catching up while the sink might restore indexes and such 75 let (found_last_tx, found_last_out) = if catch_up { 76 let (tx, rx) = oneshot::channel(); 77 (Some(tx), Some(rx)) 78 } else { 79 (None, None) 80 }; 81 82 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 83 let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 84 85 // set up sources 86 if no_bulk { 87 // simple mode, just poll upstream from teh beginning 88 if http != DEFAULT_HTTP.parse()? { 89 log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 90 } 91 if let Some(d) = dir { 92 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 93 } 94 if let Some(u) = until { 95 log::warn!( 96 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 97 ); 98 } 99 let mut upstream = upstream; 100 upstream.set_path("/export"); 101 tasks.spawn(poll_upstream(None, upstream, poll_tx)); 102 tasks.spawn(full_pages(poll_out, full_tx)); 103 tasks.spawn(pages_to_stdout(full_out, None)); 104 } else { 105 // fun mode 106 107 // set up bulk sources 108 if let Some(dir) = dir { 109 if http != DEFAULT_HTTP.parse()? { 110 anyhow::bail!( 111 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 112 ); 113 } 114 tasks.spawn(backfill( 115 FolderSource(dir), 116 bulk_tx, 117 source_workers.unwrap_or(1), 118 until, 119 )); 120 } else { 121 tasks.spawn(backfill( 122 HttpSource(http), 123 bulk_tx, 124 source_workers.unwrap_or(4), 125 until, 126 )); 127 } 128 129 // and the catch-up source... 130 if let Some(last) = found_last_out { 131 tasks.spawn(async move { 132 let mut upstream = upstream; 133 upstream.set_path("/export"); 134 poll_upstream(last.await?, upstream, poll_tx).await 135 }); 136 } 137 138 // set up sinks 139 if let Some(pg_url) = to_postgres { 140 log::trace!("connecting to postgres..."); 141 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 142 log::trace!("connected to postgres"); 143 144 tasks.spawn(backfill_to_pg( 145 db.clone(), 146 postgres_reset, 147 bulk_out, 148 found_last_tx, 149 )); 150 if catch_up { 151 tasks.spawn(pages_to_pg(db, full_out)); 152 } 153 } else { 154 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 155 if catch_up { 156 tasks.spawn(pages_to_stdout(full_out, None)); 157 } 158 } 159 } 160 161 while let Some(next) = tasks.join_next().await { 162 match next { 163 Err(e) if e.is_panic() => { 164 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 165 return Err(e.into()); 166 } 167 Err(e) => { 168 log::error!("a joinset task failed to join: {e}"); 169 return Err(e.into()); 170 } 171 Ok(Err(e)) => { 172 log::error!("a joinset task completed with error: {e}"); 173 return Err(e); 174 } 175 Ok(Ok(name)) => { 176 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 177 } 178 } 179 } 180 181 Ok(()) 182} 183 184#[derive(Debug, Parser)] 185struct CliArgs { 186 #[command(flatten)] 187 globals: GlobalArgs, 188 #[command(flatten)] 189 args: Args, 190} 191 192#[allow(dead_code)] 193#[tokio::main] 194async fn main() -> anyhow::Result<()> { 195 let args = CliArgs::parse(); 196 bin_init("backfill"); 197 run(args.globals, args.args).await?; 198 Ok(()) 199}