Server tools to backfill, tail, mirror, and verify PLC logs
at debug 298 lines 8.0 kB view raw
1use crate::{GovernorMiddleware, UA, logo}; 2use futures::TryStreamExt; 3use governor::Quota; 4use poem::{ 5 Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 handler, 7 http::StatusCode, 8 listener::{Listener, TcpListener, acme::AutoCert}, 9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 web::{Data, Json}, 11}; 12use reqwest::{Client, Url}; 13use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 15#[derive(Debug, Clone)] 16struct State { 17 client: Client, 18 plc: Url, 19 upstream: Url, 20} 21 22#[handler] 23fn hello(Data(State { upstream, .. }): Data<&State>) -> String { 24 format!( 25 r#"{} 26 27This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28synchronizes a local PLC reference server instance[2] (why?[3]). 29 30 31Configured upstream: 32 33 {upstream} 34 35 36Available APIs: 37 38 - GET /_health Health and version info 39 40 - GET /* Proxies to wrapped server; see PLC API docs: 41 https://web.plc.directory/api/redoc 42 43 - POST /* Always rejected. This is a mirror. 44 45 46 tip: try `GET /{{did}}` to resolve an identity 47 48 49Allegedly is a suit of open-source CLI tools for working with PLC logs: 50 51 https://tangled.org/@microcosm.blue/Allegedly 52 53 54[1] https://web.plc.directory 55[2] https://github.com/did-method-plc/did-method-plc 56[3] https://updates.microcosm.blue/3lz7nwvh4zc2u 57"#, 58 logo("mirror") 59 ) 60} 61 62#[handler] 63fn favicon() -> impl IntoResponse { 64 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 65} 66 67fn failed_to_reach_wrapped() -> String { 68 format!( 69 r#"{} 70 71Failed to reach the wrapped reference PLC server. Sorry. 72"#, 73 logo("mirror 502 :( ") 74 ) 75} 76 77async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 78 use serde_json::json; 79 80 let mut url = url.clone(); 81 url.set_path("/_health"); 82 83 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 84 return (false, json!({"error": "cannot reach plc server"})); 85 }; 86 87 let status = response.status(); 88 89 let Ok(text) = response.text().await else { 90 return (false, json!({"error": "failed to read response body"})); 91 }; 92 93 let body = match serde_json::from_str(&text) { 94 Ok(json) => json, 95 Err(_) => serde_json::Value::String(text.to_string()), 96 }; 97 98 if status.is_success() { 99 (true, body) 100 } else { 101 ( 102 false, 103 json!({ 104 "error": "non-ok status", 105 "status": status.as_str(), 106 "status_code": status.as_u16(), 107 "response": body, 108 }), 109 ) 110 } 111} 112 113#[handler] 114async fn health( 115 Data(State { 116 plc, 117 client, 118 upstream, 119 }): Data<&State>, 120) -> impl IntoResponse { 121 let mut overall_status = StatusCode::OK; 122 let (ok, wrapped_status) = plc_status(plc, client).await; 123 if !ok { 124 overall_status = StatusCode::BAD_GATEWAY; 125 } 126 let (ok, upstream_status) = plc_status(upstream, client).await; 127 if !ok { 128 overall_status = StatusCode::BAD_GATEWAY; 129 } 130 ( 131 overall_status, 132 Json(serde_json::json!({ 133 "server": "allegedly (mirror)", 134 "version": env!("CARGO_PKG_VERSION"), 135 "wrapped_plc": wrapped_status, 136 "upstream_plc": upstream_status, 137 })), 138 ) 139} 140 141#[handler] 142async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 143 let mut target = state.plc.clone(); 144 target.set_path(req.uri().path()); 145 let upstream_res = state 146 .client 147 .get(target) 148 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server 149 .headers(req.headers().clone()) 150 .send() 151 .await 152 .map_err(|e| { 153 log::error!("upstream req fail: {e}"); 154 Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 155 })?; 156 157 let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 158 let (parts, reqw_body) = http_res.into_parts(); 159 160 let parts = poem::ResponseParts { 161 status: parts.status, 162 version: parts.version, 163 headers: parts.headers, 164 extensions: parts.extensions, 165 }; 166 167 let body = http_body_util::BodyDataStream::new(reqw_body) 168 .map_err(|e| std::io::Error::other(Box::new(e))); 169 170 Ok(Response::from_parts( 171 parts, 172 poem::Body::from_bytes_stream(body), 173 )) 174} 175 176#[handler] 177async fn nope(Data(State { upstream, .. }): Data<&State>) -> (StatusCode, String) { 178 ( 179 StatusCode::BAD_REQUEST, 180 format!( 181 r#"{} 182 183Sorry, this server does not accept POST requests. 184 185You may wish to try upstream: {upstream} 186"#, 187 logo("mirror (nope)") 188 ), 189 ) 190} 191 192#[derive(Debug)] 193pub enum ListenConf { 194 Acme { 195 domains: Vec<String>, 196 cache_path: PathBuf, 197 directory_url: String, 198 }, 199 Bind(SocketAddr), 200} 201 202pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 203 log::info!("starting server..."); 204 205 // not using crate CLIENT: don't want the retries etc 206 let client = Client::builder() 207 .user_agent(UA) 208 .timeout(Duration::from_secs(10)) // fallback 209 .build() 210 .expect("reqwest client to build"); 211 212 let state = State { 213 client, 214 plc, 215 upstream: upstream.clone(), 216 }; 217 218 let app = Route::new() 219 .at("/", get(hello)) 220 .at("/favicon.ico", get(favicon)) 221 .at("/_health", get(health)) 222 .at("/:any", get(proxy).post(nope)) 223 .with(AddData::new(state)) 224 .with(Cors::new().allow_credentials(false)) 225 .with(Compression::new()) 226 .with(GovernorMiddleware::new(Quota::per_minute( 227 3000.try_into().expect("ratelimit middleware to build"), 228 ))) 229 .with(CatchPanic::new()) 230 .with(Tracing); 231 232 match listen { 233 ListenConf::Acme { 234 domains, 235 cache_path, 236 directory_url, 237 } => { 238 rustls::crypto::aws_lc_rs::default_provider() 239 .install_default() 240 .expect("crypto provider to be installable"); 241 242 let mut auto_cert = AutoCert::builder() 243 .directory_url(directory_url) 244 .cache_path(cache_path); 245 for domain in domains { 246 auto_cert = auto_cert.domain(domain); 247 } 248 let auto_cert = auto_cert.build().expect("acme config to build"); 249 250 let notice_task = tokio::task::spawn(run_insecure_notice()); 251 let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 252 log::warn!("server task ended, aborting insecure server task..."); 253 notice_task.abort(); 254 app_res?; 255 notice_task.await??; 256 } 257 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 258 } 259 260 Ok("server (uh oh?)") 261} 262 263async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 264where 265 A: Endpoint + 'static, 266 L: Listener + 'static, 267{ 268 Server::new(listener) 269 .name("allegedly (mirror)") 270 .run(app) 271 .await 272} 273 274/// kick off a tiny little server on a tokio task to tell people to use 443 275async fn run_insecure_notice() -> Result<(), std::io::Error> { 276 #[handler] 277 fn oop_plz_be_secure() -> (StatusCode, String) { 278 ( 279 StatusCode::BAD_REQUEST, 280 format!( 281 r#"{} 282 283You probably want to change your request to use HTTPS instead of HTTP. 284"#, 285 logo("mirror (tls on 443 please)") 286 ), 287 ) 288 } 289 290 let app = Route::new() 291 .at("/", get(oop_plz_be_secure)) 292 .at("/favicon.ico", get(favicon)) 293 .with(Tracing); 294 Server::new(TcpListener::bind("0.0.0.0:80")) 295 .name("allegedly (mirror:80 helper)") 296 .run(app) 297 .await 298}