Server tools to backfill, tail, mirror, and verify PLC logs
at main 164 lines 4.8 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::sync::{mpsc, oneshot}; 3 4mod backfill; 5mod cached_value; 6mod client; 7pub mod datastore; 8mod mirror; 9mod plc_pg; 10mod plc_rocksdb; 11mod poll; 12mod ratelimit; 13mod weekly; 14 15pub mod bin; 16 17pub use backfill::backfill; 18pub use cached_value::{CachedValue, Fetcher}; 19pub use client::{CLIENT, UA}; 20pub use datastore::DatastoreEnum; 21pub use mirror::{ExperimentalConf, ListenConf, serve}; 22pub use plc_pg::Db; 23pub use plc_rocksdb::{RocksDatastore, backfill_to_rocksdb}; 24pub use poll::{PageBoundaryState, get_page, poll_upstream}; 25pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 26pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 27 28pub type Dt = chrono::DateTime<chrono::Utc>; 29 30/// One page of PLC export 31/// 32/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 33#[derive(Debug)] 34pub struct ExportPage { 35 pub ops: Vec<Op>, 36} 37 38impl ExportPage { 39 pub fn is_empty(&self) -> bool { 40 self.ops.is_empty() 41 } 42} 43 44/// A fully-deserialized plc operation 45/// 46/// including the plc's wrapping with timestmap and nullified state 47#[derive(Debug, Clone, Deserialize, Serialize)] 48#[serde(rename_all = "camelCase")] 49pub struct Op { 50 pub did: String, 51 pub cid: String, 52 pub created_at: Dt, 53 pub nullified: bool, 54 pub operation: Box<serde_json::value::RawValue>, 55} 56 57#[cfg(test)] 58impl PartialEq for Op { 59 fn eq(&self, other: &Self) -> bool { 60 self.did == other.did 61 && self.cid == other.cid 62 && self.created_at == other.created_at 63 && self.nullified == other.nullified 64 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 65 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 66 } 67} 68 69/// Database primary key for an op 70#[derive(Debug, PartialEq)] 71pub struct OpKey { 72 pub did: String, 73 pub cid: String, 74} 75 76impl From<&Op> for OpKey { 77 fn from(Op { did, cid, .. }: &Op) -> Self { 78 Self { 79 did: did.to_string(), 80 cid: cid.to_string(), 81 } 82 } 83} 84 85/// page forwarder who drops its channels on receipt of a small page 86/// 87/// PLC will return up to 1000 ops on a page, and returns full pages until it 88/// has caught up, so this is a (hacky?) way to stop polling once we're up. 89pub async fn full_pages( 90 mut rx: mpsc::Receiver<ExportPage>, 91 tx: mpsc::Sender<ExportPage>, 92) -> anyhow::Result<&'static str> { 93 while let Some(page) = rx.recv().await { 94 let n = page.ops.len(); 95 if n < 900 { 96 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 97 let Some(age) = last_age else { 98 log::info!("full_pages done, empty final page"); 99 return Ok("full pages (hmm)"); 100 }; 101 if age <= chrono::TimeDelta::hours(6) { 102 log::info!("full_pages done, final page of {n} ops"); 103 } else { 104 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 105 } 106 return Ok("full pages (cool)"); 107 } 108 log::trace!("full_pages: continuing with page of {n} ops"); 109 tx.send(page).await?; 110 } 111 Err(anyhow::anyhow!( 112 "full_pages ran out of source material, sender closed" 113 )) 114} 115 116pub async fn pages_to_stdout( 117 mut rx: mpsc::Receiver<ExportPage>, 118 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 119) -> anyhow::Result<&'static str> { 120 let mut last_at = None; 121 while let Some(page) = rx.recv().await { 122 for op in &page.ops { 123 println!("{}", serde_json::to_string(op)?); 124 } 125 if notify_last_at.is_some() 126 && let Some(s) = PageBoundaryState::new(&page) 127 { 128 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 129 } 130 } 131 if let Some(notify) = notify_last_at { 132 log::trace!("notifying last_at: {last_at:?}"); 133 if notify.send(last_at).is_err() { 134 log::error!("receiver for last_at dropped, can't notify"); 135 }; 136 } 137 Ok("pages_to_stdout") 138} 139 140pub fn logo(name: &str) -> String { 141 format!( 142 r" 143 144 \ | | | | 145 _ \ | | -_) _` | -_) _` | | | | ({name}) 146 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 147 ____| __/ 148", 149 env!("CARGO_PKG_VERSION"), 150 ) 151} 152 153pub fn bin_init(name: &str) { 154 if std::env::var_os("RUST_LOG").is_none() { 155 unsafe { std::env::set_var("RUST_LOG", "info") }; 156 } 157 let filter = tracing_subscriber::EnvFilter::from_default_env(); 158 tracing_subscriber::fmt() 159 .with_writer(std::io::stderr) 160 .with_env_filter(filter) 161 .init(); 162 163 log::info!("{}", logo(name)); 164}