forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{
2 CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, FjallDb, GovernorMiddleware, IpLimiters, UA,
3 doc, logo,
4};
5use futures::TryStreamExt;
6use governor::Quota;
7use poem::{
8 Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server,
9 get, handler,
10 http::{StatusCode, header::USER_AGENT},
11 listener::{Listener, TcpListener, acme::AutoCert},
12 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
13 web::{Data, Json, Path},
14};
15use reqwest::{Client, Url};
16use std::{net::SocketAddr, path::PathBuf, time::Duration};
17
18pub mod fjall;
19pub mod pg;
20
21pub use fjall::serve_fjall;
22pub use pg::serve;
23
24#[derive(Debug)]
25pub enum ListenConf {
26 Acme {
27 domains: Vec<String>,
28 cache_path: PathBuf,
29 directory_url: String,
30 ipv6: bool,
31 },
32 Bind(SocketAddr),
33}
34
35#[derive(Debug, Clone)]
36pub struct ExperimentalConf {
37 pub acme_domain: Option<String>,
38 pub write_upstream: bool,
39}
40
41#[handler]
42pub fn favicon() -> impl IntoResponse {
43 include_bytes!("../../favicon.ico").with_content_type("image/x-icon")
44}
45
46pub fn failed_to_reach_named(name: &str) -> String {
47 format!(
48 r#"{}
49
50Failed to reach the {name} server. Sorry.
51"#,
52 logo("mirror 502 :( ")
53 )
54}
55
56pub fn bad_create_op(reason: &str) -> Response {
57 Response::builder()
58 .status(StatusCode::BAD_REQUEST)
59 .body(format!(
60 r#"{}
61
62NooOOOooooo: {reason}
63"#,
64 logo("mirror 400 >:( ")
65 ))
66}
67
68pub type PlcStatus = (bool, serde_json::Value);
69
70pub async fn plc_status(url: &Url, client: &Client) -> PlcStatus {
71 use serde_json::json;
72
73 let mut url = url.clone();
74 url.set_path("/_health");
75
76 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else {
77 return (false, json!({"error": "cannot reach plc server"}));
78 };
79
80 let status = response.status();
81
82 let Ok(text) = response.text().await else {
83 return (false, json!({"error": "failed to read response body"}));
84 };
85
86 let body = match serde_json::from_str(&text) {
87 Ok(json) => json,
88 Err(_) => serde_json::Value::String(text.to_string()),
89 };
90
91 if status.is_success() {
92 (true, body)
93 } else {
94 (
95 false,
96 json!({
97 "error": "non-ok status",
98 "status": status.as_str(),
99 "status_code": status.as_u16(),
100 "response": body,
101 }),
102 )
103 }
104}
105
106pub fn proxy_response(res: reqwest::Response) -> Response {
107 let http_res: poem::http::Response<reqwest::Body> = res.into();
108 let (parts, reqw_body) = http_res.into_parts();
109
110 let parts = poem::ResponseParts {
111 status: parts.status,
112 version: parts.version,
113 headers: parts.headers,
114 extensions: parts.extensions,
115 };
116
117 let body = http_body_util::BodyDataStream::new(reqw_body)
118 .map_err(|e| std::io::Error::other(Box::new(e)));
119
120 Response::from_parts(parts, poem::Body::from_bytes_stream(body))
121}
122
123async fn run<A, L>(app: A, listener: L) -> std::io::Result<()>
124where
125 A: Endpoint + 'static,
126 L: Listener + 'static,
127{
128 Server::new(listener)
129 .name("allegedly (mirror)")
130 .run(app)
131 .await
132}
133
134/// kick off a tiny little server on a tokio task to tell people to use 443
135async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> {
136 #[handler]
137 fn oop_plz_be_secure() -> (StatusCode, String) {
138 (
139 StatusCode::BAD_REQUEST,
140 format!(
141 r#"{}
142
143You probably want to change your request to use HTTPS instead of HTTP.
144"#,
145 logo("mirror (tls on 443 please)")
146 ),
147 )
148 }
149
150 let app = Route::new()
151 .at("/favicon.ico", get(favicon))
152 .nest("/", get(oop_plz_be_secure))
153 .with(Tracing);
154 Server::new(TcpListener::bind(if ipv6 {
155 "[::]:80"
156 } else {
157 "0.0.0.0:80"
158 }))
159 .name("allegedly (mirror:80 helper)")
160 .run(app)
161 .await
162}
163
164pub async fn bind_or_acme<A>(app: A, listen: ListenConf) -> anyhow::Result<&'static str>
165where
166 A: Endpoint + 'static,
167{
168 match listen {
169 ListenConf::Acme {
170 domains,
171 cache_path,
172 directory_url,
173 ipv6,
174 } => {
175 rustls::crypto::aws_lc_rs::default_provider()
176 .install_default()
177 .expect("crypto provider to be installable");
178
179 let mut auto_cert = AutoCert::builder()
180 .directory_url(directory_url)
181 .cache_path(cache_path);
182 for domain in domains {
183 auto_cert = auto_cert.domain(domain);
184 }
185 let auto_cert = auto_cert.build().expect("acme config to build");
186
187 log::trace!("auto_cert: {auto_cert:?}");
188
189 let notice_task = tokio::task::spawn(run_insecure_notice(ipv6));
190 let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" });
191 let app_res = run(app, listener.acme(auto_cert)).await;
192 log::warn!("server task ended, aborting insecure server task...");
193 notice_task.abort();
194 app_res?;
195 notice_task.await??;
196 }
197 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?,
198 }
199
200 Ok("server (uh oh?)")
201}