Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{Dt, ExportPage, PageBoundaryState};
2use native_tls::{Certificate, TlsConnector};
3use postgres_native_tls::MakeTlsConnector;
4use std::path::PathBuf;
5use std::pin::pin;
6use std::time::Instant;
7use tokio::{
8 sync::{mpsc, oneshot},
9 task::{JoinHandle, spawn},
10};
11use tokio_postgres::{
12 Client, Error as PgError, NoTls, Socket,
13 binary_copy::BinaryCopyInWriter,
14 connect,
15 tls::MakeTlsConnect,
16 types::{Json, Type},
17};
18
19fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> {
20 let cert = std::fs::read(cert)?;
21 let cert = Certificate::from_pem(&cert)?;
22 let connector = TlsConnector::builder().add_root_certificate(cert).build()?;
23 Ok(MakeTlsConnector::new(connector))
24}
25
26async fn get_client_and_task<T>(
27 uri: &str,
28 connector: T,
29) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError>
30where
31 T: MakeTlsConnect<Socket>,
32 <T as MakeTlsConnect<Socket>>::Stream: Send + 'static,
33{
34 let (client, connection) = connect(uri, connector).await?;
35 Ok((client, spawn(connection)))
36}
37
38/// a little tokio-postgres helper
39///
40/// it's clone for easiness. it doesn't share any resources underneath after
41/// cloning *at all* so it's not meant for eg. handling public web requests
42#[derive(Clone)]
43pub struct Db {
44 pg_uri: String,
45 cert: Option<MakeTlsConnector>,
46}
47
48impl Db {
49 pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> {
50 // we're going to interact with did-method-plc's database, so make sure
51 // it's what we expect: check for db migrations.
52 log::trace!("checking migrations...");
53
54 let connector = cert.map(get_tls).transpose()?;
55
56 let (client, conn_task) = if let Some(ref connector) = connector {
57 get_client_and_task(pg_uri, connector.clone()).await?
58 } else {
59 get_client_and_task(pg_uri, NoTls).await?
60 };
61
62 let migrations: Vec<String> = client
63 .query("SELECT name FROM kysely_migration ORDER BY name", &[])
64 .await?
65 .iter()
66 .map(|row| row.get(0))
67 .collect();
68 assert_eq!(
69 &migrations,
70 &[
71 "_20221020T204908820Z",
72 "_20230223T215019669Z",
73 "_20230406T174552885Z",
74 "_20231128T203323431Z",
75 ]
76 );
77 drop(client);
78 // make sure the connection worker thing doesn't linger
79 conn_task.await??;
80 log::info!("db connection succeeded and plc migrations appear as expected");
81
82 Ok(Self {
83 pg_uri: pg_uri.to_string(),
84 cert: connector,
85 })
86 }
87
88 pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> {
89 log::trace!("connecting postgres...");
90 if let Some(ref connector) = self.cert {
91 get_client_and_task(&self.pg_uri, connector.clone()).await
92 } else {
93 get_client_and_task(&self.pg_uri, NoTls).await
94 }
95 }
96
97 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> {
98 let (client, task) = self.connect().await?;
99 let dt: Option<Dt> = client
100 .query_opt(
101 r#"SELECT "createdAt"
102 FROM operations
103 ORDER BY "createdAt" DESC
104 LIMIT 1"#,
105 &[],
106 )
107 .await?
108 .map(|row| row.get(0));
109 drop(task);
110 Ok(dt)
111 }
112}
113
114pub async fn pages_to_pg(
115 db: Db,
116 mut pages: mpsc::Receiver<ExportPage>,
117) -> anyhow::Result<&'static str> {
118 log::info!("starting pages_to_pg writer...");
119
120 let (mut client, task) = db.connect().await?;
121
122 let ops_stmt = client
123 .prepare(
124 r#"INSERT INTO operations (did, operation, cid, nullified, "createdAt")
125 VALUES ($1, $2, $3, $4, $5)
126 ON CONFLICT do nothing"#,
127 )
128 .await?;
129 let did_stmt = client
130 .prepare(r#"INSERT INTO dids (did) VALUES ($1) ON CONFLICT do nothing"#)
131 .await?;
132
133 let t0 = Instant::now();
134 let mut ops_inserted = 0;
135 let mut dids_inserted = 0;
136
137 while let Some(page) = pages.recv().await {
138 log::trace!("writing page with {} ops", page.ops.len());
139 let tx = client.transaction().await?;
140 for op in page.ops {
141 ops_inserted += tx
142 .execute(
143 &ops_stmt,
144 &[
145 &op.did,
146 &Json(op.operation),
147 &op.cid,
148 &op.nullified,
149 &op.created_at,
150 ],
151 )
152 .await?;
153 dids_inserted += tx.execute(&did_stmt, &[&op.did]).await?;
154 }
155 tx.commit().await?;
156 }
157 drop(task);
158
159 log::info!(
160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
161 t0.elapsed()
162 );
163 Ok("pages_to_pg")
164}
165
166/// Dump rows into an empty operations table quickly
167///
168/// you must run this after initializing the db with kysely migrations from the
169/// typescript app, but before inserting any content.
170///
171/// it's an invasive process: it will drop the indexes that kysely created (and
172/// restore them after) in order to get the backfill in as quickly as possible.
173///
174/// fails: if the backfill data violates the primary key constraint (unique did*cid)
175///
176/// panics: if the operations or dids tables are not empty, unless reset is true
177///
178/// recommended postgres setting: `max_wal_size=4GB` (or more)
179pub async fn backfill_to_pg(
180 db: Db,
181 reset: bool,
182 mut pages: mpsc::Receiver<ExportPage>,
183 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
184) -> anyhow::Result<&'static str> {
185 let (mut client, task) = db.connect().await?;
186
187 let t0 = Instant::now();
188 let tx = client.transaction().await?;
189 tx.execute("SET LOCAL synchronous_commit = off", &[])
190 .await?;
191
192 let t_step = Instant::now();
193 for table in ["operations", "dids"] {
194 if reset {
195 let n = tx.execute(&format!("DELETE FROM {table}"), &[]).await?;
196 if n > 0 {
197 log::warn!("postgres reset: deleted {n} from {table}");
198 }
199 } else {
200 let n: i64 = tx
201 .query_one(&format!("SELECT count(*) FROM {table}"), &[])
202 .await?
203 .get(0);
204 if n > 0 {
205 panic!("postgres: {table} table was not empty and `reset` not requested");
206 }
207 }
208 }
209 log::trace!("tables clean: {:?}", t_step.elapsed());
210
211 let t_step = Instant::now();
212 tx.execute("ALTER TABLE operations SET UNLOGGED", &[])
213 .await?;
214 tx.execute("ALTER TABLE dids SET UNLOGGED", &[]).await?;
215 log::trace!("set tables unlogged: {:?}", t_step.elapsed());
216
217 let t_step = Instant::now();
218 tx.execute(r#"DROP INDEX "operations_createdAt_index""#, &[])
219 .await?;
220 tx.execute("DROP INDEX operations_did_createdat_idx", &[])
221 .await?;
222 log::trace!("indexes dropped: {:?}", t_step.elapsed());
223
224 let t_step = Instant::now();
225 log::trace!("starting binary COPY IN...");
226 let types = &[
227 Type::TEXT,
228 Type::JSONB,
229 Type::TEXT,
230 Type::BOOL,
231 Type::TIMESTAMPTZ,
232 ];
233 let sync = tx
234 .copy_in(
235 r#"COPY operations (did, operation, cid, nullified, "createdAt") FROM STDIN BINARY"#,
236 )
237 .await?;
238 let mut writer = pin!(BinaryCopyInWriter::new(sync, types));
239 let mut last_at = None;
240 while let Some(page) = pages.recv().await {
241 for op in &page.ops {
242 writer
243 .as_mut()
244 .write(&[
245 &op.did,
246 &Json(op.operation.clone()),
247 &op.cid,
248 &op.nullified,
249 &op.created_at,
250 ])
251 .await?;
252 }
253 if notify_last_at.is_some()
254 && let Some(s) = PageBoundaryState::new(&page)
255 {
256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
257 }
258 }
259 log::debug!("finished receiving bulk pages");
260
261 if let Some(notify) = notify_last_at {
262 log::trace!("notifying last_at: {last_at:?}");
263 if notify.send(last_at).is_err() {
264 log::error!("receiver for last_at dropped, can't notify");
265 };
266 }
267
268 let n = writer.as_mut().finish().await?;
269 log::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed());
270
271 // CAUTION: these indexes MUST match up exactly with the kysely ones we dropped
272 let t_step = Instant::now();
273 tx.execute(
274 r#"CREATE INDEX operations_did_createdat_idx ON operations (did, "createdAt")"#,
275 &[],
276 )
277 .await?;
278 tx.execute(
279 r#"CREATE INDEX "operations_createdAt_index" ON operations ("createdAt")"#,
280 &[],
281 )
282 .await?;
283 log::trace!("indexes recreated: {:?}", t_step.elapsed());
284
285 let t_step = Instant::now();
286 let n = tx
287 .execute(
288 r#"INSERT INTO dids SELECT distinct did FROM operations"#,
289 &[],
290 )
291 .await?;
292 log::trace!("INSERT wrote {n} dids: {:?}", t_step.elapsed());
293
294 let t_step = Instant::now();
295 tx.execute("ALTER TABLE dids SET LOGGED", &[]).await?;
296 tx.execute("ALTER TABLE operations SET LOGGED", &[]).await?;
297 log::trace!("set tables LOGGED: {:?}", t_step.elapsed());
298
299 tx.commit().await?;
300 drop(task);
301 log::info!("total backfill time: {:?}", t0.elapsed());
302
303 Ok("backfill_to_pg")
304}