tangled
alpha
login
or
join now
microcosm.blue
/
Allegedly
52
fork
atom
Server tools to backfill, tail, mirror, and verify PLC logs
52
fork
atom
overview
issues
4
pulls
1
pipelines
cache upstream status too
and add some log traces
bad-example.com
5 months ago
94b80a2d
6b2fc8cd
+49
-17
2 changed files
expand all
collapse all
unified
split
src
cached_value.rs
mirror.rs
+16
-1
src/cached_value.rs
···
16
16
impl<T: Clone> ExpiringValue<T> {
17
17
fn get(&self, now: Instant) -> Option<T> {
18
18
if now <= self.expires {
19
19
+
log::trace!("returning val (fresh for {:?})", self.expires - now);
19
20
Some(self.value.clone())
20
21
} else {
22
22
+
log::trace!("hiding expired val");
21
23
None
22
24
}
23
25
}
···
48
50
if let Some(v) = val.as_ref().and_then(|v| v.get(now)) {
49
51
return Ok(v);
50
52
}
51
51
-
let new = self.fetcher.fetch().await?;
53
53
+
log::debug!(
54
54
+
"value {}, fetching...",
55
55
+
if val.is_some() {
56
56
+
"expired"
57
57
+
} else {
58
58
+
"not present"
59
59
+
}
60
60
+
);
61
61
+
let new = self
62
62
+
.fetcher
63
63
+
.fetch()
64
64
+
.await
65
65
+
.inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?;
66
66
+
log::debug!("fetched ok, saving a copy for cache.");
52
67
*val = Some(ExpiringValue {
53
68
value: new.clone(),
54
69
expires: now + self.validitiy,
+33
-16
src/mirror.rs
···
17
17
client: Client,
18
18
plc: Url,
19
19
upstream: Url,
20
20
-
latest_at: CachedValue<Dt, LatestAt>,
21
21
-
}
22
22
-
23
23
-
#[derive(Clone)]
24
24
-
struct LatestAt(Db);
25
25
-
impl Fetcher<Dt> for LatestAt {
26
26
-
async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> {
27
27
-
let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!(
28
28
-
"expected to find at least one thing in the db"
29
29
-
))?;
30
30
-
Ok(now)
31
31
-
}
20
20
+
latest_at: CachedValue<Dt, GetLatestAt>,
21
21
+
upstream_status: CachedValue<PlcStatus, CheckUpstream>,
32
22
}
33
23
34
24
#[handler]
···
89
79
)
90
80
}
91
81
92
92
-
async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) {
82
82
+
type PlcStatus = (bool, serde_json::Value);
83
83
+
84
84
+
async fn plc_status(url: &Url, client: &Client) -> PlcStatus {
93
85
use serde_json::json;
94
86
95
87
let mut url = url.clone();
···
125
117
}
126
118
}
127
119
120
120
+
#[derive(Clone)]
121
121
+
struct GetLatestAt(Db);
122
122
+
impl Fetcher<Dt> for GetLatestAt {
123
123
+
async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> {
124
124
+
let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!(
125
125
+
"expected to find at least one thing in the db"
126
126
+
))?;
127
127
+
Ok(now)
128
128
+
}
129
129
+
}
130
130
+
131
131
+
#[derive(Clone)]
132
132
+
struct CheckUpstream(Url, Client);
133
133
+
impl Fetcher<PlcStatus> for CheckUpstream {
134
134
+
async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> {
135
135
+
Ok(plc_status(&self.0, &self.1).await)
136
136
+
}
137
137
+
}
138
138
+
128
139
#[handler]
129
140
async fn health(
130
141
Data(State {
131
142
plc,
132
143
client,
133
133
-
upstream,
134
144
latest_at,
145
145
+
upstream_status,
146
146
+
..
135
147
}): Data<&State>,
136
148
) -> impl IntoResponse {
137
149
let mut overall_status = StatusCode::OK;
···
139
151
if !ok {
140
152
overall_status = StatusCode::BAD_GATEWAY;
141
153
}
142
142
-
let (ok, upstream_status) = plc_status(upstream, client).await;
154
154
+
let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
143
155
if !ok {
144
156
overall_status = StatusCode::BAD_GATEWAY;
145
157
}
···
233
245
.build()
234
246
.expect("reqwest client to build");
235
247
236
236
-
let latest_at = CachedValue::new(LatestAt(db), Duration::from_secs(1));
248
248
+
let latest_at = CachedValue::new(GetLatestAt(db), Duration::from_secs(2));
249
249
+
let upstream_status = CachedValue::new(
250
250
+
CheckUpstream(upstream.clone(), client.clone()),
251
251
+
Duration::from_secs(6),
252
252
+
);
237
253
238
254
let state = State {
239
255
client,
240
256
plc,
241
257
upstream: upstream.clone(),
242
258
latest_at,
259
259
+
upstream_status,
243
260
};
244
261
245
262
let app = Route::new()