···11use crate::{BundleSource, Dt, ExportPage, Week, week_to_pages};
22use std::sync::Arc;
33+use std::time::Instant;
34use tokio::{sync::Mutex, task::JoinSet};
4556const FIRST_WEEK: Week = Week::from_n(1668643200);
···20212122 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new();
22232424+ let t_step = Instant::now();
2525+ log::info!(
2626+ "fetching backfill for {} weeks with {source_workers} workers...",
2727+ weeks.lock().await.len()
2828+ );
2929+2330 // spin up the fetchers to work in parallel
2431 for w in 0..source_workers {
2532 let weeks = weeks.clone();
2633 let dest = dest.clone();
2734 let source = source.clone();
2835 workers.spawn(async move {
2929- log::trace!("about to get weeks...");
3030-3136 while let Some(week) = weeks.lock().await.pop() {
3232- log::trace!(
3333- "worker {w}: fetching week {} (-{})",
3434- Into::<Dt>::into(week).to_rfc3339(),
3535- week.n_ago(),
3636- );
3737+ let when = Into::<Dt>::into(week).to_rfc3339();
3838+ log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago());
3739 week_to_pages(source.clone(), week, dest.clone()).await?;
3838- log::trace!("week {}", Into::<Dt>::into(week).to_rfc3339());
3940 }
4041 log::info!("done with the weeks ig");
4142 Ok(())
···44454546 // TODO: handle missing/failed weeks
46474747- // wait for them to finish
4848+ // wait for the big backfill to finish
4849 while let Some(res) = workers.join_next().await {
4950 res??;
5051 }
5151-5252+ log::info!("finished fetching backfill in {:?}", t_step.elapsed());
5253 Ok(())
5354}
+83-11
src/bin/allegedly.rs
···11use allegedly::{
22- Db, Dt, ExportPage, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks,
33- poll_upstream, write_bulk as pages_to_pg,
22+ Db, Dt, ExportPage, FolderSource, HttpSource, PageBoundaryState, backfill, backfill_to_pg,
33+ bin_init, pages_to_pg, pages_to_weeks, poll_upstream,
44};
55use clap::{Parser, Subcommand};
66use std::path::PathBuf;
77+use tokio::sync::oneshot;
78use url::Url;
89910#[derive(Debug, Parser)]
···4546 /// Stop at the week ending before this date
4647 #[arg(long)]
4748 until: Option<Dt>,
4949+ /// After the weekly imports, poll upstream until we're caught up
5050+ #[arg(long, action)]
5151+ catch_up: bool,
4852 },
4953 /// Scrape a PLC server, collecting ops into weekly bundles
5054 ///
···7579 },
7680}
77817878-async fn pages_to_stdout(rx: flume::Receiver<ExportPage>) -> Result<(), flume::RecvError> {
8282+async fn pages_to_stdout(
8383+ rx: flume::Receiver<ExportPage>,
8484+ notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
8585+) -> Result<(), flume::RecvError> {
8686+ let mut last_at = None;
7987 while let Ok(page) = rx.recv_async().await {
8080- for op in page.ops {
8181- println!("{op}")
8888+ for op in &page.ops {
8989+ println!("{op}");
9090+ }
9191+ if notify_last_at.is_some()
9292+ && let Some(s) = PageBoundaryState::new(&page)
9393+ {
9494+ last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
8295 }
8396 }
9797+ if let Some(notify) = notify_last_at {
9898+ log::trace!("notifying last_at: {last_at:?}");
9999+ if notify.send(last_at).is_err() {
100100+ log::error!("receiver for last_at dropped, can't notify");
101101+ };
102102+ }
84103 Ok(())
85104}
86105106106+/// page forwarder who drops its channels on receipt of a small page
107107+///
108108+/// PLC will return up to 1000 ops on a page, and returns full pages until it
109109+/// has caught up, so this is a (hacky?) way to stop polling once we're up.
110110+fn full_pages(rx: flume::Receiver<ExportPage>) -> flume::Receiver<ExportPage> {
111111+ let (tx, fwd) = flume::bounded(0);
112112+ tokio::task::spawn(async move {
113113+ while let Ok(page) = rx.recv_async().await
114114+ && page.ops.len() > 900
115115+ {
116116+ tx.send_async(page).await.unwrap();
117117+ }
118118+ });
119119+ fwd
120120+}
121121+87122#[tokio::main]
88123async fn main() {
89124 bin_init("main");
···98133 to_postgres,
99134 postgres_reset,
100135 until,
136136+ catch_up,
101137 } => {
102102- let (tx, rx) = flume::bounded(32); // big pages
138138+ let (tx, rx) = flume::bounded(32); // these are big pages
103139 tokio::task::spawn(async move {
104140 if let Some(dir) = dir {
105141 log::info!("Reading weekly bundles from local folder {dir:?}");
···113149 .unwrap();
114150 }
115151 });
116116- if let Some(url) = to_postgres {
117117- let db = Db::new(url.as_str()).await.unwrap();
118118- pages_to_pg(db, rx, postgres_reset).await.unwrap();
152152+153153+ // postgres writer will notify us as soon as the very last op's time is known
154154+ // so we can start catching up while pg is restoring indexes and stuff
155155+ let (notify_last_at, rx_last) = if catch_up {
156156+ let (tx, rx) = oneshot::channel();
157157+ (Some(tx), Some(rx))
119158 } else {
120120- pages_to_stdout(rx).await.unwrap();
159159+ (None, None)
160160+ };
161161+162162+ let to_postgres_url_bulk = to_postgres.clone();
163163+ let bulk_out_write = tokio::task::spawn(async move {
164164+ if let Some(ref url) = to_postgres_url_bulk {
165165+ let db = Db::new(url.as_str()).await.unwrap();
166166+ backfill_to_pg(db, postgres_reset, rx, notify_last_at)
167167+ .await
168168+ .unwrap();
169169+ } else {
170170+ pages_to_stdout(rx, notify_last_at).await.unwrap();
171171+ }
172172+ });
173173+174174+ if let Some(rx_last) = rx_last {
175175+ let mut upstream = args.upstream;
176176+ upstream.set_path("/export");
177177+ // wait until the time for `after` is known
178178+ let last_at = rx_last.await.unwrap();
179179+ log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
180180+ let (tx, rx) = flume::bounded(256);
181181+ tokio::task::spawn(
182182+ async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
183183+ );
184184+ bulk_out_write.await.unwrap();
185185+ log::info!("writing catch-up pages");
186186+ let full_pages = full_pages(rx);
187187+ if let Some(url) = to_postgres {
188188+ let db = Db::new(url.as_str()).await.unwrap();
189189+ pages_to_pg(db, full_pages).await.unwrap();
190190+ } else {
191191+ pages_to_stdout(full_pages, None).await.unwrap();
192192+ }
121193 }
122194 }
123195 Commands::Bundle {
···139211 let start_at = after.or_else(|| Some(chrono::Utc::now()));
140212 let (tx, rx) = flume::bounded(1);
141213 tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
142142- pages_to_stdout(rx).await.unwrap();
214214+ pages_to_stdout(rx, None).await.unwrap();
143215 }
144216 }
145217}
+2-2
src/lib.rs
···8899pub use backfill::backfill;
1010pub use client::CLIENT;
1111-pub use plc_pg::{Db, write_bulk};
1212-pub use poll::{get_page, poll_upstream};
1111+pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
1212+pub use poll::{PageBoundaryState, get_page, poll_upstream};
1313pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
14141515pub type Dt = chrono::DateTime<chrono::Utc>;
+72-6
src/plc_pg.rs
···11-use crate::{ExportPage, Op};
11+use crate::{Dt, ExportPage, Op, PageBoundaryState};
22use std::pin::pin;
33use std::time::Instant;
44+use tokio::sync::oneshot;
45use tokio_postgres::{
56 Client, Error as PgError, NoTls,
67 binary_copy::BinaryCopyInWriter,
···7172 }
7273}
73747575+pub async fn pages_to_pg(db: Db, pages: flume::Receiver<ExportPage>) -> Result<(), PgError> {
7676+ let mut client = db.connect().await?;
7777+7878+ let ops_stmt = client
7979+ .prepare(
8080+ r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt")
8181+ VALUES ($1, $2, $3, $4, $5)"#,
8282+ )
8383+ .await?;
8484+ let did_stmt = client
8585+ .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#)
8686+ .await?;
8787+8888+ let t0 = Instant::now();
8989+ let mut ops_inserted = 0;
9090+ let mut dids_inserted = 0;
9191+9292+ while let Ok(page) = pages.recv_async().await {
9393+ log::trace!("writing page with {} ops", page.ops.len());
9494+ let tx = client.transaction().await?;
9595+ for s in page.ops {
9696+ let Ok(op) = serde_json::from_str::<Op>(&s) else {
9797+ log::warn!("ignoring unparseable op {s:?}");
9898+ continue;
9999+ };
100100+ ops_inserted += tx
101101+ .execute(
102102+ &ops_stmt,
103103+ &[
104104+ &op.did,
105105+ &Json(op.operation),
106106+ &op.cid,
107107+ &op.nullified,
108108+ &op.created_at,
109109+ ],
110110+ )
111111+ .await?;
112112+ dids_inserted += tx.execute(&did_stmt, &[&op.did]).await?;
113113+ }
114114+ tx.commit().await?;
115115+ }
116116+117117+ log::info!(
118118+ "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
119119+ t0.elapsed()
120120+ );
121121+ Ok(())
122122+}
123123+74124/// Dump rows into an empty operations table quickly
75125///
76126/// you must run this after initializing the db with kysely migrations from the
···84134/// panics: if the operations or dids tables are not empty, unless reset is true
85135///
86136/// recommended postgres setting: `max_wal_size=4GB` (or more)
8787-pub async fn write_bulk(
137137+pub async fn backfill_to_pg(
88138 db: Db,
139139+ reset: bool,
89140 pages: flume::Receiver<ExportPage>,
9090- reset: bool,
141141+ notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
91142) -> Result<(), PgError> {
92143 let mut client = db.connect().await?;
9314494145 let t0 = Instant::now();
95146 let tx = client.transaction().await?;
9696- tx.execute("SET LOCAL synchronous_commit = off", &[]).await?;
147147+ tx.execute("SET LOCAL synchronous_commit = off", &[])
148148+ .await?;
9714998150 let t_step = Instant::now();
99151 for table in ["operations", "dids"] {
···142194 )
143195 .await?;
144196 let mut writer = pin!(BinaryCopyInWriter::new(sync, types));
197197+ let mut last_at = None;
145198 while let Ok(page) = pages.recv_async().await {
146146- for s in page.ops {
147147- let Ok(op) = serde_json::from_str::<Op>(&s) else {
199199+ for s in &page.ops {
200200+ let Ok(op) = serde_json::from_str::<Op>(s) else {
148201 log::warn!("ignoring unparseable op: {s:?}");
149202 continue;
150203 };
···159212 ])
160213 .await?;
161214 }
215215+ if notify_last_at.is_some()
216216+ && let Some(s) = PageBoundaryState::new(&page)
217217+ {
218218+ last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
219219+ }
162220 }
221221+222222+ if let Some(notify) = notify_last_at {
223223+ log::trace!("notifying last_at: {last_at:?}");
224224+ if notify.send(last_at).is_err() {
225225+ log::error!("receiver for last_at dropped, can't notify");
226226+ };
227227+ }
228228+163229 let n = writer.as_mut().finish().await?;
164230 log::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed());
165231
+3-3
src/poll.rs
···45454646/// PLC
4747#[derive(Debug, PartialEq)]
4848-struct PageBoundaryState {
4949- last_at: Dt,
4848+pub struct PageBoundaryState {
4949+ pub last_at: Dt,
5050 keys_at: Vec<OpKey>, // expected to ~always be length one
5151}
5252···6666// should unrefactor to make Op own its data again, parse (and deal with errors)
6767// upfront, and probably greatly simplify everything downstream. simple.
6868impl PageBoundaryState {
6969- fn new(page: &ExportPage) -> Option<Self> {
6969+ pub fn new(page: &ExportPage) -> Option<Self> {
7070 let mut skips = 0;
71717272 // grab the very last op