···11use allegedly::{
22- Db, Dt, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, bin_init,
33- full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
22+ Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs,
33+ bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
44};
55use clap::Parser;
66use reqwest::Url;
77use std::path::PathBuf;
88-use tokio::sync::{mpsc, oneshot};
88+use tokio::{
99+ sync::{mpsc, oneshot},
1010+ task::JoinSet,
1111+};
1212+1313+pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/";
9141015#[derive(Debug, clap::Args)]
1116pub struct Args {
1217 /// Remote URL prefix to fetch bundles from
1318 #[arg(long)]
1414- #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
1919+ #[clap(default_value = DEFAULT_HTTP)]
1520 http: Url,
1621 /// Local folder to fetch bundles from (overrides `http`)
1722 #[arg(long)]
1823 dir: Option<PathBuf>,
2424+ /// Don't do weekly bulk-loading at all.
2525+ ///
2626+ /// overrides `http` and `dir`, makes catch_up redundant
2727+ #[arg(long, action)]
2828+ no_bulk: bool,
1929 /// Parallel bundle fetchers
2030 ///
2131 /// Default: 4 for http fetches, 1 for local folder
···4757 Args {
4858 http,
4959 dir,
6060+ no_bulk,
5061 source_workers,
5162 to_postgres,
5263 postgres_cert,
···5566 catch_up,
5667 }: Args,
5768) -> anyhow::Result<()> {
5858- let (tx, rx) = mpsc::channel(32); // these are big pages
5959- tokio::task::spawn(async move {
6060- if let Some(dir) = dir {
6161- log::info!("Reading weekly bundles from local folder {dir:?}");
6262- backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
6363- .await
6464- .inspect_err(|e| log::error!("backfill from folder problem: {e}"))
6565- .expect("to source bundles from a folder");
6666- } else {
6767- log::info!("Fetching weekly bundles from from {http}");
6868- backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
6969- .await
7070- .expect("to source bundles from http");
7171- }
7272- });
6969+ let mut tasks = JoinSet::new();
7070+7171+ let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
73727474- // postgres writer will notify us as soon as the very last op's time is known
7575- // so we can start catching up while pg is restoring indexes and stuff
7676- let (notify_last_at, rx_last) = if catch_up {
7373+ // a bulk sink can notify us as soon as the very last op's time is known
7474+ // so we can start catching up while the sink might restore indexes and such
7575+ let (found_last_tx, found_last_out) = if catch_up {
7776 let (tx, rx) = oneshot::channel();
7877 (Some(tx), Some(rx))
7978 } else {
8079 (None, None)
8180 };
82818383- let to_postgres_url_bulk = to_postgres.clone();
8484- let pg_cert = postgres_cert.clone();
8585- let bulk_out_write = tokio::task::spawn(async move {
8686- if let Some(ref url) = to_postgres_url_bulk {
8787- let db = Db::new(url.as_str(), pg_cert)
8888- .await
8989- .expect("to get db for bulk out write");
9090- backfill_to_pg(db, postgres_reset, rx, notify_last_at)
9191- .await
9292- .expect("to backfill to pg");
8282+ let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages
8383+ let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter
8484+8585+ // set up sources
8686+ if no_bulk {
8787+ // simple mode, just poll upstream from teh beginning
8888+ if http != DEFAULT_HTTP.parse()? {
8989+ log::warn!("ignoring non-default bulk http setting since --no-bulk was set");
9090+ }
9191+ if let Some(d) = dir {
9292+ log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set.");
9393+ }
9494+ if let Some(u) = until {
9595+ log::warn!(
9696+ "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)"
9797+ );
9898+ }
9999+ let mut upstream = upstream;
100100+ upstream.set_path("/export");
101101+ tasks.spawn(poll_upstream(None, upstream, poll_tx));
102102+ tasks.spawn(full_pages(poll_out, full_tx));
103103+ tasks.spawn(pages_to_stdout(full_out, None));
104104+ } else {
105105+ // fun mode
106106+107107+ // set up bulk sources
108108+ if let Some(dir) = dir {
109109+ if http != DEFAULT_HTTP.parse()? {
110110+ anyhow::bail!(
111111+ "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
112112+ );
113113+ }
114114+ tasks.spawn(backfill(
115115+ FolderSource(dir),
116116+ bulk_tx,
117117+ source_workers.unwrap_or(1),
118118+ until,
119119+ ));
93120 } else {
9494- pages_to_stdout(rx, notify_last_at)
9595- .await
9696- .expect("to backfill to stdout");
121121+ tasks.spawn(backfill(
122122+ HttpSource(http),
123123+ bulk_tx,
124124+ source_workers.unwrap_or(4),
125125+ until,
126126+ ));
97127 }
9898- });
99128100100- if let Some(rx_last) = rx_last {
101101- let mut upstream = upstream;
102102- upstream.set_path("/export");
103103- // wait until the time for `after` is known
104104- let last_at = rx_last.await.expect("to get the last log's createdAt");
105105- log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
106106- let (tx, rx) = mpsc::channel(256); // these are small pages
107107- tokio::task::spawn(async move {
108108- poll_upstream(last_at, upstream, tx)
109109- .await
110110- .expect("polling upstream to work")
111111- });
112112- bulk_out_write.await.expect("to wait for bulk_out_write");
113113- log::info!("writing catch-up pages");
114114- let full_pages = full_pages(rx);
115115- if let Some(url) = to_postgres {
116116- let db = Db::new(url.as_str(), postgres_cert)
117117- .await
118118- .expect("to connect pg for catchup");
119119- pages_to_pg(db, full_pages)
120120- .await
121121- .expect("to write catch-up pages to pg");
129129+ // and the catch-up source...
130130+ if let Some(last) = found_last_out {
131131+ tasks.spawn(async move {
132132+ let mut upstream = upstream;
133133+ upstream.set_path("/export");
134134+ poll_upstream(last.await?, upstream, poll_tx).await
135135+ });
136136+ }
137137+138138+ // set up sinks
139139+ if let Some(pg_url) = to_postgres {
140140+ log::trace!("connecting to postgres...");
141141+ let db = Db::new(pg_url.as_str(), postgres_cert).await?;
142142+ log::trace!("connected to postgres");
143143+144144+ tasks.spawn(backfill_to_pg(
145145+ db.clone(),
146146+ postgres_reset,
147147+ bulk_out,
148148+ found_last_tx,
149149+ ));
150150+ tasks.spawn(pages_to_pg(db, full_out));
122151 } else {
123123- pages_to_stdout(full_pages, None)
124124- .await
125125- .expect("to write catch-up pages to stdout");
152152+ tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
153153+ tasks.spawn(pages_to_stdout(full_out, None));
126154 }
127155 }
156156+157157+ while let Some(next) = tasks.join_next().await {
158158+ match next {
159159+ Err(e) if e.is_panic() => {
160160+ log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)");
161161+ return Err(e.into());
162162+ }
163163+ Err(e) => {
164164+ log::error!("a joinset task failed to join: {e}");
165165+ return Err(e.into());
166166+ }
167167+ Ok(Err(e)) => {
168168+ log::error!("a joinset task completed with error: {e}");
169169+ return Err(e);
170170+ }
171171+ _ => {}
172172+ }
173173+ }
174174+128175 Ok(())
129176}
130177
+24-9
src/lib.rs
···8080///
8181/// PLC will return up to 1000 ops on a page, and returns full pages until it
8282/// has caught up, so this is a (hacky?) way to stop polling once we're up.
8383-pub fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> {
8484- let (tx, fwd) = mpsc::channel(1);
8585- tokio::task::spawn(async move {
8686- while let Some(page) = rx.recv().await
8787- && page.ops.len() > 900
8888- {
8989- tx.send(page).await.expect("to be able to forward a page");
8383+pub async fn full_pages(
8484+ mut rx: mpsc::Receiver<ExportPage>,
8585+ tx: mpsc::Sender<ExportPage>,
8686+) -> anyhow::Result<()> {
8787+ while let Some(page) = rx.recv().await {
8888+ let n = page.ops.len();
8989+ if n < 900 {
9090+ let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
9191+ let Some(age) = last_age else {
9292+ log::info!("full_pages done, empty final page");
9393+ return Ok(());
9494+ };
9595+ if age <= chrono::TimeDelta::hours(6) {
9696+ log::info!("full_pages done, final page of {n} ops");
9797+ } else {
9898+ log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
9999+ }
100100+ return Ok(());
90101 }
9191- });
9292- fwd
102102+ log::trace!("full_pages: continuing with page of {n} ops");
103103+ tx.send(page).await?;
104104+ }
105105+ Err(anyhow::anyhow!(
106106+ "full_pages ran out of source material, sender closed"
107107+ ))
93108}
9410995110pub async fn pages_to_stdout(