Server tools to backfill, tail, mirror, and verify PLC logs

one bin cli (tail to start)

+242 -235
+213
src/bin/backfill.rs
··· 1 + use clap::Parser; 2 + use std::time::Duration; 3 + use url::Url; 4 + 5 + use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream, week_to_pages}; 6 + 7 + const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 8 + const WEEK_IN_SECONDS: u64 = 7 * 86400; 9 + 10 + #[derive(Parser)] 11 + struct Args { 12 + /// Upstream PLC server to mirror 13 + /// 14 + /// default: https://plc.directory 15 + #[arg(long, env)] 16 + #[clap(default_value = "https://plc.directory")] 17 + upstream: Url, 18 + /// Bulk export source prefix 19 + /// 20 + /// Must be a prefix for urls ending with {WEEK_TIMESTAMP}.jsonl.gz 21 + /// 22 + /// default: https://plc.t3.storage.dev/plc.directory/ 23 + /// 24 + /// pass "off" to skip fast bulk backfilling 25 + #[arg(long, env)] 26 + #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 + upstream_bulk: Url, 28 + /// The oldest available bulk upstream export timestamp 29 + /// 30 + /// Must be a week-truncated unix timestamp 31 + /// 32 + /// plc.directory's oldest week is `1668643200`; you probably don't want to change this. 33 + #[arg(long, env)] 34 + #[clap(default_value = "1668643200")] 35 + bulk_epoch: u64, 36 + /// Mirror PLC's postgres database 37 + /// 38 + /// URI string with credentials etc 39 + #[arg(long, env)] 40 + postgres: String, 41 + } 42 + 43 + async fn bulk_backfill((upstream, epoch): (Url, u64), tx: flume::Sender<ExportPage>) { 44 + let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400); 45 + let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH)) 46 + .unwrap() 47 + .as_secs(); 48 + let immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 49 + let mut week = epoch; 50 + let mut week_n = 0; 51 + while week < immutable_week { 52 + log::info!("backfilling week {week_n} ({week})"); 53 + let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 54 + week_to_pages(url, tx.clone()).await.unwrap(); 55 + week_n += 1; 56 + week += WEEK_IN_SECONDS; 57 + } 58 + } 59 + 60 + async fn export_upstream( 61 + upstream: Url, 62 + bulk: (Url, u64), 63 + tx: flume::Sender<ExportPage>, 64 + pg_client: tokio_postgres::Client, 65 + ) { 66 + let latest = get_latest(&pg_client).await; 67 + 68 + if latest.is_none() { 69 + bulk_backfill(bulk, tx.clone()).await; 70 + } 71 + let mut upstream = upstream; 72 + upstream.set_path("/export"); 73 + poll_upstream(latest, upstream, tx).await.unwrap(); 74 + } 75 + 76 + async fn write_pages( 77 + rx: flume::Receiver<ExportPage>, 78 + mut pg_client: tokio_postgres::Client, 79 + ) -> Result<(), anyhow::Error> { 80 + // TODO: one big upsert at the end from select distinct on the other table 81 + 82 + // let upsert_did = &pg_client 83 + // .prepare( 84 + // r#" 85 + // INSERT INTO dids (did) VALUES ($1) 86 + // ON CONFLICT DO NOTHING"#, 87 + // ) 88 + // .await 89 + // .unwrap(); 90 + 91 + let insert_op = &pg_client 92 + .prepare( 93 + r#" 94 + INSERT INTO operations (did, operation, cid, nullified, "createdAt") 95 + VALUES ($1, $2, $3, $4, $5) 96 + ON CONFLICT (did, cid) DO UPDATE 97 + SET nullified = excluded.nullified, 98 + "createdAt" = excluded."createdAt" 99 + WHERE operations.nullified = excluded.nullified 100 + OR operations."createdAt" = excluded."createdAt""#, 101 + ) // idea: op is provable via cid, so leave it out. after did/cid (pk) that leaves nullified and createdAt 102 + // that we want to notice changing. 103 + // normal insert: no conflict, rows changed = 1 104 + // conflict (exact match): where clause passes, rows changed = 1 105 + // conflict (mismatch): where clause fails, rows changed = 0 (detect this and warn!) 106 + .await 107 + .unwrap(); 108 + 109 + while let Ok(page) = rx.recv_async().await { 110 + log::trace!("got a page..."); 111 + 112 + let tx = pg_client.transaction().await.unwrap(); 113 + 114 + // TODO: probably figure out postgres COPY IN 115 + // for now just write everything into a transaction 116 + 117 + log::trace!("setting up inserts..."); 118 + for op_line in page 119 + .ops 120 + .into_iter() 121 + .flat_map(|s| { 122 + s.replace("}{", "}\n{") 123 + .split('\n') 124 + .map(|s| s.trim()) 125 + .map(Into::into) 126 + .collect::<Vec<String>>() 127 + }) 128 + .filter(|s| !s.is_empty()) 129 + { 130 + let Ok(op) = serde_json::from_str::<Op>(&op_line) 131 + .inspect_err(|e| log::error!("failing! at the {op_line}! {e}")) 132 + else { 133 + log::error!("ayeeeee just ignoring this error for now......"); 134 + continue; 135 + }; 136 + // let client = &tx; 137 + 138 + // client.execute(upsert_did, &[&op.did]).await.unwrap(); 139 + 140 + // let sp = tx.savepoint("op").await.unwrap(); 141 + let inserted = tx 142 + .execute( 143 + insert_op, 144 + &[ 145 + &op.did, 146 + &tokio_postgres::types::Json(op.operation), 147 + &op.cid, 148 + &op.nullified, 149 + &op.created_at, 150 + ], 151 + ) 152 + .await 153 + .unwrap(); 154 + if inserted != 1 { 155 + log::warn!( 156 + "possible log modification: {inserted} rows changed after upserting {op:?}" 157 + ); 158 + } 159 + // { 160 + // if e.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) { 161 + // anyhow::bail!(e); 162 + // } 163 + // // TODO: assert that the row has not changed 164 + // log::warn!("ignoring dup"); 165 + // } 166 + } 167 + 168 + tx.commit().await.unwrap(); 169 + } 170 + Ok(()) 171 + } 172 + 173 + async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<Dt> { 174 + pg_client 175 + .query_opt( 176 + r#"SELECT "createdAt" FROM operations 177 + ORDER BY "createdAt" DESC LIMIT 1"#, 178 + &[], 179 + ) 180 + .await 181 + .unwrap() 182 + .map(|r| r.get(0)) 183 + } 184 + 185 + #[tokio::main] 186 + async fn main() -> anyhow::Result<()> { 187 + bin_init("main"); 188 + let args = Args::parse(); 189 + let db = Db::new(&args.postgres); 190 + let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE); 191 + 192 + log::trace!("connecting postgres for export task..."); 193 + let pg_client = db.connect().await?; 194 + let export_task = tokio::task::spawn(export_upstream( 195 + args.upstream, 196 + (args.upstream_bulk, args.bulk_epoch), 197 + tx, 198 + pg_client, 199 + )); 200 + 201 + log::trace!("connecting postgres for writer task..."); 202 + let pg_client = db.connect().await?; 203 + let writer_task = tokio::task::spawn(write_pages(rx, pg_client)); 204 + 205 + tokio::select! { 206 + z = export_task => log::warn!("export task ended: {z:?}"), 207 + z = writer_task => log::warn!("writer task ended: {z:?}"), 208 + }; 209 + 210 + log::error!("todo: shutdown"); 211 + 212 + Ok(()) 213 + }
+29 -197
src/bin/main.rs
··· 1 - use clap::Parser; 2 - use std::time::Duration; 1 + use clap::{Parser, Subcommand}; 3 2 use url::Url; 3 + use allegedly::{Dt, bin_init, poll_upstream}; 4 4 5 - use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream, week_to_pages}; 6 - 7 - const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now 8 - const WEEK_IN_SECONDS: u64 = 7 * 86400; 9 - 10 - #[derive(Parser)] 11 - struct Args { 12 - /// Upstream PLC server to mirror 13 - /// 14 - /// default: https://plc.directory 5 + #[derive(Debug, Parser)] 6 + struct Cli { 7 + /// Upstream PLC server 15 8 #[arg(long, env)] 16 9 #[clap(default_value = "https://plc.directory")] 17 10 upstream: Url, 18 - /// Bulk export source prefix 19 - /// 20 - /// Must be a prefix for urls ending with {WEEK_TIMESTAMP}.jsonl.gz 21 - /// 22 - /// default: https://plc.t3.storage.dev/plc.directory/ 23 - /// 24 - /// pass "off" to skip fast bulk backfilling 25 - #[arg(long, env)] 26 - #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 - upstream_bulk: Url, 28 - /// The oldest available bulk upstream export timestamp 29 - /// 30 - /// Must be a week-truncated unix timestamp 31 - /// 32 - /// plc.directory's oldest week is `1668643200`; you probably don't want to change this. 33 - #[arg(long, env)] 34 - #[clap(default_value = "1668643200")] 35 - bulk_epoch: u64, 36 - /// Mirror PLC's postgres database 37 - /// 38 - /// URI string with credentials etc 39 - #[arg(long, env)] 40 - postgres: String, 11 + #[command(subcommand)] 12 + command: Commands, 41 13 } 42 14 43 - async fn bulk_backfill((upstream, epoch): (Url, u64), tx: flume::Sender<ExportPage>) { 44 - let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400); 45 - let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH)) 46 - .unwrap() 47 - .as_secs(); 48 - let immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 49 - let mut week = epoch; 50 - let mut week_n = 0; 51 - while week < immutable_week { 52 - log::info!("backfilling week {week_n} ({week})"); 53 - let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap(); 54 - week_to_pages(url, tx.clone()).await.unwrap(); 55 - week_n += 1; 56 - week += WEEK_IN_SECONDS; 15 + #[derive(Debug, Subcommand)] 16 + enum Commands { 17 + /// Poll an upstream PLC server and log new ops to stdout 18 + Tail { 19 + /// Begin replay from a specific timestamp 20 + #[arg(long)] 21 + after: Option<Dt>, 57 22 } 58 23 } 59 24 60 - async fn export_upstream( 61 - upstream: Url, 62 - bulk: (Url, u64), 63 - tx: flume::Sender<ExportPage>, 64 - pg_client: tokio_postgres::Client, 65 - ) { 66 - let latest = get_latest(&pg_client).await; 67 - 68 - if latest.is_none() { 69 - bulk_backfill(bulk, tx.clone()).await; 70 - } 71 - let mut upstream = upstream; 72 - upstream.set_path("/export"); 73 - poll_upstream(latest, upstream, tx).await.unwrap(); 74 - } 75 - 76 - async fn write_pages( 77 - rx: flume::Receiver<ExportPage>, 78 - mut pg_client: tokio_postgres::Client, 79 - ) -> Result<(), anyhow::Error> { 80 - // TODO: one big upsert at the end from select distinct on the other table 81 - 82 - // let upsert_did = &pg_client 83 - // .prepare( 84 - // r#" 85 - // INSERT INTO dids (did) VALUES ($1) 86 - // ON CONFLICT DO NOTHING"#, 87 - // ) 88 - // .await 89 - // .unwrap(); 90 - 91 - let insert_op = &pg_client 92 - .prepare( 93 - r#" 94 - INSERT INTO operations (did, operation, cid, nullified, "createdAt") 95 - VALUES ($1, $2, $3, $4, $5) 96 - ON CONFLICT (did, cid) DO UPDATE 97 - SET nullified = excluded.nullified, 98 - "createdAt" = excluded."createdAt" 99 - WHERE operations.nullified = excluded.nullified 100 - OR operations."createdAt" = excluded."createdAt""#, 101 - ) // idea: op is provable via cid, so leave it out. after did/cid (pk) that leaves nullified and createdAt 102 - // that we want to notice changing. 103 - // normal insert: no conflict, rows changed = 1 104 - // conflict (exact match): where clause passes, rows changed = 1 105 - // conflict (mismatch): where clause fails, rows changed = 0 (detect this and warn!) 106 - .await 107 - .unwrap(); 108 - 109 - while let Ok(page) = rx.recv_async().await { 110 - log::trace!("got a page..."); 25 + #[tokio::main] 26 + async fn main() { 27 + bin_init("main"); 111 28 112 - let tx = pg_client.transaction().await.unwrap(); 29 + let args = Cli::parse(); 113 30 114 - // TODO: probably figure out postgres COPY IN 115 - // for now just write everything into a transaction 116 - 117 - log::trace!("setting up inserts..."); 118 - for op_line in page 119 - .ops 120 - .into_iter() 121 - .flat_map(|s| { 122 - s.replace("}{", "}\n{") 123 - .split('\n') 124 - .map(|s| s.trim()) 125 - .map(Into::into) 126 - .collect::<Vec<String>>() 127 - }) 128 - .filter(|s| !s.is_empty()) 129 - { 130 - let Ok(op) = serde_json::from_str::<Op>(&op_line) 131 - .inspect_err(|e| log::error!("failing! at the {op_line}! {e}")) 132 - else { 133 - log::error!("ayeeeee just ignoring this error for now......"); 134 - continue; 135 - }; 136 - // let client = &tx; 137 - 138 - // client.execute(upsert_did, &[&op.did]).await.unwrap(); 139 - 140 - // let sp = tx.savepoint("op").await.unwrap(); 141 - let inserted = tx 142 - .execute( 143 - insert_op, 144 - &[ 145 - &op.did, 146 - &tokio_postgres::types::Json(op.operation), 147 - &op.cid, 148 - &op.nullified, 149 - &op.created_at, 150 - ], 151 - ) 152 - .await 153 - .unwrap(); 154 - if inserted != 1 { 155 - log::warn!( 156 - "possible log modification: {inserted} rows changed after upserting {op:?}" 157 - ); 31 + match args.command { 32 + Commands::Tail { after } => { 33 + let mut url = args.upstream; 34 + url.set_path("/export"); 35 + let start_at = after.or_else(|| Some(chrono::Utc::now())); 36 + let (tx, rx) = flume::bounded(0); // rendezvous 37 + tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 38 + loop { 39 + for op in rx.recv_async().await.unwrap().ops { 40 + println!("{op}") 41 + } 158 42 } 159 - // { 160 - // if e.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) { 161 - // anyhow::bail!(e); 162 - // } 163 - // // TODO: assert that the row has not changed 164 - // log::warn!("ignoring dup"); 165 - // } 166 43 } 167 - 168 - tx.commit().await.unwrap(); 169 44 } 170 - Ok(()) 171 - } 172 - 173 - async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<Dt> { 174 - pg_client 175 - .query_opt( 176 - r#"SELECT "createdAt" FROM operations 177 - ORDER BY "createdAt" DESC LIMIT 1"#, 178 - &[], 179 - ) 180 - .await 181 - .unwrap() 182 - .map(|r| r.get(0)) 183 - } 184 - 185 - #[tokio::main] 186 - async fn main() -> anyhow::Result<()> { 187 - bin_init("main"); 188 - let args = Args::parse(); 189 - let db = Db::new(&args.postgres); 190 - let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE); 191 - 192 - log::trace!("connecting postgres for export task..."); 193 - let pg_client = db.connect().await?; 194 - let export_task = tokio::task::spawn(export_upstream( 195 - args.upstream, 196 - (args.upstream_bulk, args.bulk_epoch), 197 - tx, 198 - pg_client, 199 - )); 200 - 201 - log::trace!("connecting postgres for writer task..."); 202 - let pg_client = db.connect().await?; 203 - let writer_task = tokio::task::spawn(write_pages(rx, pg_client)); 204 - 205 - tokio::select! { 206 - z = export_task => log::warn!("export task ended: {z:?}"), 207 - z = writer_task => log::warn!("writer task ended: {z:?}"), 208 - }; 209 - 210 - log::error!("todo: shutdown"); 211 - 212 - Ok(()) 213 45 }
-38
src/bin/tail.rs
··· 1 - use allegedly::{bin_init, poll_upstream}; 2 - use clap::Parser; 3 - use url::Url; 4 - 5 - #[derive(Parser)] 6 - struct Args { 7 - /// Upstream PLC server to poll 8 - /// 9 - /// default: https://plc.directory 10 - #[arg(long, env)] 11 - #[clap(default_value = "https://plc.directory")] 12 - upstream: Url, 13 - } 14 - 15 - #[tokio::main] 16 - async fn main() { 17 - bin_init("tail"); 18 - 19 - let mut url = Args::parse().upstream; 20 - url.set_path("/export"); 21 - let now = chrono::Utc::now(); 22 - 23 - let (tx, rx) = flume::bounded(0); // rendezvous 24 - tokio::task::spawn(async move { 25 - if let Err(e) = poll_upstream(Some(now), url, tx).await { 26 - log::error!("polling failed: {e}"); 27 - } else { 28 - log::warn!("poller finished ok (weird?)"); 29 - } 30 - }); 31 - 32 - while let Ok(page) = rx.recv_async().await { 33 - for op in page.ops { 34 - println!("{op}"); 35 - } 36 - } 37 - log::warn!("recv failed, bye"); 38 - }