Server tools to backfill, tail, mirror, and verify PLC logs
1use serde::{Deserialize, Serialize};
2use tokio::sync::{mpsc, oneshot};
3
4mod backfill;
5mod client;
6mod mirror;
7mod plc_pg;
8mod poll;
9mod ratelimit;
10mod weekly;
11
12pub mod bin;
13
14pub use backfill::backfill;
15pub use client::{CLIENT, UA};
16pub use mirror::{ListenConf, serve};
17pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
18pub use poll::{PageBoundaryState, get_page, poll_upstream};
19pub use ratelimit::GovernorMiddleware;
20pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
21
22pub type Dt = chrono::DateTime<chrono::Utc>;
23
24/// One page of PLC export
25///
26/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
27#[derive(Debug)]
28pub struct ExportPage {
29 pub ops: Vec<Op>,
30}
31
32impl ExportPage {
33 pub fn is_empty(&self) -> bool {
34 self.ops.is_empty()
35 }
36}
37
38/// A fully-deserialized plc operation
39///
40/// including the plc's wrapping with timestmap and nullified state
41#[derive(Debug, Clone, Deserialize, Serialize)]
42#[serde(rename_all = "camelCase")]
43pub struct Op {
44 pub did: String,
45 pub cid: String,
46 pub created_at: Dt,
47 pub nullified: bool,
48 pub operation: Box<serde_json::value::RawValue>,
49}
50
51#[cfg(test)]
52impl PartialEq for Op {
53 fn eq(&self, other: &Self) -> bool {
54 self.did == other.did
55 && self.cid == other.cid
56 && self.created_at == other.created_at
57 && self.nullified == other.nullified
58 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
59 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
60 }
61}
62
63/// Database primary key for an op
64#[derive(Debug, PartialEq)]
65pub struct OpKey {
66 pub did: String,
67 pub cid: String,
68}
69
70impl From<&Op> for OpKey {
71 fn from(Op { did, cid, .. }: &Op) -> Self {
72 Self {
73 did: did.to_string(),
74 cid: cid.to_string(),
75 }
76 }
77}
78
79/// page forwarder who drops its channels on receipt of a small page
80///
81/// PLC will return up to 1000 ops on a page, and returns full pages until it
82/// has caught up, so this is a (hacky?) way to stop polling once we're up.
83pub async fn full_pages(
84 mut rx: mpsc::Receiver<ExportPage>,
85 tx: mpsc::Sender<ExportPage>,
86) -> anyhow::Result<&'static str> {
87 while let Some(page) = rx.recv().await {
88 let n = page.ops.len();
89 if n < 900 {
90 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
91 let Some(age) = last_age else {
92 log::info!("full_pages done, empty final page");
93 return Ok("full pages (hmm)");
94 };
95 if age <= chrono::TimeDelta::hours(6) {
96 log::info!("full_pages done, final page of {n} ops");
97 } else {
98 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
99 }
100 return Ok("full pages (cool)");
101 }
102 log::trace!("full_pages: continuing with page of {n} ops");
103 tx.send(page).await?;
104 }
105 Err(anyhow::anyhow!(
106 "full_pages ran out of source material, sender closed"
107 ))
108}
109
110pub async fn pages_to_stdout(
111 mut rx: mpsc::Receiver<ExportPage>,
112 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
113) -> anyhow::Result<&'static str> {
114 let mut last_at = None;
115 while let Some(page) = rx.recv().await {
116 for op in &page.ops {
117 println!("{}", serde_json::to_string(op)?);
118 }
119 if notify_last_at.is_some()
120 && let Some(s) = PageBoundaryState::new(&page)
121 {
122 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
123 }
124 }
125 if let Some(notify) = notify_last_at {
126 log::trace!("notifying last_at: {last_at:?}");
127 if notify.send(last_at).is_err() {
128 log::error!("receiver for last_at dropped, can't notify");
129 };
130 }
131 Ok("pages_to_stdout")
132}
133
134pub fn logo(name: &str) -> String {
135 format!(
136 r"
137
138 \ | | | |
139 _ \ | | -_) _` | -_) _` | | | | ({name})
140 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{})
141 ____| __/
142",
143 env!("CARGO_PKG_VERSION"),
144 )
145}
146
147pub fn bin_init(name: &str) {
148 if std::env::var_os("RUST_LOG").is_none() {
149 unsafe { std::env::set_var("RUST_LOG", "info") };
150 }
151 let filter = tracing_subscriber::EnvFilter::from_default_env();
152 tracing_subscriber::fmt()
153 .with_writer(std::io::stderr)
154 .with_env_filter(filter)
155 .init();
156
157 log::info!("{}", logo(name));
158}