Server tools to backfill, tail, mirror, and verify PLC logs

fixup export polling

+164 -55
+21
Cargo.lock
··· 41 41 "reqwest", 42 42 "serde", 43 43 "serde_json", 44 + "thiserror", 44 45 "tokio", 45 46 "tokio-postgres", 46 47 "url", ··· 1758 1759 "once_cell", 1759 1760 "rustix", 1760 1761 "windows-sys 0.61.0", 1762 + ] 1763 + 1764 + [[package]] 1765 + name = "thiserror" 1766 + version = "2.0.16" 1767 + source = "registry+https://github.com/rust-lang/crates.io-index" 1768 + checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" 1769 + dependencies = [ 1770 + "thiserror-impl", 1771 + ] 1772 + 1773 + [[package]] 1774 + name = "thiserror-impl" 1775 + version = "2.0.16" 1776 + source = "registry+https://github.com/rust-lang/crates.io-index" 1777 + checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" 1778 + dependencies = [ 1779 + "proc-macro2", 1780 + "quote", 1781 + "syn", 1761 1782 ] 1762 1783 1763 1784 [[package]]
+1
Cargo.toml
··· 16 16 reqwest = { version = "0.12.23", features = ["stream"] } 17 17 serde = "1.0.219" 18 18 serde_json = { version = "1.0.143", features = ["raw_value"] } 19 + thiserror = "2.0.16" 19 20 tokio = { version = "1.47.1", features = ["full"] } 20 21 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 21 22 url = "2.5.7"
src/bin/blah.rs src/bin/get_backfill_chunk_adsf.rs
+28 -55
src/bin/main.rs
··· 4 4 use tokio_postgres::NoTls; 5 5 use url::Url; 6 6 7 - use allegedly::{ExportPage, week_to_pages}; 7 + use allegedly::{ExportPage, poll_upstream, week_to_pages}; 8 8 9 9 const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 10 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500); 11 10 const WEEK_IN_SECONDS: u64 = 7 * 86400; 12 11 13 12 #[derive(Parser)] ··· 45 44 46 45 #[derive(Deserialize)] 47 46 #[serde(rename_all = "camelCase")] 48 - struct OpPeek { 49 - pub created_at: chrono::DateTime<chrono::Utc>, 50 - } 51 - 52 - #[derive(Deserialize)] 53 - #[serde(rename_all = "camelCase")] 54 47 struct Op<'a> { 55 48 pub did: &'a str, 56 49 pub cid: &'a str, ··· 85 78 upstream: Url, 86 79 bulk: (Url, u64), 87 80 tx: flume::Sender<ExportPage>, 88 - latest: Option<chrono::DateTime<chrono::Utc>>, 81 + pg_client: tokio_postgres::Client, 89 82 ) { 83 + let latest = get_latest(&pg_client).await; 90 84 let client = reqwest::Client::builder() 91 85 .user_agent(concat!( 92 86 "allegedly v", ··· 99 93 if latest.is_none() { 100 94 bulk_backfill(client.clone(), bulk, tx.clone()).await; 101 95 } 102 - 103 96 let mut upstream = upstream; 104 97 upstream.set_path("/export"); 105 - let mut after = latest; 106 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 107 - 108 - loop { 109 - tick.tick().await; 110 - let mut url = upstream.clone(); 111 - if let Some(ref after) = after { 112 - url.query_pairs_mut() 113 - .append_pair("after", &after.to_rfc3339()); 114 - } 115 - let ops = client 116 - .get(url) 117 - .send() 118 - .await 119 - .unwrap() 120 - .error_for_status() 121 - .unwrap() 122 - .text() 123 - .await 124 - .unwrap() 125 - .trim() 126 - .to_string(); 127 - 128 - let Some((_, last_line)) = ops.rsplit_once('\n') else { 129 - log::trace!("no ops in response page, nothing to do"); 130 - continue; 131 - }; 132 - 133 - let op: OpPeek = serde_json::from_str(last_line).unwrap(); 134 - after = Some(op.created_at); 135 - 136 - log::trace!("got some ops until {after:?}, sending them..."); 137 - let ops = ops.split('\n').map(Into::into).collect(); 138 - tx.send_async(ExportPage { ops }).await.unwrap(); 139 - } 98 + poll_upstream(&client, latest, upstream, tx).await.unwrap(); 140 99 } 141 100 142 101 async fn write_pages( ··· 221 180 Ok(()) 222 181 } 223 182 183 + async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<chrono::DateTime<chrono::Utc>> { 184 + pg_client 185 + .query_opt( 186 + r#"SELECT "createdAt" FROM operations 187 + ORDER BY "createdAt" DESC LIMIT 1"#, 188 + &[], 189 + ) 190 + .await 191 + .unwrap() 192 + .map(|r| r.get(0)) 193 + } 194 + 224 195 #[tokio::main] 225 196 async fn main() { 226 197 env_logger::init(); ··· 241 212 } 242 213 }); 243 214 244 - let latest = pg_client 245 - .query_opt( 246 - r#"SELECT "createdAt" FROM operations 247 - ORDER BY "createdAt" DESC LIMIT 1"#, 248 - &[], 249 - ) 215 + log::trace!("connecting postgres 2..."); 216 + let (pg_client2, connection2) = tokio_postgres::connect(&args.postgres, NoTls) 250 217 .await 251 - .unwrap() 252 - .map(|r| r.get(0)); 218 + .unwrap(); 253 219 254 - log::info!("connected! latest: {latest:?}"); 220 + // send the connection away to do the actual communication work 221 + // TODO: error and shutdown handling 222 + let conn_task2 = tokio::task::spawn(async move { 223 + if let Err(e) = connection2.await { 224 + eprintln!("connection error: {e}"); 225 + } 226 + }); 255 227 256 228 let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE); 257 229 ··· 259 231 args.upstream, 260 232 (args.upstream_bulk, args.bulk_epoch), 261 233 tx, 262 - latest, 234 + pg_client2, 263 235 )); 264 236 let writer_task = tokio::task::spawn(write_pages(rx, pg_client)); 265 237 266 238 tokio::select! { 267 239 z = conn_task => log::warn!("connection task ended: {z:?}"), 240 + z = conn_task2 => log::warn!("connection task ended: {z:?}"), 268 241 z = export_task => log::warn!("export task ended: {z:?}"), 269 242 z = writer_task => log::warn!("writer task ended: {z:?}"), 270 243 };
+44
src/bin/tail_export.rs
··· 1 + use allegedly::OpPeek; 2 + use url::Url; 3 + 4 + async fn get_page(client: &reqwest::Client, url: Url) -> Vec<String> { 5 + client 6 + .get(url) 7 + .send() 8 + .await 9 + .unwrap() 10 + .error_for_status() 11 + .unwrap() 12 + .text() 13 + .await 14 + .unwrap() 15 + .trim() 16 + .split('\n') 17 + .map(Into::into) 18 + .collect() 19 + } 20 + 21 + #[tokio::main] 22 + async fn main() { 23 + let client = reqwest::Client::builder() 24 + .user_agent(concat!( 25 + "allegedly (export) v", 26 + env!("CARGO_PKG_VERSION"), 27 + " (from @microcosm.blue; contact @bad-example.com)" 28 + )) 29 + .build() 30 + .unwrap(); 31 + 32 + let mut url = Url::parse("https://plc.directory/export").unwrap(); 33 + let ops = get_page(&client, url.clone()).await; 34 + 35 + println!("first: {:?}", ops.first()); 36 + 37 + if let Some(last_line) = ops.last() { 38 + let x: OpPeek = serde_json::from_str(last_line).unwrap(); 39 + url.query_pairs_mut() 40 + .append_pair("after", &x.created_at.to_rfc3339()); 41 + let ops2 = get_page(&client, url).await; 42 + println!("2nd: {:?}", ops2.first()); 43 + } 44 + }
+10
src/lib.rs
··· 1 + use serde::Deserialize; 2 + 1 3 mod backfill; 4 + mod poll; 2 5 3 6 pub use backfill::week_to_pages; 7 + pub use poll::poll_upstream; 4 8 5 9 /// One page of PLC export 6 10 /// ··· 8 12 pub struct ExportPage { 9 13 pub ops: Vec<String>, 10 14 } 15 + 16 + #[derive(Deserialize)] 17 + #[serde(rename_all = "camelCase")] 18 + pub struct OpPeek { 19 + pub created_at: chrono::DateTime<chrono::Utc>, 20 + }
+60
src/poll.rs
··· 1 + use crate::{ExportPage, OpPeek}; 2 + use chrono::{DateTime, Utc}; 3 + use std::time::Duration; 4 + use thiserror::Error; 5 + use url::Url; 6 + 7 + const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500); 8 + 9 + #[derive(Debug, Error)] 10 + pub enum GetPageError { 11 + #[error(transparent)] 12 + ReqwestError(#[from] reqwest::Error), 13 + #[error(transparent)] 14 + SerdeError(#[from] serde_json::Error), 15 + } 16 + 17 + pub async fn get_page( 18 + client: &reqwest::Client, 19 + url: Url, 20 + ) -> Result<(ExportPage, Option<DateTime<Utc>>), GetPageError> { 21 + let ops: Vec<String> = client 22 + .get(url) 23 + .send() 24 + .await? 25 + .error_for_status()? 26 + .text() 27 + .await? 28 + .trim() 29 + .split('\n') 30 + .map(Into::into) 31 + .collect(); 32 + 33 + let last_at = ops 34 + .last() 35 + .map(|s| serde_json::from_str::<OpPeek>(s)) 36 + .transpose()? 37 + .map(|o| o.created_at); 38 + 39 + Ok((ExportPage { ops }, last_at)) 40 + } 41 + 42 + pub async fn poll_upstream( 43 + client: &reqwest::Client, 44 + after: Option<DateTime<Utc>>, 45 + base: Url, 46 + dest: flume::Sender<ExportPage>, 47 + ) -> anyhow::Result<()> { 48 + let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 49 + let mut after = after; 50 + loop { 51 + tick.tick().await; 52 + let mut url = base.clone(); 53 + if let Some(a) = after { 54 + url.query_pairs_mut().append_pair("after", &a.to_rfc3339()); 55 + }; 56 + let (page, next_after) = get_page(client, url).await?; 57 + dest.send_async(page).await?; 58 + after = next_after; 59 + } 60 + }