Server tools to backfill, tail, mirror, and verify PLC logs

racing backfills

+58 -30
+5 -3
src/bin/bundle-weekly.rs
··· 1 - use allegedly::{bin_init, pages_to_weeks, poll_upstream}; 1 + use allegedly::{Week, bin_init, pages_to_weeks, poll_upstream}; 2 2 use clap::Parser; 3 3 use std::path::PathBuf; 4 4 use url::Url; ··· 23 23 /// 24 24 /// Must be a week-truncated unix timestamp 25 25 #[arg(long, env)] 26 - start_at: Option<u64>, // TODO!! 26 + start_at: Option<i64>, 27 27 } 28 28 29 29 #[tokio::main] ··· 34 34 let mut url = args.upstream; 35 35 url.set_path("/export"); 36 36 37 + let after = args.start_at.map(|n| Week::from_n(n).into()); 38 + 37 39 log::trace!("ensure weekly output directory exists"); 38 40 std::fs::create_dir_all(&args.dir)?; 39 41 40 42 let (tx, rx) = flume::bounded(PAGE_QUEUE_SIZE); 41 43 42 44 tokio::task::spawn(async move { 43 - if let Err(e) = poll_upstream(None /*todo*/, url, tx).await { 45 + if let Err(e) = poll_upstream(after, url, tx).await { 44 46 log::error!("polling failed: {e}"); 45 47 } else { 46 48 log::warn!("poller finished ok (weird?)");
+38 -23
src/bin/main.rs
··· 77 77 rx: flume::Receiver<ExportPage>, 78 78 mut pg_client: tokio_postgres::Client, 79 79 ) -> Result<(), anyhow::Error> { 80 - let upsert_did = &pg_client 81 - .prepare( 82 - r#" 83 - INSERT INTO dids (did) VALUES ($1) 84 - ON CONFLICT DO NOTHING"#, 85 - ) 86 - .await 87 - .unwrap(); 80 + // TODO: one big upsert at the end from select distinct on the other table 81 + 82 + // let upsert_did = &pg_client 83 + // .prepare( 84 + // r#" 85 + // INSERT INTO dids (did) VALUES ($1) 86 + // ON CONFLICT DO NOTHING"#, 87 + // ) 88 + // .await 89 + // .unwrap(); 88 90 89 91 let insert_op = &pg_client 90 92 .prepare( 91 93 r#" 92 94 INSERT INTO operations (did, operation, cid, nullified, "createdAt") 93 - VALUES ($1, $2, $3, $4, $5)"#, 94 - ) // TODO: check that it hasn't changed 95 + VALUES ($1, $2, $3, $4, $5) 96 + ON CONFLICT (did, cid) DO UPDATE 97 + SET nullified = excluded.nullified, 98 + "createdAt" = excluded."createdAt" 99 + WHERE operations.nullified = excluded.nullified 100 + OR operations."createdAt" = excluded."createdAt""#, 101 + ) // idea: op is provable via cid, so leave it out. after did/cid (pk) that leaves nullified and createdAt 102 + // that we want to notice changing. 103 + // normal insert: no conflict, rows changed = 1 104 + // conflict (exact match): where clause passes, rows changed = 1 105 + // conflict (mismatch): where clause fails, rows changed = 0 (detect this and warn!) 95 106 .await 96 107 .unwrap(); 97 108 98 109 while let Ok(page) = rx.recv_async().await { 99 110 log::trace!("got a page..."); 100 111 101 - let mut tx = pg_client.transaction().await.unwrap(); 112 + let tx = pg_client.transaction().await.unwrap(); 102 113 103 114 // TODO: probably figure out postgres COPY IN 104 115 // for now just write everything into a transaction ··· 122 133 log::error!("ayeeeee just ignoring this error for now......"); 123 134 continue; 124 135 }; 125 - let client = &tx; 136 + // let client = &tx; 126 137 127 - client.execute(upsert_did, &[&op.did]).await.unwrap(); 138 + // client.execute(upsert_did, &[&op.did]).await.unwrap(); 128 139 129 - let sp = tx.savepoint("op").await.unwrap(); 130 - if let Err(e) = sp 140 + // let sp = tx.savepoint("op").await.unwrap(); 141 + let inserted = tx 131 142 .execute( 132 143 insert_op, 133 144 &[ ··· 139 150 ], 140 151 ) 141 152 .await 142 - { 143 - if e.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) { 144 - anyhow::bail!(e); 145 - } 146 - // TODO: assert that the row has not changed 147 - log::warn!("ignoring dup"); 148 - } else { 149 - sp.commit().await.unwrap(); 153 + .unwrap(); 154 + if inserted != 1 { 155 + log::warn!( 156 + "possible log modification: {inserted} rows changed after upserting {op:?}" 157 + ); 150 158 } 159 + // { 160 + // if e.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) { 161 + // anyhow::bail!(e); 162 + // } 163 + // // TODO: assert that the row has not changed 164 + // log::warn!("ignoring dup"); 165 + // } 151 166 } 152 167 153 168 tx.commit().await.unwrap();
+15 -4
src/weekly.rs
··· 8 8 #[derive(Debug, Clone, Copy, PartialEq)] 9 9 pub struct Week(i64); 10 10 11 + impl Week { 12 + pub fn from_n(n: i64) -> Self { 13 + Self(n) 14 + } 15 + pub fn n_ago(&self) -> i64 { 16 + let Self(us) = self; 17 + let Self(cur) = chrono::Utc::now().into(); 18 + (cur - us) / WEEK_IN_SECONDS 19 + } 20 + } 21 + 11 22 impl From<Dt> for Week { 12 23 fn from(dt: Dt) -> Self { 13 24 let ts = dt.timestamp(); ··· 35 46 let total_t0 = Instant::now(); 36 47 let mut week_ops = 0; 37 48 let mut week_t0 = total_t0; 38 - let mut week = 0; 39 49 40 50 while let Ok(page) = rx.recv_async().await { 41 51 for mut s in page.ops { ··· 50 60 let now = Instant::now(); 51 61 52 62 log::info!( 53 - "done week {week:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 63 + "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 64 + current_week.map(|w| -w.n_ago()).unwrap_or(0), 54 65 current_week.unwrap_or(Week(0)).0, 55 66 (week_ops as f64) / (now - week_t0).as_secs_f64(), 56 67 total_ops / 1000, ··· 62 73 current_week = Some(op_week); 63 74 week_ops = 0; 64 75 week_t0 = now; 65 - week += 1; 66 76 } 67 77 s.push('\n'); // hack 68 78 log::trace!("writing: {s}"); ··· 76 86 encoder.shutdown().await?; 77 87 let now = Instant::now(); 78 88 log::info!( 79 - "done week {week:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 89 + "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 90 + current_week.map(|w| -w.n_ago()).unwrap_or(0), 80 91 current_week.unwrap_or(Week(0)).0, 81 92 (week_ops as f64) / (now - week_t0).as_secs_f64(), 82 93 total_ops / 1000,