···11+use crate::ExportPage;
22+use std::io::Write;
33+44+pub struct PageForwarder<const N: usize> {
55+ newlines: usize,
66+ bytes: Vec<u8>,
77+ dest: flume::Sender<ExportPage>,
88+}
99+1010+impl<const N: usize> PageForwarder<N> {
1111+ pub fn new(dest: flume::Sender<ExportPage>) -> Self {
1212+ Self {
1313+ newlines: 0,
1414+ bytes: Vec::new(),
1515+ dest,
1616+ }
1717+ }
1818+ fn send_page(&mut self) {
1919+ log::info!("sending page!");
2020+ let page_bytes = std::mem::take(&mut self.bytes);
2121+ if !page_bytes.is_empty() {
2222+ let ops = String::from_utf8(page_bytes)
2323+ .unwrap()
2424+ .trim()
2525+ .replace("}{", "}\n{"); // HACK because oops the exports i made are corrupted
2626+ self.dest.send(ExportPage { ops }).unwrap();
2727+ self.newlines = 0;
2828+ }
2929+ }
3030+}
3131+3232+impl<const N: usize> Write for PageForwarder<N> {
3333+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
3434+ let mut buf = buf;
3535+ loop {
3636+ let newlines_to_next_split = N - 1 - self.newlines;
3737+ let Some((i, _)) = buf
3838+ .iter()
3939+ .enumerate()
4040+ .filter(|&(_, &b)| b == b'\n')
4141+ .nth(newlines_to_next_split)
4242+ else {
4343+ // we're left with a partial page
4444+ self.bytes.extend_from_slice(buf);
4545+ // i guess we need this second pass to update the count
4646+ self.newlines += buf.iter().filter(|&&b| b == b'\n').count();
4747+ // could probably do it all in one pass but whatever
4848+ break;
4949+ };
5050+ // we have one complete page from current bytes + buf[..i]
5151+ let (page_rest, rest) = buf.split_at(i);
5252+ self.bytes.extend_from_slice(page_rest);
5353+ self.send_page();
5454+ buf = rest;
5555+ }
5656+5757+ Ok(buf.len())
5858+ }
5959+ fn flush(&mut self) -> std::io::Result<()> {
6060+ self.send_page();
6161+ Ok(())
6262+ }
6363+}
6464+6565+#[cfg(test)]
6666+mod test {
6767+ use super::*;
6868+6969+ #[test]
7070+ fn test_page_forwarder_empty_flush() {
7171+ let (tx, rx) = flume::bounded(1);
7272+ let mut pf = PageForwarder::<1>::new(tx);
7373+ pf.flush().unwrap();
7474+ assert!(rx.is_empty());
7575+ }
7676+}
+13
src/lib.rs
···11+mod backfill;
22+33+pub use backfill::PageForwarder;
44+55+/// One page of PLC export
66+///
77+/// should have maximum length of 1000 lines.
88+/// A bulk export consumer should chunk ops into pages of max 1000 ops.
99+///
1010+/// leading and trailing whitespace should be trimmed.
1111+pub struct ExportPage {
1212+ pub ops: String,
1313+}
+4-73
src/main.rs
···55use tokio_postgres::NoTls;
66use url::Url;
7788+use allegedly::{ExportPage, PageForwarder};
99+810const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now
911const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(500);
1012const WEEK_IN_SECONDS: u64 = 7 * 86400;
···4244 postgres: String,
4345}
44464545-/// One page of PLC export
4646-///
4747-/// should have maximum length of 1000 lines.
4848-/// A bulk export consumer should chunk ops into pages of max 1000 ops.
4949-///
5050-/// leading and trailing whitespace should be trimmed.
5151-struct ExportPage {
5252- pub ops: String,
5353-}
5454-5547#[derive(Deserialize)]
5648#[serde(rename_all = "camelCase")]
5749struct OpPeek {
···6961 pub operation: &'a serde_json::value::RawValue,
7062}
71637272-struct PageForwarder {
7373- newlines: usize,
7474- bytes: Vec<u8>,
7575- dest: flume::Sender<ExportPage>,
7676-}
7777-7878-impl PageForwarder {
7979- fn new(dest: flume::Sender<ExportPage>) -> Self {
8080- Self {
8181- newlines: 0,
8282- bytes: Vec::new(),
8383- dest,
8484- }
8585- }
8686- fn send_page(&mut self) {
8787- log::info!("sending page!");
8888- let page_bytes = std::mem::take(&mut self.bytes);
8989- if !page_bytes.is_empty() {
9090- let ops = String::from_utf8(page_bytes)
9191- .unwrap()
9292- .trim()
9393- .replace("}{", "}\n{"); // HACK because oops the exports i made are corrupted
9494- self.dest.send(ExportPage { ops }).unwrap();
9595- self.newlines = 0;
9696- }
9797- }
9898-}
9999-100100-impl Write for PageForwarder {
101101- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
102102- let mut buf = buf;
103103- loop {
104104- let newlines_to_next_split = 999 - self.newlines;
105105- let Some((i, _)) = buf
106106- .iter()
107107- .enumerate()
108108- .filter(|&(_, &b)| b == b'\n')
109109- .nth(newlines_to_next_split)
110110- else {
111111- // we're left with a partial page
112112- self.bytes.extend_from_slice(buf);
113113- // i guess we need this second pass to update the count
114114- self.newlines += buf.iter().filter(|&&b| b == b'\n').count();
115115- // could probably do it all in one pass but whatever
116116- break;
117117- };
118118- // we have one complete page from current bytes + buf[..i]
119119- let (page_rest, rest) = buf.split_at(i);
120120- self.bytes.extend_from_slice(page_rest);
121121- self.send_page();
122122- buf = rest;
123123- }
124124-125125- Ok(buf.len())
126126- }
127127- fn flush(&mut self) -> std::io::Result<()> {
128128- self.send_page();
129129- Ok(())
130130- }
131131-}
132132-13364async fn bulk_backfill(
13465 client: reqwest::Client,
13566 (upstream, epoch): (Url, u64),
···15384 .error_for_status()
15485 .unwrap();
15586156156- let mut sink = PageForwarder::new(tx.clone());
8787+ let mut sink = PageForwarder::<1000>::new(tx.clone());
15788 let mut decoder = flate2::write::GzDecoder::new(&mut sink);
1588915990 while let Some(chunk) = gzipped_chunks.chunk().await.unwrap() {
···343274344275 log::info!("connected! latest: {latest:?}");
345276346346- let (tx, rx) = flume::bounded::<ExportPage>(EXPORT_PAGE_QUEUE_SIZE);
277277+ let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE);
347278348279 let export_task = tokio::task::spawn(export_upstream(
349280 args.upstream,