Server tools to backfill, tail, mirror, and verify PLC logs
at debug 182 lines 5.9 kB view raw
1use crate::logo; 2 3use governor::{ 4 NotUntil, Quota, RateLimiter, 5 clock::{Clock, DefaultClock}, 6 state::keyed::DefaultKeyedStateStore, 7}; 8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9use std::{ 10 convert::TryInto, 11 net::{IpAddr, Ipv6Addr}, 12 sync::{Arc, LazyLock}, 13 time::Duration, 14}; 15use tokio::sync::oneshot; 16 17static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default); 18 19const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000); 20type IP6_56 = [u8; 7]; 21type IP6_48 = [u8; 6]; 22 23fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 let period = quota.replenish_interval() / factor; 25 let burst = quota 26 .burst_size() 27 .checked_mul(factor.try_into().expect("factor to be non-zero")) 28 .expect("burst to be able to multiply"); 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30} 31 32#[derive(Debug)] 33struct IpLimiters { 34 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, 37} 38 39impl IpLimiters { 40 pub fn new(quota: Quota) -> Self { 41 Self { 42 per_ip: RateLimiter::keyed(quota), 43 ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 44 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 } 46 } 47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 48 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 match ip { 50 addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 51 IpAddr::V6(a) => { 52 // always check all limiters 53 let check_ip = self 54 .per_ip 55 .check_key(&IpAddr::V6(a & IP6_64_MASK)) 56 .map_err(asdf); 57 let check_56 = self 58 .ip6_56 59 .check_key( 60 a.octets()[..7] 61 .try_into() 62 .expect("to check ip6 /56 limiter"), 63 ) 64 .map_err(asdf); 65 let check_48 = self 66 .ip6_48 67 .check_key( 68 a.octets()[..6] 69 .try_into() 70 .expect("to check ip6 /48 limiter"), 71 ) 72 .map_err(asdf); 73 check_ip.and(check_56).and(check_48) 74 } 75 } 76 } 77} 78 79/// Once the rate limit has been reached, the middleware will respond with 80/// status code 429 (too many requests) and a `Retry-After` header with the amount 81/// of time that needs to pass before another request will be allowed. 82#[derive(Debug)] 83pub struct GovernorMiddleware { 84 #[allow(dead_code)] 85 stop_on_drop: oneshot::Sender<()>, 86 limiters: Arc<IpLimiters>, 87} 88 89impl GovernorMiddleware { 90 /// Limit request rates 91 /// 92 /// a little gross but this spawns a tokio task for housekeeping: 93 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 94 pub fn new(quota: Quota) -> Self { 95 let limiters = Arc::new(IpLimiters::new(quota)); 96 let (stop_on_drop, mut stopped) = oneshot::channel(); 97 tokio::task::spawn({ 98 let limiters = limiters.clone(); 99 async move { 100 loop { 101 tokio::select! { 102 _ = &mut stopped => break, 103 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 104 }; 105 log::debug!( 106 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 107 limiters.per_ip.len(), 108 limiters.ip6_56.len(), 109 limiters.ip6_48.len(), 110 ); 111 limiters.per_ip.retain_recent(); 112 limiters.ip6_56.retain_recent(); 113 limiters.ip6_48.retain_recent(); 114 } 115 } 116 }); 117 Self { 118 stop_on_drop, 119 limiters, 120 } 121 } 122} 123 124impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 125 type Output = GovernorMiddlewareImpl<E>; 126 fn transform(&self, ep: E) -> Self::Output { 127 GovernorMiddlewareImpl { 128 ep, 129 limiters: self.limiters.clone(), 130 } 131 } 132} 133 134pub struct GovernorMiddlewareImpl<E> { 135 ep: E, 136 limiters: Arc<IpLimiters>, 137} 138 139impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 140 type Output = E::Output; 141 142 async fn call(&self, req: Request) -> Result<Self::Output> { 143 let remote = req 144 .remote_addr() 145 .as_socket_addr() 146 .expect("failed to get request's remote addr") // TODO 147 .ip(); 148 149 log::trace!("remote: {remote}"); 150 151 match self.limiters.check_key(remote) { 152 Ok(_) => { 153 log::debug!("allowing remote {remote}"); 154 self.ep.call(req).await 155 } 156 Err(d) => { 157 let wait_time = d.as_secs(); 158 159 log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 160 161 let res = Response::builder() 162 .status(StatusCode::TOO_MANY_REQUESTS) 163 .header("x-ratelimit-after", wait_time) 164 .header("retry-after", wait_time) 165 .body(booo()); 166 Err(poem::Error::from_response(res)) 167 } 168 } 169 } 170} 171 172fn booo() -> String { 173 format!( 174 r#"{} 175 176You're going a bit too fast. 177 178Tip: check out the `x-ratelimit-after` response header. 179"#, 180 logo("mirror 429") 181 ) 182}