···4444 /// Cert for postgres (if needed)
4545 #[arg(long)]
4646 postgres_cert: Option<PathBuf>,
4747- /// Delete all operations from the postgres db before starting
4747+ /// Delete all operations from the db before starting
4848 ///
4949- /// only used if `--to-postgres` is present
4949+ /// only used if `--to-postgres` or `--to-fjall` is present
5050 #[arg(long, action)]
5151- postgres_reset: bool,
5151+ reset: bool,
5252 /// Bulk load into a local fjall embedded database
5353 ///
5454 /// Pass a directory path for the fjall database
5555- #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert", "postgres_reset"])]
5555+ #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert"])]
5656 to_fjall: Option<PathBuf>,
5757- /// Delete all operations from the fjall db before starting
5858- ///
5959- /// only used if `--to-fjall` is present
6060- #[arg(long, action, requires = "to_fjall")]
6161- fjall_reset: bool,
6257 /// Stop at the week ending before this date
6358 #[arg(long)]
6459 until: Option<Dt>,
···8075 source_workers,
8176 to_postgres,
8277 postgres_cert,
8383- postgres_reset,
7878+ reset,
8479 to_fjall,
8585- fjall_reset,
8680 until,
8781 catch_up,
8882 }: Args,
···180174181175 tasks.spawn(backfill_to_fjall(
182176 db.clone(),
183183- fjall_reset,
177177+ reset,
184178 bulk_out,
185179 found_last_tx,
186180 ));
···192186 let db = Db::new(pg_url.as_str(), postgres_cert).await?;
193187 log::trace!("connected to postgres");
194188195195- tasks.spawn(backfill_to_pg(
196196- db.clone(),
197197- postgres_reset,
198198- bulk_out,
199199- found_last_tx,
200200- ));
189189+ tasks.spawn(backfill_to_pg(db.clone(), reset, bulk_out, found_last_tx));
201190 if catch_up {
202191 tasks.spawn(pages_to_pg(db, full_out));
203192 }
+37-1
src/lib.rs
···2020pub use cached_value::{CachedValue, Fetcher};
2121pub use client::{CLIENT, UA};
2222pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
2323-pub use plc_fjall::{FjallDb, backfill_to_fjall, pages_to_fjall};
2323+pub use plc_fjall::{FjallDb, audit as audit_fjall, backfill_to_fjall, pages_to_fjall, drop_invalid_ops as drop_invalid_ops_fjall};
2424pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
2525pub use poll::{PageBoundaryState, get_page, poll_upstream};
2626pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
···136136 };
137137 }
138138 Ok("pages_to_stdout")
139139+}
140140+141141+pub async fn invalid_ops_to_stdout(
142142+ mut rx: mpsc::Receiver<(String, Dt, String)>,
143143+) -> anyhow::Result<&'static str> {
144144+ while let Some((did, at, cid)) = rx.recv().await {
145145+ let val = serde_json::json!({
146146+ "did": did,
147147+ "at": at,
148148+ "cid": cid,
149149+ });
150150+ println!("{val}");
151151+ }
152152+ Ok("invalid_ops_to_stdout")
153153+}
154154+155155+pub async fn file_to_invalid_ops(
156156+ path: impl AsRef<std::path::Path>,
157157+ tx: mpsc::Sender<(String, Dt, String)>,
158158+) -> anyhow::Result<&'static str> {
159159+ let file = tokio::fs::File::open(path).await?;
160160+161161+ use tokio::io::AsyncBufReadExt;
162162+ let mut lines = tokio::io::BufReader::new(file).lines();
163163+ while let Some(line) = lines.next_line().await? {
164164+ #[derive(serde::Deserialize)]
165165+ struct Op {
166166+ did: String,
167167+ at: Dt,
168168+ cid: String,
169169+ }
170170+ let op: Op = serde_json::from_str(&line)?;
171171+ tx.send((op.did, op.at, op.cid)).await?;
172172+ }
173173+174174+ Ok("invalid_ops_to_stdout")
139175}
140176141177pub fn logo(name: &str) -> String {
+270-67
src/plc_fjall.rs
···11use crate::{
22 BundleSource, Dt, ExportPage, Op as CommonOp, PageBoundaryState, Week,
33- crypto::{DidKey, Signature, assure_valid_sig},
33+ crypto::{AssuranceResults, DidKey, Signature, assure_valid_sig},
44};
55use anyhow::Context;
66use data_encoding::BASE32_NOPAD;
···797797 }
798798}
799799800800+fn verify_op_sig(op: &StoredOp, prev: Option<&StoredOp>) -> anyhow::Result<AssuranceResults> {
801801+ let keys: Vec<&DidKey> = match &op.prev {
802802+ None => op.get_keys(),
803803+ Some(_) => match prev {
804804+ None => anyhow::bail!("prev cid exists but the op for that cid is missing"),
805805+ Some(p) => p.get_keys(),
806806+ },
807807+ };
808808+809809+ if keys.is_empty() {
810810+ anyhow::bail!("no keys found for genesis op or prev op");
811811+ }
812812+813813+ let data = {
814814+ let serde_json::Value::Object(mut data) = op.to_json_value() else {
815815+ unreachable!("we know op is valid, because it comes from StoredOp")
816816+ };
817817+ data.remove("sig");
818818+ serde_json::Value::Object(data)
819819+ };
820820+821821+ let results = assure_valid_sig(keys, &op.sig, &data)
822822+ .expect("that our op is an object and we removed sig field");
823823+ Ok(results)
824824+}
825825+800826// this is basically Op, but without the cid and created_at fields
801827// since we have them in the key already
802828#[derive(Debug, Deserialize, Serialize)]
···941967 .transpose()?
942968 .flatten();
943969944944- let keys: Vec<&DidKey> = match &operation.prev {
945945- None => operation.get_keys(),
946946- Some(_) => match &prev_op {
947947- None => {
948948- log::error!(
949949- "op {} {} has prev but the prev op is not found",
950950- op.did,
951951- op.cid
952952- );
970970+ let prev_stored = prev_op.as_ref().map(|(_, _, p)| &p.operation);
971971+972972+ match verify_op_sig(&operation, prev_stored) {
973973+ Ok(results) => {
974974+ if !results.valid {
975975+ let msg = results
976976+ .errors
977977+ .iter()
978978+ .map(|e| e.to_string())
979979+ .collect::<Vec<_>>()
980980+ .join("\n");
981981+ log::warn!("invalid op {} {}:\n{msg}", op.did, op.cid);
953982 return Ok(0);
954983 }
955955- Some((_, _, prev)) => prev.operation.get_keys(),
956956- },
957957- };
958958-959959- if keys.is_empty() {
960960- log::warn!("no keys for op {} {}", op.did, op.cid);
961961- return Ok(0);
962962- }
963963-964964- let data = {
965965- let serde_json::Value::Object(mut data) = operation.to_json_value() else {
966966- unreachable!("we checked if operation is valid already")
967967- };
968968- data.remove("sig");
969969- serde_json::Value::Object(data)
970970- };
971971- let results = assure_valid_sig(keys, &operation.sig, &data)?;
972972- if !results.valid {
973973- for err in results.errors {
974974- log::warn!("invalid signature for op {} {}: {err}", op.did, op.cid);
984984+ }
985985+ Err(e) => {
986986+ log::warn!("invalid op {} {}: {e}", op.did, op.cid);
987987+ return Ok(0);
975988 }
976976- return Ok(0);
977989 }
990990+ log::debug!("verified op {} {}", op.did, op.cid);
978991 }
979992980993 let db_op = DbOp {
···10041017 Ok(1)
10051018 }
1006101910201020+ fn decode_by_did_entry(
10211021+ &self,
10221022+ by_did_key: &[u8],
10231023+ prefix_len: usize,
10241024+ ) -> anyhow::Result<(Dt, PlcCid, DbOp)> {
10251025+ let key_rest = by_did_key
10261026+ .get(prefix_len..)
10271027+ .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key:?}"))?;
10281028+10291029+ let ts_bytes = key_rest
10301030+ .get(..8)
10311031+ .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?;
10321032+ let cid_suffix = key_rest
10331033+ .get(9..)
10341034+ .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?;
10351035+10361036+ let op_key = [ts_bytes, &[SEP][..], cid_suffix].concat();
10371037+ let ts = decode_timestamp(ts_bytes)?;
10381038+10391039+ let value = self
10401040+ .inner
10411041+ .ops
10421042+ .get(&op_key)?
10431043+ .ok_or_else(|| anyhow::anyhow!("op not found: {op_key:?}"))?;
10441044+10451045+ let op: DbOp = rmp_serde::from_slice(&value)?;
10461046+ let mut full_cid = op.cid_prefix.clone();
10471047+ full_cid.extend_from_slice(cid_suffix);
10481048+10491049+ Ok((ts, PlcCid(full_cid), op))
10501050+ }
10511051+10071052 fn _ops_for_did(
10081053 &self,
10091054 did: &str,
···10151060 let (by_did_key, _) = guard
10161061 .into_inner()
10171062 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
10181018-10191019- let key_rest = by_did_key
10201020- .get(prefix.len()..)
10211021- .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key:?}"))?;
10221022-10231023- let ts_bytes = key_rest
10241024- .get(..8)
10251025- .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?;
10261026- let cid_suffix = key_rest
10271027- .get(9..)
10281028- .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?;
10291029-10301030- let op_key = [ts_bytes, &[SEP][..], cid_suffix].concat();
10311031- let ts = decode_timestamp(ts_bytes)?;
10321032-10331033- let value = self
10341034- .inner
10351035- .ops
10361036- .get(&op_key)?
10371037- .ok_or_else(|| anyhow::anyhow!("op not found: {op_key:?}"))?;
10381038-10391039- let op: DbOp = rmp_serde::from_slice(&value)?;
10401040- let mut full_cid_bytes = op.cid_prefix.clone();
10411041- full_cid_bytes.extend_from_slice(cid_suffix);
10421042-10431043- let cid = PlcCid(full_cid_bytes);
10441044-10451045- Ok((ts, cid, op))
10631063+ self.decode_by_did_entry(&by_did_key, prefix.len())
10461064 }))
10471065 }
10481066···11111129 }))
11121130 }
1113113111141114- pub fn export_ops_week(
11321132+ pub fn drop_op(&self, did_str: &str, created_at: &Dt, cid: &str) -> anyhow::Result<()> {
11331133+ let cid = decode_cid_str(cid)?;
11341134+ let cid_suffix = &cid[30..];
11351135+11361136+ let op_key = op_key(created_at, cid_suffix);
11371137+ let by_did_key = by_did_key(did_str, created_at, cid_suffix)?;
11381138+11391139+ let mut batch = self.inner.db.batch();
11401140+ batch.remove(&self.inner.ops, op_key);
11411141+ batch.remove(&self.inner.by_did, by_did_key);
11421142+ batch.commit()?;
11431143+11441144+ Ok(())
11451145+ }
11461146+11471147+ pub fn audit(
11151148 &self,
11161116- week: Week,
11171117- ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> {
11181118- let after: Dt = week.into();
11191119- let before: Dt = week.next().into();
11491149+ invalid_ops_tx: mpsc::Sender<(String, Dt, String)>,
11501150+ ) -> anyhow::Result<(usize, usize)> {
11511151+ use std::sync::mpsc;
11521152+11531153+ let ops = self.inner.by_did.len()?;
11541154+11551155+ let workers = std::thread::available_parallelism()
11561156+ .map(|n| n.get())
11571157+ .unwrap_or(4);
11581158+11591159+ type Batch = (Vec<u8>, Vec<(Dt, PlcCid, DbOp)>);
11601160+ let (result_tx, result_rx) = mpsc::sync_channel::<anyhow::Result<(usize, usize)>>(workers);
11611161+11621162+ let channels: Vec<_> = (0..workers)
11631163+ .map(|_| mpsc::sync_channel::<Batch>(512))
11641164+ .collect();
11651165+ let senders: Vec<_> = channels.iter().map(|(tx, _)| tx.clone()).collect();
11661166+11671167+ std::thread::scope(|s| {
11681168+ for (_, rx) in channels {
11691169+ let result_tx = result_tx.clone();
11701170+ let invalid_ops_tx = invalid_ops_tx.clone();
11711171+ s.spawn(move || {
11721172+ let mut checked: usize = 0;
11731173+ let mut failed: usize = 0;
11741174+ while let Ok((did_prefix, ops)) = rx.recv() {
11751175+ let did = decode_did(&did_prefix[..did_prefix.len() - 1]);
11761176+ for (ts, cid, op) in &ops {
11771177+ checked += 1;
11781178+ let prev_op = op.operation.prev.as_ref().and_then(|expected| {
11791179+ ops.iter().find(|(_, c, _)| c == expected)
11801180+ });
11811181+ let prev_cid_ok = op.operation.prev.is_none() || prev_op.is_some();
11821182+ if !prev_cid_ok {
11831183+ log::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?");
11841184+ failed += 1;
11851185+ let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string()));
11861186+ continue;
11871187+ }
11881188+ let prev_stored = prev_op.map(|(_, _, p)| &p.operation);
11891189+ match verify_op_sig(&op.operation, prev_stored) {
11901190+ Ok(results) => {
11911191+ if !results.valid {
11921192+ let msg = results
11931193+ .errors
11941194+ .iter()
11951195+ .map(|e| e.to_string())
11961196+ .collect::<Vec<_>>()
11971197+ .join("\n ");
11981198+ log::warn!("audit: invalid op {} {}:\n {msg}", did, cid);
11991199+ failed += 1;
12001200+ let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string()));
12011201+ }
12021202+ }
12031203+ Err(e) => {
12041204+ log::warn!("audit: invalid op {} {}: {e}", did, cid);
12051205+ failed += 1;
12061206+ let _ = invalid_ops_tx.blocking_send((did.clone(), ts.clone(), cid.to_string()));
12071207+ }
12081208+ }
12091209+ }
12101210+ }
12111211+ let _ = result_tx.send(Ok((checked, failed)));
12121212+ });
12131213+ }
12141214+ drop(result_tx);
12151215+12161216+ // todo: probably dont use a macro...
12171217+ macro_rules! spawn_scan_thread {
12181218+ ($iter_method:ident, $start_idx:expr, $reverse:expr, $limit:expr) => {{
12191219+ let senders = senders.clone();
12201220+ let mut iter = self.inner.by_did.iter();
12211221+12221222+ s.spawn(move || -> anyhow::Result<()> {
12231223+ let mut current_prefix: Option<[u8; 16]> = None;
12241224+ let mut did_ops: Vec<(Dt, PlcCid, DbOp)> = Vec::new();
12251225+ let mut idx = $start_idx;
12261226+ let mut processed_ops: usize = 0;
12271227+12281228+ while let Some(guard) = iter.$iter_method() {
12291229+ let (by_did_key, _) = guard
12301230+ .into_inner()
12311231+ .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?;
12321232+12331233+ let mut prefix_array = [0u8; 16];
12341234+ prefix_array.copy_from_slice(by_did_key.get(..16).ok_or_else(
12351235+ || anyhow::anyhow!("by_did key too short: {by_did_key:?}"),
12361236+ )?);
12371237+12381238+ let op = self.decode_by_did_entry(&by_did_key, 16)?;
12391239+12401240+ if current_prefix.map_or(true, |cp| cp != prefix_array) {
12411241+ // new did, push the ops
12421242+ if let Some(prefix) = current_prefix.take() {
12431243+ if $reverse {
12441244+ did_ops.reverse();
12451245+ }
12461246+ senders[idx % workers]
12471247+ .send((prefix.to_vec(), std::mem::take(&mut did_ops)))
12481248+ .ok();
12491249+ idx += 1;
12501250+12511251+ if processed_ops >= $limit {
12521252+ break;
12531253+ }
12541254+ }
12551255+ current_prefix = Some(prefix_array);
12561256+ }
12571257+12581258+ did_ops.push(op);
12591259+ processed_ops += 1;
12601260+ }
12611261+12621262+ if let Some(prefix) = current_prefix {
12631263+ if $reverse {
12641264+ did_ops.reverse();
12651265+ }
12661266+ senders[idx % workers].send((prefix.to_vec(), did_ops)).ok();
12671267+ }
12681268+12691269+ Ok(())
12701270+ })
12711271+ }};
12721272+ }
12731273+12741274+ // we can start two threads, one for forward iteration and one for reverse iteration
12751275+ // this way we have two scans in parallel which should be faster!
12761276+ let f_handle = spawn_scan_thread!(next, 0, false, ops / 2);
12771277+ let b_handle = spawn_scan_thread!(next_back, workers / 2, true, ops - (ops / 2));
12781278+12791279+ f_handle.join().unwrap()?;
12801280+ b_handle.join().unwrap()?;
12811281+12821282+ drop(senders);
12831283+12841284+ let mut total_checked: usize = 0;
12851285+ let mut total_failed: usize = 0;
12861286+ for res in result_rx {
12871287+ let (c, f) = res?;
12881288+ total_checked += c;
12891289+ total_failed += f;
12901290+ }
1120129111211121- self.export_ops(after..before)
12921292+ Ok((total_checked, total_failed))
12931293+ })
11221294 }
11231295}
11241296···11301302 let db = self.clone();
1131130311321304 async move {
11331133- let (mut tx, rx) = tokio::io::duplex(1024 * 1024 * 64);
13051305+ let (mut tx, rx) = tokio::io::duplex(1024 * 1024 * 16);
1134130611351307 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
11361136- let iter = db.export_ops_week(week)?;
13081308+ let after: Dt = week.into();
13091309+ let before: Dt = week.next().into();
13101310+13111311+ let iter = db.export_ops(after..before)?;
1137131211381313 let rt = tokio::runtime::Handle::current();
11391314···1184135911851360 loop {
11861361 let pages_finished = pages.is_closed();
13621362+ // we can stop if we have no more pages and all the insert tasks are finished
11871363 if pages_finished && insert_tasks.is_empty() {
11881364 break;
11891365 }
···12661442 t0.elapsed()
12671443 );
12681444 Ok("pages_to_fjall")
14451445+}
14461446+14471447+pub async fn audit(
14481448+ db: FjallDb,
14491449+ invalid_ops_tx: mpsc::Sender<(String, Dt, String)>,
14501450+) -> anyhow::Result<&'static str> {
14511451+ log::info!("starting fjall audit...");
14521452+ let t0 = std::time::Instant::now();
14531453+ let (checked, failed) = tokio::task::spawn_blocking(move || db.audit(invalid_ops_tx)).await??;
14541454+ log::info!(
14551455+ "fjall audit complete in {:?}, {checked} ops checked",
14561456+ t0.elapsed()
14571457+ );
14581458+ if failed > 0 {
14591459+ anyhow::bail!("audit found {failed} invalid operations");
14601460+ }
14611461+ Ok("audit_fjall")
14621462+}
14631463+14641464+pub async fn drop_invalid_ops(
14651465+ db: FjallDb,
14661466+ mut invalid_ops_rx: mpsc::Receiver<(String, Dt, String)>,
14671467+) -> anyhow::Result<&'static str> {
14681468+ while let Some((did, at, cid)) = invalid_ops_rx.recv().await {
14691469+ db.drop_op(&did, &at, &cid)?;
14701470+ }
14711471+ Ok("drop_invalid_ops")
12691472}
1270147312711474#[cfg(test)]