Server tools to backfill, tail, mirror, and verify PLC logs
at main 258 lines 7.5 kB view raw
1use serde::{Deserialize, Serialize}; 2 3use tokio::sync::{mpsc, oneshot}; 4 5mod backfill; 6mod cached_value; 7mod client; 8pub mod crypto; 9pub mod doc; 10mod mirror; 11mod plc_fjall; 12mod plc_pg; 13mod poll; 14mod ratelimit; 15mod weekly; 16 17pub mod bin; 18 19pub use backfill::backfill; 20pub use cached_value::{CachedValue, Fetcher}; 21pub use client::{CLIENT, UA}; 22pub use mirror::{ExperimentalConf, ListenConf, serve, serve_fjall}; 23pub use plc_fjall::{ 24 FjallDb, audit as audit_fjall, backfill_to_fjall, fix_ops as fix_ops_fjall, seq_pages_to_fjall, 25}; 26pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 27pub use poll::{ 28 PageBoundaryState, get_page, poll_upstream, poll_upstream_seq, tail_upstream_stream, 29}; 30pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 31pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 32 33pub type Dt = chrono::DateTime<chrono::Utc>; 34 35/// One page of PLC export 36/// 37/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 38#[derive(Debug)] 39pub struct ExportPage { 40 pub ops: Vec<Op>, 41} 42 43impl ExportPage { 44 pub fn is_empty(&self) -> bool { 45 self.ops.is_empty() 46 } 47} 48 49/// A fully-deserialized plc operation 50/// 51/// including the plc's wrapping with timestmap and nullified state 52#[derive(Debug, Clone, Deserialize, Serialize)] 53#[serde(rename_all = "camelCase")] 54pub struct Op { 55 pub did: String, 56 pub cid: String, 57 pub created_at: Dt, 58 pub nullified: bool, 59 pub operation: Box<serde_json::value::RawValue>, 60} 61 62#[cfg(test)] 63impl PartialEq for Op { 64 fn eq(&self, other: &Self) -> bool { 65 self.did == other.did 66 && self.cid == other.cid 67 && self.created_at == other.created_at 68 && self.nullified == other.nullified 69 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 70 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 71 } 72} 73 74/// Database primary key for an op 75#[derive(Debug, PartialEq)] 76pub struct OpKey { 77 pub did: String, 78 pub cid: String, 79} 80 81impl From<&Op> for OpKey { 82 fn from(Op { did, cid, .. }: &Op) -> Self { 83 Self { 84 did: did.to_string(), 85 cid: cid.to_string(), 86 } 87 } 88} 89 90/// A PLC op from `/export?after=<seq>` or `/export/stream` 91/// 92/// Both endpoints return the `seq` field per op, which is a globally monotonic 93/// unsigned integer assigned by the PLC directory. 94#[derive(Debug, Clone, Deserialize)] 95#[serde(rename_all = "camelCase")] 96pub struct SeqOp { 97 pub seq: u64, 98 pub did: String, 99 pub cid: String, 100 pub created_at: Dt, 101 #[serde(default)] 102 pub nullified: bool, 103 pub operation: Box<serde_json::value::RawValue>, 104} 105 106impl From<SeqOp> for Op { 107 fn from(s: SeqOp) -> Self { 108 Op { 109 did: s.did, 110 cid: s.cid, 111 created_at: s.created_at, 112 nullified: s.nullified, 113 operation: s.operation, 114 } 115 } 116} 117 118/// A page of sequenced ops from `/export?after=<seq>` 119#[derive(Debug)] 120pub struct SeqPage { 121 pub ops: Vec<SeqOp>, 122} 123 124impl SeqPage { 125 pub fn is_empty(&self) -> bool { 126 self.ops.is_empty() 127 } 128} 129 130/// page forwarder who drops its channels on receipt of a small page 131/// 132/// PLC will return up to 1000 ops on a page, and returns full pages until it 133/// has caught up, so this is a (hacky?) way to stop polling once we're up. 134pub async fn full_pages( 135 mut rx: mpsc::Receiver<ExportPage>, 136 tx: mpsc::Sender<ExportPage>, 137) -> anyhow::Result<&'static str> { 138 while let Some(page) = rx.recv().await { 139 let n = page.ops.len(); 140 if n < 900 { 141 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 142 let Some(age) = last_age else { 143 log::info!("full_pages done, empty final page"); 144 return Ok("full pages (hmm)"); 145 }; 146 if age <= chrono::TimeDelta::hours(6) { 147 log::info!("full_pages done, final page of {n} ops"); 148 } else { 149 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 150 } 151 return Ok("full pages (cool)"); 152 } 153 log::trace!("full_pages: continuing with page of {n} ops"); 154 tx.send(page).await?; 155 } 156 Err(anyhow::anyhow!( 157 "full_pages ran out of source material, sender closed" 158 )) 159} 160 161pub async fn full_pages_seq( 162 mut rx: mpsc::Receiver<SeqPage>, 163 tx: mpsc::Sender<SeqPage>, 164) -> anyhow::Result<&'static str> { 165 while let Some(page) = rx.recv().await { 166 let n = page.ops.len(); 167 if n < 900 { 168 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 169 let Some(age) = last_age else { 170 log::info!("full_pages done, empty final page"); 171 return Ok("full pages (hmm)"); 172 }; 173 if age <= chrono::TimeDelta::hours(6) { 174 log::info!("full_pages done, final page of {n} ops"); 175 } else { 176 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 177 } 178 return Ok("full pages (cool)"); 179 } 180 log::trace!("full_pages: continuing with page of {n} ops"); 181 tx.send(page).await?; 182 } 183 Err(anyhow::anyhow!( 184 "full_pages ran out of source material, sender closed" 185 )) 186} 187 188pub async fn pages_to_stdout( 189 mut rx: mpsc::Receiver<ExportPage>, 190 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 191) -> anyhow::Result<&'static str> { 192 let mut last_at = None; 193 while let Some(page) = rx.recv().await { 194 for op in &page.ops { 195 println!("{}", serde_json::to_string(op)?); 196 } 197 if notify_last_at.is_some() 198 && let Some(s) = PageBoundaryState::new(&page) 199 { 200 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 201 } 202 } 203 if let Some(notify) = notify_last_at { 204 log::trace!("notifying last_at: {last_at:?}"); 205 if notify.send(last_at).is_err() { 206 log::error!("receiver for last_at dropped, can't notify"); 207 }; 208 } 209 Ok("pages_to_stdout") 210} 211 212#[derive(Debug, Clone, Serialize, Deserialize)] 213pub struct InvalidOp { 214 pub did: String, 215 pub at: Dt, 216 pub cid: String, 217} 218 219pub async fn invalid_ops_to_stdout( 220 mut rx: mpsc::Receiver<InvalidOp>, 221) -> anyhow::Result<&'static str> { 222 while let Some(op) = rx.recv().await { 223 use std::io::{Write, stdout}; 224 let mut stdout = stdout().lock(); 225 serde_json::to_writer(&mut stdout, &op)?; 226 stdout.write_all(b"\n")?; 227 } 228 Ok("invalid_ops_to_stdout") 229} 230 231pub async fn file_to_invalid_ops( 232 path: impl AsRef<std::path::Path>, 233 tx: mpsc::Sender<InvalidOp>, 234) -> anyhow::Result<&'static str> { 235 let file = tokio::fs::File::open(path).await?; 236 237 use tokio::io::AsyncBufReadExt; 238 let mut lines = tokio::io::BufReader::new(file).lines(); 239 while let Some(line) = lines.next_line().await? { 240 let op: InvalidOp = serde_json::from_str(&line)?; 241 tx.send(op).await?; 242 } 243 244 Ok("invalid_ops_to_stdout") 245} 246 247pub fn logo(name: &str) -> String { 248 format!( 249 r" 250 251 \ | | | | 252 _ \ | | -_) _` | -_) _` | | | | ({name}) 253 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 254 ____| __/ 255", 256 env!("CARGO_PKG_VERSION"), 257 ) 258}