···2 Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, backfill, backfill_to_fjall,
3 backfill_to_pg,
4 bin::{GlobalArgs, bin_init},
5- full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream,
6};
7use clap::Parser;
8use reqwest::Url;
···23 /// Local folder to fetch bundles from (overrides `http`)
24 #[arg(long)]
25 dir: Option<PathBuf>,
00026 /// Don't do weekly bulk-loading at all.
27 ///
28 /// overrides `http` and `dir`, makes catch_up redundant
···72 Args {
73 http,
74 dir,
075 no_bulk,
76 source_workers,
77 to_postgres,
···131 // fun mode
132133 // set up bulk sources
134- if let Some(dir) = dir {
00000135 if http != DEFAULT_HTTP.parse()? {
136 anyhow::bail!(
137 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
···2 Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, backfill, backfill_to_fjall,
3 backfill_to_pg,
4 bin::{GlobalArgs, bin_init},
5+ fjall_to_pages, full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream,
6};
7use clap::Parser;
8use reqwest::Url;
···23 /// Local folder to fetch bundles from (overrides `http`)
24 #[arg(long)]
25 dir: Option<PathBuf>,
26+ /// Local fjall database to fetch raw ops from (overrides `http` and `dir`)
27+ #[arg(long, conflicts_with_all = ["dir"])]
28+ from_fjall: Option<PathBuf>,
29 /// Don't do weekly bulk-loading at all.
30 ///
31 /// overrides `http` and `dir`, makes catch_up redundant
···75 Args {
76 http,
77 dir,
78+ from_fjall,
79 no_bulk,
80 source_workers,
81 to_postgres,
···135 // fun mode
136137 // set up bulk sources
138+ if let Some(fjall_path) = from_fjall {
139+ log::trace!("opening source fjall db at {fjall_path:?}...");
140+ let db = FjallDb::open(&fjall_path)?;
141+ log::trace!("opened source fjall db");
142+ tasks.spawn(fjall_to_pages(db, bulk_tx, until));
143+ } else if let Some(dir) = dir {
144 if http != DEFAULT_HTTP.parse()? {
145 anyhow::bail!(
146 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})"
+1-1
src/lib.rs
···19pub use cached_value::{CachedValue, Fetcher};
20pub use client::{CLIENT, UA};
21pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
22-pub use plc_fjall::{FjallDb, backfill_to_fjall, pages_to_fjall};
23pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
24pub use poll::{PageBoundaryState, get_page, poll_upstream};
25pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
···19pub use cached_value::{CachedValue, Fetcher};
20pub use client::{CLIENT, UA};
21pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
22+pub use plc_fjall::{FjallDb, backfill_to_fjall, fjall_to_pages, pages_to_fjall};
23pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
24pub use poll::{PageBoundaryState, get_page, poll_upstream};
25pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
···1use crate::{Dt, ExportPage, Op as CommonOp, PageBoundaryState};
2use anyhow::Context;
3use data_encoding::{BASE32_NOPAD, BASE64URL_NOPAD};
4+use fjall::{
5+ Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode,
6+ config::BlockSizePolicy,
7+};
8use serde::{Deserialize, Serialize};
9use std::collections::BTreeMap;
10use std::fmt;
···697698impl FjallDb {
699 pub fn open(path: impl AsRef<Path>) -> fjall::Result<Self> {
700+ const fn kb(kb: u32) -> u32 {
701+ kb * 1_024
702+ }
703+ const fn mb(mb: u32) -> u64 {
704+ kb(mb) as u64 * 1_024
705+ }
706+707 let db = Database::builder(path)
708+ // 32mb is too low we can afford more
709+ // this should be configurable though!
710+ .cache_size(mb(256))
711 .open()?;
712 let opts = KeyspaceCreateOptions::default;
713 let ops = db.keyspace("ops", || {
714+ opts()
715+ // this is mainly for when backfilling
716+ .max_memtable_size(mb(192))
717+ // this wont compress terribly well since its a bunch of CIDs and signatures and did:keys
718+ // and we want to keep reads fast since we'll be reading a lot...
719+ .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32)]))
720+ // this has no downsides, since the only point reads that might miss we do is on by_did
721+ .expect_point_read_hits(true)
722 })?;
723 let by_did = db.keyspace("by_did", || {
724+ opts()
725+ .max_memtable_size(mb(64))
726+ // this isn't gonna compress well anyway, since its just keys (did + timestamp + cid)
727+ // and dids dont have many operations in the first place, so we can use small blocks
728+ .data_block_size_policy(BlockSizePolicy::all(kb(2)))
729 })?;
730 Ok(Self {
731 inner: Arc::new(FjallInner { db, ops, by_did }),
···969 t0.elapsed()
970 );
971 Ok("pages_to_fjall")
972+}
973+974+pub async fn fjall_to_pages(
975+ db: FjallDb,
976+ dest: mpsc::Sender<ExportPage>,
977+ until: Option<Dt>,
978+) -> anyhow::Result<&'static str> {
979+ log::info!("starting fjall_to_pages backfill source...");
980+981+ let t0 = Instant::now();
982+983+ let dest_clone = dest.clone();
984+ let ops_sent = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
985+ let iter = db.export_ops(None, usize::MAX)?;
986+ let mut current_page = Vec::with_capacity(1000);
987+ let mut count = 0;
988+989+ for op_res in iter {
990+ let op = op_res?;
991+992+ if let Some(u) = until {
993+ if op.created_at >= u {
994+ break;
995+ }
996+ }
997+998+ let operation_str = serde_json::to_string(&op.operation)?;
999+ let common_op = crate::Op {
1000+ did: op.did,
1001+ cid: op.cid,
1002+ created_at: op.created_at,
1003+ nullified: op.nullified,
1004+ operation: serde_json::value::RawValue::from_string(operation_str)?,
1005+ };
1006+1007+ current_page.push(common_op);
1008+ count += 1;
1009+1010+ if current_page.len() >= 1000 {
1011+ let page = ExportPage {
1012+ ops: std::mem::take(&mut current_page),
1013+ };
1014+ if dest_clone.blocking_send(page).is_err() {
1015+ break;
1016+ }
1017+ }
1018+ }
1019+1020+ if !current_page.is_empty() {
1021+ let page = ExportPage { ops: current_page };
1022+ let _ = dest_clone.blocking_send(page);
1023+ }
1024+1025+ Ok(count)
1026+ })
1027+ .await??;
1028+1029+ log::info!(
1030+ "finished sending {ops_sent} ops from fjall in {:?}",
1031+ t0.elapsed()
1032+ );
1033+ Ok("fjall_to_pages")
1034}
10351036#[cfg(test)]