Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{
2 CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo,
3};
4use futures::TryStreamExt;
5use governor::Quota;
6use poem::{
7 Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server,
8 get, handler,
9 http::{StatusCode, header::USER_AGENT},
10 listener::{Listener, TcpListener, acme::AutoCert},
11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
12 web::{Data, Json, Path},
13};
14use reqwest::{Client, Url};
15use std::{net::SocketAddr, path::PathBuf, time::Duration};
16
17#[derive(Clone)]
18struct State {
19 client: Client,
20 plc: Url,
21 upstream: Url,
22 sync_info: Option<SyncInfo>,
23 experimental: ExperimentalConf,
24}
25
26/// server info that only applies in mirror (synchronizing) mode
27#[derive(Clone)]
28struct SyncInfo {
29 latest_at: CachedValue<Dt, GetLatestAt>,
30 upstream_status: CachedValue<PlcStatus, CheckUpstream>,
31}
32
33#[handler]
34fn hello(
35 Data(State {
36 sync_info,
37 upstream,
38 experimental: exp,
39 ..
40 }): Data<&State>,
41 req: &Request,
42) -> String {
43 // let mode = if sync_info.is_some() { "mirror" } else { "wrap" };
44 let pre_info = if sync_info.is_some() {
45 format!(
46 r#"
47This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
48synchronizes a local PLC reference server instance[2] (why?[3]).
49
50
51Configured upstream:
52
53 {upstream}
54
55"#
56 )
57 } else {
58 format!(
59 r#"
60This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse-
61proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy.
62
63
64Configured upstream (only used if experimental op forwarding is enabled):
65
66 {upstream}
67
68"#
69 )
70 };
71
72 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
73 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
74 (_, None, _) => {
75 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
76 }
77 (_, Some(d), Some(f)) if f == d => {
78 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
79 }
80 (_, Some(d), _) => format!(
81 r#" - POST /* Rejected, but experimental upstream op forwarding is
82 available at `POST https://{d}/:did`!"#
83 ),
84 };
85
86 format!(
87 r#"{}
88{pre_info}
89
90Available APIs:
91
92 - GET /_health Health and version info
93
94 - GET /* Proxies to wrapped server; see PLC API docs:
95 https://web.plc.directory/api/redoc
96
97 tip: try `GET /{{did}}` to resolve an identity
98
99{post_info}
100
101
102Allegedly is a suite of open-source CLI tools from for working with PLC logs,
103from microcosm:
104
105 https://tangled.org/@microcosm.blue/Allegedly
106
107 https://microcosm.blue
108
109
110[1] https://web.plc.directory
111[2] https://github.com/did-method-plc/did-method-plc
112[3] https://updates.microcosm.blue/3lz7nwvh4zc2u
113"#,
114 logo("mirror")
115 )
116}
117
118#[handler]
119fn favicon() -> impl IntoResponse {
120 include_bytes!("../favicon.ico").with_content_type("image/x-icon")
121}
122
123fn failed_to_reach_named(name: &str) -> String {
124 format!(
125 r#"{}
126
127Failed to reach the {name} server. Sorry.
128"#,
129 logo("mirror 502 :( ")
130 )
131}
132
133fn bad_create_op(reason: &str) -> Response {
134 Response::builder()
135 .status(StatusCode::BAD_REQUEST)
136 .body(format!(
137 r#"{}
138
139NooOOOooooo: {reason}
140"#,
141 logo("mirror 400 >:( ")
142 ))
143}
144
145type PlcStatus = (bool, serde_json::Value);
146
147async fn plc_status(url: &Url, client: &Client) -> PlcStatus {
148 use serde_json::json;
149
150 let mut url = url.clone();
151 url.set_path("/_health");
152
153 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else {
154 return (false, json!({"error": "cannot reach plc server"}));
155 };
156
157 let status = response.status();
158
159 let Ok(text) = response.text().await else {
160 return (false, json!({"error": "failed to read response body"}));
161 };
162
163 let body = match serde_json::from_str(&text) {
164 Ok(json) => json,
165 Err(_) => serde_json::Value::String(text.to_string()),
166 };
167
168 if status.is_success() {
169 (true, body)
170 } else {
171 (
172 false,
173 json!({
174 "error": "non-ok status",
175 "status": status.as_str(),
176 "status_code": status.as_u16(),
177 "response": body,
178 }),
179 )
180 }
181}
182
183#[derive(Clone)]
184struct GetLatestAt(Db);
185impl Fetcher<Dt> for GetLatestAt {
186 async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> {
187 let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!(
188 "expected to find at least one thing in the db"
189 ))?;
190 Ok(now)
191 }
192}
193
194#[derive(Clone)]
195struct CheckUpstream(Url, Client);
196impl Fetcher<PlcStatus> for CheckUpstream {
197 async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> {
198 Ok(plc_status(&self.0, &self.1).await)
199 }
200}
201
202#[handler]
203async fn health(
204 Data(State {
205 plc,
206 client,
207 sync_info,
208 ..
209 }): Data<&State>,
210) -> impl IntoResponse {
211 let mut overall_status = StatusCode::OK;
212 let (ok, wrapped_status) = plc_status(plc, client).await;
213 if !ok {
214 overall_status = StatusCode::BAD_GATEWAY;
215 }
216 if let Some(SyncInfo {
217 latest_at,
218 upstream_status,
219 }) = sync_info
220 {
221 // mirror mode
222 let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
223 if !ok {
224 overall_status = StatusCode::BAD_GATEWAY;
225 }
226 let latest = latest_at.get().await.ok();
227 (
228 overall_status,
229 Json(serde_json::json!({
230 "server": "allegedly (mirror)",
231 "version": env!("CARGO_PKG_VERSION"),
232 "wrapped_plc": wrapped_status,
233 "upstream_plc": upstream_status,
234 "latest_at": latest,
235 })),
236 )
237 } else {
238 // wrap mode
239 (
240 overall_status,
241 Json(serde_json::json!({
242 "server": "allegedly (mirror)",
243 "version": env!("CARGO_PKG_VERSION"),
244 "wrapped_plc": wrapped_status,
245 })),
246 )
247 }
248}
249
250fn proxy_response(res: reqwest::Response) -> Response {
251 let http_res: poem::http::Response<reqwest::Body> = res.into();
252 let (parts, reqw_body) = http_res.into_parts();
253
254 let parts = poem::ResponseParts {
255 status: parts.status,
256 version: parts.version,
257 headers: parts.headers,
258 extensions: parts.extensions,
259 };
260
261 let body = http_body_util::BodyDataStream::new(reqw_body)
262 .map_err(|e| std::io::Error::other(Box::new(e)));
263
264 Response::from_parts(parts, poem::Body::from_bytes_stream(body))
265}
266
267#[handler]
268async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
269 let mut target = state.plc.clone();
270 target.set_path(req.uri().path());
271 target.set_query(req.uri().query());
272 let wrapped_res = state
273 .client
274 .get(target)
275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server
276 .headers(req.headers().clone())
277 .send()
278 .await
279 .map_err(|e| {
280 log::error!("upstream req fail: {e}");
281 Error::from_string(
282 failed_to_reach_named("wrapped reference PLC"),
283 StatusCode::BAD_GATEWAY,
284 )
285 })?;
286
287 Ok(proxy_response(wrapped_res))
288}
289
290#[handler]
291async fn forward_create_op_upstream(
292 Data(State {
293 upstream,
294 client,
295 experimental,
296 ..
297 }): Data<&State>,
298 Path(did): Path<String>,
299 req: &Request,
300 body: Body,
301) -> Result<Response> {
302 if let Some(expected_domain) = &experimental.acme_domain {
303 let Some(found_host) = req.uri().host() else {
304 return Ok(bad_create_op(&format!(
305 "missing `Host` header, expected {expected_domain:?} for experimental requests."
306 )));
307 };
308 if found_host != expected_domain {
309 return Ok(bad_create_op(&format!(
310 "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}"
311 )));
312 }
313 }
314
315 // adjust proxied headers
316 let mut headers: reqwest::header::HeaderMap = req.headers().clone();
317 log::trace!("original request headers: {headers:?}");
318 headers.insert("Host", upstream.host_str().unwrap().parse().unwrap());
319 let client_ua = headers
320 .get(USER_AGENT)
321 .map(|h| h.to_str().unwrap())
322 .unwrap_or("unknown");
323 headers.insert(
324 USER_AGENT,
325 format!("{UA} (forwarding from {client_ua:?})")
326 .parse()
327 .unwrap(),
328 );
329 log::trace!("adjusted request headers: {headers:?}");
330
331 let mut target = upstream.clone();
332 target.set_path(&did);
333 let upstream_res = client
334 .post(target)
335 .timeout(Duration::from_secs(15)) // be a little generous
336 .headers(headers)
337 .body(reqwest::Body::wrap_stream(body.into_bytes_stream()))
338 .send()
339 .await
340 .map_err(|e| {
341 log::warn!("upstream write fail: {e}");
342 Error::from_string(
343 failed_to_reach_named("upstream PLC"),
344 StatusCode::BAD_GATEWAY,
345 )
346 })?;
347
348 Ok(proxy_response(upstream_res))
349}
350
351#[handler]
352async fn nope(Data(State { upstream, .. }): Data<&State>) -> (StatusCode, String) {
353 (
354 StatusCode::BAD_REQUEST,
355 format!(
356 r#"{}
357
358Sorry, this server does not accept POST requests.
359
360You may wish to try sending that to our upstream: {upstream}.
361
362If you operate this server, try running with `--experimental-write-upstream`.
363"#,
364 logo("mirror (nope)")
365 ),
366 )
367}
368
369#[derive(Debug)]
370pub enum ListenConf {
371 Acme {
372 domains: Vec<String>,
373 cache_path: PathBuf,
374 directory_url: String,
375 ipv6: bool,
376 },
377 Bind(SocketAddr),
378}
379
380#[derive(Debug, Clone)]
381pub struct ExperimentalConf {
382 pub acme_domain: Option<String>,
383 pub write_upstream: bool,
384}
385
386pub async fn serve(
387 upstream: Url,
388 plc: Url,
389 listen: ListenConf,
390 experimental: ExperimentalConf,
391 db: Option<Db>,
392) -> anyhow::Result<&'static str> {
393 log::info!("starting server...");
394
395 // not using crate CLIENT: don't want the retries etc
396 let client = Client::builder()
397 .user_agent(UA)
398 .timeout(Duration::from_secs(10)) // fallback
399 .build()
400 .expect("reqwest client to build");
401
402 // when `db` is None, we're running in wrap mode. no db access, no upstream sync
403 let sync_info = db.map(|db| SyncInfo {
404 latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)),
405 upstream_status: CachedValue::new(
406 CheckUpstream(upstream.clone(), client.clone()),
407 Duration::from_secs(6),
408 ),
409 });
410
411 let state = State {
412 client,
413 plc,
414 upstream: upstream.clone(),
415 sync_info,
416 experimental: experimental.clone(),
417 };
418
419 let mut app = Route::new()
420 .at("/", get(hello))
421 .at("/favicon.ico", get(favicon))
422 .at("/_health", get(health))
423 .at("/export", get(proxy));
424
425 if experimental.write_upstream {
426 log::info!("enabling experimental write forwarding to upstream");
427
428 let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap()));
429 let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap()));
430
431 let upstream_proxier = forward_create_op_upstream
432 .with(GovernorMiddleware::new(did_limiter))
433 .with(GovernorMiddleware::new(ip_limiter));
434
435 app = app.at("/did:plc:*", get(proxy).post(upstream_proxier));
436 } else {
437 app = app.at("/did:plc:*", get(proxy).post(nope));
438 }
439
440 let app = app
441 .with(AddData::new(state))
442 .with(Cors::new().allow_credentials(false))
443 .with(Compression::new())
444 .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute(
445 3000.try_into().expect("ratelimit middleware to build"),
446 ))))
447 .with(CatchPanic::new())
448 .with(Tracing);
449
450 match listen {
451 ListenConf::Acme {
452 domains,
453 cache_path,
454 directory_url,
455 ipv6,
456 } => {
457 rustls::crypto::aws_lc_rs::default_provider()
458 .install_default()
459 .expect("crypto provider to be installable");
460
461 let mut auto_cert = AutoCert::builder()
462 .directory_url(directory_url)
463 .cache_path(cache_path);
464 for domain in domains {
465 auto_cert = auto_cert.domain(domain);
466 }
467 let auto_cert = auto_cert.build().expect("acme config to build");
468
469 log::trace!("auto_cert: {auto_cert:?}");
470
471 let notice_task = tokio::task::spawn(run_insecure_notice(ipv6));
472 let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" });
473 let app_res = run(app, listener.acme(auto_cert)).await;
474 log::warn!("server task ended, aborting insecure server task...");
475 notice_task.abort();
476 app_res?;
477 notice_task.await??;
478 }
479 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?,
480 }
481
482 Ok("server (uh oh?)")
483}
484
485async fn run<A, L>(app: A, listener: L) -> std::io::Result<()>
486where
487 A: Endpoint + 'static,
488 L: Listener + 'static,
489{
490 Server::new(listener)
491 .name("allegedly (mirror)")
492 .run(app)
493 .await
494}
495
496/// kick off a tiny little server on a tokio task to tell people to use 443
497async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> {
498 #[handler]
499 fn oop_plz_be_secure() -> (StatusCode, String) {
500 (
501 StatusCode::BAD_REQUEST,
502 format!(
503 r#"{}
504
505You probably want to change your request to use HTTPS instead of HTTP.
506"#,
507 logo("mirror (tls on 443 please)")
508 ),
509 )
510 }
511
512 let app = Route::new()
513 .at("/favicon.ico", get(favicon))
514 .nest("/", get(oop_plz_be_secure))
515 .with(Tracing);
516 Server::new(TcpListener::bind(if ipv6 {
517 "[::]:80"
518 } else {
519 "0.0.0.0:80"
520 }))
521 .name("allegedly (mirror:80 helper)")
522 .run(app)
523 .await
524}