···11[package]
22name = "allegedly"
33-description = "public ledger tools and services (for the PLC)"
33+description = "public ledger server tools and services (for the PLC)"
44license = "MIT OR Apache-2.0"
55version = "0.1.0"
66edition = "2024"
···1111async-compression = { version = "0.4.30", features = ["futures-io", "tokio", "gzip"] }
1212chrono = { version = "0.4.42", features = ["serde"] }
1313clap = { version = "4.5.47", features = ["derive", "env"] }
1414-env_logger = "0.11.8"
1514futures = "0.3.31"
1615log = "0.4.28"
1616+poem = { version = "3.1.12", features = ["compression"] }
1717reqwest = { version = "0.12.23", features = ["stream"] }
1818reqwest-middleware = "0.4.2"
1919reqwest-retry = "0.7.0"
···2424tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
2525tokio-stream = { version = "0.1.17", features = ["io-util"] }
2626tokio-util = { version = "0.7.16", features = ["compat"] }
2727+tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+9-1
readme.md
···11# Allegedly
2233-Some [public ledger](https://github.com/did-method-plc/did-method-plc) tools and services
33+Some [public ledger](https://github.com/did-method-plc/did-method-plc) server tools and services
4455Allegedly can
6677- Tail PLC ops to stdout: `allegedly tail | jq`
88- Export PLC ops to weekly gzipped bundles: `allegdly bundle --dest ./some-folder`
99- Dump bundled ops to stdout FAST: `allegedly backfill --source-workers 6 | pv -l > /ops-unordered.jsonl`
1010+- Wrap the reference PLC server and run it as a mirror:
1111+1212+ ```bash
1313+ allegedly mirror \
1414+ --bind 0.0.0.0:8000 \
1515+ --wrap http://127.0.0.1:3000 \
1616+ --wrap-pg "postgresql://postgres:postgres@localhost:5432/postgres"
1717+ ```
10181119(add `--help` to any command for more info about it)
1220
+50-5
src/bin/allegedly.rs
···11use allegedly::{
22 Db, Dt, ExportPage, FolderSource, HttpSource, PageBoundaryState, backfill, backfill_to_pg,
33- bin_init, pages_to_pg, pages_to_weeks, poll_upstream,
33+ bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve,
44};
55-use clap::{Parser, Subcommand};
55+use clap::{CommandFactory, Parser, Subcommand};
66use reqwest::Url;
77-use std::{path::PathBuf, time::Instant};
77+use std::{net::SocketAddr, path::PathBuf, time::Instant};
88use tokio::sync::{mpsc, oneshot};
991010#[derive(Debug, Parser)]
···7171 #[arg(long, action)]
7272 clobber: bool,
7373 },
7474+ /// Wrap a did-method-plc server, syncing upstream and blocking op submits
7575+ Mirror {
7676+ /// the wrapped did-method-plc server
7777+ #[arg(long, env)]
7878+ wrap: Url,
7979+ /// the wrapped did-method-plc server's database (write access required)
8080+ #[arg(long, env)]
8181+ wrap_pg: Url,
8282+ /// wrapping server listen address
8383+ #[arg(short, long, env)]
8484+ #[clap(default_value = "127.0.0.1:8000")]
8585+ bind: SocketAddr,
8686+ },
7487 /// Poll an upstream PLC server and log new ops to stdout
7588 Tail {
7689 /// Begin tailing from a specific timestamp for replay or wait-until
···121134122135#[tokio::main]
123136async fn main() {
124124- bin_init("main");
125125-126137 let args = Cli::parse();
138138+ let matches = Cli::command().get_matches();
139139+ let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
140140+ bin_init(name);
127141128142 let t0 = Instant::now();
129143 match args.command {
···205219 log::trace!("ensuring output directory exists");
206220 std::fs::create_dir_all(&dest).unwrap();
207221 pages_to_weeks(rx, dest, clobber).await.unwrap();
222222+ }
223223+ Commands::Mirror {
224224+ wrap,
225225+ wrap_pg,
226226+ bind,
227227+ } => {
228228+ let db = Db::new(wrap_pg.as_str()).await.unwrap();
229229+ let latest = db
230230+ .get_latest()
231231+ .await
232232+ .unwrap()
233233+ .expect("there to be at least one op in the db. did you backfill?");
234234+235235+ let (tx, rx) = mpsc::channel(2);
236236+ // upstream poller
237237+ tokio::task::spawn(async move {
238238+ log::info!("starting poll reader...");
239239+ let mut url = args.upstream;
240240+ url.set_path("/export");
241241+ tokio::task::spawn(
242242+ async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
243243+ );
244244+ });
245245+ // db writer
246246+ let poll_db = db.clone();
247247+ tokio::task::spawn(async move {
248248+ log::info!("starting db writer...");
249249+ pages_to_pg(poll_db, rx).await.unwrap();
250250+ });
251251+252252+ serve(wrap, bind).await.unwrap();
208253 }
209254 Commands::Tail { after } => {
210255 let mut url = args.upstream;