···11-use allegedly::{Dt, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks, poll_upstream};
11+use allegedly::{Db, Dt, ExportPage, FolderSource, HttpSource, backfill, bin_init, pages_to_weeks, poll_upstream, write_bulk as pages_to_pg};
22use clap::{Parser, Subcommand};
33use std::path::PathBuf;
44use url::Url;
···2828 #[arg(long)]
2929 #[clap(default_value = "4")]
3030 source_workers: usize,
3131+ /// Bulk load into did-method-plc-compatible postgres instead of stdout
3232+ ///
3333+ /// Pass a postgres connection url like "postgresql://localhost:5432"
3434+ #[arg(long)]
3535+ to_postgres: Option<Url>,
3136 },
3237 /// Scrape a PLC server, collecting ops into weekly bundles
3338 ///
···5863 },
5964}
60656666+async fn pages_to_stdout(rx: flume::Receiver<ExportPage>) -> Result<(), flume::RecvError> {
6767+ loop {
6868+ for op in rx.recv_async().await?.ops {
6969+ println!("{op}")
7070+ }
7171+ }
7272+}
7373+6174#[tokio::main]
6275async fn main() {
6376 bin_init("main");
···6982 http,
7083 dir,
7184 source_workers,
8585+ to_postgres,
7286 } => {
7387 let (tx, rx) = flume::bounded(1024); // big pages
7488 tokio::task::spawn(async move {
···8498 .unwrap();
8599 }
86100 });
8787- loop {
8888- for op in rx.recv_async().await.unwrap().ops {
8989- println!("{op}")
9090- }
101101+ if let Some(url) = to_postgres {
102102+ let db = Db::new(url.as_str());
103103+ pages_to_pg(db, rx).await.unwrap();
104104+ } else {
105105+ pages_to_stdout(rx).await.unwrap();
91106 }
92107 }
93108 Commands::Bundle {
···109124 let start_at = after.or_else(|| Some(chrono::Utc::now()));
110125 let (tx, rx) = flume::bounded(0); // rendezvous, don't read ahead
111126 tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() });
112112- loop {
113113- for op in rx.recv_async().await.unwrap().ops {
114114- println!("{op}")
115115- }
116116- }
127127+ pages_to_stdout(rx).await.unwrap();
117128 }
118129 }
119130}
-214
src/bin/backfill.rs
···11-use clap::Parser;
22-use std::time::Duration;
33-use url::Url;
44-55-use allegedly::{Db, Dt, ExportPage, Op, bin_init, poll_upstream};
66-77-const EXPORT_PAGE_QUEUE_SIZE: usize = 0; // rendezvous for now
88-const WEEK_IN_SECONDS: u64 = 7 * 86400;
99-1010-#[derive(Parser)]
1111-struct Args {
1212- /// Upstream PLC server to mirror
1313- ///
1414- /// default: https://plc.directory
1515- #[arg(long, env)]
1616- #[clap(default_value = "https://plc.directory")]
1717- upstream: Url,
1818- /// Bulk export source prefix
1919- ///
2020- /// Must be a prefix for urls ending with {WEEK_TIMESTAMP}.jsonl.gz
2121- ///
2222- /// default: https://plc.t3.storage.dev/plc.directory/
2323- ///
2424- /// pass "off" to skip fast bulk backfilling
2525- #[arg(long, env)]
2626- #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")]
2727- upstream_bulk: Url,
2828- /// The oldest available bulk upstream export timestamp
2929- ///
3030- /// Must be a week-truncated unix timestamp
3131- ///
3232- /// plc.directory's oldest week is `1668643200`; you probably don't want to change this.
3333- #[arg(long, env)]
3434- #[clap(default_value = "1668643200")]
3535- bulk_epoch: u64,
3636- /// Mirror PLC's postgres database
3737- ///
3838- /// URI string with credentials etc
3939- #[arg(long, env)]
4040- postgres: String,
4141-}
4242-4343-async fn bulk_backfill((_upstream, epoch): (Url, u64), _tx: flume::Sender<ExportPage>) {
4444- let immutable_cutoff = std::time::SystemTime::now() - Duration::from_secs((7 + 4) * 86400);
4545- let immutable_ts = (immutable_cutoff.duration_since(std::time::SystemTime::UNIX_EPOCH))
4646- .unwrap()
4747- .as_secs();
4848- let _immutable_week = (immutable_ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS;
4949- let _week = epoch;
5050- let _week_n = 0;
5151- todo!();
5252- // while week < immutable_week {
5353- // log::info!("backfilling week {week_n} ({week})");
5454- // let url = upstream.join(&format!("{week}.jsonl.gz")).unwrap();
5555- // week_to_pages(url, tx.clone()).await.unwrap();
5656- // week_n += 1;
5757- // week += WEEK_IN_SECONDS;
5858- // }
5959-}
6060-6161-async fn export_upstream(
6262- upstream: Url,
6363- bulk: (Url, u64),
6464- tx: flume::Sender<ExportPage>,
6565- pg_client: tokio_postgres::Client,
6666-) {
6767- let latest = get_latest(&pg_client).await;
6868-6969- if latest.is_none() {
7070- bulk_backfill(bulk, tx.clone()).await;
7171- }
7272- let mut upstream = upstream;
7373- upstream.set_path("/export");
7474- poll_upstream(latest, upstream, tx).await.unwrap();
7575-}
7676-7777-async fn write_pages(
7878- rx: flume::Receiver<ExportPage>,
7979- mut pg_client: tokio_postgres::Client,
8080-) -> Result<(), anyhow::Error> {
8181- // TODO: one big upsert at the end from select distinct on the other table
8282-8383- // let upsert_did = &pg_client
8484- // .prepare(
8585- // r#"
8686- // INSERT INTO dids (did) VALUES ($1)
8787- // ON CONFLICT DO NOTHING"#,
8888- // )
8989- // .await
9090- // .unwrap();
9191-9292- let insert_op = &pg_client
9393- .prepare(
9494- r#"
9595- INSERT INTO operations (did, operation, cid, nullified, "createdAt")
9696- VALUES ($1, $2, $3, $4, $5)
9797- ON CONFLICT (did, cid) DO UPDATE
9898- SET nullified = excluded.nullified,
9999- "createdAt" = excluded."createdAt"
100100- WHERE operations.nullified = excluded.nullified
101101- OR operations."createdAt" = excluded."createdAt""#,
102102- ) // idea: op is provable via cid, so leave it out. after did/cid (pk) that leaves nullified and createdAt
103103- // that we want to notice changing.
104104- // normal insert: no conflict, rows changed = 1
105105- // conflict (exact match): where clause passes, rows changed = 1
106106- // conflict (mismatch): where clause fails, rows changed = 0 (detect this and warn!)
107107- .await
108108- .unwrap();
109109-110110- while let Ok(page) = rx.recv_async().await {
111111- log::trace!("got a page...");
112112-113113- let tx = pg_client.transaction().await.unwrap();
114114-115115- // TODO: probably figure out postgres COPY IN
116116- // for now just write everything into a transaction
117117-118118- log::trace!("setting up inserts...");
119119- for op_line in page
120120- .ops
121121- .into_iter()
122122- .flat_map(|s| {
123123- s.replace("}{", "}\n{")
124124- .split('\n')
125125- .map(|s| s.trim())
126126- .map(Into::into)
127127- .collect::<Vec<String>>()
128128- })
129129- .filter(|s| !s.is_empty())
130130- {
131131- let Ok(op) = serde_json::from_str::<Op>(&op_line)
132132- .inspect_err(|e| log::error!("failing! at the {op_line}! {e}"))
133133- else {
134134- log::error!("ayeeeee just ignoring this error for now......");
135135- continue;
136136- };
137137- // let client = &tx;
138138-139139- // client.execute(upsert_did, &[&op.did]).await.unwrap();
140140-141141- // let sp = tx.savepoint("op").await.unwrap();
142142- let inserted = tx
143143- .execute(
144144- insert_op,
145145- &[
146146- &op.did,
147147- &tokio_postgres::types::Json(op.operation),
148148- &op.cid,
149149- &op.nullified,
150150- &op.created_at,
151151- ],
152152- )
153153- .await
154154- .unwrap();
155155- if inserted != 1 {
156156- log::warn!(
157157- "possible log modification: {inserted} rows changed after upserting {op:?}"
158158- );
159159- }
160160- // {
161161- // if e.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
162162- // anyhow::bail!(e);
163163- // }
164164- // // TODO: assert that the row has not changed
165165- // log::warn!("ignoring dup");
166166- // }
167167- }
168168-169169- tx.commit().await.unwrap();
170170- }
171171- Ok(())
172172-}
173173-174174-async fn get_latest(pg_client: &tokio_postgres::Client) -> Option<Dt> {
175175- pg_client
176176- .query_opt(
177177- r#"SELECT "createdAt" FROM operations
178178- ORDER BY "createdAt" DESC LIMIT 1"#,
179179- &[],
180180- )
181181- .await
182182- .unwrap()
183183- .map(|r| r.get(0))
184184-}
185185-186186-#[tokio::main]
187187-async fn main() -> anyhow::Result<()> {
188188- bin_init("main");
189189- let args = Args::parse();
190190- let db = Db::new(&args.postgres);
191191- let (tx, rx) = flume::bounded(EXPORT_PAGE_QUEUE_SIZE);
192192-193193- log::trace!("connecting postgres for export task...");
194194- let pg_client = db.connect().await?;
195195- let export_task = tokio::task::spawn(export_upstream(
196196- args.upstream,
197197- (args.upstream_bulk, args.bulk_epoch),
198198- tx,
199199- pg_client,
200200- ));
201201-202202- log::trace!("connecting postgres for writer task...");
203203- let pg_client = db.connect().await?;
204204- let writer_task = tokio::task::spawn(write_pages(rx, pg_client));
205205-206206- tokio::select! {
207207- z = export_task => log::warn!("export task ended: {z:?}"),
208208- z = writer_task => log::warn!("writer task ended: {z:?}"),
209209- };
210210-211211- log::error!("todo: shutdown");
212212-213213- Ok(())
214214-}
-47
src/bin/get_backfill_chunk_adsf.rs
···11-use allegedly::{HttpSource, Week, week_to_pages};
22-use std::io::Write;
33-44-#[tokio::main]
55-async fn main() {
66- let url: url::Url = "https://plc.t3.storage.dev/plc.directory/".parse().unwrap();
77- let source = HttpSource(url);
88- // let source = FolderSource("./weekly/".into());
99- let week = Week::from_n(1699488000);
1010-1111- let (tx, rx) = flume::bounded(32);
1212-1313- tokio::task::spawn(async move {
1414- week_to_pages(source, week, tx).await.unwrap();
1515- });
1616-1717- let mut n = 0;
1818-1919- print!("receiving");
2020- while let Ok(page) = rx.recv_async().await {
2121- print!(".");
2222- std::io::stdout().flush().unwrap();
2323- n += page.ops.len();
2424- }
2525- println!();
2626-2727- println!("bye ({n})");
2828-2929- // let reader = CLIENT
3030- // .get("https://plc.t3.storage.dev/plc.directory/1699488000.jsonl.gz")
3131- // // .get("https://plc.t3.storage.dev/plc.directory/1669248000.jsonl.gz")
3232- // .send()
3333- // .await
3434- // .unwrap()
3535- // .error_for_status()
3636- // .unwrap()
3737- // .bytes_stream()
3838- // .map_err(io::Error::other)
3939- // .into_async_read();
4040-4141- // let decoder = GzipDecoder::new(io::BufReader::new(reader));
4242- // let mut chunks = io::BufReader::new(decoder).lines().chunks(1000);
4343- // while let Some(ref _chunk) = chunks.next().await {
4444- // print!(".");
4545- // }
4646- // println!();
4747-}
+1-1
src/lib.rs
···8899pub use backfill::backfill;
1010pub use client::CLIENT;
1111-pub use plc_pg::Db;
1111+pub use plc_pg::{Db, write_bulk};
1212pub use poll::{get_page, poll_upstream};
1313pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
1414
+61-1
src/plc_pg.rs
···11-use tokio_postgres::{Client, Error as PgError, NoTls, connect};
11+use crate::{ExportPage, Op};
22+use tokio_postgres::{Client, types::{Type, Json}, Error as PgError, NoTls, connect, binary_copy::BinaryCopyInWriter};
33+use std::pin::pin;
44+2536/// a little tokio-postgres helper
77+///
88+/// it's clone for easiness. it doesn't share any resources underneath after
99+/// cloning at all so it's not meant for
410#[derive(Debug, Clone)]
511pub struct Db {
612 pg_uri: String,
···2935 Ok(client)
3036 }
3137}
3838+3939+pub async fn write_bulk(
4040+ db: Db,
4141+ pages: flume::Receiver<ExportPage>,
4242+) -> Result<(), PgError> {
4343+ let mut client = db.connect().await?;
4444+ let tx = client.transaction().await?;
4545+4646+ tx
4747+ .execute(r#"
4848+ CREATE TABLE backfill (
4949+ did text not null,
5050+ cid text not null,
5151+ operation jsonb not null,
5252+ nullified boolean not null,
5353+ createdAt timestamptz not null
5454+ )"#, &[])
5555+ .await?;
5656+5757+5858+ let types = &[
5959+ Type::TEXT,
6060+ Type::TEXT,
6161+ Type::JSONB,
6262+ Type::BOOL,
6363+ Type::TIMESTAMPTZ,
6464+ ];
6565+6666+ let sync = tx.copy_in("COPY backfill FROM STDIN BINARY").await?;
6767+ let mut writer = pin!(BinaryCopyInWriter::new(sync, types));
6868+6969+ while let Ok(page) = pages.recv_async().await {
7070+ for s in page.ops {
7171+ let Ok(op) = serde_json::from_str::<Op>(&s) else {
7272+ log::warn!("ignoring unparseable op: {s:?}");
7373+ continue;
7474+ };
7575+ writer.as_mut().write(&[
7676+ &op.did,
7777+ &op.cid,
7878+ &Json(op.operation),
7979+ &op.nullified,
8080+ &op.created_at,
8181+ ]).await?;
8282+ }
8383+ }
8484+8585+ let n = writer.as_mut().finish().await?;
8686+ log::info!("copied in {n} rows");
8787+8888+ tx.commit().await?;
8989+9090+ Ok(())
9191+}