tangled
alpha
login
or
join now
ptr.pet
/
Allegedly
forked from
microcosm.blue/Allegedly
0
fork
atom
Server tools to backfill, tail, mirror, and verify PLC logs
0
fork
atom
overview
issues
pulls
pipelines
unwrap => expect
bad-example.com
5 months ago
fa5479bc
14110337
+90
-45
6 changed files
expand all
collapse all
unified
split
src
bin
allegedly.rs
client.rs
mirror.rs
plc_pg.rs
poll.rs
ratelimit.rs
+61
-27
src/bin/allegedly.rs
···
36
/// Bulk load into did-method-plc-compatible postgres instead of stdout
37
///
38
/// Pass a postgres connection url like "postgresql://localhost:5432"
39
-
#[arg(long)]
40
to_postgres: Option<Url>,
41
/// Cert for postgres (if needed)
0
42
postgres_cert: Option<PathBuf>,
43
/// Delete all operations from the postgres db before starting
44
///
···
82
#[arg(long, env = "ALLEGEDLY_WRAP_PG")]
83
wrap_pg: Url,
84
/// path to tls cert for the wrapped postgres db, if needed
0
85
wrap_pg_cert: Option<PathBuf>,
86
/// wrapping server listen address
87
#[arg(short, long, env = "ALLEGEDLY_BIND")]
···
150
while let Some(page) = rx.recv().await
151
&& page.ops.len() > 900
152
{
153
-
tx.send(page).await.unwrap();
154
}
155
});
156
fwd
···
181
log::info!("Reading weekly bundles from local folder {dir:?}");
182
backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
183
.await
184
-
.unwrap();
185
} else {
186
log::info!("Fetching weekly bundles from from {http}");
187
backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
188
.await
189
-
.unwrap();
190
}
191
});
192
···
203
let pg_cert = postgres_cert.clone();
204
let bulk_out_write = tokio::task::spawn(async move {
205
if let Some(ref url) = to_postgres_url_bulk {
206
-
let db = Db::new(url.as_str(), pg_cert).await.unwrap();
0
0
207
backfill_to_pg(db, postgres_reset, rx, notify_last_at)
208
.await
209
-
.unwrap();
210
} else {
211
-
pages_to_stdout(rx, notify_last_at).await.unwrap();
0
0
212
}
213
});
214
···
216
let mut upstream = args.upstream;
217
upstream.set_path("/export");
218
// wait until the time for `after` is known
219
-
let last_at = rx_last.await.unwrap();
220
log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
221
let (tx, rx) = mpsc::channel(256); // these are small pages
222
-
tokio::task::spawn(
223
-
async move { poll_upstream(last_at, upstream, tx).await.unwrap() },
224
-
);
225
-
bulk_out_write.await.unwrap();
0
0
226
log::info!("writing catch-up pages");
227
let full_pages = full_pages(rx);
228
if let Some(url) = to_postgres {
229
-
let db = Db::new(url.as_str(), postgres_cert).await.unwrap();
230
-
pages_to_pg(db, full_pages).await.unwrap();
0
0
0
0
231
} else {
232
-
pages_to_stdout(full_pages, None).await.unwrap();
0
0
233
}
234
}
235
}
···
241
let mut url = args.upstream;
242
url.set_path("/export");
243
let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
244
-
tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() });
0
0
0
0
245
log::trace!("ensuring output directory exists");
246
-
std::fs::create_dir_all(&dest).unwrap();
247
-
pages_to_weeks(rx, dest, clobber).await.unwrap();
0
0
248
}
249
Commands::Mirror {
250
wrap,
···
255
acme_cache_path,
256
acme_directory_url,
257
} => {
258
-
let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await.unwrap();
0
0
259
let latest = db
260
.get_latest()
261
.await
262
-
.unwrap()
263
.expect("there to be at least one op in the db. did you backfill?");
264
265
let (tx, rx) = mpsc::channel(2);
···
268
tokio::task::spawn(async move {
269
log::info!("starting poll reader...");
270
url.set_path("/export");
271
-
tokio::task::spawn(
272
-
async move { poll_upstream(Some(latest), url, tx).await.unwrap() },
273
-
);
0
0
274
});
275
// db writer
276
let poll_db = db.clone();
277
tokio::task::spawn(async move {
278
log::info!("starting db writer...");
279
-
pages_to_pg(poll_db, rx).await.unwrap();
0
0
280
});
281
282
let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
···
289
(_, _, _) => unreachable!(),
290
};
291
292
-
serve(&args.upstream, wrap, listen_conf).await.unwrap();
0
0
293
}
294
Commands::Tail { after } => {
295
let mut url = args.upstream;
296
url.set_path("/export");
297
let start_at = after.or_else(|| Some(chrono::Utc::now()));
298
let (tx, rx) = mpsc::channel(1);
299
-
tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
300
-
pages_to_stdout(rx, None).await.unwrap();
0
0
0
0
0
0
301
}
302
}
303
log::info!("whew, {:?}. goodbye!", t0.elapsed());
···
36
/// Bulk load into did-method-plc-compatible postgres instead of stdout
37
///
38
/// Pass a postgres connection url like "postgresql://localhost:5432"
39
+
#[arg(long, env = "ALLEGEDLY_TO_POSTGRES")]
40
to_postgres: Option<Url>,
41
/// Cert for postgres (if needed)
42
+
#[arg(long)]
43
postgres_cert: Option<PathBuf>,
44
/// Delete all operations from the postgres db before starting
45
///
···
83
#[arg(long, env = "ALLEGEDLY_WRAP_PG")]
84
wrap_pg: Url,
85
/// path to tls cert for the wrapped postgres db, if needed
86
+
#[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")]
87
wrap_pg_cert: Option<PathBuf>,
88
/// wrapping server listen address
89
#[arg(short, long, env = "ALLEGEDLY_BIND")]
···
152
while let Some(page) = rx.recv().await
153
&& page.ops.len() > 900
154
{
155
+
tx.send(page).await.expect("to be able to forward a page");
156
}
157
});
158
fwd
···
183
log::info!("Reading weekly bundles from local folder {dir:?}");
184
backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until)
185
.await
186
+
.expect("to source bundles from a folder");
187
} else {
188
log::info!("Fetching weekly bundles from from {http}");
189
backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until)
190
.await
191
+
.expect("to source bundles from http");
192
}
193
});
194
···
205
let pg_cert = postgres_cert.clone();
206
let bulk_out_write = tokio::task::spawn(async move {
207
if let Some(ref url) = to_postgres_url_bulk {
208
+
let db = Db::new(url.as_str(), pg_cert)
209
+
.await
210
+
.expect("to get db for bulk out write");
211
backfill_to_pg(db, postgres_reset, rx, notify_last_at)
212
.await
213
+
.expect("to backfill to pg");
214
} else {
215
+
pages_to_stdout(rx, notify_last_at)
216
+
.await
217
+
.expect("to backfill to stdout");
218
}
219
});
220
···
222
let mut upstream = args.upstream;
223
upstream.set_path("/export");
224
// wait until the time for `after` is known
225
+
let last_at = rx_last.await.expect("to get the last log's createdAt");
226
log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff");
227
let (tx, rx) = mpsc::channel(256); // these are small pages
228
+
tokio::task::spawn(async move {
229
+
poll_upstream(last_at, upstream, tx)
230
+
.await
231
+
.expect("polling upstream to work")
232
+
});
233
+
bulk_out_write.await.expect("to wait for bulk_out_write");
234
log::info!("writing catch-up pages");
235
let full_pages = full_pages(rx);
236
if let Some(url) = to_postgres {
237
+
let db = Db::new(url.as_str(), postgres_cert)
238
+
.await
239
+
.expect("to connect pg for catchup");
240
+
pages_to_pg(db, full_pages)
241
+
.await
242
+
.expect("to write catch-up pages to pg");
243
} else {
244
+
pages_to_stdout(full_pages, None)
245
+
.await
246
+
.expect("to write catch-up pages to stdout");
247
}
248
}
249
}
···
255
let mut url = args.upstream;
256
url.set_path("/export");
257
let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
258
+
tokio::task::spawn(async move {
259
+
poll_upstream(Some(after), url, tx)
260
+
.await
261
+
.expect("to poll upstream")
262
+
});
263
log::trace!("ensuring output directory exists");
264
+
std::fs::create_dir_all(&dest).expect("to ensure output dir exists");
265
+
pages_to_weeks(rx, dest, clobber)
266
+
.await
267
+
.expect("to write bundles to output files");
268
}
269
Commands::Mirror {
270
wrap,
···
275
acme_cache_path,
276
acme_directory_url,
277
} => {
278
+
let db = Db::new(wrap_pg.as_str(), wrap_pg_cert)
279
+
.await
280
+
.expect("to connect to pg for mirroring");
281
let latest = db
282
.get_latest()
283
.await
284
+
.expect("to query for last createdAt")
285
.expect("there to be at least one op in the db. did you backfill?");
286
287
let (tx, rx) = mpsc::channel(2);
···
290
tokio::task::spawn(async move {
291
log::info!("starting poll reader...");
292
url.set_path("/export");
293
+
tokio::task::spawn(async move {
294
+
poll_upstream(Some(latest), url, tx)
295
+
.await
296
+
.expect("to poll upstream for mirror sync")
297
+
});
298
});
299
// db writer
300
let poll_db = db.clone();
301
tokio::task::spawn(async move {
302
log::info!("starting db writer...");
303
+
pages_to_pg(poll_db, rx)
304
+
.await
305
+
.expect("to write to pg for mirror");
306
});
307
308
let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) {
···
315
(_, _, _) => unreachable!(),
316
};
317
318
+
serve(&args.upstream, wrap, listen_conf)
319
+
.await
320
+
.expect("to be able to serve the mirror proxy app");
321
}
322
Commands::Tail { after } => {
323
let mut url = args.upstream;
324
url.set_path("/export");
325
let start_at = after.or_else(|| Some(chrono::Utc::now()));
326
let (tx, rx) = mpsc::channel(1);
327
+
tokio::task::spawn(async move {
328
+
poll_upstream(start_at, url, tx)
329
+
.await
330
+
.expect("to poll upstream")
331
+
});
332
+
pages_to_stdout(rx, None)
333
+
.await
334
+
.expect("to write pages to stdout");
335
}
336
}
337
log::info!("whew, {:?}. goodbye!", t0.elapsed());
+4
-1
src/client.rs
···
10
);
11
12
pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| {
13
-
let inner = Client::builder().user_agent(UA).build().unwrap();
0
0
0
14
15
let policy = ExponentialBackoff::builder().build_with_max_retries(12);
16
···
10
);
11
12
pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| {
13
+
let inner = Client::builder()
14
+
.user_agent(UA)
15
+
.build()
16
+
.expect("reqwest client to build");
17
18
let policy = ExponentialBackoff::builder().build_with_max_retries(12);
19
+2
-2
src/mirror.rs
···
192
.user_agent(UA)
193
.timeout(Duration::from_secs(10)) // fallback
194
.build()
195
-
.unwrap();
196
197
let state = State {
198
client,
···
208
.with(Cors::new().allow_credentials(false))
209
.with(Compression::new())
210
.with(GovernorMiddleware::new(Quota::per_minute(
211
-
3000.try_into().unwrap(),
212
)))
213
.with(CatchPanic::new())
214
.with(Tracing);
···
192
.user_agent(UA)
193
.timeout(Duration::from_secs(10)) // fallback
194
.build()
195
+
.expect("reqwest client to build");
196
197
let state = State {
198
client,
···
208
.with(Cors::new().allow_credentials(false))
209
.with(Compression::new())
210
.with(GovernorMiddleware::new(Quota::per_minute(
211
+
3000.try_into().expect("ratelimit middleware to build"),
212
)))
213
.with(CatchPanic::new())
214
.with(Tracing);
+7
-7
src/plc_pg.rs
···
13
};
14
15
fn get_tls(cert: PathBuf) -> MakeTlsConnector {
16
-
let cert = std::fs::read(cert).unwrap();
17
-
let cert = Certificate::from_pem(&cert).unwrap();
18
let connector = TlsConnector::builder()
19
.add_root_certificate(cert)
20
.build()
21
-
.unwrap();
22
MakeTlsConnector::new(connector)
23
}
24
···
46
connection
47
.await
48
.inspect_err(|e| log::error!("connection ended with error: {e}"))
49
-
.unwrap();
50
});
51
(client, task)
52
} else {
···
55
connection
56
.await
57
.inspect_err(|e| log::error!("connection ended with error: {e}"))
58
-
.unwrap();
59
});
60
(client, task)
61
};
···
97
connection
98
.await
99
.inspect_err(|e| log::error!("connection ended with error: {e}"))
100
-
.unwrap();
101
});
102
client
103
} else {
···
109
connection
110
.await
111
.inspect_err(|e| log::error!("connection ended with error: {e}"))
112
-
.unwrap();
113
});
114
client
115
};
···
13
};
14
15
fn get_tls(cert: PathBuf) -> MakeTlsConnector {
16
+
let cert = std::fs::read(cert).expect("to read cert file");
17
+
let cert = Certificate::from_pem(&cert).expect("to build cert");
18
let connector = TlsConnector::builder()
19
.add_root_certificate(cert)
20
.build()
21
+
.expect("to build tls connector");
22
MakeTlsConnector::new(connector)
23
}
24
···
46
connection
47
.await
48
.inspect_err(|e| log::error!("connection ended with error: {e}"))
49
+
.expect("pg validation connection not to blow up");
50
});
51
(client, task)
52
} else {
···
55
connection
56
.await
57
.inspect_err(|e| log::error!("connection ended with error: {e}"))
58
+
.expect("pg validation connection not to blow up");
59
});
60
(client, task)
61
};
···
97
connection
98
.await
99
.inspect_err(|e| log::error!("connection ended with error: {e}"))
100
+
.expect("pg connection not to blow up");
101
});
102
client
103
} else {
···
109
connection
110
.await
111
.inspect_err(|e| log::error!("connection ended with error: {e}"))
112
+
.expect("pg connection not to blow up");
113
});
114
client
115
};
+1
-1
src/poll.rs
···
276
let page = ExportPage {
277
ops: vec![valid_op().to_string()],
278
};
279
-
PageBoundaryState::new(&page).unwrap()
280
}
281
282
#[test]
···
276
let page = ExportPage {
277
ops: vec![valid_op().to_string()],
278
};
279
+
PageBoundaryState::new(&page).expect("to have a base page boundary state")
280
}
281
282
#[test]
+15
-7
src/ratelimit.rs
···
24
let period = quota.replenish_interval() / factor;
25
let burst = quota
26
.burst_size()
27
-
.checked_mul(factor.try_into().unwrap())
28
-
.unwrap();
29
Quota::with_period(period).map(|q| q.allow_burst(burst))
30
}
31
···
40
pub fn new(quota: Quota) -> Self {
41
Self {
42
per_ip: RateLimiter::keyed(quota),
43
-
ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()),
44
-
ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()),
45
}
46
}
47
pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
···
56
.map_err(asdf);
57
let check_56 = self
58
.ip6_56
59
-
.check_key(a.octets()[..7].try_into().unwrap())
0
0
0
0
60
.map_err(asdf);
61
let check_48 = self
62
.ip6_48
63
-
.check_key(a.octets()[..6].try_into().unwrap())
0
0
0
0
64
.map_err(asdf);
65
check_ip.and(check_56).and(check_48)
66
}
···
135
let remote = req
136
.remote_addr()
137
.as_socket_addr()
138
-
.unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO
139
.ip();
140
141
log::trace!("remote: {remote}");
···
24
let period = quota.replenish_interval() / factor;
25
let burst = quota
26
.burst_size()
27
+
.checked_mul(factor.try_into().expect("factor to be non-zero"))
28
+
.expect("burst to be able to multiply");
29
Quota::with_period(period).map(|q| q.allow_burst(burst))
30
}
31
···
40
pub fn new(quota: Quota) -> Self {
41
Self {
42
per_ip: RateLimiter::keyed(quota),
43
+
ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")),
44
+
ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
45
}
46
}
47
pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
···
56
.map_err(asdf);
57
let check_56 = self
58
.ip6_56
59
+
.check_key(
60
+
a.octets()[..7]
61
+
.try_into()
62
+
.expect("to check ip6 /56 limiter"),
63
+
)
64
.map_err(asdf);
65
let check_48 = self
66
.ip6_48
67
+
.check_key(
68
+
a.octets()[..6]
69
+
.try_into()
70
+
.expect("to check ip6 /48 limiter"),
71
+
)
72
.map_err(asdf);
73
check_ip.and(check_56).and(check_48)
74
}
···
143
let remote = req
144
.remote_addr()
145
.as_socket_addr()
146
+
.expect("failed to get request's remote addr") // TODO
147
.ip();
148
149
log::trace!("remote: {remote}");