Server tools to backfill, tail, mirror, and verify PLC logs

rate limit

wowwwwwwwwwwww okay

kajsdlkfjaslfkj asldkfj flaskj dfl

+299 -12
+132 -1
Cargo.lock
··· 35 35 "chrono", 36 36 "clap", 37 37 "futures", 38 + "governor", 39 + "http-body-util", 38 40 "log", 39 41 "poem", 40 42 "reqwest", ··· 64 66 dependencies = [ 65 67 "alloc-no-stdlib", 66 68 ] 69 + 70 + [[package]] 71 + name = "allocator-api2" 72 + version = "0.2.21" 73 + source = "registry+https://github.com/rust-lang/crates.io-index" 74 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 67 75 68 76 [[package]] 69 77 name = "android_system_properties" ··· 387 395 ] 388 396 389 397 [[package]] 398 + name = "crossbeam-utils" 399 + version = "0.8.21" 400 + source = "registry+https://github.com/rust-lang/crates.io-index" 401 + checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 402 + 403 + [[package]] 390 404 name = "crypto-common" 391 405 version = "0.1.6" 392 406 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 394 408 dependencies = [ 395 409 "generic-array", 396 410 "typenum", 411 + ] 412 + 413 + [[package]] 414 + name = "dashmap" 415 + version = "6.1.0" 416 + source = "registry+https://github.com/rust-lang/crates.io-index" 417 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 418 + dependencies = [ 419 + "cfg-if", 420 + "crossbeam-utils", 421 + "hashbrown 0.14.5", 422 + "lock_api", 423 + "once_cell", 424 + "parking_lot_core 0.9.11", 397 425 ] 398 426 399 427 [[package]] ··· 476 504 version = "1.0.7" 477 505 source = "registry+https://github.com/rust-lang/crates.io-index" 478 506 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" 507 + 508 + [[package]] 509 + name = "foldhash" 510 + version = "0.1.5" 511 + source = "registry+https://github.com/rust-lang/crates.io-index" 512 + checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 479 513 480 514 [[package]] 481 515 name = "foreign-types" ··· 573 607 checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" 574 608 575 609 [[package]] 610 + name = "futures-timer" 611 + version = "3.0.3" 612 + source = "registry+https://github.com/rust-lang/crates.io-index" 613 + checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" 614 + 615 + [[package]] 576 616 name = "futures-util" 577 617 version = "0.3.31" 578 618 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 620 660 checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 621 661 dependencies = [ 622 662 "cfg-if", 663 + "js-sys", 623 664 "libc", 624 665 "r-efi", 625 666 "wasi 0.14.5+wasi-0.2.4", 667 + "wasm-bindgen", 626 668 ] 627 669 628 670 [[package]] ··· 632 674 checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" 633 675 634 676 [[package]] 677 + name = "governor" 678 + version = "0.10.1" 679 + source = "registry+https://github.com/rust-lang/crates.io-index" 680 + checksum = "444405bbb1a762387aa22dd569429533b54a1d8759d35d3b64cb39b0293eaa19" 681 + dependencies = [ 682 + "cfg-if", 683 + "dashmap", 684 + "futures-sink", 685 + "futures-timer", 686 + "futures-util", 687 + "getrandom 0.3.3", 688 + "hashbrown 0.15.5", 689 + "nonzero_ext", 690 + "parking_lot 0.12.4", 691 + "portable-atomic", 692 + "quanta", 693 + "rand 0.9.2", 694 + "smallvec", 695 + "spinning_top", 696 + "web-time", 697 + ] 698 + 699 + [[package]] 635 700 name = "h2" 636 701 version = "0.4.12" 637 702 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 652 717 653 718 [[package]] 654 719 name = "hashbrown" 720 + version = "0.14.5" 721 + source = "registry+https://github.com/rust-lang/crates.io-index" 722 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 723 + 724 + [[package]] 725 + name = "hashbrown" 655 726 version = "0.15.5" 656 727 source = "registry+https://github.com/rust-lang/crates.io-index" 657 728 checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 729 + dependencies = [ 730 + "allocator-api2", 731 + "equivalent", 732 + "foldhash", 733 + ] 658 734 659 735 [[package]] 660 736 name = "headers" ··· 960 1036 checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" 961 1037 dependencies = [ 962 1038 "equivalent", 963 - "hashbrown", 1039 + "hashbrown 0.15.5", 964 1040 ] 965 1041 966 1042 [[package]] ··· 1164 1240 "cfg_aliases", 1165 1241 "libc", 1166 1242 ] 1243 + 1244 + [[package]] 1245 + name = "nonzero_ext" 1246 + version = "0.3.0" 1247 + source = "registry+https://github.com/rust-lang/crates.io-index" 1248 + checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" 1167 1249 1168 1250 [[package]] 1169 1251 name = "nu-ansi-term" ··· 1385 1467 ] 1386 1468 1387 1469 [[package]] 1470 + name = "portable-atomic" 1471 + version = "1.11.1" 1472 + source = "registry+https://github.com/rust-lang/crates.io-index" 1473 + checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1474 + 1475 + [[package]] 1388 1476 name = "postgres-protocol" 1389 1477 version = "0.6.8" 1390 1478 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1453 1541 ] 1454 1542 1455 1543 [[package]] 1544 + name = "quanta" 1545 + version = "0.12.6" 1546 + source = "registry+https://github.com/rust-lang/crates.io-index" 1547 + checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" 1548 + dependencies = [ 1549 + "crossbeam-utils", 1550 + "libc", 1551 + "once_cell", 1552 + "raw-cpuid", 1553 + "wasi 0.11.1+wasi-snapshot-preview1", 1554 + "web-sys", 1555 + "winapi", 1556 + ] 1557 + 1558 + [[package]] 1456 1559 name = "quote" 1457 1560 version = "1.0.40" 1458 1561 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1524 1627 checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" 1525 1628 dependencies = [ 1526 1629 "getrandom 0.3.3", 1630 + ] 1631 + 1632 + [[package]] 1633 + name = "raw-cpuid" 1634 + version = "11.6.0" 1635 + source = "registry+https://github.com/rust-lang/crates.io-index" 1636 + checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" 1637 + dependencies = [ 1638 + "bitflags 2.9.4", 1527 1639 ] 1528 1640 1529 1641 [[package]] ··· 1923 2035 dependencies = [ 1924 2036 "libc", 1925 2037 "windows-sys 0.59.0", 2038 + ] 2039 + 2040 + [[package]] 2041 + name = "spinning_top" 2042 + version = "0.3.0" 2043 + source = "registry+https://github.com/rust-lang/crates.io-index" 2044 + checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" 2045 + dependencies = [ 2046 + "lock_api", 1926 2047 ] 1927 2048 1928 2049 [[package]] ··· 2571 2692 version = "0.3.78" 2572 2693 source = "registry+https://github.com/rust-lang/crates.io-index" 2573 2694 checksum = "77e4b637749ff0d92b8fad63aa1f7cff3cbe125fd49c175cd6345e7272638b12" 2695 + dependencies = [ 2696 + "js-sys", 2697 + "wasm-bindgen", 2698 + ] 2699 + 2700 + [[package]] 2701 + name = "web-time" 2702 + version = "1.1.0" 2703 + source = "registry+https://github.com/rust-lang/crates.io-index" 2704 + checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" 2574 2705 dependencies = [ 2575 2706 "js-sys", 2576 2707 "wasm-bindgen",
+2
Cargo.toml
··· 12 12 chrono = { version = "0.4.42", features = ["serde"] } 13 13 clap = { version = "4.5.47", features = ["derive", "env"] } 14 14 futures = "0.3.31" 15 + governor = "0.10.1" 16 + http-body-util = "0.1.3" 15 17 log = "0.4.28" 16 18 poem = { version = "3.1.12", features = ["compression"] } 17 19 reqwest = { version = "0.12.23", features = ["stream"] }
+2
src/lib.rs
··· 5 5 mod mirror; 6 6 mod plc_pg; 7 7 mod poll; 8 + mod ratelimit; 8 9 mod weekly; 9 10 10 11 pub use backfill::backfill; ··· 12 13 pub use mirror::serve; 13 14 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 14 15 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 16 + pub use ratelimit::GovernorMiddleware; 15 17 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 16 18 17 19 pub type Dt = chrono::DateTime<chrono::Utc>;
+22 -11
src/mirror.rs
··· 1 - use crate::logo; 1 + use crate::{GovernorMiddleware, logo}; 2 + use futures::TryStreamExt; 2 3 use poem::{ 3 4 EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, handler, 4 5 http::{StatusCode, Uri}, ··· 7 8 web::Data, 8 9 }; 9 10 use reqwest::{Client, Url}; 10 - use std::net::SocketAddr; 11 - use std::time::Duration; 11 + use std::{net::SocketAddr, time::Duration}; 12 12 13 13 #[derive(Debug, Clone)] 14 14 struct State { ··· 73 73 log::error!("upstream req fail: {e}"); 74 74 Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 75 75 })?; 76 - let mut res = Response::default(); 77 - upstream_res.headers().iter().for_each(|(k, v)| { 78 - res.headers_mut().insert(k, v.to_owned()); 79 - }); 80 - res.set_status(upstream_res.status()); 81 - res.set_version(upstream_res.version()); 82 - res.set_body(upstream_res.bytes().await.unwrap()); 83 - Ok(res) 76 + 77 + let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 78 + let (parts, reqw_body) = http_res.into_parts(); 79 + 80 + let parts = poem::ResponseParts { 81 + status: parts.status, 82 + version: parts.version, 83 + headers: parts.headers, 84 + extensions: parts.extensions, 85 + }; 86 + 87 + let body = http_body_util::BodyDataStream::new(reqw_body) 88 + .map_err(|e| std::io::Error::other(Box::new(e))); 89 + 90 + Ok(Response::from_parts( 91 + parts, 92 + poem::Body::from_bytes_stream(body), 93 + )) 84 94 } 85 95 86 96 #[handler] ··· 107 117 .with(AddData::new(state)) 108 118 .with(Cors::new().allow_credentials(false)) 109 119 .with(Compression::new()) 120 + .with(GovernorMiddleware::per_minute(3000).unwrap()) 110 121 .with(CatchPanic::new()) 111 122 .with(Tracing); 112 123
+141
src/ratelimit.rs
··· 1 + use crate::logo; 2 + 3 + use governor::{ 4 + Quota, RateLimiter, 5 + clock::{Clock, DefaultClock}, 6 + state::keyed::DefaultKeyedStateStore, 7 + }; 8 + use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 + use std::{ 10 + convert::TryInto, error::Error, net::IpAddr, num::NonZeroU32, sync::Arc, sync::LazyLock, 11 + time::Duration, 12 + }; 13 + 14 + static CLOCK: LazyLock<DefaultClock> = LazyLock::new(DefaultClock::default); 15 + 16 + /// Once the rate limit has been reached, the middleware will respond with 17 + /// status code 429 (too many requests) and a `Retry-After` header with the amount 18 + /// of time that needs to pass before another request will be allowed. 19 + #[derive(Debug, Clone)] 20 + pub struct GovernorMiddleware { 21 + limiter: Arc<RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>>, 22 + } 23 + 24 + impl GovernorMiddleware { 25 + /// Constructs a rate-limiting middleware from a [`Duration`] that allows one request in the given time interval. 26 + /// 27 + /// If the time interval is zero, returns `None`. 28 + #[must_use] 29 + pub fn with_period(duration: Duration) -> Option<Self> { 30 + Some(Self { 31 + limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::with_period( 32 + duration, 33 + )?)), 34 + }) 35 + } 36 + 37 + /// Constructs a rate-limiting middleware that allows a specified number of requests every second. 38 + /// 39 + /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 40 + pub fn per_second<T>(times: T) -> Result<Self> 41 + where 42 + T: TryInto<NonZeroU32>, 43 + T::Error: Error + Send + Sync + 'static, 44 + { 45 + Ok(Self { 46 + limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_second( 47 + times.try_into().unwrap(), // TODO 48 + ))), 49 + }) 50 + } 51 + 52 + /// Constructs a rate-limiting middleware that allows a specified number of requests every minute. 53 + /// 54 + /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 55 + pub fn per_minute<T>(times: T) -> Result<Self> 56 + where 57 + T: TryInto<NonZeroU32>, 58 + T::Error: Error + Send + Sync + 'static, 59 + { 60 + Ok(Self { 61 + limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_minute( 62 + times.try_into().unwrap(), // TODO 63 + ))), 64 + }) 65 + } 66 + 67 + /// Constructs a rate-limiting middleware that allows a specified number of requests every hour. 68 + /// 69 + /// Returns an error if `times` can't be converted into a [`NonZeroU32`]. 70 + pub fn per_hour<T>(times: T) -> Result<Self> 71 + where 72 + T: TryInto<NonZeroU32>, 73 + T::Error: Error + Send + Sync + 'static, 74 + { 75 + Ok(Self { 76 + limiter: Arc::new(RateLimiter::<IpAddr, _, _>::keyed(Quota::per_hour( 77 + times.try_into().unwrap(), // TODO 78 + ))), 79 + }) 80 + } 81 + } 82 + 83 + impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 84 + type Output = GovernorMiddlewareImpl<E>; 85 + fn transform(&self, ep: E) -> Self::Output { 86 + GovernorMiddlewareImpl { 87 + ep, 88 + limiter: self.limiter.clone(), 89 + } 90 + } 91 + } 92 + 93 + pub struct GovernorMiddlewareImpl<E> { 94 + ep: E, 95 + limiter: Arc<RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>>, 96 + } 97 + 98 + impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 99 + type Output = E::Output; 100 + 101 + async fn call(&self, req: Request) -> Result<Self::Output> { 102 + let remote = req 103 + .remote_addr() 104 + .as_socket_addr() 105 + .unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO 106 + .ip(); 107 + 108 + log::trace!("remote: {remote}"); 109 + 110 + match self.limiter.check_key(&remote) { 111 + Ok(_) => { 112 + log::debug!("allowing remote {remote}"); 113 + self.ep.call(req).await 114 + } 115 + Err(negative) => { 116 + let wait_time = negative.wait_time_from(CLOCK.now()).as_secs(); 117 + 118 + log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 119 + 120 + let res = Response::builder() 121 + .status(StatusCode::TOO_MANY_REQUESTS) 122 + .header("x-ratelimit-after", wait_time) 123 + .header("retry-after", wait_time) 124 + .body(booo()); 125 + Err(poem::Error::from_response(res)) 126 + } 127 + } 128 + } 129 + } 130 + 131 + fn booo() -> String { 132 + format!( 133 + r#"{} 134 + 135 + You're going a bit too fast. 136 + 137 + Tip: check out the `x-ratelimit-after` response header. 138 + "#, 139 + logo("mirror 429") 140 + ) 141 + }