forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs,
3 bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
4};
5use clap::Parser;
6use reqwest::Url;
7use std::path::PathBuf;
8use tokio::{
9 sync::{mpsc, oneshot},
10 task::JoinSet,
11};
12
13pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
14
15#[derive(Debug, clap::Args)]
16pub struct Args {
17 /// Remote URL prefix to fetch bundles from
18 #[arg(long)]
19 #[clap(default_value = DEFAULT_HTTP)]
20 http: Url,
21 /// Local folder to fetch bundles from (overrides `http`)
22 #[arg(long)]
23 dir: Option<PathBuf>,
24 /// Don't do weekly bulk-loading at all.
25 ///
26 /// overrides `http` and `dir`, makes catch_up redundant
27 #[arg(long, action)]
28 no_bulk: bool,
29 /// Parallel bundle fetchers
30 ///
31 /// Default: 4 for http fetches, 1 for local folder
32 #[arg(long)]
33 source_workers: Option<usize>,
34 /// Bulk load into did-method-plc-compatible postgres instead of stdout
35 ///
36 /// Pass a postgres connection url like "postgresql://localhost:5432"
37 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
38 to_postgres: Option<Url>,
39 /// Cert for postgres (if needed)
40 #[arg(long)]
41 postgres_cert: Option<PathBuf>,
42 /// Delete all operations from the postgres db before starting
43 ///
44 /// only used if `--to-postgres` is present
45 #[arg(long, action)]
46 postgres_reset: bool,
47 /// Stop at the week ending before this date
48 #[arg(long)]
49 until: Option<Dt>,
50 /// After the weekly imports, poll upstream until we're caught up
51 #[arg(long, action)]
52 catch_up: bool,
53}
54
55pub async fn run(
56 GlobalArgs { upstream }: GlobalArgs,
57 Args {
58 http,
59 dir,
60 no_bulk,
61 source_workers,
62 to_postgres,
63 postgres_cert,
64 postgres_reset,
65 until,
66 catch_up,
67 }: Args,
68) -> anyhow::Result<()> {
69 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
70
71 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
72
73 // a bulk sink can notify us as soon as the very last op's time is known
74 // so we can start catching up while the sink might restore indexes and such
75 let (found_last_tx, found_last_out) = if catch_up {
76 let (tx, rx) = oneshot::channel();
77 (Some(tx), Some(rx))
78 } else {
79 (None, None)
80 };
81
82 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
83 let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter
84
85 // set up sources
86 if no_bulk {
87 // simple mode, just poll upstream from teh beginning
88 if http != DEFAULT_HTTP.parse()? {
89 log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
90 }
91 if let Some(d) = dir {
92 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
93 }
94 if let Some(u) = until {
95 log::warn!(
96 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
97 );
98 }
99 let mut upstream = upstream;
100 upstream.set_path("/export");
101 tasks.spawn(poll_upstream(None, upstream, poll_tx));
102 tasks.spawn(full_pages(poll_out, full_tx));
103 tasks.spawn(pages_to_stdout(full_out, None));
104 } else {
105 // fun mode
106
107 // set up bulk sources
108 if let Some(dir) = dir {
109 if http != DEFAULT_HTTP.parse()? {
110 anyhow::bail!(
111 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
112 );
113 }
114 tasks.spawn(backfill(
115 FolderSource(dir),
116 bulk_tx,
117 source_workers.unwrap_or(1),
118 until,
119 ));
120 } else {
121 tasks.spawn(backfill(
122 HttpSource(http),
123 bulk_tx,
124 source_workers.unwrap_or(4),
125 until,
126 ));
127 }
128
129 // and the catch-up source...
130 if let Some(last) = found_last_out {
131 tasks.spawn(async move {
132 let mut upstream = upstream;
133 upstream.set_path("/export");
134 poll_upstream(last.await?, upstream, poll_tx).await
135 });
136 }
137
138 // set up sinks
139 if let Some(pg_url) = to_postgres {
140 log::trace!("connecting to postgres...");
141 let db = Db::new(pg_url.as_str(), postgres_cert).await?;
142 log::trace!("connected to postgres");
143
144 tasks.spawn(backfill_to_pg(
145 db.clone(),
146 postgres_reset,
147 bulk_out,
148 found_last_tx,
149 ));
150 if catch_up {
151 tasks.spawn(pages_to_pg(db, full_out));
152 }
153 } else {
154 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
155 if catch_up {
156 tasks.spawn(pages_to_stdout(full_out, None));
157 }
158 }
159 }
160
161 while let Some(next) = tasks.join_next().await {
162 match next {
163 Err(e) if e.is_panic() => {
164 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
165 return Err(e.into());
166 }
167 Err(e) => {
168 log::error!("a joinset task failed to join: {e}");
169 return Err(e.into());
170 }
171 Ok(Err(e)) => {
172 log::error!("a joinset task completed with error: {e}");
173 return Err(e);
174 }
175 Ok(Ok(name)) => {
176 log::trace!("a task completed: {name:?}. {} left", tasks.len());
177 }
178 }
179 }
180
181 Ok(())
182}
183
184#[derive(Debug, Parser)]
185struct CliArgs {
186 #[command(flatten)]
187 globals: GlobalArgs,
188 #[command(flatten)]
189 args: Args,
190}
191
192#[allow(dead_code)]
193#[tokio::main]
194async fn main() -> anyhow::Result<()> {
195 let args = CliArgs::parse();
196 bin_init("backfill");
197 run(args.globals, args.args).await?;
198 Ok(())
199}