Server tools to backfill, tail, mirror, and verify PLC logs
at main 237 lines 7.3 kB view raw
1use crate::logo; 2 3use governor::{ 4 NotUntil, Quota, RateLimiter, 5 clock::{Clock, DefaultClock}, 6 state::keyed::DefaultKeyedStateStore, 7}; 8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9use std::{ 10 convert::TryInto, 11 hash::Hash, 12 net::{IpAddr, Ipv6Addr}, 13 sync::{Arc, LazyLock}, 14 time::Duration, 15}; 16use tokio::sync::oneshot; 17 18static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default); 19 20const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000); 21type IP6_56 = [u8; 7]; 22type IP6_48 = [u8; 6]; 23 24pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static { 25 fn extract_key(&self, req: &Request) -> Result<K>; 26 fn check_key(&self, ip: &K) -> Result<(), Duration>; 27 fn housekeep(&self); 28} 29 30fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 31 let period = quota.replenish_interval() / factor; 32 let burst = quota 33 .burst_size() 34 .checked_mul(factor.try_into().expect("factor to be non-zero")) 35 .expect("burst to be able to multiply"); 36 Quota::with_period(period).map(|q| q.allow_burst(burst)) 37} 38 39#[derive(Debug)] 40pub struct CreatePlcOpLimiter { 41 limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>, 42} 43 44impl CreatePlcOpLimiter { 45 pub fn new(quota: Quota) -> Self { 46 Self { 47 limiter: RateLimiter::keyed(quota), 48 } 49 } 50} 51 52/// this must be used with an endpoint with a single path param for the did 53impl Limiter<String> for CreatePlcOpLimiter { 54 fn extract_key(&self, req: &Request) -> Result<String> { 55 let (did,) = req.path_params::<(String,)>()?; 56 Ok(did) 57 } 58 fn check_key(&self, did: &String) -> Result<(), Duration> { 59 self.limiter 60 .check_key(did) 61 .map_err(|e| e.wait_time_from(CLOCK.now())) 62 } 63 fn housekeep(&self) { 64 log::debug!( 65 "limiter size before housekeeping: {} dids", 66 self.limiter.len() 67 ); 68 self.limiter.retain_recent(); 69 } 70} 71 72#[derive(Debug)] 73pub struct IpLimiters { 74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, 77} 78 79impl IpLimiters { 80 pub fn new(quota: Quota) -> Self { 81 Self { 82 per_ip: RateLimiter::keyed(quota), 83 ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 85 } 86 } 87} 88 89impl Limiter<IpAddr> for IpLimiters { 90 fn extract_key(&self, req: &Request) -> Result<IpAddr> { 91 Ok(req 92 .remote_addr() 93 .as_socket_addr() 94 .expect("failed to get request's remote addr") // TODO 95 .ip()) 96 } 97 fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> { 98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 99 match ip { 100 addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf), 101 IpAddr::V6(a) => { 102 // always check all limiters 103 let check_ip = self 104 .per_ip 105 .check_key(&IpAddr::V6(a & IP6_64_MASK)) 106 .map_err(asdf); 107 let check_56 = self 108 .ip6_56 109 .check_key( 110 a.octets()[..7] 111 .try_into() 112 .expect("to check ip6 /56 limiter"), 113 ) 114 .map_err(asdf); 115 let check_48 = self 116 .ip6_48 117 .check_key( 118 a.octets()[..6] 119 .try_into() 120 .expect("to check ip6 /48 limiter"), 121 ) 122 .map_err(asdf); 123 check_ip.and(check_56).and(check_48) 124 } 125 } 126 } 127 fn housekeep(&self) { 128 log::debug!( 129 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 self.per_ip.len(), 131 self.ip6_56.len(), 132 self.ip6_48.len(), 133 ); 134 self.per_ip.retain_recent(); 135 self.ip6_56.retain_recent(); 136 self.ip6_48.retain_recent(); 137 } 138} 139 140/// Once the rate limit has been reached, the middleware will respond with 141/// status code 429 (too many requests) and a `Retry-After` header with the amount 142/// of time that needs to pass before another request will be allowed. 143// #[derive(Debug)] 144pub struct GovernorMiddleware<K> { 145 #[allow(dead_code)] 146 stop_on_drop: oneshot::Sender<()>, 147 limiters: Arc<dyn Limiter<K>>, 148} 149 150impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> { 151 /// Limit request rates 152 /// 153 /// a little gross but this spawns a tokio task for housekeeping: 154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 155 pub fn new(limiters: impl Limiter<K>) -> Self { 156 let limiters = Arc::new(limiters); 157 let (stop_on_drop, mut stopped) = oneshot::channel(); 158 tokio::task::spawn({ 159 let limiters = limiters.clone(); 160 async move { 161 loop { 162 tokio::select! { 163 _ = &mut stopped => break, 164 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 165 }; 166 limiters.housekeep(); 167 } 168 } 169 }); 170 Self { 171 stop_on_drop, 172 limiters, 173 } 174 } 175} 176 177impl<E, K> Middleware<E> for GovernorMiddleware<K> 178where 179 E: Endpoint, 180 K: Hash + std::fmt::Debug + Send + Sync + 'static, 181{ 182 type Output = GovernorMiddlewareImpl<E, K>; 183 fn transform(&self, ep: E) -> Self::Output { 184 GovernorMiddlewareImpl { 185 ep, 186 limiters: self.limiters.clone(), 187 } 188 } 189} 190 191pub struct GovernorMiddlewareImpl<E, K> { 192 ep: E, 193 limiters: Arc<dyn Limiter<K>>, 194} 195 196impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K> 197where 198 E: Endpoint, 199 K: Hash + std::fmt::Debug + Send + Sync + 'static, 200{ 201 type Output = E::Output; 202 203 async fn call(&self, req: Request) -> Result<Self::Output> { 204 let key = self.limiters.extract_key(&req)?; 205 206 match self.limiters.check_key(&key) { 207 Ok(_) => { 208 log::debug!("allowing key {key:?}"); 209 self.ep.call(req).await 210 } 211 Err(d) => { 212 let wait_time = d.as_secs(); 213 214 log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 215 216 let res = Response::builder() 217 .status(StatusCode::TOO_MANY_REQUESTS) 218 .header("x-ratelimit-after", wait_time) 219 .header("retry-after", wait_time) 220 .body(booo()); 221 Err(poem::Error::from_response(res)) 222 } 223 } 224 } 225} 226 227fn booo() -> String { 228 format!( 229 r#"{} 230 231You're going a bit too fast. 232 233Tip: check out the `x-ratelimit-after` response header. 234"#, 235 logo("mirror 429") 236 ) 237}