Server tools to backfill, tail, mirror, and verify PLC logs

fjall: use seq numbers

ptr.pet 078d4829 bb4a7328

verified
+548 -370
+46
Cargo.lock
··· 42 42 "opentelemetry", 43 43 "opentelemetry-otlp", 44 44 "opentelemetry_sdk", 45 + "ordered-varint", 45 46 "p256", 46 47 "poem", 47 48 "postgres-native-tls", ··· 59 60 "tokio", 60 61 "tokio-postgres", 61 62 "tokio-stream", 63 + "tokio-tungstenite", 62 64 "tokio-util", 63 65 "tracing", 64 66 "tracing-opentelemetry", ··· 2100 2102 ] 2101 2103 2102 2104 [[package]] 2105 + name = "ordered-varint" 2106 + version = "2.0.0" 2107 + source = "registry+https://github.com/rust-lang/crates.io-index" 2108 + checksum = "e9cc9f18ab4bad1e01726bda1259feb8f11e5e76308708a966b4c0136e9db34c" 2109 + 2110 + [[package]] 2103 2111 name = "p256" 2104 2112 version = "0.13.2" 2105 2113 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3539 3547 ] 3540 3548 3541 3549 [[package]] 3550 + name = "tokio-tungstenite" 3551 + version = "0.26.2" 3552 + source = "registry+https://github.com/rust-lang/crates.io-index" 3553 + checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" 3554 + dependencies = [ 3555 + "futures-util", 3556 + "log", 3557 + "native-tls", 3558 + "tokio", 3559 + "tokio-native-tls", 3560 + "tungstenite", 3561 + ] 3562 + 3563 + [[package]] 3542 3564 name = "tokio-util" 3543 3565 version = "0.7.18" 3544 3566 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3739 3761 checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" 3740 3762 3741 3763 [[package]] 3764 + name = "tungstenite" 3765 + version = "0.26.2" 3766 + source = "registry+https://github.com/rust-lang/crates.io-index" 3767 + checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" 3768 + dependencies = [ 3769 + "bytes", 3770 + "data-encoding", 3771 + "http", 3772 + "httparse", 3773 + "log", 3774 + "native-tls", 3775 + "rand 0.9.2", 3776 + "sha1", 3777 + "thiserror 2.0.18", 3778 + "utf-8", 3779 + ] 3780 + 3781 + [[package]] 3742 3782 name = "twox-hash" 3743 3783 version = "2.1.2" 3744 3784 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3815 3855 "percent-encoding", 3816 3856 "serde", 3817 3857 ] 3858 + 3859 + [[package]] 3860 + name = "utf-8" 3861 + version = "0.7.6" 3862 + source = "registry+https://github.com/rust-lang/crates.io-index" 3863 + checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" 3818 3864 3819 3865 [[package]] 3820 3866 name = "utf8_iter"
+2
Cargo.toml
··· 39 39 tokio = { version = "1.47.1", features = ["full"] } 40 40 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 41 41 tokio-stream = { version = "0.1.17", features = ["io-util"] } 42 + tokio-tungstenite = { version = "0.26", features = ["native-tls"] } 42 43 tokio-util = { version = "0.7.16", features = ["compat"] } 43 44 tracing = "0.1.41" 44 45 tracing-opentelemetry = "0.31.0" ··· 53 54 p256 = "0.13.2" 54 55 k256 = "0.13.4" 55 56 serde_ipld_dagcbor = "0.6.4" 57 + ordered-varint = "2.0.0" 56 58
+2 -2
readme.md
··· 15 15 --wrap-pg "postgresql://user:pass@pg-host:5432/plc-db" 16 16 ``` 17 17 18 - - Run a fully self-contained mirror using an embedded fjall database (no postgres needed): 18 + - Run a fully self-contained mirror using an embedded fjall database (no postgres or local plc server needed): 19 19 20 20 ```bash 21 21 # backfill first 22 - allegedly backfill --to-fjall ./plc-data 22 + allegedly backfill --no-bulk --to-fjall ./plc-data 23 23 24 24 # then run the mirror 25 25 allegedly mirror --wrap-fjall ./plc-data
+16 -33
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, backfill, backfill_to_fjall, 3 - backfill_to_pg, 2 + Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, SeqPage, backfill, backfill_to_pg, 4 3 bin::{GlobalArgs, bin_init}, 5 - full_pages, logo, pages_to_fjall, pages_to_pg, pages_to_stdout, poll_upstream, 4 + full_pages, full_pages_seq, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5 + poll_upstream_seq, seq_pages_to_fjall, 6 6 }; 7 7 use clap::Parser; 8 8 use reqwest::Url; ··· 23 23 /// Local folder to fetch bundles from (overrides `http`) 24 24 #[arg(long)] 25 25 dir: Option<PathBuf>, 26 - /// Local fjall database to fetch raw ops from (overrides `http` and `dir`) 27 - #[arg(long, conflicts_with_all = ["dir"])] 28 - from_fjall: Option<PathBuf>, 29 26 /// Don't do weekly bulk-loading at all. 30 27 /// 31 28 /// overrides `http` and `dir`, makes catch_up redundant ··· 49 46 /// only used if `--to-postgres` or `--to-fjall` is present 50 47 #[arg(long, action)] 51 48 reset: bool, 52 - /// Bulk load into a local fjall embedded database 49 + /// Load into a local fjall embedded database 50 + /// (doesnt support bulk yet unless loading from another fjall db) 53 51 /// 54 52 /// Pass a directory path for the fjall database 55 53 #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert"])] ··· 70 68 Args { 71 69 http, 72 70 dir, 73 - from_fjall, 74 71 no_bulk, 75 72 source_workers, 76 73 to_postgres, ··· 95 92 }; 96 93 97 94 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 98 - let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 95 + let (full_tx, full_out) = mpsc::channel::<ExportPage>(1); // don't need to buffer at this filter 99 96 100 97 // set up sources 101 98 if no_bulk { ··· 114 111 let mut upstream = upstream; 115 112 upstream.set_path("/export"); 116 113 let throttle = Duration::from_millis(upstream_throttle_ms); 117 - tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 118 - tasks.spawn(full_pages(poll_out, full_tx)); 119 114 if let Some(fjall_path) = to_fjall { 120 115 log::trace!("opening fjall db at {fjall_path:?}..."); 121 116 let db = FjallDb::open(&fjall_path)?; 122 117 log::trace!("opened fjall db"); 123 118 124 - tasks.spawn(pages_to_fjall(db, full_out)); 119 + let (poll_tx, poll_out) = mpsc::channel::<SeqPage>(128); // normal/small pages 120 + let (full_tx, full_out) = mpsc::channel::<SeqPage>(1); // don't need to buffer at this filter 121 + 122 + tasks.spawn(poll_upstream_seq(None, upstream, throttle, poll_tx)); 123 + tasks.spawn(full_pages_seq(poll_out, full_tx)); 124 + tasks.spawn(seq_pages_to_fjall(db, full_out)); 125 125 } else { 126 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 127 + tasks.spawn(full_pages(poll_out, full_tx)); 126 128 tasks.spawn(pages_to_stdout(full_out, None)); 127 129 } 128 130 } else { 129 131 // fun mode 130 132 131 133 // set up bulk sources 132 - if let Some(fjall_path) = from_fjall { 133 - log::trace!("opening source fjall db at {fjall_path:?}..."); 134 - let db = FjallDb::open(&fjall_path)?; 135 - log::trace!("opened source fjall db"); 136 - tasks.spawn(backfill(db, bulk_tx, source_workers.unwrap_or(4), until)); 137 - } else if let Some(dir) = dir { 134 + if let Some(dir) = dir { 138 135 if http != DEFAULT_HTTP.parse()? { 139 136 anyhow::bail!( 140 137 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" ··· 167 164 } 168 165 169 166 // set up sinks 170 - if let Some(fjall_path) = to_fjall { 171 - log::trace!("opening fjall db at {fjall_path:?}..."); 172 - let db = FjallDb::open(&fjall_path)?; 173 - log::trace!("opened fjall db"); 174 - 175 - tasks.spawn(backfill_to_fjall( 176 - db.clone(), 177 - reset, 178 - bulk_out, 179 - found_last_tx, 180 - )); 181 - if catch_up { 182 - tasks.spawn(pages_to_fjall(db, full_out)); 183 - } 184 - } else if let Some(pg_url) = to_postgres { 167 + if let Some(pg_url) = to_postgres { 185 168 log::trace!("connecting to postgres..."); 186 169 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 187 170 log::trace!("connected to postgres");
+85 -10
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 2 Db, ExperimentalConf, FjallDb, ListenConf, 3 3 bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 - logo, pages_to_fjall, pages_to_pg, poll_upstream, serve, serve_fjall, 4 + logo, pages_to_pg, poll_upstream, poll_upstream_seq, seq_pages_to_fjall, serve, serve_fjall, 5 + tail_upstream_stream, 5 6 }; 6 7 use clap::Parser; 7 8 use reqwest::Url; ··· 69 70 /// accept writes! by forwarding them upstream 70 71 #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 71 72 experimental_write_upstream: bool, 73 + /// switch from polling to /export/stream once the latest op is within 74 + /// this many days of now (plc.directory only supports ~1 week of backfill) 75 + #[arg(long, env = "ALLEGEDLY_STREAM_CUTOVER_DAYS", default_value = "5")] 76 + stream_cutover_days: u32, 72 77 } 73 78 74 79 pub async fn run( ··· 89 94 acme_ipv6, 90 95 experimental_acme_domain, 91 96 experimental_write_upstream, 97 + stream_cutover_days, 92 98 }: Args, 93 99 sync: bool, 94 100 ) -> anyhow::Result<()> { ··· 122 128 let db = FjallDb::open(&fjall_path)?; 123 129 if compact_fjall { 124 130 log::info!("compacting fjall..."); 125 - db.compact()?; // blocking here is fine, we didn't start anything yet 131 + db.compact()?; 126 132 } 127 133 128 - log::debug!("getting the latest op from fjall..."); 129 - let latest = db 134 + log::debug!("getting the latest seq from fjall..."); 135 + let latest_seq = db 130 136 .get_latest()? 137 + .map(|(seq, _)| seq) 131 138 .expect("there to be at least one op in the db. did you backfill?"); 132 - log::info!("starting polling from {latest}..."); 139 + log::info!("starting seq polling from seq {latest_seq}..."); 133 140 134 - let (send_page, recv_page) = mpsc::channel(8); 141 + let (send_page, recv_page) = mpsc::channel::<allegedly::SeqPage>(8); 135 142 136 - let mut poll_url = upstream.clone(); 137 - poll_url.set_path("/export"); 143 + let mut export_url = upstream.clone(); 144 + export_url.set_path("/export"); 145 + let mut stream_url = upstream.clone(); 146 + stream_url.set_path("/export/stream"); 138 147 let throttle = Duration::from_millis(upstream_throttle_ms); 148 + let cutover_age = Duration::from_secs(stream_cutover_days as u64 * 86_400); 149 + 150 + // the poll -> stream task: poll until we're caught up, then switch to stream. 151 + // on stream disconnect, fall back to polling to resync. 152 + let send_page_bg = send_page.clone(); 153 + tasks.spawn(async move { 154 + let mut current_seq = latest_seq; 155 + loop { 156 + log::info!("seq polling from seq {current_seq}"); 157 + let (inner_tx, mut inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 158 + 159 + // run poller; it ends only when the channel closes 160 + let poll_url = export_url.clone(); 161 + let poll_task = tokio::spawn(poll_upstream_seq( 162 + Some(current_seq), 163 + poll_url, 164 + throttle, 165 + inner_tx, 166 + )); 167 + 168 + // drain pages from poller until the last op is within cutover_age of now, 169 + // meaning we're close enough to the tip that the stream can cover the rest 170 + let mut last_seq_from_poll = current_seq; 139 171 140 - tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 141 - tasks.spawn(pages_to_fjall(db.clone(), recv_page)); 172 + while let Some(page) = inner_rx.recv().await { 173 + let near_tip = page.ops.last().map_or(false, |op| { 174 + let age = chrono::Utc::now().signed_duration_since(op.created_at); 175 + age.to_std().map_or(false, |d| d <= cutover_age) 176 + }); 177 + if let Some(last) = page.ops.last() { 178 + last_seq_from_poll = last.seq; 179 + } 180 + let _ = send_page_bg.send(page).await; 181 + if near_tip { 182 + break; 183 + } 184 + } 185 + 186 + poll_task.abort(); 187 + current_seq = last_seq_from_poll; 188 + 189 + // switch to streaming 190 + log::info!("caught up at seq {current_seq}, switching to /export/stream"); 191 + let (stream_inner_tx, mut stream_inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 192 + let stream_task = tokio::spawn(tail_upstream_stream( 193 + Some(current_seq), 194 + stream_url.clone(), 195 + stream_inner_tx, 196 + )); 197 + 198 + while let Some(page) = stream_inner_rx.recv().await { 199 + if let Some(last) = page.ops.last() { 200 + current_seq = last.seq; 201 + } 202 + if send_page_bg.send(page).await.is_err() { 203 + stream_task.abort(); 204 + return anyhow::Ok("fjall-poll-stream (dest closed)"); 205 + } 206 + } 207 + 208 + // stream ended/errored — loop back to polling to resync 209 + match stream_task.await { 210 + Ok(Ok(())) => log::info!("stream closed cleanly, resyncing via poll"), 211 + Ok(Err(e)) => log::warn!("stream error: {e}, resyncing via poll"), 212 + Err(e) => log::warn!("stream task join error: {e}"), 213 + } 214 + } 215 + }); 142 216 217 + tasks.spawn(seq_pages_to_fjall(db.clone(), recv_page)); 143 218 tasks.spawn(serve_fjall(upstream, listen_conf, experimental_conf, db)); 144 219 } else { 145 220 let wrap = wrap.ok_or(anyhow::anyhow!(
+72 -3
src/lib.rs
··· 5 5 mod backfill; 6 6 mod cached_value; 7 7 mod client; 8 - mod crypto; 8 + pub mod crypto; 9 9 pub mod doc; 10 10 mod mirror; 11 11 mod plc_fjall; ··· 21 21 pub use client::{CLIENT, UA}; 22 22 pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall}; 23 23 pub use plc_fjall::{ 24 - FjallDb, audit as audit_fjall, backfill_to_fjall, fix_ops as fix_ops_fjall, pages_to_fjall, 24 + FjallDb, audit as audit_fjall, backfill_to_fjall, fix_ops as fix_ops_fjall, seq_pages_to_fjall, 25 25 }; 26 26 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 27 - pub use poll::{PageBoundaryState, get_page, poll_upstream}; 27 + pub use poll::{ 28 + PageBoundaryState, get_page, poll_upstream, poll_upstream_seq, tail_upstream_stream, 29 + }; 28 30 pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 29 31 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 30 32 ··· 85 87 } 86 88 } 87 89 90 + /// A PLC op from `/export?after=<seq>` or `/export/stream` 91 + /// 92 + /// Both endpoints return the `seq` field per op, which is a globally monotonic 93 + /// unsigned integer assigned by the PLC directory. 94 + #[derive(Debug, Clone, Deserialize)] 95 + #[serde(rename_all = "camelCase")] 96 + pub struct SeqOp { 97 + pub seq: u64, 98 + pub did: String, 99 + pub cid: String, 100 + pub created_at: Dt, 101 + #[serde(default)] 102 + pub nullified: bool, 103 + pub operation: Box<serde_json::value::RawValue>, 104 + } 105 + 106 + impl From<SeqOp> for Op { 107 + fn from(s: SeqOp) -> Self { 108 + Op { 109 + did: s.did, 110 + cid: s.cid, 111 + created_at: s.created_at, 112 + nullified: s.nullified, 113 + operation: s.operation, 114 + } 115 + } 116 + } 117 + 118 + /// A page of sequenced ops from `/export?after=<seq>` 119 + #[derive(Debug)] 120 + pub struct SeqPage { 121 + pub ops: Vec<SeqOp>, 122 + } 123 + 124 + impl SeqPage { 125 + pub fn is_empty(&self) -> bool { 126 + self.ops.is_empty() 127 + } 128 + } 129 + 88 130 /// page forwarder who drops its channels on receipt of a small page 89 131 /// 90 132 /// PLC will return up to 1000 ops on a page, and returns full pages until it ··· 92 134 pub async fn full_pages( 93 135 mut rx: mpsc::Receiver<ExportPage>, 94 136 tx: mpsc::Sender<ExportPage>, 137 + ) -> anyhow::Result<&'static str> { 138 + while let Some(page) = rx.recv().await { 139 + let n = page.ops.len(); 140 + if n < 900 { 141 + let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 142 + let Some(age) = last_age else { 143 + log::info!("full_pages done, empty final page"); 144 + return Ok("full pages (hmm)"); 145 + }; 146 + if age <= chrono::TimeDelta::hours(6) { 147 + log::info!("full_pages done, final page of {n} ops"); 148 + } else { 149 + log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 150 + } 151 + return Ok("full pages (cool)"); 152 + } 153 + log::trace!("full_pages: continuing with page of {n} ops"); 154 + tx.send(page).await?; 155 + } 156 + Err(anyhow::anyhow!( 157 + "full_pages ran out of source material, sender closed" 158 + )) 159 + } 160 + 161 + pub async fn full_pages_seq( 162 + mut rx: mpsc::Receiver<SeqPage>, 163 + tx: mpsc::Sender<SeqPage>, 95 164 ) -> anyhow::Result<&'static str> { 96 165 while let Some(page) = rx.recv().await { 97 166 let n = page.ops.len();
+15 -24
src/mirror/fjall.rs
··· 14 14 15 15 #[derive(Clone)] 16 16 struct FjallSyncInfo { 17 - latest_at: CachedValue<Dt, GetFjallLatestAt>, 17 + latest: CachedValue<(u64, Dt), GetFjallLatest>, 18 18 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 19 19 } 20 20 21 21 #[derive(Clone)] 22 - struct GetFjallLatestAt(FjallDb); 23 - impl Fetcher<Dt> for GetFjallLatestAt { 24 - async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 22 + struct GetFjallLatest(FjallDb); 23 + impl Fetcher<(u64, Dt)> for GetFjallLatest { 24 + async fn fetch(&self) -> Result<(u64, Dt), Box<dyn std::error::Error>> { 25 25 let db = self.0.clone(); 26 - let now = tokio::task::spawn_blocking(move || db.get_latest()) 26 + tokio::task::spawn_blocking(move || db.get_latest()) 27 27 .await?? 28 - .ok_or(anyhow::anyhow!( 29 - "expected to find at least one thing in the db" 30 - ))?; 31 - Ok(now) 28 + .ok_or_else(|| anyhow::anyhow!("db is empty").into()) 32 29 } 33 30 } 34 31 ··· 116 113 if !ok { 117 114 overall_status = StatusCode::BAD_GATEWAY; 118 115 } 119 - let latest = sync_info.latest_at.get().await.ok(); 116 + let latest = sync_info.latest.get().await.ok(); 117 + let latest_at = latest.map(|(_, dt)| dt); 118 + let latest_seq = latest.map(|(seq, _)| seq); 120 119 121 120 ( 122 121 overall_status, ··· 124 123 "server": "allegedly (mirror/fjall)", 125 124 "version": env!("CARGO_PKG_VERSION"), 126 125 "upstream_plc": upstream_status, 127 - "latest_at": latest, 126 + "latest_at": latest_at, 127 + "latest_seq": latest_seq, 128 128 })), 129 129 ) 130 130 } ··· 250 250 251 251 #[derive(Deserialize)] 252 252 struct ExportQuery { 253 - after: Option<String>, 253 + after: Option<u64>, 254 254 #[allow(dead_code)] // we just cap at 1000 for now, matching reference impl 255 255 count: Option<usize>, 256 256 } ··· 261 261 Query(query): Query<ExportQuery>, 262 262 Data(FjallState { fjall, .. }): Data<&FjallState>, 263 263 ) -> Result<Body> { 264 - let after = if let Some(a) = query.after { 265 - Some( 266 - chrono::DateTime::parse_from_rfc3339(&a) 267 - .map_err(|e| Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))? 268 - .with_timezone(&chrono::Utc), 269 - ) 270 - } else { 271 - None 272 - }; 273 - 264 + let after = query.after.unwrap_or(0); 274 265 let limit = 1000; 275 266 let db = fjall.clone(); 276 267 277 268 let ops = tokio::task::spawn_blocking(move || { 278 - let iter = db.export_ops(after.unwrap_or(Dt::UNIX_EPOCH)..)?; 269 + let iter = db.export_ops(after..)?; 279 270 iter.take(limit).collect::<anyhow::Result<Vec<_>>>() 280 271 }) 281 272 .await ··· 324 315 .expect("reqwest client to build"); 325 316 326 317 let sync_info = FjallSyncInfo { 327 - latest_at: CachedValue::new(GetFjallLatestAt(fjall.clone()), Duration::from_secs(2)), 318 + latest: CachedValue::new(GetFjallLatest(fjall.clone()), Duration::from_secs(2)), 328 319 upstream_status: CachedValue::new( 329 320 CheckUpstream(upstream.clone(), client.clone()), 330 321 Duration::from_secs(6),
+151 -178
src/plc_fjall.rs
··· 1 1 use crate::{ 2 - BundleSource, Dt, ExportPage, InvalidOp, Op as CommonOp, PageBoundaryState, Week, 2 + Dt, InvalidOp, Op as CommonOp, 3 3 crypto::{AssuranceResults, DidKey, Signature, assure_valid_sig}, 4 4 }; 5 5 use anyhow::Context; 6 6 use data_encoding::BASE32_NOPAD; 7 7 use fjall::{Database, Keyspace, KeyspaceCreateOptions, PersistMode, config::BlockSizePolicy}; 8 - use futures::Future; 8 + use ordered_varint::Variable; 9 9 use serde::{Deserialize, Serialize}; 10 10 use std::collections::BTreeMap; 11 11 use std::fmt; 12 12 use std::path::Path; 13 13 use std::sync::Arc; 14 14 use std::time::Instant; 15 - use tokio::io::{AsyncRead, AsyncWriteExt}; 16 15 use tokio::sync::{mpsc, oneshot}; 17 16 18 17 const SEP: u8 = 0; 18 + 19 + fn seq_key(seq: u64) -> Vec<u8> { 20 + seq.to_variable_vec().expect("that seq number encodes") 21 + } 22 + 23 + fn decode_seq_key(key: &[u8]) -> anyhow::Result<u64> { 24 + u64::decode_variable(key).context("failed to decode seq key") 25 + } 19 26 20 27 type IpldCid = cid::CidGeneric<64>; 21 28 ··· 54 61 format!("did:plc:{decoded}") 55 62 } 56 63 57 - fn op_key(created_at: &Dt, cid_suffix: &[u8]) -> Vec<u8> { 58 - let micros = created_at.timestamp_micros() as u64; 59 - let mut key = Vec::with_capacity(8 + 1 + cid_suffix.len()); 60 - key.extend_from_slice(&micros.to_be_bytes()); 61 - key.push(SEP); 62 - key.extend_from_slice(cid_suffix); 63 - key 64 - } 65 - 66 64 fn by_did_prefix(did: &str) -> anyhow::Result<Vec<u8>> { 67 65 let mut p = Vec::with_capacity(BASE32_NOPAD.decode_len(did.len())? + 1); 68 66 encode_did(&mut p, did)?; ··· 70 68 Ok(p) 71 69 } 72 70 73 - fn by_did_key(did: &str, created_at: &Dt, cid_suffix: &[u8]) -> anyhow::Result<Vec<u8>> { 71 + /// by_did key: [15 bytes encoded did][SEP][seq varint] 72 + fn by_did_key(did: &str, seq: u64) -> anyhow::Result<Vec<u8>> { 74 73 let mut key = by_did_prefix(did)?; 75 - let micros = created_at.timestamp_micros() as u64; 76 - key.extend_from_slice(&micros.to_be_bytes()); 77 - key.push(SEP); 78 - key.extend_from_slice(cid_suffix); 74 + seq.encode_variable(&mut key)?; 79 75 Ok(key) 80 - } 81 - 82 - fn decode_timestamp(key: &[u8]) -> anyhow::Result<Dt> { 83 - let micros = u64::from_be_bytes( 84 - key.try_into() 85 - .map_err(|e| anyhow::anyhow!("invalid timestamp key {key:?}: {e}"))?, 86 - ); 87 - Dt::from_timestamp_micros(micros as i64) 88 - .ok_or_else(|| anyhow::anyhow!("invalid timestamp {micros}")) 89 76 } 90 77 91 78 /// CID string → binary CID bytes ··· 827 814 Ok(results) 828 815 } 829 816 830 - // this is basically Op, but without the cid and created_at fields 831 - // since we have them in the key already 817 + // stored alongside the seq key in the ops keyspace 818 + // cid and created_at are in the value (not the key) in the new layout 832 819 #[derive(Debug, Deserialize, Serialize)] 833 820 #[serde(rename_all = "camelCase")] 834 821 struct DbOp { 835 822 #[serde(with = "serde_bytes")] 836 823 pub did: Vec<u8>, 837 824 #[serde(with = "serde_bytes")] 838 - pub cid_prefix: Vec<u8>, 825 + pub cid: Vec<u8>, 826 + pub created_at: u64, 839 827 pub nullified: bool, 840 828 pub operation: StoredOp, 841 829 } ··· 857 845 858 846 struct FjallInner { 859 847 db: Database, 848 + /// primary keyspace: seq (varint) -> DbOp 860 849 ops: Keyspace, 850 + /// secondary index: [encoded_did][SEP][seq_varint] -> [] 861 851 by_did: Keyspace, 862 852 } 863 853 ··· 915 905 Ok(()) 916 906 } 917 907 918 - pub fn get_latest(&self) -> anyhow::Result<Option<Dt>> { 908 + /// Returns `(seq, created_at)` for the last stored op, or `None` if empty. 909 + pub fn get_latest(&self) -> anyhow::Result<Option<(u64, Dt)>> { 919 910 let Some(guard) = self.inner.ops.last_key_value() else { 920 911 return Ok(None); 921 912 }; 922 - let key = guard 923 - .key() 924 - .map_err(|e| anyhow::anyhow!("fjall key error: {e}"))?; 925 - 926 - key.get(..8) 927 - .ok_or_else(|| anyhow::anyhow!("invalid timestamp key {key:?}")) 928 - .map(decode_timestamp) 929 - .flatten() 930 - .map(Some) 913 + let (key, value) = guard 914 + .into_inner() 915 + .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 916 + let seq = decode_seq_key(&key)?; 917 + let db_op: DbOp = rmp_serde::from_slice(&value)?; 918 + let dt = Dt::from_timestamp_micros(db_op.created_at as i64) 919 + .ok_or_else(|| anyhow::anyhow!("invalid created_at in last op"))?; 920 + Ok(Some((seq, dt))) 931 921 } 932 922 933 - pub fn insert_op<const VERIFY: bool>(&self, op: &CommonOp) -> anyhow::Result<usize> { 923 + pub fn insert_op<const VERIFY: bool>(&self, op: &CommonOp, seq: u64) -> anyhow::Result<usize> { 934 924 let cid_bytes = decode_cid_str(&op.cid)?; 935 - let cid_prefix = cid_bytes 936 - .get(..30) 937 - .ok_or_else(|| anyhow::anyhow!("invalid cid length (prefix): {}", op.cid))? 938 - .to_vec(); 939 - let cid_suffix = cid_bytes 940 - .get(30..) 941 - .ok_or_else(|| anyhow::anyhow!("invalid cid length (suffix): {}", op.cid))?; 942 925 943 926 let op_json: serde_json::Value = serde_json::from_str(op.operation.get())?; 944 927 let (stored, mut errors) = StoredOp::from_json_value(op_json); ··· 960 943 .prev 961 944 .as_ref() 962 945 .map(|prev_cid| { 946 + // TODO: we should have a cid -> seq lookup eventually maybe? 947 + // this is probably fine though we will only iter over like 2 ops at most 948 + // or so, its there to handle nullified... 949 + // but a cid lookup would also help us avoid duplicate ops! 963 950 self._ops_for_did(&op.did) 964 951 .map(|ops| { 965 952 ops.rev() ··· 1000 987 encode_did(&mut encoded_did, &op.did)?; 1001 988 encoded_did 1002 989 }, 1003 - cid_prefix, 990 + cid: cid_bytes, 991 + created_at: op.created_at.timestamp_micros() as u64, 1004 992 nullified: op.nullified, 1005 993 operation, 1006 994 }; 1007 995 996 + let seq_val = rmp_serde::to_vec(&db_op)?; 997 + let seq_key_bytes = seq_key(seq); 998 + let by_did_key_bytes = by_did_key(&op.did, seq)?; 999 + 1008 1000 let mut batch = self.inner.db.batch(); 1009 - batch.insert( 1010 - &self.inner.ops, 1011 - op_key(&op.created_at, cid_suffix), 1012 - rmp_serde::to_vec(&db_op)?, 1013 - ); 1014 - batch.insert( 1015 - &self.inner.by_did, 1016 - by_did_key(&op.did, &op.created_at, cid_suffix)?, 1017 - &[], 1018 - ); 1001 + batch.insert(&self.inner.ops, seq_key_bytes, seq_val); 1002 + batch.insert(&self.inner.by_did, by_did_key_bytes, &[]); 1019 1003 batch.commit()?; 1020 1004 1021 1005 Ok(1) 1022 1006 } 1007 + } 1023 1008 1009 + impl FjallDb { 1010 + /// Decode a `by_did` entry: extract the seq from the key suffix, then 1011 + /// look up the full `DbOp` in the `ops` keyspace. 1024 1012 fn decode_by_did_entry( 1025 1013 &self, 1026 - by_did_key: &[u8], 1014 + by_did_key_bytes: &[u8], 1027 1015 prefix_len: usize, 1028 1016 ) -> anyhow::Result<(Dt, PlcCid, DbOp)> { 1029 - let key_rest = by_did_key 1017 + let key_suffix = by_did_key_bytes 1030 1018 .get(prefix_len..) 1031 - .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key:?}"))?; 1032 - 1033 - let ts_bytes = key_rest 1034 - .get(..8) 1035 - .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 1036 - let cid_suffix = key_rest 1037 - .get(9..) 1038 - .ok_or_else(|| anyhow::anyhow!("invalid length: {key_rest:?}"))?; 1019 + .ok_or_else(|| anyhow::anyhow!("invalid by_did key {by_did_key_bytes:?}"))?; 1039 1020 1040 - let op_key = [ts_bytes, &[SEP][..], cid_suffix].concat(); 1041 - let ts = decode_timestamp(ts_bytes)?; 1021 + let seq = 1022 + u64::decode_variable(key_suffix).context("failed to decode seq from by_did key")?; 1042 1023 1043 1024 let value = self 1044 1025 .inner 1045 1026 .ops 1046 - .get(&op_key)? 1047 - .ok_or_else(|| anyhow::anyhow!("op not found: {op_key:?}"))?; 1027 + .get(seq_key(seq))? 1028 + .ok_or_else(|| anyhow::anyhow!("op not found for seq {seq}"))?; 1048 1029 1049 1030 let op: DbOp = rmp_serde::from_slice(&value)?; 1050 - let mut full_cid = op.cid_prefix.clone(); 1051 - full_cid.extend_from_slice(cid_suffix); 1031 + let ts = Dt::from_timestamp_micros(op.created_at as i64) 1032 + .ok_or_else(|| anyhow::anyhow!("invalid created_at_micros {}", op.created_at))?; 1033 + let cid = PlcCid(op.cid.clone()); 1052 1034 1053 - Ok((ts, PlcCid(full_cid), op)) 1035 + Ok((ts, cid, op)) 1054 1036 } 1055 1037 1056 1038 fn _ops_for_did( ··· 1074 1056 ) -> anyhow::Result<impl DoubleEndedIterator<Item = anyhow::Result<Op>> + '_> { 1075 1057 Ok(self._ops_for_did(did)?.map(|res| { 1076 1058 let (ts, cid, op) = res?; 1077 - 1078 1059 let cid = decode_cid(&cid.0)?; 1079 1060 let did = decode_did(&op.did); 1080 - 1081 1061 Ok(Op { 1082 1062 did, 1083 1063 cid, ··· 1090 1070 1091 1071 pub fn export_ops( 1092 1072 &self, 1093 - range: impl std::ops::RangeBounds<Dt>, 1073 + range: impl std::ops::RangeBounds<u64>, 1094 1074 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Op>> + '_> { 1095 1075 use std::ops::Bound; 1096 - let map_bound = |b: Bound<&Dt>| -> Bound<[u8; 8]> { 1076 + let map_bound = |b: Bound<&u64>| -> Bound<Vec<u8>> { 1097 1077 match b { 1098 - Bound::Included(dt) => Bound::Included(dt.timestamp_micros().to_be_bytes()), 1099 - Bound::Excluded(dt) => Bound::Excluded(dt.timestamp_micros().to_be_bytes()), 1078 + Bound::Included(seq) => Bound::Included(seq_key(*seq)), 1079 + Bound::Excluded(seq) => Bound::Excluded(seq_key(*seq)), 1100 1080 Bound::Unbounded => Bound::Unbounded, 1101 1081 } 1102 1082 }; 1103 1083 let range = (map_bound(range.start_bound()), map_bound(range.end_bound())); 1104 1084 1105 - let iter = self.inner.ops.range(range); 1085 + Ok(self 1086 + .inner 1087 + .ops 1088 + .range(range) 1089 + .map(|item| -> anyhow::Result<Op> { 1090 + let (_, value) = item 1091 + .into_inner() 1092 + .map_err(|e: fjall::Error| anyhow::anyhow!("fjall read error: {e}"))?; 1093 + let db_op: DbOp = rmp_serde::from_slice(&value)?; 1094 + let created_at = 1095 + Dt::from_timestamp_micros(db_op.created_at as i64).ok_or_else(|| { 1096 + anyhow::anyhow!("invalid created_at_micros {}", db_op.created_at) 1097 + })?; 1098 + let cid = decode_cid(&db_op.cid)?; 1099 + let did = decode_did(&db_op.did); 1100 + Ok(Op { 1101 + did, 1102 + cid, 1103 + created_at, 1104 + nullified: db_op.nullified, 1105 + operation: db_op.operation.to_json_value(), 1106 + }) 1107 + })) 1108 + } 1106 1109 1107 - Ok(iter.map(|item| { 1108 - let (key, value) = item 1110 + pub fn drop_op(&self, did_str: &str, _created_at: &Dt, _cid: &str) -> anyhow::Result<()> { 1111 + // scan the by_did index for this DID and find the op that matches 1112 + // (in practice drop_op is rare so a scan is fine) 1113 + let prefix = by_did_prefix(did_str)?; 1114 + let mut found_seq: Option<u64> = None; 1115 + let mut found_by_did_key: Option<Vec<u8>> = None; 1116 + 1117 + for guard in self.inner.by_did.prefix(&prefix) { 1118 + let (key, _) = guard 1109 1119 .into_inner() 1110 1120 .map_err(|e| anyhow::anyhow!("fjall read error: {e}"))?; 1111 - let db_op: DbOp = rmp_serde::from_slice(&value)?; 1112 - let created_at = decode_timestamp( 1113 - key.get(..8) 1114 - .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?, 1115 - )?; 1116 - let cid_suffix = key 1117 - .get(9..) 1118 - .ok_or_else(|| anyhow::anyhow!("invalid op key {key:?}"))?; 1121 + let suffix = &key[prefix.len()..]; 1122 + let seq = u64::decode_variable(suffix).context("decode seq in drop_op")?; 1123 + found_seq = Some(seq); 1124 + found_by_did_key = Some(key.to_vec()); 1125 + // if there were multiple ops for this DID we'd need to match by cid, 1126 + // but for now take the last matched (they're in seq order) 1127 + } 1119 1128 1120 - let mut full_cid_bytes = db_op.cid_prefix.clone(); 1121 - full_cid_bytes.extend_from_slice(cid_suffix); 1122 - 1123 - let cid = decode_cid(&full_cid_bytes)?; 1124 - let did = decode_did(&db_op.did); 1125 - 1126 - Ok(Op { 1127 - did, 1128 - cid, 1129 - created_at, 1130 - nullified: db_op.nullified, 1131 - operation: db_op.operation.to_json_value(), 1132 - }) 1133 - })) 1134 - } 1135 - 1136 - pub fn drop_op(&self, did_str: &str, created_at: &Dt, cid: &str) -> anyhow::Result<()> { 1137 - let cid = decode_cid_str(cid)?; 1138 - let cid_suffix = &cid[30..]; 1139 - 1140 - let op_key = op_key(created_at, cid_suffix); 1141 - let by_did_key = by_did_key(did_str, created_at, cid_suffix)?; 1129 + let (seq, by_did_key_bytes) = match (found_seq, found_by_did_key) { 1130 + (Some(s), Some(k)) => (s, k), 1131 + _ => { 1132 + log::warn!("drop_op: by_did entry not found for {did_str}"); 1133 + return Ok(()); 1134 + } 1135 + }; 1142 1136 1143 1137 let mut batch = self.inner.db.batch(); 1144 - batch.remove(&self.inner.ops, op_key); 1145 - batch.remove(&self.inner.by_did, by_did_key); 1138 + batch.remove(&self.inner.ops, seq_key(seq)); 1139 + batch.remove(&self.inner.by_did, by_did_key_bytes); 1146 1140 batch.commit()?; 1147 1141 1148 1142 Ok(()) ··· 1302 1296 } 1303 1297 } 1304 1298 1305 - impl BundleSource for FjallDb { 1306 - fn reader_for( 1307 - &self, 1308 - week: Week, 1309 - ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send { 1310 - let db = self.clone(); 1311 - 1312 - async move { 1313 - let (mut tx, rx) = tokio::io::duplex(1024 * 1024 * 16); 1314 - 1315 - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { 1316 - let after: Dt = week.into(); 1317 - let before: Dt = week.next().into(); 1318 - 1319 - let iter = db.export_ops(after..before)?; 1320 - 1321 - let rt = tokio::runtime::Handle::current(); 1322 - 1323 - for op_res in iter { 1324 - let op = op_res?; 1325 - let operation_str = serde_json::to_string(&op.operation)?; 1326 - let common_op = crate::Op { 1327 - did: op.did, 1328 - cid: op.cid, 1329 - created_at: op.created_at, 1330 - nullified: op.nullified, 1331 - operation: serde_json::value::RawValue::from_string(operation_str)?, 1332 - }; 1333 - 1334 - let mut json_bytes = serde_json::to_vec(&common_op)?; 1335 - json_bytes.push(b'\n'); 1336 - 1337 - if rt.block_on(tx.write_all(&json_bytes)).is_err() { 1338 - break; 1339 - } 1340 - } 1341 - 1342 - Ok(()) 1343 - }); 1344 - 1345 - Ok(rx) 1346 - } 1347 - } 1348 - } 1349 - 1350 1299 pub async fn backfill_to_fjall( 1351 1300 db: FjallDb, 1352 1301 reset: bool, 1353 - mut pages: mpsc::Receiver<ExportPage>, 1302 + mut pages: mpsc::Receiver<crate::SeqPage>, 1354 1303 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 1355 1304 ) -> anyhow::Result<&'static str> { 1356 1305 let t0 = Instant::now(); ··· 1375 1324 page = pages.recv(), if !pages_finished => { 1376 1325 let Some(page) = page else { continue; }; 1377 1326 if notify_last_at.is_some() { 1378 - if let Some(s) = PageBoundaryState::new(&page) { 1379 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 1327 + // SeqPage ops are always in order, so we can just grab the last one 1328 + if let Some(last_op) = page.ops.last() { 1329 + last_at = last_at.filter(|&l| l >= last_op.created_at).or(Some(last_op.created_at)); 1380 1330 } 1381 1331 } 1332 + 1382 1333 let db = db.clone(); 1334 + 1383 1335 // we don't have to wait for inserts to finish, because insert_op 1384 1336 // without verification does not read anything from the db 1385 1337 insert_tasks.spawn_blocking(move || { 1386 1338 let mut count: usize = 0; 1387 - for op in &page.ops { 1388 - // we don't verify sigs for bulk, since pages might be out of order 1389 - count += db.insert_op::<false>(op)?; 1339 + for seq_op in &page.ops { 1340 + let op = CommonOp { 1341 + did: seq_op.did.clone(), 1342 + cid: seq_op.cid.clone(), 1343 + created_at: seq_op.created_at, 1344 + nullified: seq_op.nullified, 1345 + operation: seq_op.operation.clone(), 1346 + }; 1347 + // we don't verify sigs for bulk, since pages might be out of order (and we trust for backfills) 1348 + count += db.insert_op::<false>(&op, seq_op.seq)?; 1390 1349 } 1391 1350 db.persist(PersistMode::Buffer)?; 1392 1351 Ok(count) ··· 1421 1380 Ok("backfill_to_fjall") 1422 1381 } 1423 1382 1424 - pub async fn pages_to_fjall( 1383 + /// Write sequenced ops (with PLC seq numbers) into fjall. 1384 + pub async fn seq_pages_to_fjall( 1425 1385 db: FjallDb, 1426 - mut pages: mpsc::Receiver<ExportPage>, 1386 + mut pages: mpsc::Receiver<crate::SeqPage>, 1427 1387 ) -> anyhow::Result<&'static str> { 1428 - log::info!("starting pages_to_fjall writer..."); 1388 + log::info!("starting seq_pages_to_fjall writer..."); 1429 1389 1430 1390 let t0 = Instant::now(); 1431 1391 let mut ops_inserted: usize = 0; 1432 1392 1433 1393 while let Some(page) = pages.recv().await { 1434 - log::trace!("writing page with {} ops", page.ops.len()); 1394 + log::trace!("writing seq page with {} ops", page.ops.len()); 1435 1395 let db = db.clone(); 1436 1396 let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 1437 1397 let mut count: usize = 0; 1438 - for op in &page.ops { 1439 - count += db.insert_op::<true>(op)?; 1398 + for seq_op in &page.ops { 1399 + let common_op = CommonOp { 1400 + did: seq_op.did.clone(), 1401 + cid: seq_op.cid.clone(), 1402 + created_at: seq_op.created_at, 1403 + nullified: seq_op.nullified, 1404 + operation: seq_op.operation.clone(), 1405 + }; 1406 + count += db.insert_op::<true>(&common_op, seq_op.seq)?; 1440 1407 } 1441 1408 db.persist(PersistMode::Buffer)?; 1442 1409 Ok(count) ··· 1446 1413 } 1447 1414 1448 1415 log::info!( 1449 - "no more pages. inserted {ops_inserted} ops in {:?}", 1416 + "no more seq pages. inserted {ops_inserted} ops in {:?}", 1450 1417 t0.elapsed() 1451 1418 ); 1452 - Ok("pages_to_fjall") 1419 + Ok("seq_pages_to_fjall") 1453 1420 } 1454 1421 1455 1422 pub async fn audit( ··· 1464 1431 t0.elapsed() 1465 1432 ); 1466 1433 if failed > 0 { 1467 - anyhow::bail!("audit found {failed} invalid operations"); 1434 + log::error!("audit found {failed} invalid operations"); 1468 1435 } 1469 1436 Ok("audit_fjall") 1470 1437 } ··· 1481 1448 1482 1449 let latest_at = db 1483 1450 .get_latest()? 1484 - .ok_or_else(|| anyhow::anyhow!("db not backfilled? expected at least one op"))?; 1451 + .ok_or_else(|| anyhow::anyhow!("db not backfilled? expected at least one op")) 1452 + .map(|(_, dt)| dt)?; 1453 + 1454 + // local seq counter for newly fetched ops 1455 + let mut next_seq = db.get_latest()?.map(|(s, _)| s).unwrap_or(0) + 1; 1485 1456 1486 1457 while let Some(op) = invalid_ops_rx.recv().await { 1487 1458 let InvalidOp { did, at, cid, .. } = op; ··· 1544 1515 continue; 1545 1516 } 1546 1517 1547 - count += db.insert_op::<true>(&op)?; 1518 + let seq = next_seq; 1519 + next_seq += 1; 1520 + count += db.insert_op::<true>(&op, seq)?; 1548 1521 } 1549 1522 1550 1523 db.persist(PersistMode::Buffer)?;
+159 -1
src/poll.rs
··· 1 - use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 1 + use crate::{CLIENT, Dt, ExportPage, Op, OpKey, SeqOp, SeqPage}; 2 2 use reqwest::Url; 3 3 use std::time::Duration; 4 4 use thiserror::Error; ··· 255 255 256 256 prev_last = next_last.or(prev_last); 257 257 } 258 + } 259 + 260 + /// Fetch one page of seq-based export from `/export?after=<seq>` 261 + async fn get_seq_page(url: Url) -> Result<SeqPage, GetPageError> { 262 + use futures::TryStreamExt; 263 + use tokio::io::{AsyncBufReadExt, BufReader}; 264 + use tokio_util::compat::FuturesAsyncReadCompatExt; 265 + 266 + log::trace!("getting seq page: {url}"); 267 + 268 + let res = CLIENT.get(url).send().await?.error_for_status()?; 269 + let stream = Box::pin( 270 + res.bytes_stream() 271 + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 272 + .into_async_read() 273 + .compat(), 274 + ); 275 + 276 + let mut lines = BufReader::new(stream).lines(); 277 + let mut ops = Vec::new(); 278 + 279 + loop { 280 + match lines.next_line().await { 281 + Ok(Some(line)) => { 282 + let line = line.trim(); 283 + if line.is_empty() { 284 + continue; 285 + } 286 + match serde_json::from_str::<SeqOp>(line) { 287 + Ok(op) => ops.push(op), 288 + Err(e) => log::warn!("failed to parse seq op: {e} ({line})"), 289 + } 290 + } 291 + Ok(None) => break, 292 + Err(e) => { 293 + log::warn!( 294 + "transport error mid-seq-page: {}; returning partial page", 295 + e 296 + ); 297 + break; 298 + } 299 + } 300 + } 301 + 302 + Ok(SeqPage { ops }) 303 + } 304 + 305 + /// Poll an upstream PLC server using seq-number-based cursoring 306 + /// 307 + /// Uses `/export?after=<seq>` — each op from the server carries a `seq` field 308 + /// which is a globally monotonic unsigned integer. Because seq is unique per op 309 + /// there is no need for page-boundary deduplication. 310 + /// 311 + /// Pages are sent to `dest`. Returns when the channel closes. 312 + pub async fn poll_upstream_seq( 313 + after: Option<u64>, 314 + base: Url, 315 + throttle: Duration, 316 + dest: mpsc::Sender<SeqPage>, 317 + ) -> anyhow::Result<&'static str> { 318 + log::info!("starting seq upstream poller at {base} after {after:?}"); 319 + let mut tick = tokio::time::interval(throttle); 320 + let mut last_seq: u64 = after.unwrap_or(0); 321 + 322 + loop { 323 + tick.tick().await; 324 + 325 + let mut url = base.clone(); 326 + url.query_pairs_mut() 327 + .append_pair("after", &last_seq.to_string()); 328 + 329 + let page = match get_seq_page(url).await { 330 + Ok(p) => p, 331 + Err(e) => { 332 + log::warn!("error polling upstream (seq): {e}"); 333 + continue; 334 + } 335 + }; 336 + 337 + if let Some(last) = page.ops.last() { 338 + last_seq = last.seq; 339 + } 340 + 341 + if !page.is_empty() { 342 + match dest.try_send(page) { 343 + Ok(()) => {} 344 + Err(mpsc::error::TrySendError::Full(page)) => { 345 + log::warn!("seq poll: destination channel full, awaiting..."); 346 + dest.send(page).await?; 347 + } 348 + e => e?, 349 + }; 350 + } 351 + } 352 + } 353 + 354 + /// Tail the upstream PLC `/export/stream` WebSocket endpoint 355 + /// 356 + /// `cursor` is a seq number to resume from. The server only supports backfill 357 + /// of up to ~1 week (server-configurable), so this cannot replay from seq 0. 358 + /// Use `poll_upstream_seq` to catch up first, then hand off to this function. 359 + /// 360 + /// Messages arrive as single-op `SeqPage`s sent to `dest`. Returns on 361 + /// disconnect so the caller can reconnect or fall back to polling. 362 + pub async fn tail_upstream_stream( 363 + cursor: Option<u64>, 364 + base: Url, 365 + dest: mpsc::Sender<SeqPage>, 366 + ) -> anyhow::Result<()> { 367 + use futures::StreamExt; 368 + use tokio_tungstenite::{connect_async, tungstenite::Message}; 369 + 370 + let mut url = base.clone(); 371 + // convert ws(s):// scheme if needed; some callers pass http(s):// 372 + let ws_scheme = match url.scheme() { 373 + "https" => "wss", 374 + "http" => "ws", 375 + _ => "ws", 376 + } 377 + .to_owned(); 378 + url.set_scheme(&ws_scheme) 379 + .map_err(|_| anyhow::anyhow!("failed to set websocket scheme"))?; 380 + if let Some(seq) = cursor { 381 + url.query_pairs_mut() 382 + .append_pair("cursor", &seq.to_string()); 383 + } 384 + 385 + log::info!("connecting to stream: {url}"); 386 + let (mut ws, _) = connect_async(url.as_str()).await?; 387 + log::info!("stream connected"); 388 + 389 + while let Some(msg) = ws.next().await { 390 + let msg = msg?; 391 + let text = match msg { 392 + Message::Text(t) => t, 393 + Message::Close(_) => { 394 + log::info!("stream closed by server"); 395 + break; 396 + } 397 + _ => continue, 398 + }; 399 + 400 + let op: SeqOp = match serde_json::from_str(&text) { 401 + Ok(op) => op, 402 + Err(e) => { 403 + log::warn!("failed to parse stream event: {e} ({text})"); 404 + continue; 405 + } 406 + }; 407 + 408 + let page = SeqPage { ops: vec![op] }; 409 + if dest.send(page).await.is_err() { 410 + log::info!("stream dest channel closed, stopping"); 411 + break; 412 + } 413 + } 414 + 415 + Ok(()) 258 416 } 259 417 260 418 #[cfg(test)]
-119
tests/fjall_mirror_test.rs
··· 1 - use allegedly::{ 2 - ExperimentalConf, FjallDb, ListenConf, backfill_to_fjall, bin::bin_init, poll_upstream, 3 - serve_fjall, 4 - }; 5 - use futures::TryFutureExt; 6 - use reqwest::{StatusCode, Url}; 7 - use std::time::Duration; 8 - use tokio::sync::mpsc; 9 - 10 - #[tokio::test] 11 - async fn test_fjall_mirror_mode() -> anyhow::Result<()> { 12 - bin_init(false); 13 - let temp_dir = tempfile::tempdir()?; 14 - let db_path = temp_dir.path().join("fjall.db"); 15 - let db = FjallDb::open(&db_path)?; 16 - 17 - // backfill (limited to 1 page) 18 - let (backfill_tx, backfill_rx) = mpsc::channel(1); 19 - let (upstream_tx, mut upstream_rx) = mpsc::channel(1); 20 - 21 - let upstream_url: Url = "https://plc.directory".parse()?; 22 - 23 - // spawn upstream poller 24 - tokio::spawn({ 25 - let mut base = upstream_url.clone(); 26 - base.set_path("/export"); 27 - async move { 28 - // poll fresh data so our data matches the upstream 29 - let start_at = chrono::Utc::now() - chrono::Duration::try_minutes(5).unwrap(); 30 - let _ = poll_upstream( 31 - Some(start_at), 32 - base, 33 - Duration::from_millis(100), 34 - upstream_tx, 35 - ) 36 - .inspect_err(|err| log::error!("failed to poll upstream: {err}")) 37 - .await; 38 - } 39 - }); 40 - 41 - log::info!("waiting for page from upstream..."); 42 - let page = upstream_rx 43 - .recv() 44 - .await 45 - .expect("to receive page from upstream"); 46 - log::info!("received page with {} ops", page.ops.len()); 47 - let sample_did = page.ops.last().unwrap().did.clone(); 48 - println!("will check did {sample_did}"); 49 - 50 - backfill_tx.send(page).await?; 51 - let backfill_handle = tokio::spawn(backfill_to_fjall(db.clone(), false, backfill_rx, None)); 52 - // since we are using a channel with 1 capacity, we can wait that the backfill task received 53 - // the page by reserving on the channel, and then drop the sender to signal the backfill task to finish 54 - let _ = backfill_tx.reserve().await; 55 - drop(backfill_tx); 56 - backfill_handle.await??; 57 - 58 - // todo: should probably use a random port here but shrug 59 - let listener = std::net::TcpListener::bind("127.0.0.1:17548")?; 60 - let port = listener.local_addr()?.port(); 61 - drop(listener); 62 - 63 - let listen_conf = ListenConf::Bind(([127, 0, 0, 1], port).into()); 64 - let exp_conf = ExperimentalConf { 65 - acme_domain: None, 66 - write_upstream: false, 67 - }; 68 - 69 - let server_handle = tokio::spawn({ 70 - let db = db.clone(); 71 - let upstream = upstream_url.clone(); 72 - serve_fjall(upstream, listen_conf, exp_conf, db) 73 - .inspect_err(|err| log::error!("failed to serve: {err}")) 74 - }); 75 - let base_url = format!("http://127.0.0.1:{}", port); 76 - 77 - // wait for server to be ready 78 - let client = reqwest::Client::new(); 79 - let health_url = format!("{base_url}/_health"); 80 - let mut ready = None; 81 - for _ in 0..50 { 82 - let resp = match client.get(&health_url).send().await { 83 - Ok(resp) => resp, 84 - Err(err) => { 85 - log::warn!("failed to get health: {err}"); 86 - continue; 87 - } 88 - }; 89 - if resp.status().is_success() { 90 - let json: serde_json::Value = resp.json().await?; 91 - ready = Some(json); 92 - break; 93 - } 94 - tokio::time::sleep(Duration::from_millis(100)).await; 95 - } 96 - assert!(ready.is_some(), "server failed to start"); 97 - assert_eq!(ready.unwrap()["server"], "allegedly (mirror/fjall)"); 98 - 99 - // verify did resolution against upstream 100 - let mut doc_url = upstream_url.clone(); 101 - doc_url.set_path(&format!("/{sample_did}")); 102 - let upstream_resp = client.get(doc_url).send().await?; 103 - assert_eq!(upstream_resp.status(), StatusCode::OK); 104 - let upstream_doc: serde_json::Value = upstream_resp.json().await?; 105 - 106 - let local_doc_url = format!("{base_url}/{sample_did}"); 107 - let resp = client.get(local_doc_url).send().await?; 108 - assert_eq!(resp.status(), StatusCode::OK); 109 - let doc: serde_json::Value = resp.json().await?; 110 - 111 - assert_eq!( 112 - doc, upstream_doc, 113 - "local doc != upstream doc.\nlocal: {:#?}\nupstream: {:#?}", 114 - doc, upstream_doc 115 - ); 116 - 117 - server_handle.abort(); 118 - Ok(()) 119 - }