forked from
microcosm.blue/Allegedly
Server tools to backfill, tail, mirror, and verify PLC logs
1use serde::{Deserialize, Serialize};
2
3use tokio::sync::{mpsc, oneshot};
4
5mod backfill;
6mod cached_value;
7mod client;
8pub mod crypto;
9pub mod doc;
10mod mirror;
11mod plc_fjall;
12mod plc_pg;
13mod poll;
14mod ratelimit;
15mod weekly;
16
17pub mod bin;
18
19pub use backfill::backfill;
20pub use cached_value::{CachedValue, Fetcher};
21pub use client::{CLIENT, UA};
22pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall};
23pub use plc_fjall::{
24 FjallDb, audit as audit_fjall, backfill_to_fjall, fix_ops as fix_ops_fjall, seq_pages_to_fjall,
25};
26pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
27pub use poll::{
28 PageBoundaryState, get_page, poll_upstream, poll_upstream_seq, tail_upstream_stream,
29};
30pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
31pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
32
33pub type Dt = chrono::DateTime<chrono::Utc>;
34
35/// One page of PLC export
36///
37/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
38#[derive(Debug)]
39pub struct ExportPage {
40 pub ops: Vec<Op>,
41}
42
43impl ExportPage {
44 pub fn is_empty(&self) -> bool {
45 self.ops.is_empty()
46 }
47}
48
49/// A fully-deserialized plc operation
50///
51/// including the plc's wrapping with timestmap and nullified state
52#[derive(Debug, Clone, Deserialize, Serialize)]
53#[serde(rename_all = "camelCase")]
54pub struct Op {
55 pub did: String,
56 pub cid: String,
57 pub created_at: Dt,
58 pub nullified: bool,
59 pub operation: Box<serde_json::value::RawValue>,
60}
61
62#[cfg(test)]
63impl PartialEq for Op {
64 fn eq(&self, other: &Self) -> bool {
65 self.did == other.did
66 && self.cid == other.cid
67 && self.created_at == other.created_at
68 && self.nullified == other.nullified
69 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
70 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
71 }
72}
73
74/// Database primary key for an op
75#[derive(Debug, PartialEq)]
76pub struct OpKey {
77 pub did: String,
78 pub cid: String,
79}
80
81impl From<&Op> for OpKey {
82 fn from(Op { did, cid, .. }: &Op) -> Self {
83 Self {
84 did: did.to_string(),
85 cid: cid.to_string(),
86 }
87 }
88}
89
90/// A PLC op from `/export?after=<seq>` or `/export/stream`
91///
92/// Both endpoints return the `seq` field per op, which is a globally monotonic
93/// unsigned integer assigned by the PLC directory.
94#[derive(Debug, Clone, Deserialize)]
95#[serde(rename_all = "camelCase")]
96pub struct SeqOp {
97 pub seq: u64,
98 pub did: String,
99 pub cid: String,
100 pub created_at: Dt,
101 #[serde(default)]
102 pub nullified: bool,
103 pub operation: Box<serde_json::value::RawValue>,
104}
105
106impl From<SeqOp> for Op {
107 fn from(s: SeqOp) -> Self {
108 Op {
109 did: s.did,
110 cid: s.cid,
111 created_at: s.created_at,
112 nullified: s.nullified,
113 operation: s.operation,
114 }
115 }
116}
117
118/// A page of sequenced ops from `/export?after=<seq>`
119#[derive(Debug)]
120pub struct SeqPage {
121 pub ops: Vec<SeqOp>,
122}
123
124impl SeqPage {
125 pub fn is_empty(&self) -> bool {
126 self.ops.is_empty()
127 }
128}
129
130/// page forwarder who drops its channels on receipt of a small page
131///
132/// PLC will return up to 1000 ops on a page, and returns full pages until it
133/// has caught up, so this is a (hacky?) way to stop polling once we're up.
134pub async fn full_pages(
135 mut rx: mpsc::Receiver<ExportPage>,
136 tx: mpsc::Sender<ExportPage>,
137) -> anyhow::Result<&'static str> {
138 while let Some(page) = rx.recv().await {
139 let n = page.ops.len();
140 if n < 900 {
141 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
142 let Some(age) = last_age else {
143 log::info!("full_pages done, empty final page");
144 return Ok("full pages (hmm)");
145 };
146 if age <= chrono::TimeDelta::hours(6) {
147 log::info!("full_pages done, final page of {n} ops");
148 } else {
149 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
150 }
151 return Ok("full pages (cool)");
152 }
153 log::trace!("full_pages: continuing with page of {n} ops");
154 tx.send(page).await?;
155 }
156 Err(anyhow::anyhow!(
157 "full_pages ran out of source material, sender closed"
158 ))
159}
160
161pub async fn full_pages_seq(
162 mut rx: mpsc::Receiver<SeqPage>,
163 tx: mpsc::Sender<SeqPage>,
164) -> anyhow::Result<&'static str> {
165 while let Some(page) = rx.recv().await {
166 let n = page.ops.len();
167 if n < 900 {
168 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
169 let Some(age) = last_age else {
170 log::info!("full_pages done, empty final page");
171 return Ok("full pages (hmm)");
172 };
173 if age <= chrono::TimeDelta::hours(6) {
174 log::info!("full_pages done, final page of {n} ops");
175 } else {
176 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
177 }
178 return Ok("full pages (cool)");
179 }
180 log::trace!("full_pages: continuing with page of {n} ops");
181 tx.send(page).await?;
182 }
183 Err(anyhow::anyhow!(
184 "full_pages ran out of source material, sender closed"
185 ))
186}
187
188pub async fn pages_to_stdout(
189 mut rx: mpsc::Receiver<ExportPage>,
190 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
191) -> anyhow::Result<&'static str> {
192 let mut last_at = None;
193 while let Some(page) = rx.recv().await {
194 for op in &page.ops {
195 println!("{}", serde_json::to_string(op)?);
196 }
197 if notify_last_at.is_some()
198 && let Some(s) = PageBoundaryState::new(&page)
199 {
200 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
201 }
202 }
203 if let Some(notify) = notify_last_at {
204 log::trace!("notifying last_at: {last_at:?}");
205 if notify.send(last_at).is_err() {
206 log::error!("receiver for last_at dropped, can't notify");
207 };
208 }
209 Ok("pages_to_stdout")
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct InvalidOp {
214 pub did: String,
215 pub at: Dt,
216 pub cid: String,
217}
218
219pub async fn invalid_ops_to_stdout(
220 mut rx: mpsc::Receiver<InvalidOp>,
221) -> anyhow::Result<&'static str> {
222 while let Some(op) = rx.recv().await {
223 use std::io::{Write, stdout};
224 let mut stdout = stdout().lock();
225 serde_json::to_writer(&mut stdout, &op)?;
226 stdout.write_all(b"\n")?;
227 }
228 Ok("invalid_ops_to_stdout")
229}
230
231pub async fn file_to_invalid_ops(
232 path: impl AsRef<std::path::Path>,
233 tx: mpsc::Sender<InvalidOp>,
234) -> anyhow::Result<&'static str> {
235 let file = tokio::fs::File::open(path).await?;
236
237 use tokio::io::AsyncBufReadExt;
238 let mut lines = tokio::io::BufReader::new(file).lines();
239 while let Some(line) = lines.next_line().await? {
240 let op: InvalidOp = serde_json::from_str(&line)?;
241 tx.send(op).await?;
242 }
243
244 Ok("invalid_ops_to_stdout")
245}
246
247pub fn logo(name: &str) -> String {
248 format!(
249 r"
250
251 \ | | | |
252 _ \ | | -_) _` | -_) _` | | | | ({name})
253 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{})
254 ____| __/
255",
256 env!("CARGO_PKG_VERSION"),
257 )
258}