Server tools to backfill, tail, mirror, and verify PLC logs
at main 524 lines 15 kB view raw
1use crate::{ 2 CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3}; 4use futures::TryStreamExt; 5use governor::Quota; 6use poem::{ 7 Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 get, handler, 9 http::{StatusCode, header::USER_AGENT}, 10 listener::{Listener, TcpListener, acme::AutoCert}, 11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 12 web::{Data, Json, Path}, 13}; 14use reqwest::{Client, Url}; 15use std::{net::SocketAddr, path::PathBuf, time::Duration}; 16 17#[derive(Clone)] 18struct State { 19 client: Client, 20 plc: Url, 21 upstream: Url, 22 sync_info: Option<SyncInfo>, 23 experimental: ExperimentalConf, 24} 25 26/// server info that only applies in mirror (synchronizing) mode 27#[derive(Clone)] 28struct SyncInfo { 29 latest_at: CachedValue<Dt, GetLatestAt>, 30 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 31} 32 33#[handler] 34fn hello( 35 Data(State { 36 sync_info, 37 upstream, 38 experimental: exp, 39 .. 40 }): Data<&State>, 41 req: &Request, 42) -> String { 43 // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 let pre_info = if sync_info.is_some() { 45 format!( 46 r#" 47This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 48synchronizes a local PLC reference server instance[2] (why?[3]). 49 50 51Configured upstream: 52 53 {upstream} 54 55"# 56 ) 57 } else { 58 format!( 59 r#" 60This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 63 64Configured upstream (only used if experimental op forwarding is enabled): 65 66 {upstream} 67 68"# 69 ) 70 }; 71 72 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 73 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 74 (_, None, _) => { 75 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 76 } 77 (_, Some(d), Some(f)) if f == d => { 78 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 79 } 80 (_, Some(d), _) => format!( 81 r#" - POST /* Rejected, but experimental upstream op forwarding is 82 available at `POST https://{d}/:did`!"# 83 ), 84 }; 85 86 format!( 87 r#"{} 88{pre_info} 89 90Available APIs: 91 92 - GET /_health Health and version info 93 94 - GET /* Proxies to wrapped server; see PLC API docs: 95 https://web.plc.directory/api/redoc 96 97 tip: try `GET /{{did}}` to resolve an identity 98 99{post_info} 100 101 102Allegedly is a suite of open-source CLI tools from for working with PLC logs, 103from microcosm: 104 105 https://tangled.org/@microcosm.blue/Allegedly 106 107 https://microcosm.blue 108 109 110[1] https://web.plc.directory 111[2] https://github.com/did-method-plc/did-method-plc 112[3] https://updates.microcosm.blue/3lz7nwvh4zc2u 113"#, 114 logo("mirror") 115 ) 116} 117 118#[handler] 119fn favicon() -> impl IntoResponse { 120 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 121} 122 123fn failed_to_reach_named(name: &str) -> String { 124 format!( 125 r#"{} 126 127Failed to reach the {name} server. Sorry. 128"#, 129 logo("mirror 502 :( ") 130 ) 131} 132 133fn bad_create_op(reason: &str) -> Response { 134 Response::builder() 135 .status(StatusCode::BAD_REQUEST) 136 .body(format!( 137 r#"{} 138 139NooOOOooooo: {reason} 140"#, 141 logo("mirror 400 >:( ") 142 )) 143} 144 145type PlcStatus = (bool, serde_json::Value); 146 147async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 148 use serde_json::json; 149 150 let mut url = url.clone(); 151 url.set_path("/_health"); 152 153 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else { 154 return (false, json!({"error": "cannot reach plc server"})); 155 }; 156 157 let status = response.status(); 158 159 let Ok(text) = response.text().await else { 160 return (false, json!({"error": "failed to read response body"})); 161 }; 162 163 let body = match serde_json::from_str(&text) { 164 Ok(json) => json, 165 Err(_) => serde_json::Value::String(text.to_string()), 166 }; 167 168 if status.is_success() { 169 (true, body) 170 } else { 171 ( 172 false, 173 json!({ 174 "error": "non-ok status", 175 "status": status.as_str(), 176 "status_code": status.as_u16(), 177 "response": body, 178 }), 179 ) 180 } 181} 182 183#[derive(Clone)] 184struct GetLatestAt(Db); 185impl Fetcher<Dt> for GetLatestAt { 186 async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 "expected to find at least one thing in the db" 189 ))?; 190 Ok(now) 191 } 192} 193 194#[derive(Clone)] 195struct CheckUpstream(Url, Client); 196impl Fetcher<PlcStatus> for CheckUpstream { 197 async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 198 Ok(plc_status(&self.0, &self.1).await) 199 } 200} 201 202#[handler] 203async fn health( 204 Data(State { 205 plc, 206 client, 207 sync_info, 208 .. 209 }): Data<&State>, 210) -> impl IntoResponse { 211 let mut overall_status = StatusCode::OK; 212 let (ok, wrapped_status) = plc_status(plc, client).await; 213 if !ok { 214 overall_status = StatusCode::BAD_GATEWAY; 215 } 216 if let Some(SyncInfo { 217 latest_at, 218 upstream_status, 219 }) = sync_info 220 { 221 // mirror mode 222 let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 if !ok { 224 overall_status = StatusCode::BAD_GATEWAY; 225 } 226 let latest = latest_at.get().await.ok(); 227 ( 228 overall_status, 229 Json(serde_json::json!({ 230 "server": "allegedly (mirror)", 231 "version": env!("CARGO_PKG_VERSION"), 232 "wrapped_plc": wrapped_status, 233 "upstream_plc": upstream_status, 234 "latest_at": latest, 235 })), 236 ) 237 } else { 238 // wrap mode 239 ( 240 overall_status, 241 Json(serde_json::json!({ 242 "server": "allegedly (mirror)", 243 "version": env!("CARGO_PKG_VERSION"), 244 "wrapped_plc": wrapped_status, 245 })), 246 ) 247 } 248} 249 250fn proxy_response(res: reqwest::Response) -> Response { 251 let http_res: poem::http::Response<reqwest::Body> = res.into(); 252 let (parts, reqw_body) = http_res.into_parts(); 253 254 let parts = poem::ResponseParts { 255 status: parts.status, 256 version: parts.version, 257 headers: parts.headers, 258 extensions: parts.extensions, 259 }; 260 261 let body = http_body_util::BodyDataStream::new(reqw_body) 262 .map_err(|e| std::io::Error::other(Box::new(e))); 263 264 Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 265} 266 267#[handler] 268async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 269 let mut target = state.plc.clone(); 270 target.set_path(req.uri().path()); 271 target.set_query(req.uri().query()); 272 let wrapped_res = state 273 .client 274 .get(target) 275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server 276 .headers(req.headers().clone()) 277 .send() 278 .await 279 .map_err(|e| { 280 log::error!("upstream req fail: {e}"); 281 Error::from_string( 282 failed_to_reach_named("wrapped reference PLC"), 283 StatusCode::BAD_GATEWAY, 284 ) 285 })?; 286 287 Ok(proxy_response(wrapped_res)) 288} 289 290#[handler] 291async fn forward_create_op_upstream( 292 Data(State { 293 upstream, 294 client, 295 experimental, 296 .. 297 }): Data<&State>, 298 Path(did): Path<String>, 299 req: &Request, 300 body: Body, 301) -> Result<Response> { 302 if let Some(expected_domain) = &experimental.acme_domain { 303 let Some(found_host) = req.uri().host() else { 304 return Ok(bad_create_op(&format!( 305 "missing `Host` header, expected {expected_domain:?} for experimental requests." 306 ))); 307 }; 308 if found_host != expected_domain { 309 return Ok(bad_create_op(&format!( 310 "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}" 311 ))); 312 } 313 } 314 315 // adjust proxied headers 316 let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 317 log::trace!("original request headers: {headers:?}"); 318 headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 319 let client_ua = headers 320 .get(USER_AGENT) 321 .map(|h| h.to_str().unwrap()) 322 .unwrap_or("unknown"); 323 headers.insert( 324 USER_AGENT, 325 format!("{UA} (forwarding from {client_ua:?})") 326 .parse() 327 .unwrap(), 328 ); 329 log::trace!("adjusted request headers: {headers:?}"); 330 331 let mut target = upstream.clone(); 332 target.set_path(&did); 333 let upstream_res = client 334 .post(target) 335 .timeout(Duration::from_secs(15)) // be a little generous 336 .headers(headers) 337 .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 338 .send() 339 .await 340 .map_err(|e| { 341 log::warn!("upstream write fail: {e}"); 342 Error::from_string( 343 failed_to_reach_named("upstream PLC"), 344 StatusCode::BAD_GATEWAY, 345 ) 346 })?; 347 348 Ok(proxy_response(upstream_res)) 349} 350 351#[handler] 352async fn nope(Data(State { upstream, .. }): Data<&State>) -> (StatusCode, String) { 353 ( 354 StatusCode::BAD_REQUEST, 355 format!( 356 r#"{} 357 358Sorry, this server does not accept POST requests. 359 360You may wish to try sending that to our upstream: {upstream}. 361 362If you operate this server, try running with `--experimental-write-upstream`. 363"#, 364 logo("mirror (nope)") 365 ), 366 ) 367} 368 369#[derive(Debug)] 370pub enum ListenConf { 371 Acme { 372 domains: Vec<String>, 373 cache_path: PathBuf, 374 directory_url: String, 375 ipv6: bool, 376 }, 377 Bind(SocketAddr), 378} 379 380#[derive(Debug, Clone)] 381pub struct ExperimentalConf { 382 pub acme_domain: Option<String>, 383 pub write_upstream: bool, 384} 385 386pub async fn serve( 387 upstream: Url, 388 plc: Url, 389 listen: ListenConf, 390 experimental: ExperimentalConf, 391 db: Option<Db>, 392) -> anyhow::Result<&'static str> { 393 log::info!("starting server..."); 394 395 // not using crate CLIENT: don't want the retries etc 396 let client = Client::builder() 397 .user_agent(UA) 398 .timeout(Duration::from_secs(10)) // fallback 399 .build() 400 .expect("reqwest client to build"); 401 402 // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 let sync_info = db.map(|db| SyncInfo { 404 latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 upstream_status: CachedValue::new( 406 CheckUpstream(upstream.clone(), client.clone()), 407 Duration::from_secs(6), 408 ), 409 }); 410 411 let state = State { 412 client, 413 plc, 414 upstream: upstream.clone(), 415 sync_info, 416 experimental: experimental.clone(), 417 }; 418 419 let mut app = Route::new() 420 .at("/", get(hello)) 421 .at("/favicon.ico", get(favicon)) 422 .at("/_health", get(health)) 423 .at("/export", get(proxy)); 424 425 if experimental.write_upstream { 426 log::info!("enabling experimental write forwarding to upstream"); 427 428 let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 429 let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 430 431 let upstream_proxier = forward_create_op_upstream 432 .with(GovernorMiddleware::new(did_limiter)) 433 .with(GovernorMiddleware::new(ip_limiter)); 434 435 app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 436 } else { 437 app = app.at("/did:plc:*", get(proxy).post(nope)); 438 } 439 440 let app = app 441 .with(AddData::new(state)) 442 .with(Cors::new().allow_credentials(false)) 443 .with(Compression::new()) 444 .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 445 3000.try_into().expect("ratelimit middleware to build"), 446 )))) 447 .with(CatchPanic::new()) 448 .with(Tracing); 449 450 match listen { 451 ListenConf::Acme { 452 domains, 453 cache_path, 454 directory_url, 455 ipv6, 456 } => { 457 rustls::crypto::aws_lc_rs::default_provider() 458 .install_default() 459 .expect("crypto provider to be installable"); 460 461 let mut auto_cert = AutoCert::builder() 462 .directory_url(directory_url) 463 .cache_path(cache_path); 464 for domain in domains { 465 auto_cert = auto_cert.domain(domain); 466 } 467 let auto_cert = auto_cert.build().expect("acme config to build"); 468 469 log::trace!("auto_cert: {auto_cert:?}"); 470 471 let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 472 let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 473 let app_res = run(app, listener.acme(auto_cert)).await; 474 log::warn!("server task ended, aborting insecure server task..."); 475 notice_task.abort(); 476 app_res?; 477 notice_task.await??; 478 } 479 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 480 } 481 482 Ok("server (uh oh?)") 483} 484 485async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> 486where 487 A: Endpoint + 'static, 488 L: Listener + 'static, 489{ 490 Server::new(listener) 491 .name("allegedly (mirror)") 492 .run(app) 493 .await 494} 495 496/// kick off a tiny little server on a tokio task to tell people to use 443 497async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 498 #[handler] 499 fn oop_plz_be_secure() -> (StatusCode, String) { 500 ( 501 StatusCode::BAD_REQUEST, 502 format!( 503 r#"{} 504 505You probably want to change your request to use HTTPS instead of HTTP. 506"#, 507 logo("mirror (tls on 443 please)") 508 ), 509 ) 510 } 511 512 let app = Route::new() 513 .at("/favicon.ico", get(favicon)) 514 .nest("/", get(oop_plz_be_secure)) 515 .with(Tracing); 516 Server::new(TcpListener::bind(if ipv6 { 517 "[::]:80" 518 } else { 519 "0.0.0.0:80" 520 })) 521 .name("allegedly (mirror:80 helper)") 522 .run(app) 523 .await 524}