Server tools to backfill, tail, mirror, and verify PLC logs

make tailing plc work

+96 -96
+3 -7
src/backfill.rs
··· 1 - use crate::ExportPage; 1 + use crate::{CLIENT, ExportPage}; 2 2 use url::Url; 3 3 4 4 use async_compression::futures::bufread::GzipDecoder; 5 5 use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 6 6 7 - pub async fn week_to_pages( 8 - client: &reqwest::Client, 9 - url: Url, 10 - dest: flume::Sender<ExportPage>, 11 - ) -> anyhow::Result<()> { 12 - let reader = client 7 + pub async fn week_to_pages(url: Url, dest: flume::Sender<ExportPage>) -> anyhow::Result<()> { 8 + let reader = CLIENT 13 9 .get(url) 14 10 .send() 15 11 .await?
+2 -10
src/bin/get_backfill_chunk_adsf.rs
··· 1 + use allegedly::CLIENT; 1 2 use async_compression::futures::bufread::GzipDecoder; 2 3 use futures::{AsyncBufReadExt, StreamExt, TryStreamExt, io}; 3 4 4 5 #[tokio::main] 5 6 async fn main() { 6 - let client = reqwest::Client::builder() 7 - .user_agent(concat!( 8 - "allegedly (blah) v", 9 - env!("CARGO_PKG_VERSION"), 10 - " (from @microcosm.blue; contact @bad-example.com)" 11 - )) 12 - .build() 13 - .unwrap(); 14 - 15 - let reader = client 7 + let reader = CLIENT 16 8 .get("https://plc.t3.storage.dev/plc.directory/1699488000.jsonl.gz") 17 9 // .get("https://plc.t3.storage.dev/plc.directory/1669248000.jsonl.gz") 18 10 .send()
+8 -22
src/bin/main.rs
··· 4 4 use tokio_postgres::NoTls; 5 5 use url::Url; 6 6 7 - use allegedly::{ExportPage, poll_upstream, week_to_pages}; 7 + use allegedly::{Dt, ExportPage, bin_init, poll_upstream, week_to_pages}; 8 8 9 9 const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 10 10 const WEEK_IN_SECONDS: u64 = 7 * 86400; ··· 47 47 struct Op<'a> { 48 48 pub did: &'a str, 49 49 pub cid: &'a str, 50 - pub created_at: chrono::DateTime<chrono::Utc>, 50 + pub created_at: Dt, 51 51 pub nullified: bool, 52 52 #[serde(borrow)] 53 53 pub operation: &'a serde_json::value::RawValue, 54 54 } 55 55 56 - async fn bulk_backfill( 57 - client: reqwest::Client, 58 - (upstream, epoch): (Url, u64), 59 - tx: flume::Sender<ExportPage>, 60 - ) { 56 + async fn bulk_backfill((upstream, epoch): (Url, u64), tx: flume::Sender<ExportPage>) { 61 57 let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400); 62 58 let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH)) 63 59 .unwrap() ··· 68 64 while week < immutable_week { 69 65 log::info!("backfilling week {week_n} ({week})"); 70 66 let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 71 - week_to_pages(&client, url, tx.clone()).await.unwrap(); 67 + week_to_pages(url, tx.clone()).await.unwrap(); 72 68 week_n += 1; 73 69 week += WEEK_IN_SECONDS; 74 70 } ··· 81 77 pg_client: tokio_postgres::Client, 82 78 ) { 83 79 let latest = get_latest(&pg_client).await; 84 - let client = reqwest::Client::builder() 85 - .user_agent(concat!( 86 - "allegedly v", 87 - env!("CARGO_PKG_VERSION"), 88 - " (from @microcosm.blue; contact @bad-example.com)" 89 - )) 90 - .build() 91 - .unwrap(); 92 80 93 81 if latest.is_none() { 94 - bulk_backfill(client.clone(), bulk, tx.clone()).await; 82 + bulk_backfill(bulk, tx.clone()).await; 95 83 } 96 84 let mut upstream = upstream; 97 85 upstream.set_path("/export"); 98 - poll_upstream(&client, latest, upstream, tx).await.unwrap(); 86 + poll_upstream(latest, upstream, tx).await.unwrap(); 99 87 } 100 88 101 89 async fn write_pages( ··· 180 168 Ok(()) 181 169 } 182 170 183 - async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<chrono::DateTime<chrono::Utc>> { 171 + async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<Dt> { 184 172 pg_client 185 173 .query_opt( 186 174 r#"SELECT "createdAt" FROM operations ··· 194 182 195 183 #[tokio::main] 196 184 async fn main() { 197 - env_logger::init(); 198 - log::info!(concat!("📜 Allegedly v", env!("CARGO_PKG_VERSION"))); 199 - 185 + bin_init("main"); 200 186 let args = Args::parse(); 201 187 202 188 log::trace!("connecting postgres...");
+38
src/bin/tail.rs
··· 1 + use allegedly::{bin_init, poll_upstream}; 2 + use clap::Parser; 3 + use url::Url; 4 + 5 + #[derive(Parser)] 6 + struct Args { 7 + /// Upstream PLC server to poll 8 + /// 9 + /// default: https://plc.directory 10 + #[arg(long, env)] 11 + #[clap(default_value = "https://plc.directory")] 12 + upstream: Url, 13 + } 14 + 15 + #[tokio::main] 16 + async fn main() { 17 + bin_init("tail"); 18 + 19 + let mut url = Args::parse().upstream; 20 + url.set_path("/export"); 21 + let now = chrono::Utc::now(); 22 + 23 + let (tx, rx) = flume::bounded(0); // rendezvous 24 + tokio::task::spawn(async move { 25 + if let Err(e) = poll_upstream(Some(now), url, tx).await { 26 + log::error!("polling failed: {e}"); 27 + } else { 28 + log::warn!("poller finished ok (weird?)"); 29 + } 30 + }); 31 + 32 + while let Ok(page) = rx.recv_async().await { 33 + for op in page.ops { 34 + println!("{op}"); 35 + } 36 + } 37 + log::warn!("recv failed, bye"); 38 + }
-44
src/bin/tail_export.rs
··· 1 - use allegedly::OpPeek; 2 - use url::Url; 3 - 4 - async fn get_page(client: &reqwest::Client, url: Url) -> Vec<String> { 5 - client 6 - .get(url) 7 - .send() 8 - .await 9 - .unwrap() 10 - .error_for_status() 11 - .unwrap() 12 - .text() 13 - .await 14 - .unwrap() 15 - .trim() 16 - .split('\n') 17 - .map(Into::into) 18 - .collect() 19 - } 20 - 21 - #[tokio::main] 22 - async fn main() { 23 - let client = reqwest::Client::builder() 24 - .user_agent(concat!( 25 - "allegedly (export) v", 26 - env!("CARGO_PKG_VERSION"), 27 - " (from @microcosm.blue; contact @bad-example.com)" 28 - )) 29 - .build() 30 - .unwrap(); 31 - 32 - let mut url = Url::parse("https://plc.directory/export").unwrap(); 33 - let ops = get_page(&client, url.clone()).await; 34 - 35 - println!("first: {:?}", ops.first()); 36 - 37 - if let Some(last_line) = ops.last() { 38 - let x: OpPeek = serde_json::from_str(last_line).unwrap(); 39 - url.query_pairs_mut() 40 - .append_pair("after", &x.created_at.to_rfc3339()); 41 - let ops2 = get_page(&client, url).await; 42 - println!("2nd: {:?}", ops2.first()); 43 - } 44 - }
+13
src/client.rs
··· 1 + use reqwest::Client; 2 + use std::sync::LazyLock; 3 + 4 + pub static CLIENT: LazyLock<Client> = LazyLock::new(|| { 5 + Client::builder() 6 + .user_agent(concat!( 7 + "allegedly, v", 8 + env!("CARGO_PKG_VERSION"), 9 + " (from @microcosm.blue; contact @bad-example.com)" 10 + )) 11 + .build() 12 + .unwrap() 13 + });
+21 -1
src/lib.rs
··· 1 1 use serde::Deserialize; 2 2 3 3 mod backfill; 4 + mod client; 4 5 mod poll; 5 6 6 7 pub use backfill::week_to_pages; 8 + pub use client::CLIENT; 7 9 pub use poll::poll_upstream; 10 + 11 + pub type Dt = chrono::DateTime<chrono::Utc>; 8 12 9 13 /// One page of PLC export 10 14 /// ··· 16 20 #[derive(Deserialize)] 17 21 #[serde(rename_all = "camelCase")] 18 22 pub struct OpPeek { 19 - pub created_at: chrono::DateTime<chrono::Utc>, 23 + pub created_at: Dt, 24 + } 25 + 26 + pub fn bin_init(name: &str) { 27 + use env_logger::{Builder, Env}; 28 + Builder::from_env(Env::new().filter_or("RUST_LOG", "info")).init(); 29 + 30 + log::info!( 31 + r" 32 + 33 + \ | | | | 34 + _ \ | | -_) _` | -_) _` | | | | ({name}) 35 + _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 36 + ____| __/ 37 + ", 38 + env!("CARGO_PKG_VERSION") 39 + ); 20 40 }
+11 -12
src/poll.rs
··· 1 - use crate::{ExportPage, OpPeek}; 2 - use chrono::{DateTime, Utc}; 1 + use crate::{CLIENT, Dt, ExportPage, OpPeek}; 3 2 use std::time::Duration; 4 3 use thiserror::Error; 5 4 use url::Url; ··· 14 13 SerdeError(#[from] serde_json::Error), 15 14 } 16 15 17 - pub async fn get_page( 18 - client: &reqwest::Client, 19 - url: Url, 20 - ) -> Result<(ExportPage, Option<DateTime<Utc>>), GetPageError> { 21 - let ops: Vec<String> = client 16 + pub async fn get_page(url: Url) -> Result<(ExportPage, Option<Dt>), GetPageError> { 17 + log::trace!("Getting page: {url}"); 18 + 19 + let ops: Vec<String> = CLIENT 22 20 .get(url) 23 21 .send() 24 22 .await? ··· 32 30 33 31 let last_at = ops 34 32 .last() 33 + .filter(|s| !s.is_empty()) 35 34 .map(|s| serde_json::from_str::<OpPeek>(s)) 36 35 .transpose()? 37 - .map(|o| o.created_at); 36 + .map(|o| o.created_at) 37 + .inspect(|at| log::trace!("new last_at: {at}")); 38 38 39 39 Ok((ExportPage { ops }, last_at)) 40 40 } 41 41 42 42 pub async fn poll_upstream( 43 - client: &reqwest::Client, 44 - after: Option<DateTime<Utc>>, 43 + after: Option<Dt>, 45 44 base: Url, 46 45 dest: flume::Sender<ExportPage>, 47 46 ) -> anyhow::Result<()> { ··· 53 52 if let Some(a) = after { 54 53 url.query_pairs_mut().append_pair("after", &a.to_rfc3339()); 55 54 }; 56 - let (page, next_after) = get_page(client, url).await?; 55 + let (page, next_after) = get_page(url).await?; 57 56 dest.send_async(page).await?; 58 - after = next_after; 57 + after = next_after.or(after); 59 58 } 60 59 }