Server tools to backfill, tail, mirror, and verify PLC logs
at debug 304 lines 9.6 kB view raw
1use crate::{Dt, ExportPage, PageBoundaryState}; 2use native_tls::{Certificate, TlsConnector}; 3use postgres_native_tls::MakeTlsConnector; 4use std::path::PathBuf; 5use std::pin::pin; 6use std::time::Instant; 7use tokio::{ 8 sync::{mpsc, oneshot}, 9 task::{JoinHandle, spawn}, 10}; 11use tokio_postgres::{ 12 Client, Error as PgError, NoTls, Socket, 13 binary_copy::BinaryCopyInWriter, 14 connect, 15 tls::MakeTlsConnect, 16 types::{Json, Type}, 17}; 18 19fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> { 20 let cert = std::fs::read(cert)?; 21 let cert = Certificate::from_pem(&cert)?; 22 let connector = TlsConnector::builder().add_root_certificate(cert).build()?; 23 Ok(MakeTlsConnector::new(connector)) 24} 25 26async fn get_client_and_task<T>( 27 uri: &str, 28 connector: T, 29) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> 30where 31 T: MakeTlsConnect<Socket>, 32 <T as MakeTlsConnect<Socket>>::Stream: Send + 'static, 33{ 34 let (client, connection) = connect(uri, connector).await?; 35 Ok((client, spawn(connection))) 36} 37 38/// a little tokio-postgres helper 39/// 40/// it's clone for easiness. it doesn't share any resources underneath after 41/// cloning *at all* so it's not meant for eg. handling public web requests 42#[derive(Clone)] 43pub struct Db { 44 pg_uri: String, 45 cert: Option<MakeTlsConnector>, 46} 47 48impl Db { 49 pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 50 // we're going to interact with did-method-plc's database, so make sure 51 // it's what we expect: check for db migrations. 52 log::trace!("checking migrations..."); 53 54 let connector = cert.map(get_tls).transpose()?; 55 56 let (client, conn_task) = if let Some(ref connector) = connector { 57 get_client_and_task(pg_uri, connector.clone()).await? 58 } else { 59 get_client_and_task(pg_uri, NoTls).await? 60 }; 61 62 let migrations: Vec<String> = client 63 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 64 .await? 65 .iter() 66 .map(|row| row.get(0)) 67 .collect(); 68 assert_eq!( 69 &migrations, 70 &[ 71 "_20221020T204908820Z", 72 "_20230223T215019669Z", 73 "_20230406T174552885Z", 74 "_20231128T203323431Z", 75 ] 76 ); 77 drop(client); 78 // make sure the connection worker thing doesn't linger 79 conn_task.await??; 80 log::info!("db connection succeeded and plc migrations appear as expected"); 81 82 Ok(Self { 83 pg_uri: pg_uri.to_string(), 84 cert: connector, 85 }) 86 } 87 88 pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 89 log::trace!("connecting postgres..."); 90 if let Some(ref connector) = self.cert { 91 get_client_and_task(&self.pg_uri, connector.clone()).await 92 } else { 93 get_client_and_task(&self.pg_uri, NoTls).await 94 } 95 } 96 97 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 98 let (client, task) = self.connect().await?; 99 let dt: Option<Dt> = client 100 .query_opt( 101 r#"SELECT "createdAt" 102 FROM operations 103 ORDER BY "createdAt" DESC 104 LIMIT 1"#, 105 &[], 106 ) 107 .await? 108 .map(|row| row.get(0)); 109 drop(task); 110 Ok(dt) 111 } 112} 113 114pub async fn pages_to_pg( 115 db: Db, 116 mut pages: mpsc::Receiver<ExportPage>, 117) -> anyhow::Result<&'static str> { 118 log::info!("starting pages_to_pg writer..."); 119 120 let (mut client, task) = db.connect().await?; 121 122 let ops_stmt = client 123 .prepare( 124 r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt") 125 VALUES ($1, $2, $3, $4, $5) 126 ON CONFLICT do nothing"#, 127 ) 128 .await?; 129 let did_stmt = client 130 .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#) 131 .await?; 132 133 let t0 = Instant::now(); 134 let mut ops_inserted = 0; 135 let mut dids_inserted = 0; 136 137 while let Some(page) = pages.recv().await { 138 log::trace!("writing page with {} ops", page.ops.len()); 139 let tx = client.transaction().await?; 140 for op in page.ops { 141 ops_inserted += tx 142 .execute( 143 &ops_stmt, 144 &[ 145 &op.did, 146 &Json(op.operation), 147 &op.cid, 148 &op.nullified, 149 &op.created_at, 150 ], 151 ) 152 .await?; 153 dids_inserted += tx.execute(&did_stmt, &[&op.did]).await?; 154 } 155 tx.commit().await?; 156 } 157 drop(task); 158 159 log::info!( 160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 161 t0.elapsed() 162 ); 163 Ok("pages_to_pg") 164} 165 166/// Dump rows into an empty operations table quickly 167/// 168/// you must run this after initializing the db with kysely migrations from the 169/// typescript app, but before inserting any content. 170/// 171/// it's an invasive process: it will drop the indexes that kysely created (and 172/// restore them after) in order to get the backfill in as quickly as possible. 173/// 174/// fails: if the backfill data violates the primary key constraint (unique did*cid) 175/// 176/// panics: if the operations or dids tables are not empty, unless reset is true 177/// 178/// recommended postgres setting: `max_wal_size=4GB` (or more) 179pub async fn backfill_to_pg( 180 db: Db, 181 reset: bool, 182 mut pages: mpsc::Receiver<ExportPage>, 183 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 184) -> anyhow::Result<&'static str> { 185 let (mut client, task) = db.connect().await?; 186 187 let t0 = Instant::now(); 188 let tx = client.transaction().await?; 189 tx.execute("SET LOCAL synchronous_commit = off", &[]) 190 .await?; 191 192 let t_step = Instant::now(); 193 for table in ["operations", "dids"] { 194 if reset { 195 let n = tx.execute(&format!("DELETE FROM {table}"), &[]).await?; 196 if n > 0 { 197 log::warn!("postgres reset: deleted {n} from {table}"); 198 } 199 } else { 200 let n: i64 = tx 201 .query_one(&format!("SELECT count(*) FROM {table}"), &[]) 202 .await? 203 .get(0); 204 if n > 0 { 205 panic!("postgres: {table} table was not empty and `reset` not requested"); 206 } 207 } 208 } 209 log::trace!("tables clean: {:?}", t_step.elapsed()); 210 211 let t_step = Instant::now(); 212 tx.execute("ALTER TABLE operations SET UNLOGGED", &[]) 213 .await?; 214 tx.execute("ALTER TABLE dids SET UNLOGGED", &[]).await?; 215 log::trace!("set tables unlogged: {:?}", t_step.elapsed()); 216 217 let t_step = Instant::now(); 218 tx.execute(r#"DROP INDEX "operations_createdAt_index""#, &[]) 219 .await?; 220 tx.execute("DROP INDEX operations_did_createdat_idx", &[]) 221 .await?; 222 log::trace!("indexes dropped: {:?}", t_step.elapsed()); 223 224 let t_step = Instant::now(); 225 log::trace!("starting binary COPY IN..."); 226 let types = &[ 227 Type::TEXT, 228 Type::JSONB, 229 Type::TEXT, 230 Type::BOOL, 231 Type::TIMESTAMPTZ, 232 ]; 233 let sync = tx 234 .copy_in( 235 r#"COPY operations (did, operation, cid, nullified, "createdAt") FROM STDIN BINARY"#, 236 ) 237 .await?; 238 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 239 let mut last_at = None; 240 while let Some(page) = pages.recv().await { 241 for op in &page.ops { 242 writer 243 .as_mut() 244 .write(&[ 245 &op.did, 246 &Json(op.operation.clone()), 247 &op.cid, 248 &op.nullified, 249 &op.created_at, 250 ]) 251 .await?; 252 } 253 if notify_last_at.is_some() 254 && let Some(s) = PageBoundaryState::new(&page) 255 { 256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 257 } 258 } 259 log::debug!("finished receiving bulk pages"); 260 261 if let Some(notify) = notify_last_at { 262 log::trace!("notifying last_at: {last_at:?}"); 263 if notify.send(last_at).is_err() { 264 log::error!("receiver for last_at dropped, can't notify"); 265 }; 266 } 267 268 let n = writer.as_mut().finish().await?; 269 log::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed()); 270 271 // CAUTION: these indexes MUST match up exactly with the kysely ones we dropped 272 let t_step = Instant::now(); 273 tx.execute( 274 r#"CREATE INDEX operations_did_createdat_idx ON operations (did, "createdAt")"#, 275 &[], 276 ) 277 .await?; 278 tx.execute( 279 r#"CREATE INDEX "operations_createdAt_index" ON operations ("createdAt")"#, 280 &[], 281 ) 282 .await?; 283 log::trace!("indexes recreated: {:?}", t_step.elapsed()); 284 285 let t_step = Instant::now(); 286 let n = tx 287 .execute( 288 r#"INSERT INTO dids SELECT distinct did FROM operations"#, 289 &[], 290 ) 291 .await?; 292 log::trace!("INSERT wrote {n} dids: {:?}", t_step.elapsed()); 293 294 let t_step = Instant::now(); 295 tx.execute("ALTER TABLE dids SET LOGGED", &[]).await?; 296 tx.execute("ALTER TABLE operations SET LOGGED", &[]).await?; 297 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 298 299 tx.commit().await?; 300 drop(task); 301 log::info!("total backfill time: {:?}", t0.elapsed()); 302 303 Ok("backfill_to_pg") 304}