Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::logo;
2
3use governor::{
4 NotUntil, Quota, RateLimiter,
5 clock::{Clock, DefaultClock},
6 state::keyed::DefaultKeyedStateStore,
7};
8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode};
9use std::{
10 convert::TryInto,
11 net::{IpAddr, Ipv6Addr},
12 sync::{Arc, LazyLock},
13 time::Duration,
14};
15use tokio::sync::oneshot;
16
17static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default);
18
19const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000);
20type IP6_56 = [u8; 7];
21type IP6_48 = [u8; 6];
22
23fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> {
24 let period = quota.replenish_interval() / factor;
25 let burst = quota
26 .burst_size()
27 .checked_mul(factor.try_into().expect("factor to be non-zero"))
28 .expect("burst to be able to multiply");
29 Quota::with_period(period).map(|q| q.allow_burst(burst))
30}
31
32#[derive(Debug)]
33struct IpLimiters {
34 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>,
35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>,
36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>,
37}
38
39impl IpLimiters {
40 pub fn new(quota: Quota) -> Self {
41 Self {
42 per_ip: RateLimiter::keyed(quota),
43 ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")),
44 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
45 }
46 }
47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
48 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now());
49 match ip {
50 addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf),
51 IpAddr::V6(a) => {
52 // always check all limiters
53 let check_ip = self
54 .per_ip
55 .check_key(&IpAddr::V6(a & IP6_64_MASK))
56 .map_err(asdf);
57 let check_56 = self
58 .ip6_56
59 .check_key(
60 a.octets()[..7]
61 .try_into()
62 .expect("to check ip6 /56 limiter"),
63 )
64 .map_err(asdf);
65 let check_48 = self
66 .ip6_48
67 .check_key(
68 a.octets()[..6]
69 .try_into()
70 .expect("to check ip6 /48 limiter"),
71 )
72 .map_err(asdf);
73 check_ip.and(check_56).and(check_48)
74 }
75 }
76 }
77}
78
79/// Once the rate limit has been reached, the middleware will respond with
80/// status code 429 (too many requests) and a `Retry-After` header with the amount
81/// of time that needs to pass before another request will be allowed.
82#[derive(Debug)]
83pub struct GovernorMiddleware {
84 #[allow(dead_code)]
85 stop_on_drop: oneshot::Sender<()>,
86 limiters: Arc<IpLimiters>,
87}
88
89impl GovernorMiddleware {
90 /// Limit request rates
91 ///
92 /// a little gross but this spawns a tokio task for housekeeping:
93 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping
94 pub fn new(quota: Quota) -> Self {
95 let limiters = Arc::new(IpLimiters::new(quota));
96 let (stop_on_drop, mut stopped) = oneshot::channel();
97 tokio::task::spawn({
98 let limiters = limiters.clone();
99 async move {
100 loop {
101 tokio::select! {
102 _ = &mut stopped => break,
103 _ = tokio::time::sleep(Duration::from_secs(60)) => {},
104 };
105 log::debug!(
106 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
107 limiters.per_ip.len(),
108 limiters.ip6_56.len(),
109 limiters.ip6_48.len(),
110 );
111 limiters.per_ip.retain_recent();
112 limiters.ip6_56.retain_recent();
113 limiters.ip6_48.retain_recent();
114 }
115 }
116 });
117 Self {
118 stop_on_drop,
119 limiters,
120 }
121 }
122}
123
124impl<E: Endpoint> Middleware<E> for GovernorMiddleware {
125 type Output = GovernorMiddlewareImpl<E>;
126 fn transform(&self, ep: E) -> Self::Output {
127 GovernorMiddlewareImpl {
128 ep,
129 limiters: self.limiters.clone(),
130 }
131 }
132}
133
134pub struct GovernorMiddlewareImpl<E> {
135 ep: E,
136 limiters: Arc<IpLimiters>,
137}
138
139impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> {
140 type Output = E::Output;
141
142 async fn call(&self, req: Request) -> Result<Self::Output> {
143 let remote = req
144 .remote_addr()
145 .as_socket_addr()
146 .expect("failed to get request's remote addr") // TODO
147 .ip();
148
149 log::trace!("remote: {remote}");
150
151 match self.limiters.check_key(remote) {
152 Ok(_) => {
153 log::debug!("allowing remote {remote}");
154 self.ep.call(req).await
155 }
156 Err(d) => {
157 let wait_time = d.as_secs();
158
159 log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s");
160
161 let res = Response::builder()
162 .status(StatusCode::TOO_MANY_REQUESTS)
163 .header("x-ratelimit-after", wait_time)
164 .header("retry-after", wait_time)
165 .body(booo());
166 Err(poem::Error::from_response(res))
167 }
168 }
169 }
170}
171
172fn booo() -> String {
173 format!(
174 r#"{}
175
176You're going a bit too fast.
177
178Tip: check out the `x-ratelimit-after` response header.
179"#,
180 logo("mirror 429")
181 )
182}