Server tools to backfill, tail, mirror, and verify PLC logs

ratelimit: handle ipv6

+70 -64
+4 -1
src/mirror.rs
··· 1 1 use crate::{GovernorMiddleware, logo}; 2 2 use futures::TryStreamExt; 3 + use governor::Quota; 3 4 use poem::{ 4 5 EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, handler, 5 6 http::StatusCode, ··· 127 128 .with(AddData::new(state)) 128 129 .with(Cors::new().allow_credentials(false)) 129 130 .with(Compression::new()) 130 - .with(GovernorMiddleware::per_minute(3000).unwrap()) 131 + .with(GovernorMiddleware::new(Quota::per_minute( 132 + 3000.try_into().unwrap(), 133 + ))) 131 134 .with(CatchPanic::new()) 132 135 .with(Tracing); 133 136
+66 -63
src/ratelimit.rs
··· 1 1 use crate::logo; 2 2 3 3 use governor::{ 4 - Quota, RateLimiter, 4 + NotUntil, Quota, RateLimiter, 5 5 clock::{Clock, DefaultClock}, 6 6 state::keyed::DefaultKeyedStateStore, 7 7 }; 8 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 9 use std::{ 10 - convert::TryInto, error::Error, net::IpAddr, num::NonZeroU32, sync::Arc, sync::LazyLock, 10 + convert::TryInto, 11 + net::{IpAddr, Ipv6Addr}, 12 + sync::{Arc, LazyLock}, 11 13 time::Duration, 12 14 }; 13 15 14 16 static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default); 15 17 18 + const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000); 19 + type IP6_56 = [u8; 7]; 20 + type IP6_48 = [u8; 6]; 21 + 22 + fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 23 + let period = quota.replenish_interval() / factor; 24 + let burst = quota 25 + .burst_size() 26 + .checked_mul(factor.try_into().unwrap()) 27 + .unwrap(); 28 + Quota::with_period(period).map(|q| q.allow_burst(burst)) 29 + } 30 + 31 + #[derive(Debug)] 32 + struct IpLimiters { 33 + per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 34 + ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 35 + ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, 36 + } 37 + 38 + impl IpLimiters { 39 + pub fn new(quota: Quota) -> Self { 40 + Self { 41 + per_ip: RateLimiter::keyed(quota), 42 + ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()), 43 + ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()), 44 + } 45 + } 46 + pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 47 + let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 48 + match ip { 49 + addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 50 + IpAddr::V6(a) => { 51 + // always check all limiters 52 + let check_ip = self 53 + .per_ip 54 + .check_key(&IpAddr::V6(a & IP6_64_MASK)) 55 + .map_err(asdf); 56 + let check_56 = self 57 + .ip6_56 58 + .check_key(a.octets()[..7].try_into().unwrap()) 59 + .map_err(asdf); 60 + let check_48 = self 61 + .ip6_48 62 + .check_key(a.octets()[..6].try_into().unwrap()) 63 + .map_err(asdf); 64 + check_ip.and(check_56).and(check_48) 65 + } 66 + } 67 + } 68 + } 69 + 16 70 /// Once the rate limit has been reached, the middleware will respond with 17 71 /// status code 429 (too many requests) and a `Retry-After` header with the amount 18 72 /// of time that needs to pass before another request will be allowed. 19 73 #[derive(Debug, Clone)] 20 74 pub struct GovernorMiddleware { 21 - limiter: Arc<RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>>, 75 + limiters: Arc<IpLimiters>, 22 76 } 23 77 24 78 impl GovernorMiddleware { 25 - /// Constructs a rate-limiting middleware from a [`Duration`] that allows one request in the given time interval. 26 - /// 27 - /// If the time interval is zero, returns `None`. 28 - #[must_use] 29 - pub fn with_period(duration: Duration) -> Option<Self> { 30 - Some(Self { 31 - limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::with_period( 32 - duration, 33 - )?)), 34 - }) 35 - } 36 - 37 - /// Constructs a rate-limiting middleware that allows a specified number of requests every second. 38 - /// 39 - /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 40 - pub fn per_second<T>(times: T) -> Result<Self> 41 - where 42 - T: TryInto<NonZeroU32>, 43 - T::Error: Error + Send + Sync + 'static, 44 - { 45 - Ok(Self { 46 - limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_second( 47 - times.try_into().unwrap(), // TODO 48 - ))), 49 - }) 50 - } 51 - 52 - /// Constructs a rate-limiting middleware that allows a specified number of requests every minute. 53 - /// 54 - /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 55 - pub fn per_minute<T>(times: T) -> Result<Self> 56 - where 57 - T: TryInto<NonZeroU32>, 58 - T::Error: Error + Send + Sync + 'static, 59 - { 60 - Ok(Self { 61 - limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_minute( 62 - times.try_into().unwrap(), // TODO 63 - ))), 64 - }) 65 - } 66 - 67 - /// Constructs a rate-limiting middleware that allows a specified number of requests every hour. 68 - /// 69 - /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 70 - pub fn per_hour<T>(times: T) -> Result<Self> 71 - where 72 - T: TryInto<NonZeroU32>, 73 - T::Error: Error + Send + Sync + 'static, 74 - { 75 - Ok(Self { 76 - limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_hour( 77 - times.try_into().unwrap(), // TODO 78 - ))), 79 - }) 79 + pub fn new(quota: Quota) -> Self { 80 + Self { 81 + limiters: Arc::new(IpLimiters::new(quota)), 82 + } 80 83 } 81 84 } 82 85 ··· 85 88 fn transform(&self, ep: E) -> Self::Output { 86 89 GovernorMiddlewareImpl { 87 90 ep, 88 - limiter: self.limiter.clone(), 91 + limiters: self.limiters.clone(), 89 92 } 90 93 } 91 94 } 92 95 93 96 pub struct GovernorMiddlewareImpl<E> { 94 97 ep: E, 95 - limiter: Arc<RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>>, 98 + limiters: Arc<IpLimiters>, 96 99 } 97 100 98 101 impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { ··· 107 110 108 111 log::trace!("remote: {remote}"); 109 112 110 - match self.limiter.check_key(&remote) { 113 + match self.limiters.check_key(remote) { 111 114 Ok(_) => { 112 115 log::debug!("allowing remote {remote}"); 113 116 self.ep.call(req).await 114 117 } 115 - Err(negative) => { 116 - let wait_time = negative.wait_time_from(CLOCK.now()).as_secs(); 118 + Err(d) => { 119 + let wait_time = d.as_secs(); 117 120 118 121 log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 119 122