Server tools to backfill, tail, mirror, and verify PLC logs

move data to actual tables

seems a little slow perhaps

+44 -6
+6 -1
src/backfill.rs
··· 8 source: impl BundleSource + Send + 'static, 9 dest: flume::Sender<ExportPage>, 10 source_workers: usize, 11 ) -> anyhow::Result<()> { 12 // queue up the week bundles that should be available 13 - let weeks = Arc::new(Mutex::new(Week::range(FIRST_WEEK..))); 14 weeks.lock().await.reverse(); 15 16 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new();
··· 8 source: impl BundleSource + Send + 'static, 9 dest: flume::Sender<ExportPage>, 10 source_workers: usize, 11 + until: Option<Dt>, 12 ) -> anyhow::Result<()> { 13 // queue up the week bundles that should be available 14 + let weeks = Arc::new(Mutex::new( 15 + until 16 + .map(|u| Week::range(FIRST_WEEK..u.into())) 17 + .unwrap_or(Week::range(FIRST_WEEK..)), 18 + )); 19 weeks.lock().await.reverse(); 20 21 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new();
+6 -2
src/bin/allegedly.rs
··· 37 /// Pass a postgres connection url like "postgresql://localhost:5432" 38 #[arg(long)] 39 to_postgres: Option<Url>, 40 }, 41 /// Scrape a PLC server, collecting ops into weekly bundles 42 /// ··· 88 dir, 89 source_workers, 90 to_postgres, 91 } => { 92 let (tx, rx) = flume::bounded(32); // big pages 93 tokio::task::spawn(async move { 94 if let Some(dir) = dir { 95 log::info!("Reading weekly bundles from local folder {dir:?}"); 96 - backfill(FolderSource(dir), tx, source_workers.unwrap_or(1)) 97 .await 98 .unwrap(); 99 } else { 100 log::info!("Fetching weekly bundles from from {http}"); 101 - backfill(HttpSource(http), tx, source_workers.unwrap_or(4)) 102 .await 103 .unwrap(); 104 }
··· 37 /// Pass a postgres connection url like "postgresql://localhost:5432" 38 #[arg(long)] 39 to_postgres: Option<Url>, 40 + /// Stop at the week ending before this date 41 + #[arg(long)] 42 + until: Option<Dt>, 43 }, 44 /// Scrape a PLC server, collecting ops into weekly bundles 45 /// ··· 91 dir, 92 source_workers, 93 to_postgres, 94 + until, 95 } => { 96 let (tx, rx) = flume::bounded(32); // big pages 97 tokio::task::spawn(async move { 98 if let Some(dir) = dir { 99 log::info!("Reading weekly bundles from local folder {dir:?}"); 100 + backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 101 .await 102 .unwrap(); 103 } else { 104 log::info!("Fetching weekly bundles from from {http}"); 105 + backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 106 .await 107 .unwrap(); 108 }
+32 -3
src/plc_pg.rs
··· 92 cid text not null, 93 operation jsonb not null, 94 nullified boolean not null, 95 - createdAt timestamptz not null 96 )"#, 97 &[], 98 ) ··· 135 log::info!("copied in {n} rows"); 136 137 tx.commit().await?; 138 139 - let dt = t0.elapsed(); 140 - log::info!("backfill time: {dt:?}"); 141 142 Ok(()) 143 }
··· 92 cid text not null, 93 operation jsonb not null, 94 nullified boolean not null, 95 + "createdAt" timestamptz not null 96 )"#, 97 &[], 98 ) ··· 135 log::info!("copied in {n} rows"); 136 137 tx.commit().await?; 138 + log::info!("copy in time: {:?}", t0.elapsed()); 139 140 + log::info!("copying dids into plc table..."); 141 + let n = client 142 + .execute( 143 + r#" 144 + INSERT INTO dids 145 + SELECT distinct did FROM allegedly_backfill 146 + ON CONFLICT do nothing"#, 147 + &[], 148 + ) 149 + .await?; 150 + log::info!("{n} inserted; elapsed: {:?}", t0.elapsed()); 151 + 152 + log::info!("copying ops into plc table..."); 153 + let n = client 154 + .execute( 155 + r#" 156 + INSERT INTO operations (did, cid, operation, nullified, "createdAt") 157 + SELECT did, cid, operation, nullified, "createdAt" FROM allegedly_backfill 158 + ON CONFLICT do nothing"#, 159 + &[], 160 + ) 161 + .await?; 162 + log::info!("{n} inserted; elapsed: {:?}", t0.elapsed()); 163 + 164 + log::info!("clean up backfill table..."); 165 + client 166 + .execute(r#"DROP TABLE allegedly_backfill"#, &[]) 167 + .await?; 168 + 169 + log::info!("total backfill time: {:?}", t0.elapsed()); 170 171 Ok(()) 172 }