Server tools to backfill, tail, mirror, and verify PLC logs
1use allegedly::{
2 Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg,
3 bin::{GlobalArgs, bin_init},
4 full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream,
5};
6use clap::Parser;
7use reqwest::Url;
8use std::{path::PathBuf, time::Duration};
9use tokio::{
10 sync::{mpsc, oneshot},
11 task::JoinSet,
12};
13
14pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
15
16#[derive(Debug, clap::Args)]
17pub struct Args {
18 /// Remote URL prefix to fetch bundles from
19 #[arg(long)]
20 #[clap(default_value = DEFAULT_HTTP)]
21 http: Url,
22 /// Local folder to fetch bundles from (overrides `http`)
23 #[arg(long)]
24 dir: Option<PathBuf>,
25 /// Don't do weekly bulk-loading at all.
26 ///
27 /// overrides `http` and `dir`, makes catch_up redundant
28 #[arg(long, action)]
29 no_bulk: bool,
30 /// Parallel bundle fetchers
31 ///
32 /// Default: 4 for http fetches, 1 for local folder
33 #[arg(long)]
34 source_workers: Option<usize>,
35 /// Bulk load into did-method-plc-compatible postgres instead of stdout
36 ///
37 /// Pass a postgres connection url like "postgresql://localhost:5432"
38 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
39 to_postgres: Option<Url>,
40 /// Cert for postgres (if needed)
41 #[arg(long)]
42 postgres_cert: Option<PathBuf>,
43 /// Delete all operations from the postgres db before starting
44 ///
45 /// only used if `--to-postgres` is present
46 #[arg(long, action)]
47 postgres_reset: bool,
48 /// Stop at the week ending before this date
49 #[arg(long)]
50 until: Option<Dt>,
51 /// After the weekly imports, poll upstream until we're caught up
52 #[arg(long, action)]
53 catch_up: bool,
54}
55
56pub async fn run(
57 GlobalArgs {
58 upstream,
59 upstream_throttle_ms,
60 }: GlobalArgs,
61 Args {
62 http,
63 dir,
64 no_bulk,
65 source_workers,
66 to_postgres,
67 postgres_cert,
68 postgres_reset,
69 until,
70 catch_up,
71 }: Args,
72) -> anyhow::Result<()> {
73 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
74
75 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
76
77 // a bulk sink can notify us as soon as the very last op's time is known
78 // so we can start catching up while the sink might restore indexes and such
79 let (found_last_tx, found_last_out) = if catch_up {
80 let (tx, rx) = oneshot::channel();
81 (Some(tx), Some(rx))
82 } else {
83 (None, None)
84 };
85
86 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
87 let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter
88
89 // set up sources
90 if no_bulk {
91 // simple mode, just poll upstream from teh beginning
92 if http != DEFAULT_HTTP.parse()? {
93 log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
94 }
95 if let Some(d) = dir {
96 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
97 }
98 if let Some(u) = until {
99 log::warn!(
100 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
101 );
102 }
103 let mut upstream = upstream;
104 upstream.set_path("/export");
105 let throttle = Duration::from_millis(upstream_throttle_ms);
106 tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx));
107 tasks.spawn(full_pages(poll_out, full_tx));
108 tasks.spawn(pages_to_stdout(full_out, None));
109 } else {
110 // fun mode
111
112 // set up bulk sources
113 if let Some(dir) = dir {
114 if http != DEFAULT_HTTP.parse()? {
115 anyhow::bail!(
116 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
117 );
118 }
119 tasks.spawn(backfill(
120 FolderSource(dir),
121 bulk_tx,
122 source_workers.unwrap_or(1),
123 until,
124 ));
125 } else {
126 tasks.spawn(backfill(
127 HttpSource(http),
128 bulk_tx,
129 source_workers.unwrap_or(4),
130 until,
131 ));
132 }
133
134 // and the catch-up source...
135 if let Some(last) = found_last_out {
136 let throttle = Duration::from_millis(upstream_throttle_ms);
137 tasks.spawn(async move {
138 let mut upstream = upstream;
139 upstream.set_path("/export");
140
141 poll_upstream(last.await?, upstream, throttle, poll_tx).await
142 });
143 }
144
145 // set up sinks
146 if let Some(pg_url) = to_postgres {
147 log::trace!("connecting to postgres...");
148 let db = Db::new(pg_url.as_str(), postgres_cert).await?;
149 log::trace!("connected to postgres");
150
151 tasks.spawn(backfill_to_pg(
152 db.clone(),
153 postgres_reset,
154 bulk_out,
155 found_last_tx,
156 ));
157 if catch_up {
158 tasks.spawn(pages_to_pg(db, full_out));
159 }
160 } else {
161 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
162 if catch_up {
163 tasks.spawn(pages_to_stdout(full_out, None));
164 }
165 }
166 }
167
168 while let Some(next) = tasks.join_next().await {
169 match next {
170 Err(e) if e.is_panic() => {
171 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
172 return Err(e.into());
173 }
174 Err(e) => {
175 log::error!("a joinset task failed to join: {e}");
176 return Err(e.into());
177 }
178 Ok(Err(e)) => {
179 log::error!("a joinset task completed with error: {e}");
180 return Err(e);
181 }
182 Ok(Ok(name)) => {
183 log::trace!("a task completed: {name:?}. {} left", tasks.len());
184 }
185 }
186 }
187
188 Ok(())
189}
190
191#[derive(Debug, Parser)]
192struct CliArgs {
193 #[command(flatten)]
194 globals: GlobalArgs,
195 #[command(flatten)]
196 args: Args,
197}
198
199#[allow(dead_code)]
200#[tokio::main]
201async fn main() -> anyhow::Result<()> {
202 let args = CliArgs::parse();
203 bin_init(false);
204 log::info!("{}", logo("backfill"));
205 run(args.globals, args.args).await?;
206 Ok(())
207}