Server tools to backfill, tail, mirror, and verify PLC logs

quick fix: cert for postgres

laksdjl

+91 -22
+14
Cargo.lock
··· 38 "governor", 39 "http-body-util", 40 "log", 41 "poem", 42 "reqwest", 43 "reqwest-middleware", 44 "reqwest-retry", ··· 1740 version = "1.11.1" 1741 source = "registry+https://github.com/rust-lang/crates.io-index" 1742 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1743 1744 [[package]] 1745 name = "postgres-protocol"
··· 38 "governor", 39 "http-body-util", 40 "log", 41 + "native-tls", 42 "poem", 43 + "postgres-native-tls", 44 "reqwest", 45 "reqwest-middleware", 46 "reqwest-retry", ··· 1742 version = "1.11.1" 1743 source = "registry+https://github.com/rust-lang/crates.io-index" 1744 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1745 + 1746 + [[package]] 1747 + name = "postgres-native-tls" 1748 + version = "0.5.1" 1749 + source = "registry+https://github.com/rust-lang/crates.io-index" 1750 + checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f" 1751 + dependencies = [ 1752 + "native-tls", 1753 + "tokio", 1754 + "tokio-native-tls", 1755 + "tokio-postgres", 1756 + ] 1757 1758 [[package]] 1759 name = "postgres-protocol"
+2
Cargo.toml
··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 poem = { version = "3.1.12", features = ["acme", "compression"] } 19 reqwest = { version = "0.12.23", features = ["stream", "json"] } 20 reqwest-middleware = "0.4.2" 21 reqwest-retry = "0.7.0"
··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 + native-tls = "0.2.14" 19 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 + postgres-native-tls = "0.5.1" 21 reqwest = { version = "0.12.23", features = ["stream", "json"] } 22 reqwest-middleware = "0.4.2" 23 reqwest-retry = "0.7.0"
+1 -1
readme.md
··· 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 - --acme-cache-dir ./acme-cache \ 31 --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33
··· 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 + --acme-cache-path ./acme-cache \ 31 --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33
+10 -3
src/bin/allegedly.rs
··· 38 /// Pass a postgres connection url like "postgresql://localhost:5432" 39 #[arg(long)] 40 to_postgres: Option<Url>, 41 /// Delete all operations from the postgres db before starting 42 /// 43 /// only used if `--to-postgres` is present ··· 79 /// the wrapped did-method-plc server's database (write access required) 80 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 81 wrap_pg: Url, 82 /// wrapping server listen address 83 #[arg(short, long, env = "ALLEGEDLY_BIND")] 84 #[clap(default_value = "127.0.0.1:8000")] ··· 166 dir, 167 source_workers, 168 to_postgres, 169 postgres_reset, 170 until, 171 catch_up, ··· 195 }; 196 197 let to_postgres_url_bulk = to_postgres.clone(); 198 let bulk_out_write = tokio::task::spawn(async move { 199 if let Some(ref url) = to_postgres_url_bulk { 200 - let db = Db::new(url.as_str()).await.unwrap(); 201 backfill_to_pg(db, postgres_reset, rx, notify_last_at) 202 .await 203 .unwrap(); ··· 220 log::info!("writing catch-up pages"); 221 let full_pages = full_pages(rx); 222 if let Some(url) = to_postgres { 223 - let db = Db::new(url.as_str()).await.unwrap(); 224 pages_to_pg(db, full_pages).await.unwrap(); 225 } else { 226 pages_to_stdout(full_pages, None).await.unwrap(); ··· 243 Commands::Mirror { 244 wrap, 245 wrap_pg, 246 bind, 247 acme_domain, 248 acme_cache_path, 249 acme_directory_url, 250 } => { 251 - let db = Db::new(wrap_pg.as_str()).await.unwrap(); 252 let latest = db 253 .get_latest() 254 .await
··· 38 /// Pass a postgres connection url like "postgresql://localhost:5432" 39 #[arg(long)] 40 to_postgres: Option<Url>, 41 + /// Cert for postgres (if needed) 42 + postgres_cert: Option<PathBuf>, 43 /// Delete all operations from the postgres db before starting 44 /// 45 /// only used if `--to-postgres` is present ··· 81 /// the wrapped did-method-plc server's database (write access required) 82 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 83 wrap_pg: Url, 84 + /// path to tls cert for the wrapped postgres db, if needed 85 + wrap_pg_cert: Option<PathBuf>, 86 /// wrapping server listen address 87 #[arg(short, long, env = "ALLEGEDLY_BIND")] 88 #[clap(default_value = "127.0.0.1:8000")] ··· 170 dir, 171 source_workers, 172 to_postgres, 173 + postgres_cert, 174 postgres_reset, 175 until, 176 catch_up, ··· 200 }; 201 202 let to_postgres_url_bulk = to_postgres.clone(); 203 + let pg_cert = postgres_cert.clone(); 204 let bulk_out_write = tokio::task::spawn(async move { 205 if let Some(ref url) = to_postgres_url_bulk { 206 + let db = Db::new(url.as_str(), pg_cert).await.unwrap(); 207 backfill_to_pg(db, postgres_reset, rx, notify_last_at) 208 .await 209 .unwrap(); ··· 226 log::info!("writing catch-up pages"); 227 let full_pages = full_pages(rx); 228 if let Some(url) = to_postgres { 229 + let db = Db::new(url.as_str(), postgres_cert).await.unwrap(); 230 pages_to_pg(db, full_pages).await.unwrap(); 231 } else { 232 pages_to_stdout(full_pages, None).await.unwrap(); ··· 249 Commands::Mirror { 250 wrap, 251 wrap_pg, 252 + wrap_pg_cert, 253 bind, 254 acme_domain, 255 acme_cache_path, 256 acme_directory_url, 257 } => { 258 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await.unwrap(); 259 let latest = db 260 .get_latest() 261 .await
+64 -18
src/plc_pg.rs
··· 1 use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 use std::pin::pin; 3 use std::time::Instant; 4 use tokio::sync::{mpsc, oneshot}; ··· 9 types::{Json, Type}, 10 }; 11 12 /// a little tokio-postgres helper 13 /// 14 /// it's clone for easiness. it doesn't share any resources underneath after 15 /// cloning at all so it's not meant for 16 - #[derive(Debug, Clone)] 17 pub struct Db { 18 pg_uri: String, 19 } 20 21 impl Db { 22 - pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> { 23 // we're going to interact with did-method-plc's database, so make sure 24 // it's what we expect: check for db migrations. 25 log::trace!("checking migrations..."); 26 - let (client, connection) = connect(pg_uri, NoTls).await?; 27 - let connection_task = tokio::task::spawn(async move { 28 - connection 29 - .await 30 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 31 - .unwrap(); 32 - }); 33 let migrations: Vec<String> = client 34 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 35 .await? ··· 52 53 Ok(Self { 54 pg_uri: pg_uri.to_string(), 55 }) 56 } 57 58 pub async fn connect(&self) -> Result<Client, PgError> { 59 log::trace!("connecting postgres..."); 60 - let (client, connection) = connect(&self.pg_uri, NoTls).await?; 61 62 - // send the connection away to do the actual communication work 63 - // apparently the connection will complete when the client drops 64 - tokio::task::spawn(async move { 65 - connection 66 - .await 67 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 68 - .unwrap(); 69 - }); 70 71 Ok(client) 72 }
··· 1 use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 + use native_tls::{Certificate, TlsConnector}; 3 + use postgres_native_tls::MakeTlsConnector; 4 + use std::path::PathBuf; 5 use std::pin::pin; 6 use std::time::Instant; 7 use tokio::sync::{mpsc, oneshot}; ··· 12 types::{Json, Type}, 13 }; 14 15 + fn get_tls(cert: PathBuf) -> MakeTlsConnector { 16 + let cert = std::fs::read(cert).unwrap(); 17 + let cert = Certificate::from_pem(&cert).unwrap(); 18 + let connector = TlsConnector::builder() 19 + .add_root_certificate(cert) 20 + .build() 21 + .unwrap(); 22 + MakeTlsConnector::new(connector) 23 + } 24 + 25 /// a little tokio-postgres helper 26 /// 27 /// it's clone for easiness. it doesn't share any resources underneath after 28 /// cloning at all so it's not meant for 29 + #[derive(Clone)] 30 pub struct Db { 31 pg_uri: String, 32 + cert: Option<MakeTlsConnector>, 33 } 34 35 impl Db { 36 + pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 37 // we're going to interact with did-method-plc's database, so make sure 38 // it's what we expect: check for db migrations. 39 log::trace!("checking migrations..."); 40 + 41 + let connector = cert.map(get_tls); 42 + 43 + let (client, connection_task) = if let Some(ref connector) = connector { 44 + let (client, connection) = connect(pg_uri, connector.clone()).await?; 45 + let task = tokio::task::spawn(async move { 46 + connection 47 + .await 48 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 49 + .unwrap(); 50 + }); 51 + (client, task) 52 + } else { 53 + let (client, connection) = connect(pg_uri, NoTls).await?; 54 + let task = tokio::task::spawn(async move { 55 + connection 56 + .await 57 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 58 + .unwrap(); 59 + }); 60 + (client, task) 61 + }; 62 + 63 let migrations: Vec<String> = client 64 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 65 .await? ··· 82 83 Ok(Self { 84 pg_uri: pg_uri.to_string(), 85 + cert: connector, 86 }) 87 } 88 89 pub async fn connect(&self) -> Result<Client, PgError> { 90 log::trace!("connecting postgres..."); 91 + let client = if let Some(ref connector) = self.cert { 92 + let (client, connection) = connect(&self.pg_uri, connector.clone()).await?; 93 + 94 + // send the connection away to do the actual communication work 95 + // apparently the connection will complete when the client drops 96 + tokio::task::spawn(async move { 97 + connection 98 + .await 99 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 100 + .unwrap(); 101 + }); 102 + client 103 + } else { 104 + let (client, connection) = connect(&self.pg_uri, NoTls).await?; 105 106 + // send the connection away to do the actual communication work 107 + // apparently the connection will complete when the client drops 108 + tokio::task::spawn(async move { 109 + connection 110 + .await 111 + .inspect_err(|e| log::error!("connection ended with error: {e}")) 112 + .unwrap(); 113 + }); 114 + client 115 + }; 116 117 Ok(client) 118 }