forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use serde::{Deserialize, Serialize};
2use tokio::sync::{mpsc, oneshot};
3
4mod backfill;
5mod cached_value;
6mod client;
7pub mod datastore;
8mod mirror;
9mod plc_pg;
10mod plc_rocksdb;
11mod poll;
12mod ratelimit;
13mod weekly;
14
15pub mod bin;
16
17pub use backfill::backfill;
18pub use cached_value::{CachedValue, Fetcher};
19pub use client::{CLIENT, UA};
20pub use datastore::DatastoreEnum;
21pub use mirror::{ExperimentalConf, ListenConf, serve};
22pub use plc_pg::Db;
23pub use plc_rocksdb::{RocksDatastore, backfill_to_rocksdb};
24pub use poll::{PageBoundaryState, get_page, poll_upstream};
25pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
26pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
27
28pub type Dt = chrono::DateTime<chrono::Utc>;
29
30/// One page of PLC export
31///
32/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
33#[derive(Debug)]
34pub struct ExportPage {
35 pub ops: Vec<Op>,
36}
37
38impl ExportPage {
39 pub fn is_empty(&self) -> bool {
40 self.ops.is_empty()
41 }
42}
43
44/// A fully-deserialized plc operation
45///
46/// including the plc's wrapping with timestmap and nullified state
47#[derive(Debug, Clone, Deserialize, Serialize)]
48#[serde(rename_all = "camelCase")]
49pub struct Op {
50 pub did: String,
51 pub cid: String,
52 pub created_at: Dt,
53 pub nullified: bool,
54 pub operation: Box<serde_json::value::RawValue>,
55}
56
57#[cfg(test)]
58impl PartialEq for Op {
59 fn eq(&self, other: &Self) -> bool {
60 self.did == other.did
61 && self.cid == other.cid
62 && self.created_at == other.created_at
63 && self.nullified == other.nullified
64 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
65 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
66 }
67}
68
69/// Database primary key for an op
70#[derive(Debug, PartialEq)]
71pub struct OpKey {
72 pub did: String,
73 pub cid: String,
74}
75
76impl From<&Op> for OpKey {
77 fn from(Op { did, cid, .. }: &Op) -> Self {
78 Self {
79 did: did.to_string(),
80 cid: cid.to_string(),
81 }
82 }
83}
84
85/// page forwarder who drops its channels on receipt of a small page
86///
87/// PLC will return up to 1000 ops on a page, and returns full pages until it
88/// has caught up, so this is a (hacky?) way to stop polling once we're up.
89pub async fn full_pages(
90 mut rx: mpsc::Receiver<ExportPage>,
91 tx: mpsc::Sender<ExportPage>,
92) -> anyhow::Result<&'static str> {
93 while let Some(page) = rx.recv().await {
94 let n = page.ops.len();
95 if n < 900 {
96 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
97 let Some(age) = last_age else {
98 log::info!("full_pages done, empty final page");
99 return Ok("full pages (hmm)");
100 };
101 if age <= chrono::TimeDelta::hours(6) {
102 log::info!("full_pages done, final page of {n} ops");
103 } else {
104 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
105 }
106 return Ok("full pages (cool)");
107 }
108 log::trace!("full_pages: continuing with page of {n} ops");
109 tx.send(page).await?;
110 }
111 Err(anyhow::anyhow!(
112 "full_pages ran out of source material, sender closed"
113 ))
114}
115
116pub async fn pages_to_stdout(
117 mut rx: mpsc::Receiver<ExportPage>,
118 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
119) -> anyhow::Result<&'static str> {
120 let mut last_at = None;
121 while let Some(page) = rx.recv().await {
122 for op in &page.ops {
123 println!("{}", serde_json::to_string(op)?);
124 }
125 if notify_last_at.is_some()
126 && let Some(s) = PageBoundaryState::new(&page)
127 {
128 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
129 }
130 }
131 if let Some(notify) = notify_last_at {
132 log::trace!("notifying last_at: {last_at:?}");
133 if notify.send(last_at).is_err() {
134 log::error!("receiver for last_at dropped, can't notify");
135 };
136 }
137 Ok("pages_to_stdout")
138}
139
140pub fn logo(name: &str) -> String {
141 format!(
142 r"
143
144 \ | | | |
145 _ \ | | -_) _` | -_) _` | | | | ({name})
146 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{})
147 ____| __/
148",
149 env!("CARGO_PKG_VERSION"),
150 )
151}
152
153pub fn bin_init(name: &str) {
154 if std::env::var_os("RUST_LOG").is_none() {
155 unsafe { std::env::set_var("RUST_LOG", "info") };
156 }
157 let filter = tracing_subscriber::EnvFilter::from_default_env();
158 tracing_subscriber::fmt()
159 .with_writer(std::io::stderr)
160 .with_env_filter(filter)
161 .init();
162
163 log::info!("{}", logo(name));
164}