Server tools to backfill, tail, mirror, and verify PLC logs
at debug 158 lines 4.6 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::sync::{mpsc, oneshot}; 3 4mod backfill; 5mod client; 6mod mirror; 7mod plc_pg; 8mod poll; 9mod ratelimit; 10mod weekly; 11 12pub mod bin; 13 14pub use backfill::backfill; 15pub use client::{CLIENT, UA}; 16pub use mirror::{ListenConf, serve}; 17pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 18pub use poll::{PageBoundaryState, get_page, poll_upstream}; 19pub use ratelimit::GovernorMiddleware; 20pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 21 22pub type Dt = chrono::DateTime<chrono::Utc>; 23 24/// One page of PLC export 25/// 26/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 27#[derive(Debug)] 28pub struct ExportPage { 29 pub ops: Vec<Op>, 30} 31 32impl ExportPage { 33 pub fn is_empty(&self) -> bool { 34 self.ops.is_empty() 35 } 36} 37 38/// A fully-deserialized plc operation 39/// 40/// including the plc's wrapping with timestmap and nullified state 41#[derive(Debug, Clone, Deserialize, Serialize)] 42#[serde(rename_all = "camelCase")] 43pub struct Op { 44 pub did: String, 45 pub cid: String, 46 pub created_at: Dt, 47 pub nullified: bool, 48 pub operation: Box<serde_json::value::RawValue>, 49} 50 51#[cfg(test)] 52impl PartialEq for Op { 53 fn eq(&self, other: &Self) -> bool { 54 self.did == other.did 55 && self.cid == other.cid 56 && self.created_at == other.created_at 57 && self.nullified == other.nullified 58 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 59 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 60 } 61} 62 63/// Database primary key for an op 64#[derive(Debug, PartialEq)] 65pub struct OpKey { 66 pub did: String, 67 pub cid: String, 68} 69 70impl From<&Op> for OpKey { 71 fn from(Op { did, cid, .. }: &Op) -> Self { 72 Self { 73 did: did.to_string(), 74 cid: cid.to_string(), 75 } 76 } 77} 78 79/// page forwarder who drops its channels on receipt of a small page 80/// 81/// PLC will return up to 1000 ops on a page, and returns full pages until it 82/// has caught up, so this is a (hacky?) way to stop polling once we're up. 83pub async fn full_pages( 84 mut rx: mpsc::Receiver<ExportPage>, 85 tx: mpsc::Sender<ExportPage>, 86) -> anyhow::Result<&'static str> { 87 while let Some(page) = rx.recv().await { 88 let n = page.ops.len(); 89 if n < 900 { 90 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 91 let Some(age) = last_age else { 92 log::info!("full_pages done, empty final page"); 93 return Ok("full pages (hmm)"); 94 }; 95 if age <= chrono::TimeDelta::hours(6) { 96 log::info!("full_pages done, final page of {n} ops"); 97 } else { 98 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 99 } 100 return Ok("full pages (cool)"); 101 } 102 log::trace!("full_pages: continuing with page of {n} ops"); 103 tx.send(page).await?; 104 } 105 Err(anyhow::anyhow!( 106 "full_pages ran out of source material, sender closed" 107 )) 108} 109 110pub async fn pages_to_stdout( 111 mut rx: mpsc::Receiver<ExportPage>, 112 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 113) -> anyhow::Result<&'static str> { 114 let mut last_at = None; 115 while let Some(page) = rx.recv().await { 116 for op in &page.ops { 117 println!("{}", serde_json::to_string(op)?); 118 } 119 if notify_last_at.is_some() 120 && let Some(s) = PageBoundaryState::new(&page) 121 { 122 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 123 } 124 } 125 if let Some(notify) = notify_last_at { 126 log::trace!("notifying last_at: {last_at:?}"); 127 if notify.send(last_at).is_err() { 128 log::error!("receiver for last_at dropped, can't notify"); 129 }; 130 } 131 Ok("pages_to_stdout") 132} 133 134pub fn logo(name: &str) -> String { 135 format!( 136 r" 137 138 \ | | | | 139 _ \ | | -_) _` | -_) _` | | | | ({name}) 140 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 141 ____| __/ 142", 143 env!("CARGO_PKG_VERSION"), 144 ) 145} 146 147pub fn bin_init(name: &str) { 148 if std::env::var_os("RUST_LOG").is_none() { 149 unsafe { std::env::set_var("RUST_LOG", "info") }; 150 } 151 let filter = tracing_subscriber::EnvFilter::from_default_env(); 152 tracing_subscriber::fmt() 153 .with_writer(std::io::stderr) 154 .with_env_filter(filter) 155 .init(); 156 157 log::info!("{}", logo(name)); 158}