···22 Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, backfill, backfill_to_fjall,
33 backfill_to_pg,
44 bin::{GlobalArgs, bin_init},
55- fjall_to_pages, full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream,
55+ full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream,
66};
77use clap::Parser;
88use reqwest::Url;
···139139 log::trace!("opening source fjall db at {fjall_path:?}...");
140140 let db = FjallDb::open(&fjall_path)?;
141141 log::trace!("opened source fjall db");
142142- tasks.spawn(fjall_to_pages(db, bulk_tx, until));
142142+ tasks.spawn(backfill(db, bulk_tx, source_workers.unwrap_or(4), until));
143143 } else if let Some(dir) = dir {
144144 if http != DEFAULT_HTTP.parse()? {
145145 anyhow::bail!(
+1-1
src/lib.rs
···1919pub use cached_value::{CachedValue, Fetcher};
2020pub use client::{CLIENT, UA};
2121pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
2222-pub use plc_fjall::{FjallDb, backfill_to_fjall, fjall_to_pages, pages_to_fjall};
2222+pub use plc_fjall::{FjallDb, backfill_to_fjall, pages_to_fjall};
2323pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
2424pub use poll::{PageBoundaryState, get_page, poll_upstream};
2525pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
+2-2
src/mirror/fjall.rs
···275275 let db = fjall.clone();
276276277277 let ops = tokio::task::spawn_blocking(move || {
278278- let iter = db.export_ops(after, limit)?;
279279- iter.collect::<anyhow::Result<Vec<_>>>()
278278+ let iter = db.export_ops(after.unwrap_or(Dt::UNIX_EPOCH)..)?;
279279+ iter.take(limit).collect::<anyhow::Result<Vec<_>>>()
280280 })
281281 .await
282282 .map_err(|e| Error::from_string(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?
+67-70
src/plc_fjall.rs
···11+use crate::{BundleSource, Week};
12use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState};
23use anyhow::Context;
34use data_encoding::{BASE32_NOPAD, BASE64URL_NOPAD};
···56 Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode,
67 config::BlockSizePolicy,
78};
99+use futures::Future;
810use serde::{Deserialize, Serialize};
911use std::collections::BTreeMap;
1012use std::fmt;
1113use std::path::Path;
1214use std::sync::Arc;
1315use std::time::Instant;
1616+use tokio::io::{AsyncRead, AsyncWriteExt};
1417use tokio::sync::{mpsc, oneshot};
15181619const SEP: u8 = 0;
···1022102510231026 pub fn export_ops(
10241027 &self,
10251025- after: Option<Dt>,
10261026- limit: usize,
10281028+ range: impl std::ops::RangeBounds<Dt>,
10271029 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> {
10281028- let iter = if let Some(after) = after {
10291029- let start = (after.timestamp_micros() as u64).to_be_bytes();
10301030- self.inner.ops.range(start..)
10311031- } else {
10321032- self.inner.ops.iter()
10301030+ use std::ops::Bound;
10311031+ let map_bound = |b: Bound<&Dt>| -> Bound<[u8; 8]> {
10321032+ match b {
10331033+ Bound::Included(dt) => Bound::Included(dt.timestamp_micros().to_be_bytes()),
10341034+ Bound::Excluded(dt) => Bound::Excluded(dt.timestamp_micros().to_be_bytes()),
10351035+ Bound::Unbounded => Bound::Unbounded,
10361036+ }
10331037 };
10381038+ let range = (map_bound(range.start_bound()), map_bound(range.end_bound()));
1034103910351035- Ok(iter.take(limit).map(|item| {
10401040+ let iter = self.inner.ops.range(range);
10411041+10421042+ Ok(iter.map(|item| {
10361043 let (key, value) = item
10371044 .into_inner()
10381045 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
···10601067 })
10611068 }))
10621069 }
10701070+10711071+ pub fn export_ops_week(
10721072+ &self,
10731073+ week: Week,
10741074+ ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> {
10751075+ let after: Dt = week.into();
10761076+ let before: Dt = week.next().into();
10771077+10781078+ self.export_ops(after..before)
10791079+ }
10801080+}
10811081+10821082+impl BundleSource for FjallDb {
10831083+ fn reader_for(
10841084+ &self,
10851085+ week: Week,
10861086+ ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send {
10871087+ let db = self.clone();
10881088+10891089+ async move {
10901090+ let (mut tx, rx) = tokio::io::duplex(1024 * 1024 * 64);
10911091+10921092+ tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
10931093+ let iter = db.export_ops_week(week)?;
10941094+10951095+ let rt = tokio::runtime::Handle::current();
10961096+10971097+ for op_res in iter {
10981098+ let op = op_res?;
10991099+ let operation_str = serde_json::to_string(&op.operation)?;
11001100+ let common_op = crate::Op {
11011101+ did: op.did,
11021102+ cid: op.cid,
11031103+ created_at: op.created_at,
11041104+ nullified: op.nullified,
11051105+ operation: serde_json::value::RawValue::from_string(operation_str)?,
11061106+ };
11071107+11081108+ let mut json_bytes = serde_json::to_vec(&common_op)?;
11091109+ json_bytes.push(b'\n');
11101110+11111111+ if rt.block_on(tx.write_all(&json_bytes)).is_err() {
11121112+ break;
11131113+ }
11141114+ }
11151115+11161116+ Ok(())
11171117+ });
11181118+11191119+ Ok(rx)
11201120+ }
11211121+ }
10631122}
1064112310651124pub async fn backfill_to_fjall(
···11491208 t0.elapsed()
11501209 );
11511210 Ok("pages_to_fjall")
11521152-}
11531153-11541154-pub async fn fjall_to_pages(
11551155- db: FjallDb,
11561156- dest: mpsc::Sender<ExportPage>,
11571157- until: Option<Dt>,
11581158-) -> anyhow::Result<&'static str> {
11591159- log::info!("starting fjall_to_pages backfill source...");
11601160-11611161- let t0 = Instant::now();
11621162-11631163- let dest_clone = dest.clone();
11641164- let ops_sent = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
11651165- let iter = db.export_ops(None, usize::MAX)?;
11661166- let mut current_page = Vec::with_capacity(1000);
11671167- let mut count = 0;
11681168-11691169- for op_res in iter {
11701170- let op = op_res?;
11711171-11721172- if let Some(u) = until {
11731173- if op.created_at >= u {
11741174- break;
11751175- }
11761176- }
11771177-11781178- let operation_str = serde_json::to_string(&op.operation)?;
11791179- let common_op = crate::Op {
11801180- did: op.did,
11811181- cid: op.cid,
11821182- created_at: op.created_at,
11831183- nullified: op.nullified,
11841184- operation: serde_json::value::RawValue::from_string(operation_str)?,
11851185- };
11861186-11871187- current_page.push(common_op);
11881188- count += 1;
11891189-11901190- if current_page.len() >= 1000 {
11911191- let page = ExportPage {
11921192- ops: std::mem::take(&mut current_page),
11931193- };
11941194- if dest_clone.blocking_send(page).is_err() {
11951195- break;
11961196- }
11971197- }
11981198- }
11991199-12001200- if !current_page.is_empty() {
12011201- let page = ExportPage { ops: current_page };
12021202- let _ = dest_clone.blocking_send(page);
12031203- }
12041204-12051205- Ok(count)
12061206- })
12071207- .await??;
12081208-12091209- log::info!(
12101210- "finished sending {ops_sent} ops from fjall in {:?}",
12111211- t0.elapsed()
12121212- );
12131213- Ok("fjall_to_pages")
12141211}
1215121212161213#[cfg(test)]
+7-5
src/weekly.rs
···101101 let file = File::open(path)
102102 .await
103103 .inspect_err(|e| log::error!("failed to open file: {e}"))?;
104104- Ok(file)
104104+ let decoder = GzipDecoder::new(BufReader::new(file));
105105+ Ok(decoder)
105106 }
106107}
107108···112113 use futures::TryStreamExt;
113114 let HttpSource(base) = self;
114115 let url = base.join(&format!("{}.jsonl.gz", week.0))?;
115115- Ok(CLIENT
116116+ let stream = CLIENT
116117 .get(url)
117118 .send()
118119 .await?
···120121 .bytes_stream()
121122 .map_err(futures::io::Error::other)
122123 .into_async_read()
123123- .compat())
124124+ .compat();
125125+ let decoder = GzipDecoder::new(BufReader::new(stream));
126126+ Ok(decoder)
124127 }
125128}
126129···213216 }
214217 };
215218216216- let decoder = GzipDecoder::new(BufReader::new(reader));
217217- let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000));
219219+ let mut chunks = pin!(LinesStream::new(BufReader::new(reader).lines()).try_chunks(10000));
218220 let mut success = true;
219221220222 while let Some(chunk) = match chunks.as_mut().try_next().await {