Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::logo;
2
3use governor::{
4 NotUntil, Quota, RateLimiter,
5 clock::{Clock, DefaultClock},
6 state::keyed::DefaultKeyedStateStore,
7};
8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode};
9use std::{
10 convert::TryInto,
11 hash::Hash,
12 net::{IpAddr, Ipv6Addr},
13 sync::{Arc, LazyLock},
14 time::Duration,
15};
16use tokio::sync::oneshot;
17
18static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default);
19
20const IP6_64_MASK: Ipv6Addr = Ipv6Addr::from_bits(0xFFFF_FFFF_FFFF_FFFF_0000_0000_0000_0000);
21type IP6_56 = [u8; 7];
22type IP6_48 = [u8; 6];
23
24pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static {
25 fn extract_key(&self, req: &Request) -> Result<K>;
26 fn check_key(&self, ip: &K) -> Result<(), Duration>;
27 fn housekeep(&self);
28}
29
30fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> {
31 let period = quota.replenish_interval() / factor;
32 let burst = quota
33 .burst_size()
34 .checked_mul(factor.try_into().expect("factor to be non-zero"))
35 .expect("burst to be able to multiply");
36 Quota::with_period(period).map(|q| q.allow_burst(burst))
37}
38
39#[derive(Debug)]
40pub struct CreatePlcOpLimiter {
41 limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>,
42}
43
44impl CreatePlcOpLimiter {
45 pub fn new(quota: Quota) -> Self {
46 Self {
47 limiter: RateLimiter::keyed(quota),
48 }
49 }
50}
51
52/// this must be used with an endpoint with a single path param for the did
53impl Limiter<String> for CreatePlcOpLimiter {
54 fn extract_key(&self, req: &Request) -> Result<String> {
55 let (did,) = req.path_params::<(String,)>()?;
56 Ok(did)
57 }
58 fn check_key(&self, did: &String) -> Result<(), Duration> {
59 self.limiter
60 .check_key(did)
61 .map_err(|e| e.wait_time_from(CLOCK.now()))
62 }
63 fn housekeep(&self) {
64 log::debug!(
65 "limiter size before housekeeping: {} dids",
66 self.limiter.len()
67 );
68 self.limiter.retain_recent();
69 }
70}
71
72#[derive(Debug)]
73pub struct IpLimiters {
74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>,
75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>,
76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>,
77}
78
79impl IpLimiters {
80 pub fn new(quota: Quota) -> Self {
81 Self {
82 per_ip: RateLimiter::keyed(quota),
83 ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")),
84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
85 }
86 }
87}
88
89impl Limiter<IpAddr> for IpLimiters {
90 fn extract_key(&self, req: &Request) -> Result<IpAddr> {
91 Ok(req
92 .remote_addr()
93 .as_socket_addr()
94 .expect("failed to get request's remote addr") // TODO
95 .ip())
96 }
97 fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> {
98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now());
99 match ip {
100 addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf),
101 IpAddr::V6(a) => {
102 // always check all limiters
103 let check_ip = self
104 .per_ip
105 .check_key(&IpAddr::V6(a & IP6_64_MASK))
106 .map_err(asdf);
107 let check_56 = self
108 .ip6_56
109 .check_key(
110 a.octets()[..7]
111 .try_into()
112 .expect("to check ip6 /56 limiter"),
113 )
114 .map_err(asdf);
115 let check_48 = self
116 .ip6_48
117 .check_key(
118 a.octets()[..6]
119 .try_into()
120 .expect("to check ip6 /48 limiter"),
121 )
122 .map_err(asdf);
123 check_ip.and(check_56).and(check_48)
124 }
125 }
126 }
127 fn housekeep(&self) {
128 log::debug!(
129 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
130 self.per_ip.len(),
131 self.ip6_56.len(),
132 self.ip6_48.len(),
133 );
134 self.per_ip.retain_recent();
135 self.ip6_56.retain_recent();
136 self.ip6_48.retain_recent();
137 }
138}
139
140/// Once the rate limit has been reached, the middleware will respond with
141/// status code 429 (too many requests) and a `Retry-After` header with the amount
142/// of time that needs to pass before another request will be allowed.
143// #[derive(Debug)]
144pub struct GovernorMiddleware<K> {
145 #[allow(dead_code)]
146 stop_on_drop: oneshot::Sender<()>,
147 limiters: Arc<dyn Limiter<K>>,
148}
149
150impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> {
151 /// Limit request rates
152 ///
153 /// a little gross but this spawns a tokio task for housekeeping:
154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping
155 pub fn new(limiters: impl Limiter<K>) -> Self {
156 let limiters = Arc::new(limiters);
157 let (stop_on_drop, mut stopped) = oneshot::channel();
158 tokio::task::spawn({
159 let limiters = limiters.clone();
160 async move {
161 loop {
162 tokio::select! {
163 _ = &mut stopped => break,
164 _ = tokio::time::sleep(Duration::from_secs(60)) => {},
165 };
166 limiters.housekeep();
167 }
168 }
169 });
170 Self {
171 stop_on_drop,
172 limiters,
173 }
174 }
175}
176
177impl<E, K> Middleware<E> for GovernorMiddleware<K>
178where
179 E: Endpoint,
180 K: Hash + std::fmt::Debug + Send + Sync + 'static,
181{
182 type Output = GovernorMiddlewareImpl<E, K>;
183 fn transform(&self, ep: E) -> Self::Output {
184 GovernorMiddlewareImpl {
185 ep,
186 limiters: self.limiters.clone(),
187 }
188 }
189}
190
191pub struct GovernorMiddlewareImpl<E, K> {
192 ep: E,
193 limiters: Arc<dyn Limiter<K>>,
194}
195
196impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K>
197where
198 E: Endpoint,
199 K: Hash + std::fmt::Debug + Send + Sync + 'static,
200{
201 type Output = E::Output;
202
203 async fn call(&self, req: Request) -> Result<Self::Output> {
204 let key = self.limiters.extract_key(&req)?;
205
206 match self.limiters.check_key(&key) {
207 Ok(_) => {
208 log::debug!("allowing key {key:?}");
209 self.ep.call(req).await
210 }
211 Err(d) => {
212 let wait_time = d.as_secs();
213
214 log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s");
215
216 let res = Response::builder()
217 .status(StatusCode::TOO_MANY_REQUESTS)
218 .header("x-ratelimit-after", wait_time)
219 .header("retry-after", wait_time)
220 .body(booo());
221 Err(poem::Error::from_response(res))
222 }
223 }
224 }
225}
226
227fn booo() -> String {
228 format!(
229 r#"{}
230
231You're going a bit too fast.
232
233Tip: check out the `x-ratelimit-after` response header.
234"#,
235 logo("mirror 429")
236 )
237}