Server tools to backfill, tail, mirror, and verify PLC logs

make the self-rate-limit interval configurable

+39 -17
+1
Cargo.lock
··· 2057 2057 source = "registry+https://github.com/rust-lang/crates.io-index" 2058 2058 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2059 2059 dependencies = [ 2060 + "async-compression", 2060 2061 "base64", 2061 2062 "bytes", 2062 2063 "encoding_rs",
+1 -1
Cargo.toml
··· 18 18 native-tls = "0.2.14" 19 19 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 20 postgres-native-tls = "0.5.1" 21 - reqwest = { version = "0.12.23", features = ["stream", "json"] } 21 + reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 22 22 reqwest-middleware = "0.4.2" 23 23 reqwest-retry = "0.7.0" 24 24 rustls = "0.23.32"
+5 -3
src/bin/allegedly.rs
··· 1 1 use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 2 use clap::{CommandFactory, Parser, Subcommand}; 3 - use std::{path::PathBuf, time::Instant}; 3 + use std::{path::PathBuf, time::Duration, time::Instant}; 4 4 use tokio::fs::create_dir_all; 5 5 use tokio::sync::mpsc; 6 6 ··· 76 76 } => { 77 77 let mut url = globals.upstream; 78 78 url.set_path("/export"); 79 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 79 80 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 80 81 tokio::task::spawn(async move { 81 - poll_upstream(Some(after), url, tx) 82 + poll_upstream(Some(after), url, throttle, tx) 82 83 .await 83 84 .expect("to poll upstream") 84 85 }); ··· 95 96 let mut url = globals.upstream; 96 97 url.set_path("/export"); 97 98 let start_at = after.or_else(|| Some(chrono::Utc::now())); 99 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 98 100 let (tx, rx) = mpsc::channel(1); 99 101 tokio::task::spawn(async move { 100 - poll_upstream(start_at, url, tx) 102 + poll_upstream(start_at, url, throttle, tx) 101 103 .await 102 104 .expect("to poll upstream") 103 105 });
+10 -4
src/bin/backfill.rs
··· 4 4 }; 5 5 use clap::Parser; 6 6 use reqwest::Url; 7 - use std::path::PathBuf; 7 + use std::{path::PathBuf, time::Duration}; 8 8 use tokio::{ 9 9 sync::{mpsc, oneshot}, 10 10 task::JoinSet, ··· 53 53 } 54 54 55 55 pub async fn run( 56 - GlobalArgs { upstream }: GlobalArgs, 56 + GlobalArgs { 57 + upstream, 58 + upstream_throttle_ms, 59 + }: GlobalArgs, 57 60 Args { 58 61 http, 59 62 dir, ··· 98 101 } 99 102 let mut upstream = upstream; 100 103 upstream.set_path("/export"); 101 - tasks.spawn(poll_upstream(None, upstream, poll_tx)); 104 + let throttle = Duration::from_millis(upstream_throttle_ms); 105 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 102 106 tasks.spawn(full_pages(poll_out, full_tx)); 103 107 tasks.spawn(pages_to_stdout(full_out, None)); 104 108 } else { ··· 128 132 129 133 // and the catch-up source... 130 134 if let Some(last) = found_last_out { 135 + let throttle = Duration::from_millis(upstream_throttle_ms); 131 136 tasks.spawn(async move { 132 137 let mut upstream = upstream; 133 138 upstream.set_path("/export"); 134 - poll_upstream(last.await?, upstream, poll_tx).await 139 + 140 + poll_upstream(last.await?, upstream, throttle, poll_tx).await 135 141 }); 136 142 } 137 143
+7 -3
src/bin/mirror.rs
··· 3 3 }; 4 4 use clap::Parser; 5 5 use reqwest::Url; 6 - use std::{net::SocketAddr, path::PathBuf}; 6 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 7 7 use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 8 8 9 9 #[derive(Debug, clap::Args)] ··· 60 60 } 61 61 62 62 pub async fn run( 63 - GlobalArgs { upstream }: GlobalArgs, 63 + GlobalArgs { 64 + upstream, 65 + upstream_throttle_ms, 66 + }: GlobalArgs, 64 67 Args { 65 68 wrap, 66 69 wrap_pg, ··· 113 116 114 117 let mut poll_url = upstream.clone(); 115 118 poll_url.set_path("/export"); 119 + let throttle = Duration::from_millis(upstream_throttle_ms); 116 120 117 - tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 121 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 118 122 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 119 123 tasks.spawn(serve( 120 124 upstream,
+6
src/bin/mod.rs
··· 6 6 #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 7 #[clap(default_value = "https://plc.directory")] 8 8 pub upstream: Url, 9 + /// Self-rate-limit upstream request interval 10 + /// 11 + /// plc.directory's rate limiting is 500 requests per 5 mins (600ms) 12 + #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 13 + #[clap(default_value = "600")] 14 + pub upstream_throttle_ms: u64, 9 15 } 10 16 11 17 #[allow(dead_code)]
+1
src/client.rs
··· 12 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 13 let inner = Client::builder() 14 14 .user_agent(UA) 15 + .gzip(true) 15 16 .build() 16 17 .expect("reqwest client to build"); 17 18
+8 -6
src/poll.rs
··· 4 4 use thiserror::Error; 5 5 use tokio::sync::mpsc; 6 6 7 - // plc.directory ratelimit on /export is 500 per 5 mins 8 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 - 10 7 #[derive(Debug, Error)] 11 8 pub enum GetPageError { 12 9 #[error(transparent)] ··· 141 138 .split('\n') 142 139 .filter_map(|s| { 143 140 serde_json::from_str::<Op>(s) 144 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 141 + .inspect_err(|e| { 142 + if !s.is_empty() { 143 + log::warn!("failed to parse op: {e} ({s})") 144 + } 145 + }) 145 146 .ok() 146 147 }) 147 148 .collect(); ··· 154 155 pub async fn poll_upstream( 155 156 after: Option<Dt>, 156 157 base: Url, 158 + throttle: Duration, 157 159 dest: mpsc::Sender<ExportPage>, 158 160 ) -> anyhow::Result<&'static str> { 159 - log::info!("starting upstream poller after {after:?}"); 160 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 161 + log::info!("starting upstream poller at {base} after {after:?}"); 162 + let mut tick = tokio::time::interval(throttle); 161 163 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 164 let mut boundary_state: Option<PageBoundaryState> = None; 163 165 loop {