Server tools to backfill, tail, mirror, and verify PLC logs

experimental: forward writes to upstream

+259 -67
+31 -3
src/bin/mirror.rs
··· 1 - use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 1 + use allegedly::{ 2 + Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, 3 + }; 2 4 use clap::Parser; 3 5 use reqwest::Url; 4 6 use std::{net::SocketAddr, path::PathBuf}; ··· 39 41 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 42 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 43 acme_directory_url: Url, 42 - /// listen for ipv6 44 + /// try to listen for ipv6 43 45 #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 44 46 acme_ipv6: bool, 47 + /// only accept experimental requests at this hostname 48 + /// 49 + /// a cert will be provisioned for it from letsencrypt. if you're not using 50 + /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 51 + #[arg( 52 + long, 53 + requires("acme_domain"), 54 + env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 55 + )] 56 + experimental_acme_domain: Option<String>, 57 + /// accept writes! by forwarding them upstream 58 + #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 59 + experimental_write_upstream: bool, 45 60 } 46 61 47 62 pub async fn run( ··· 55 70 acme_cache_path, 56 71 acme_directory_url, 57 72 acme_ipv6, 73 + experimental_acme_domain, 74 + experimental_write_upstream, 58 75 }: Args, 59 76 ) -> anyhow::Result<()> { 60 77 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; ··· 79 96 } 80 97 (bind, true, None) => ListenConf::Bind(bind), 81 98 (_, _, _) => unreachable!(), 99 + }; 100 + 101 + let experimental_conf = ExperimentalConf { 102 + acme_domain: experimental_acme_domain, 103 + write_upstream: experimental_write_upstream, 82 104 }; 83 105 84 106 let mut tasks = JoinSet::new(); ··· 90 112 91 113 tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 92 114 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 93 - tasks.spawn(serve(upstream, wrap, listen_conf, db.clone())); 115 + tasks.spawn(serve( 116 + upstream, 117 + wrap, 118 + listen_conf, 119 + experimental_conf, 120 + db.clone(), 121 + )); 94 122 95 123 while let Some(next) = tasks.join_next().await { 96 124 match next {
+2 -2
src/lib.rs
··· 15 15 pub use backfill::backfill; 16 16 pub use cached_value::{CachedValue, Fetcher}; 17 17 pub use client::{CLIENT, UA}; 18 - pub use mirror::{ListenConf, serve}; 18 + pub use mirror::{ExperimentalConf, ListenConf, serve}; 19 19 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20 20 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21 - pub use ratelimit::GovernorMiddleware; 21 + pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22 22 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 23 23 24 24 pub type Dt = chrono::DateTime<chrono::Utc>;
+138 -29
src/mirror.rs
··· 1 - use crate::{CachedValue, Db, Dt, Fetcher, GovernorMiddleware, UA, logo}; 1 + use crate::{ 2 + CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3 + }; 2 4 use futures::TryStreamExt; 3 5 use governor::Quota; 4 6 use poem::{ 5 - Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 - handler, 7 + Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 + get, handler, 7 9 http::StatusCode, 8 10 listener::{Listener, TcpListener, acme::AutoCert}, 9 11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 - web::{Data, Json}, 12 + web::{Data, Json, Path}, 11 13 }; 12 14 use reqwest::{Client, Url}; 13 15 use std::{net::SocketAddr, path::PathBuf, time::Duration}; ··· 19 21 upstream: Url, 20 22 latest_at: CachedValue<Dt, GetLatestAt>, 21 23 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 24 + experimental: ExperimentalConf, 22 25 } 23 26 24 27 #[handler] ··· 69 72 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 70 73 } 71 74 72 - fn failed_to_reach_wrapped() -> String { 75 + fn failed_to_reach_named(name: &str) -> String { 73 76 format!( 74 77 r#"{} 75 78 76 - Failed to reach the wrapped reference PLC server. Sorry. 79 + Failed to reach the {name} server. Sorry. 77 80 "#, 78 81 logo("mirror 502 :( ") 79 82 ) 80 83 } 81 84 85 + fn bad_create_op(reason: &str) -> Response { 86 + Response::builder() 87 + .status(StatusCode::BAD_REQUEST) 88 + .body(format!( 89 + r#"{} 90 + 91 + NooOOOooooo: {reason} 92 + "#, 93 + logo("mirror 400 >:( ") 94 + )) 95 + } 96 + 82 97 type PlcStatus = (bool, serde_json::Value); 83 98 84 99 async fn plc_status(url: &Url, client: &Client) -> PlcStatus { ··· 168 183 ) 169 184 } 170 185 186 + fn proxy_response(res: reqwest::Response) -> Response { 187 + let http_res: poem::http::Response<reqwest::Body> = res.into(); 188 + let (parts, reqw_body) = http_res.into_parts(); 189 + 190 + let parts = poem::ResponseParts { 191 + status: parts.status, 192 + version: parts.version, 193 + headers: parts.headers, 194 + extensions: parts.extensions, 195 + }; 196 + 197 + let body = http_body_util::BodyDataStream::new(reqw_body) 198 + .map_err(|e| std::io::Error::other(Box::new(e))); 199 + 200 + Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 201 + } 202 + 171 203 #[handler] 172 - async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 204 + async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 173 205 let mut target = state.plc.clone(); 174 206 target.set_path(req.uri().path()); 175 - let upstream_res = state 207 + let wrapped_res = state 176 208 .client 177 209 .get(target) 178 210 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 181 213 .await 182 214 .map_err(|e| { 183 215 log::error!("upstream req fail: {e}"); 184 - Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 216 + Error::from_string( 217 + failed_to_reach_named("wrapped reference PLC"), 218 + StatusCode::BAD_GATEWAY, 219 + ) 185 220 })?; 186 221 187 - let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 188 - let (parts, reqw_body) = http_res.into_parts(); 222 + Ok(proxy_response(wrapped_res)) 223 + } 189 224 190 - let parts = poem::ResponseParts { 191 - status: parts.status, 192 - version: parts.version, 193 - headers: parts.headers, 194 - extensions: parts.extensions, 195 - }; 225 + #[handler] 226 + async fn forward_create_op_upstream( 227 + Data(State { 228 + upstream, 229 + client, 230 + experimental, 231 + .. 232 + }): Data<&State>, 233 + Path(did): Path<String>, 234 + req: &Request, 235 + body: Body, 236 + ) -> Result<Response> { 237 + if let Some(expected_domain) = &experimental.acme_domain { 238 + let Some(found_host) = req.header("Host") else { 239 + return Ok(bad_create_op(&format!( 240 + "missing `Host` header, expected {expected_domain} for experimental requests." 241 + ))); 242 + }; 243 + if found_host != expected_domain { 244 + return Ok(bad_create_op(&format!( 245 + "experimental requests must be made to {expected_domain}, but this request's `Host` header was {found_host}" 246 + ))); 247 + } 248 + } 249 + 250 + // adjust proxied headers 251 + let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 252 + log::trace!("original request headers: {headers:?}"); 253 + headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 254 + let client_ua = headers 255 + .get("User-Agent") 256 + .map(|h| h.to_str().unwrap()) 257 + .unwrap_or("unknown"); 258 + headers.insert( 259 + "User-Agent", 260 + format!("{UA} (forwarding from {client_ua:?})") 261 + .parse() 262 + .unwrap(), 263 + ); 264 + log::trace!("adjusted request headers: {headers:?}"); 196 265 197 - let body = http_body_util::BodyDataStream::new(reqw_body) 198 - .map_err(|e| std::io::Error::other(Box::new(e))); 266 + let mut target = upstream.clone(); 267 + target.set_path(&did); 268 + let upstream_res = client 269 + .post(target) 270 + .timeout(Duration::from_secs(15)) // be a little generous 271 + .headers(headers) 272 + .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 273 + .send() 274 + .await 275 + .map_err(|e| { 276 + log::warn!("upstream write fail: {e}"); 277 + Error::from_string( 278 + failed_to_reach_named("upstream PLC"), 279 + StatusCode::BAD_GATEWAY, 280 + ) 281 + })?; 199 282 200 - Ok(Response::from_parts( 201 - parts, 202 - poem::Body::from_bytes_stream(body), 203 - )) 283 + Ok(proxy_response(upstream_res)) 204 284 } 205 285 206 286 #[handler] ··· 212 292 213 293 Sorry, this server does not accept POST requests. 214 294 215 - You may wish to try upstream: {upstream} 295 + You may wish to try sending that to our upstream: {upstream}. 296 + 297 + If you operate this server, try running with `--experimental-write-upstream`. 216 298 "#, 217 299 logo("mirror (nope)") 218 300 ), ··· 230 312 Bind(SocketAddr), 231 313 } 232 314 315 + #[derive(Debug, Clone)] 316 + pub struct ExperimentalConf { 317 + pub acme_domain: Option<String>, 318 + pub write_upstream: bool, 319 + } 320 + 233 321 pub async fn serve( 234 322 upstream: Url, 235 323 plc: Url, 236 324 listen: ListenConf, 325 + experimental: ExperimentalConf, 237 326 db: Db, 238 327 ) -> anyhow::Result<&'static str> { 239 328 log::info!("starting server..."); ··· 257 346 upstream: upstream.clone(), 258 347 latest_at, 259 348 upstream_status, 349 + experimental: experimental.clone(), 260 350 }; 261 351 262 - let app = Route::new() 352 + let mut app = Route::new() 263 353 .at("/", get(hello)) 264 354 .at("/favicon.ico", get(favicon)) 265 - .at("/_health", get(health)) 266 - .at("/:any", get(proxy).post(nope)) 355 + .at("/_health", get(health)); 356 + 357 + if experimental.write_upstream { 358 + log::info!("enabling experimental write forwarding to upstream"); 359 + 360 + let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 361 + let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 362 + 363 + let upstream_proxier = forward_create_op_upstream 364 + .with(GovernorMiddleware::new(did_limiter)) 365 + .with(GovernorMiddleware::new(ip_limiter)); 366 + 367 + app = app.at("/:any", get(proxy).post(upstream_proxier)); 368 + } else { 369 + app = app.at("/:any", get(proxy).post(nope)); 370 + } 371 + 372 + let app = app 267 373 .with(AddData::new(state)) 268 374 .with(Cors::new().allow_credentials(false)) 269 375 .with(Compression::new()) 270 - .with(GovernorMiddleware::new(Quota::per_minute( 376 + .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 271 377 3000.try_into().expect("ratelimit middleware to build"), 272 - ))) 378 + )))) 273 379 .with(CatchPanic::new()) 274 380 .with(Tracing); 275 381 ··· 288 394 .directory_url(directory_url) 289 395 .cache_path(cache_path); 290 396 for domain in domains { 397 + auto_cert = auto_cert.domain(domain); 398 + } 399 + if let Some(domain) = experimental.acme_domain { 291 400 auto_cert = auto_cert.domain(domain); 292 401 } 293 402 let auto_cert = auto_cert.build().expect("acme config to build");
+88 -33
src/ratelimit.rs
··· 8 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 9 use std::{ 10 10 convert::TryInto, 11 + hash::Hash, 11 12 net::{IpAddr, Ipv6Addr}, 12 13 sync::{Arc, LazyLock}, 13 14 time::Duration, ··· 20 21 type IP6_56 = [u8; 7]; 21 22 type IP6_48 = [u8; 6]; 22 23 24 + pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static { 25 + fn extract_key(&self, req: &Request) -> Result<K>; 26 + fn check_key(&self, ip: &K) -> Result<(), Duration>; 27 + fn housekeep(&self); 28 + } 29 + 23 30 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 31 let period = quota.replenish_interval() / factor; 25 32 let burst = quota ··· 30 37 } 31 38 32 39 #[derive(Debug)] 33 - struct IpLimiters { 40 + pub struct CreatePlcOpLimiter { 41 + limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>, 42 + } 43 + 44 + impl CreatePlcOpLimiter { 45 + pub fn new(quota: Quota) -> Self { 46 + Self { 47 + limiter: RateLimiter::keyed(quota), 48 + } 49 + } 50 + } 51 + 52 + /// this must be used with an endpoint with a single path param for the did 53 + impl Limiter<String> for CreatePlcOpLimiter { 54 + fn extract_key(&self, req: &Request) -> Result<String> { 55 + let (did,) = req.path_params::<(String,)>()?; 56 + Ok(did) 57 + } 58 + fn check_key(&self, did: &String) -> Result<(), Duration> { 59 + self.limiter 60 + .check_key(did) 61 + .map_err(|e| e.wait_time_from(CLOCK.now())) 62 + } 63 + fn housekeep(&self) { 64 + log::debug!( 65 + "limiter size before housekeeping: {} dids", 66 + self.limiter.len() 67 + ); 68 + self.limiter.retain_recent(); 69 + } 70 + } 71 + 72 + #[derive(Debug)] 73 + pub struct IpLimiters { 34 74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 44 84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 85 } 46 86 } 47 - pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 87 + } 88 + 89 + impl Limiter<IpAddr> for IpLimiters { 90 + fn extract_key(&self, req: &Request) -> Result<IpAddr> { 91 + Ok(req 92 + .remote_addr() 93 + .as_socket_addr() 94 + .expect("failed to get request's remote addr") // TODO 95 + .ip()) 96 + } 97 + fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> { 48 98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 99 match ip { 50 - addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 100 + addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf), 51 101 IpAddr::V6(a) => { 52 102 // always check all limiters 53 103 let check_ip = self ··· 74 124 } 75 125 } 76 126 } 127 + fn housekeep(&self) { 128 + log::debug!( 129 + "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 + self.per_ip.len(), 131 + self.ip6_56.len(), 132 + self.ip6_48.len(), 133 + ); 134 + self.per_ip.retain_recent(); 135 + self.ip6_56.retain_recent(); 136 + self.ip6_48.retain_recent(); 137 + } 77 138 } 78 139 79 140 /// Once the rate limit has been reached, the middleware will respond with 80 141 /// status code 429 (too many requests) and a `Retry-After` header with the amount 81 142 /// of time that needs to pass before another request will be allowed. 82 - #[derive(Debug)] 83 - pub struct GovernorMiddleware { 143 + // #[derive(Debug)] 144 + pub struct GovernorMiddleware<K> { 84 145 #[allow(dead_code)] 85 146 stop_on_drop: oneshot::Sender<()>, 86 - limiters: Arc<IpLimiters>, 147 + limiters: Arc<dyn Limiter<K>>, 87 148 } 88 149 89 - impl GovernorMiddleware { 150 + impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> { 90 151 /// Limit request rates 91 152 /// 92 153 /// a little gross but this spawns a tokio task for housekeeping: 93 154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 94 - pub fn new(quota: Quota) -> Self { 95 - let limiters = Arc::new(IpLimiters::new(quota)); 155 + pub fn new(limiters: impl Limiter<K>) -> Self { 156 + let limiters = Arc::new(limiters); 96 157 let (stop_on_drop, mut stopped) = oneshot::channel(); 97 158 tokio::task::spawn({ 98 159 let limiters = limiters.clone(); ··· 102 163 _ = &mut stopped => break, 103 164 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 104 165 }; 105 - log::debug!( 106 - "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 107 - limiters.per_ip.len(), 108 - limiters.ip6_56.len(), 109 - limiters.ip6_48.len(), 110 - ); 111 - limiters.per_ip.retain_recent(); 112 - limiters.ip6_56.retain_recent(); 113 - limiters.ip6_48.retain_recent(); 166 + limiters.housekeep(); 114 167 } 115 168 } 116 169 }); ··· 121 174 } 122 175 } 123 176 124 - impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 125 - type Output = GovernorMiddlewareImpl<E>; 177 + impl<E, K> Middleware<E> for GovernorMiddleware<K> 178 + where 179 + E: Endpoint, 180 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 181 + { 182 + type Output = GovernorMiddlewareImpl<E, K>; 126 183 fn transform(&self, ep: E) -> Self::Output { 127 184 GovernorMiddlewareImpl { 128 185 ep, ··· 131 188 } 132 189 } 133 190 134 - pub struct GovernorMiddlewareImpl<E> { 191 + pub struct GovernorMiddlewareImpl<E, K> { 135 192 ep: E, 136 - limiters: Arc<IpLimiters>, 193 + limiters: Arc<dyn Limiter<K>>, 137 194 } 138 195 139 - impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 196 + impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K> 197 + where 198 + E: Endpoint, 199 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 200 + { 140 201 type Output = E::Output; 141 202 142 203 async fn call(&self, req: Request) -> Result<Self::Output> { 143 - let remote = req 144 - .remote_addr() 145 - .as_socket_addr() 146 - .expect("failed to get request's remote addr") // TODO 147 - .ip(); 204 + let key = self.limiters.extract_key(&req)?; 148 205 149 - log::trace!("remote: {remote}"); 150 - 151 - match self.limiters.check_key(remote) { 206 + match self.limiters.check_key(&key) { 152 207 Ok(_) => { 153 - log::debug!("allowing remote {remote}"); 208 + log::debug!("allowing key {key:?}"); 154 209 self.ep.call(req).await 155 210 } 156 211 Err(d) => { 157 212 let wait_time = d.as_secs(); 158 213 159 - log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 214 + log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 160 215 161 216 let res = Response::builder() 162 217 .status(StatusCode::TOO_MANY_REQUESTS)