Server tools to backfill, tail, mirror, and verify PLC logs
at main 201 lines 5.4 kB view raw
1use crate::{ 2 CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, FjallDb, GovernorMiddleware, IpLimiters, UA, 3 doc, logo, 4}; 5use futures::TryStreamExt; 6use governor::Quota; 7use poem::{ 8 Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 9 get, handler, 10 http::{StatusCode, header::USER_AGENT}, 11 listener::{Listener, TcpListener, acme::AutoCert}, 12 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 13 web::{Data, Json, Path}, 14}; 15use reqwest::{Client, Url}; 16use std::{net::SocketAddr, path::PathBuf, time::Duration}; 17 18pub mod fjall; 19pub mod pg; 20 21pub use fjall::serve_fjall; 22pub use pg::serve; 23 24#[derive(Debug)] 25pub enum ListenConf { 26 Acme { 27 domains: Vec<String>, 28 cache_path: PathBuf, 29 directory_url: String, 30 ipv6: bool, 31 }, 32 Bind(SocketAddr), 33} 34 35#[derive(Debug, Clone)] 36pub struct ExperimentalConf { 37 pub acme_domain: Option<String>, 38 pub write_upstream: bool, 39} 40 41#[handler] 42pub fn favicon() -> impl IntoResponse { 43 include_bytes!("../../favicon.ico").with_content_type("image/x-icon") 44} 45 46pub fn failed_to_reach_named(name: &str) -> String { 47 format!( 48 r#"{} 49 50Failed to reach the {name} server. Sorry. 51"#, 52 logo("mirror 502 :( ") 53 ) 54} 55 56pub fn bad_create_op(reason: &str) -> Response { 57 Response::builder() 58 .status(StatusCode::BAD_REQUEST) 59 .body(format!( 60 r#"{} 61 62NooOOOooooo: {reason} 63"#, 64 logo("mirror 400 >:( ") 65 )) 66} 67 68pub type PlcStatus = (bool, serde_json::Value); 69 70pub async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 71 use serde_json::json; 72 73 let mut url = url.clone(); 74 url.set_path("/_health"); 75 76 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 77 return (false, json!({"error": "cannot reach plc server"})); 78 }; 79 80 let status = response.status(); 81 82 let Ok(text) = response.text().await else { 83 return (false, json!({"error": "failed to read response body"})); 84 }; 85 86 let body = match serde_json::from_str(&text) { 87 Ok(json) => json, 88 Err(_) => serde_json::Value::String(text.to_string()), 89 }; 90 91 if status.is_success() { 92 (true, body) 93 } else { 94 ( 95 false, 96 json!({ 97 "error": "non-ok status", 98 "status": status.as_str(), 99 "status_code": status.as_u16(), 100 "response": body, 101 }), 102 ) 103 } 104} 105 106pub fn proxy_response(res: reqwest::Response) -> Response { 107 let http_res: poem::http::Response<reqwest::Body> = res.into(); 108 let (parts, reqw_body) = http_res.into_parts(); 109 110 let parts = poem::ResponseParts { 111 status: parts.status, 112 version: parts.version, 113 headers: parts.headers, 114 extensions: parts.extensions, 115 }; 116 117 let body = http_body_util::BodyDataStream::new(reqw_body) 118 .map_err(|e| std::io::Error::other(Box::new(e))); 119 120 Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 121} 122 123async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 124where 125 A: Endpoint + 'static, 126 L: Listener + 'static, 127{ 128 Server::new(listener) 129 .name("allegedly (mirror)") 130 .run(app) 131 .await 132} 133 134/// kick off a tiny little server on a tokio task to tell people to use 443 135async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 136 #[handler] 137 fn oop_plz_be_secure() -> (StatusCode, String) { 138 ( 139 StatusCode::BAD_REQUEST, 140 format!( 141 r#"{} 142 143You probably want to change your request to use HTTPS instead of HTTP. 144"#, 145 logo("mirror (tls on 443 please)") 146 ), 147 ) 148 } 149 150 let app = Route::new() 151 .at("/favicon.ico", get(favicon)) 152 .nest("/", get(oop_plz_be_secure)) 153 .with(Tracing); 154 Server::new(TcpListener::bind(if ipv6 { 155 "[::]:80" 156 } else { 157 "0.0.0.0:80" 158 })) 159 .name("allegedly (mirror:80 helper)") 160 .run(app) 161 .await 162} 163 164pub async fn bind_or_acme<A>(app: A, listen: ListenConf) -> anyhow::Result<&'static str> 165where 166 A: Endpoint + 'static, 167{ 168 match listen { 169 ListenConf::Acme { 170 domains, 171 cache_path, 172 directory_url, 173 ipv6, 174 } => { 175 rustls::crypto::aws_lc_rs::default_provider() 176 .install_default() 177 .expect("crypto provider to be installable"); 178 179 let mut auto_cert = AutoCert::builder() 180 .directory_url(directory_url) 181 .cache_path(cache_path); 182 for domain in domains { 183 auto_cert = auto_cert.domain(domain); 184 } 185 let auto_cert = auto_cert.build().expect("acme config to build"); 186 187 log::trace!("auto_cert: {auto_cert:?}"); 188 189 let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 190 let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 191 let app_res = run(app, listener.acme(auto_cert)).await; 192 log::warn!("server task ended, aborting insecure server task..."); 193 notice_task.abort(); 194 app_res?; 195 notice_task.await??; 196 } 197 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 198 } 199 200 Ok("server (uh oh?)") 201}