Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{GovernorMiddleware, UA, logo};
2use futures::TryStreamExt;
3use governor::Quota;
4use poem::{
5 Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get,
6 handler,
7 http::StatusCode,
8 listener::{Listener, TcpListener, acme::AutoCert},
9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
10 web::{Data, Json},
11};
12use reqwest::{Client, Url};
13use std::{net::SocketAddr, path::PathBuf, time::Duration};
14
15#[derive(Debug, Clone)]
16struct State {
17 client: Client,
18 plc: Url,
19 upstream: Url,
20}
21
22#[handler]
23fn hello(Data(State { upstream, .. }): Data<&State>) -> String {
24 format!(
25 r#"{}
26
27This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
28synchronizes a local PLC reference server instance[2] (why?[3]).
29
30
31Configured upstream:
32
33 {upstream}
34
35
36Available APIs:
37
38 - GET /_health Health and version info
39
40 - GET /* Proxies to wrapped server; see PLC API docs:
41 https://web.plc.directory/api/redoc
42
43 - POST /* Always rejected. This is a mirror.
44
45
46 tip: try `GET /{{did}}` to resolve an identity
47
48
49Allegedly is a suit of open-source CLI tools for working with PLC logs:
50
51 https://tangled.org/@microcosm.blue/Allegedly
52
53
54[1] https://web.plc.directory
55[2] https://github.com/did-method-plc/did-method-plc
56[3] https://updates.microcosm.blue/3lz7nwvh4zc2u
57"#,
58 logo("mirror")
59 )
60}
61
62#[handler]
63fn favicon() -> impl IntoResponse {
64 include_bytes!("../favicon.ico").with_content_type("image/x-icon")
65}
66
67fn failed_to_reach_wrapped() -> String {
68 format!(
69 r#"{}
70
71Failed to reach the wrapped reference PLC server. Sorry.
72"#,
73 logo("mirror 502 :( ")
74 )
75}
76
77async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) {
78 use serde_json::json;
79
80 let mut url = url.clone();
81 url.set_path("/_health");
82
83 let Ok(response) = client.get(url).timeout(Duration::from_secs(3)).send().await else {
84 return (false, json!({"error": "cannot reach plc server"}));
85 };
86
87 let status = response.status();
88
89 let Ok(text) = response.text().await else {
90 return (false, json!({"error": "failed to read response body"}));
91 };
92
93 let body = match serde_json::from_str(&text) {
94 Ok(json) => json,
95 Err(_) => serde_json::Value::String(text.to_string()),
96 };
97
98 if status.is_success() {
99 (true, body)
100 } else {
101 (
102 false,
103 json!({
104 "error": "non-ok status",
105 "status": status.as_str(),
106 "status_code": status.as_u16(),
107 "response": body,
108 }),
109 )
110 }
111}
112
113#[handler]
114async fn health(
115 Data(State {
116 plc,
117 client,
118 upstream,
119 }): Data<&State>,
120) -> impl IntoResponse {
121 let mut overall_status = StatusCode::OK;
122 let (ok, wrapped_status) = plc_status(plc, client).await;
123 if !ok {
124 overall_status = StatusCode::BAD_GATEWAY;
125 }
126 let (ok, upstream_status) = plc_status(upstream, client).await;
127 if !ok {
128 overall_status = StatusCode::BAD_GATEWAY;
129 }
130 (
131 overall_status,
132 Json(serde_json::json!({
133 "server": "allegedly (mirror)",
134 "version": env!("CARGO_PKG_VERSION"),
135 "wrapped_plc": wrapped_status,
136 "upstream_plc": upstream_status,
137 })),
138 )
139}
140
141#[handler]
142async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> {
143 let mut target = state.plc.clone();
144 target.set_path(req.uri().path());
145 let upstream_res = state
146 .client
147 .get(target)
148 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server
149 .headers(req.headers().clone())
150 .send()
151 .await
152 .map_err(|e| {
153 log::error!("upstream req fail: {e}");
154 Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY)
155 })?;
156
157 let http_res: poem::http::Response<reqwest::Body> = upstream_res.into();
158 let (parts, reqw_body) = http_res.into_parts();
159
160 let parts = poem::ResponseParts {
161 status: parts.status,
162 version: parts.version,
163 headers: parts.headers,
164 extensions: parts.extensions,
165 };
166
167 let body = http_body_util::BodyDataStream::new(reqw_body)
168 .map_err(|e| std::io::Error::other(Box::new(e)));
169
170 Ok(Response::from_parts(
171 parts,
172 poem::Body::from_bytes_stream(body),
173 ))
174}
175
176#[handler]
177async fn nope(Data(State { upstream, .. }): Data<&State>) -> (StatusCode, String) {
178 (
179 StatusCode::BAD_REQUEST,
180 format!(
181 r#"{}
182
183Sorry, this server does not accept POST requests.
184
185You may wish to try upstream: {upstream}
186"#,
187 logo("mirror (nope)")
188 ),
189 )
190}
191
192#[derive(Debug)]
193pub enum ListenConf {
194 Acme {
195 domains: Vec<String>,
196 cache_path: PathBuf,
197 directory_url: String,
198 },
199 Bind(SocketAddr),
200}
201
202pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> {
203 log::info!("starting server...");
204
205 // not using crate CLIENT: don't want the retries etc
206 let client = Client::builder()
207 .user_agent(UA)
208 .timeout(Duration::from_secs(10)) // fallback
209 .build()
210 .expect("reqwest client to build");
211
212 let state = State {
213 client,
214 plc,
215 upstream: upstream.clone(),
216 };
217
218 let app = Route::new()
219 .at("/", get(hello))
220 .at("/favicon.ico", get(favicon))
221 .at("/_health", get(health))
222 .at("/:any", get(proxy).post(nope))
223 .with(AddData::new(state))
224 .with(Cors::new().allow_credentials(false))
225 .with(Compression::new())
226 .with(GovernorMiddleware::new(Quota::per_minute(
227 3000.try_into().expect("ratelimit middleware to build"),
228 )))
229 .with(CatchPanic::new())
230 .with(Tracing);
231
232 match listen {
233 ListenConf::Acme {
234 domains,
235 cache_path,
236 directory_url,
237 } => {
238 rustls::crypto::aws_lc_rs::default_provider()
239 .install_default()
240 .expect("crypto provider to be installable");
241
242 let mut auto_cert = AutoCert::builder()
243 .directory_url(directory_url)
244 .cache_path(cache_path);
245 for domain in domains {
246 auto_cert = auto_cert.domain(domain);
247 }
248 let auto_cert = auto_cert.build().expect("acme config to build");
249
250 let notice_task = tokio::task::spawn(run_insecure_notice());
251 let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await;
252 log::warn!("server task ended, aborting insecure server task...");
253 notice_task.abort();
254 app_res?;
255 notice_task.await??;
256 }
257 ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?,
258 }
259
260 Ok("server (uh oh?)")
261}
262
263async fn run<A, L>(app: A, listener: L) -> std::io::Result<()>
264where
265 A: Endpoint + 'static,
266 L: Listener + 'static,
267{
268 Server::new(listener)
269 .name("allegedly (mirror)")
270 .run(app)
271 .await
272}
273
274/// kick off a tiny little server on a tokio task to tell people to use 443
275async fn run_insecure_notice() -> Result<(), std::io::Error> {
276 #[handler]
277 fn oop_plz_be_secure() -> (StatusCode, String) {
278 (
279 StatusCode::BAD_REQUEST,
280 format!(
281 r#"{}
282
283You probably want to change your request to use HTTPS instead of HTTP.
284"#,
285 logo("mirror (tls on 443 please)")
286 ),
287 )
288 }
289
290 let app = Route::new()
291 .at("/", get(oop_plz_be_secure))
292 .at("/favicon.ico", get(favicon))
293 .with(Tracing);
294 Server::new(TcpListener::bind("0.0.0.0:80"))
295 .name("allegedly (mirror:80 helper)")
296 .run(app)
297 .await
298}