tangled
alpha
login
or
join now
microcosm.blue
/
Allegedly
52
fork
atom
Server tools to backfill, tail, mirror, and verify PLC logs
52
fork
atom
overview
issues
4
pulls
1
pipelines
keep track of pg connection tasks
bad-example.com
5 months ago
fbf7f9b4
d44a5513
+41
-59
1 changed file
expand all
collapse all
unified
split
src
plc_pg.rs
+41
-59
src/plc_pg.rs
···
4
4
use std::path::PathBuf;
5
5
use std::pin::pin;
6
6
use std::time::Instant;
7
7
-
use tokio::sync::{mpsc, oneshot};
7
7
+
use tokio::{
8
8
+
task::{spawn, JoinHandle},
9
9
+
sync::{mpsc, oneshot},
10
10
+
};
8
11
use tokio_postgres::{
9
12
Client, Error as PgError, NoTls,
10
13
binary_copy::BinaryCopyInWriter,
11
14
connect,
15
15
+
Socket,
12
16
types::{Json, Type},
17
17
+
tls::MakeTlsConnect
13
18
};
14
19
15
15
-
fn get_tls(cert: PathBuf) -> MakeTlsConnector {
16
16
-
let cert = std::fs::read(cert).expect("to read cert file");
17
17
-
let cert = Certificate::from_pem(&cert).expect("to build cert");
18
18
-
let connector = TlsConnector::builder()
19
19
-
.add_root_certificate(cert)
20
20
-
.build()
21
21
-
.expect("to build tls connector");
22
22
-
MakeTlsConnector::new(connector)
20
20
+
fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> {
21
21
+
let cert = std::fs::read(cert)?;
22
22
+
let cert = Certificate::from_pem(&cert)?;
23
23
+
let connector = TlsConnector::builder().add_root_certificate(cert).build()?;
24
24
+
Ok(MakeTlsConnector::new(connector))
25
25
+
}
26
26
+
27
27
+
async fn get_client_and_task<T>(
28
28
+
uri: &str,
29
29
+
connector: T,
30
30
+
) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError>
31
31
+
where
32
32
+
T: MakeTlsConnect<Socket>,
33
33
+
<T as MakeTlsConnect<Socket>>::Stream: Send + 'static,
34
34
+
{
35
35
+
let (client, connection) = connect(uri, connector).await?;
36
36
+
Ok((client, spawn(connection)))
23
37
}
24
38
25
39
/// a little tokio-postgres helper
26
40
///
27
41
/// it's clone for easiness. it doesn't share any resources underneath after
28
28
-
/// cloning at all so it's not meant for
42
42
+
/// cloning *at all* so it's not meant for eg. handling public web requests
29
43
#[derive(Clone)]
30
44
pub struct Db {
31
45
pg_uri: String,
···
38
52
// it's what we expect: check for db migrations.
39
53
log::trace!("checking migrations...");
40
54
41
41
-
let connector = cert.map(get_tls);
55
55
+
let connector = cert.map(get_tls).transpose()?;
42
56
43
43
-
let (client, connection_task) = if let Some(ref connector) = connector {
44
44
-
let (client, connection) = connect(pg_uri, connector.clone()).await?;
45
45
-
let task = tokio::task::spawn(async move {
46
46
-
connection
47
47
-
.await
48
48
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
49
49
-
.expect("pg validation connection not to blow up");
50
50
-
});
51
51
-
(client, task)
57
57
+
let (client, conn_task) = if let Some(ref connector) = connector {
58
58
+
get_client_and_task(pg_uri, connector.clone()).await?
52
59
} else {
53
53
-
let (client, connection) = connect(pg_uri, NoTls).await?;
54
54
-
let task = tokio::task::spawn(async move {
55
55
-
connection
56
56
-
.await
57
57
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
58
58
-
.expect("pg validation connection not to blow up");
59
59
-
});
60
60
-
(client, task)
60
60
+
get_client_and_task(pg_uri, NoTls).await?
61
61
};
62
62
63
63
let migrations: Vec<String> = client
···
77
77
);
78
78
drop(client);
79
79
// make sure the connection worker thing doesn't linger
80
80
-
connection_task.await?;
80
80
+
conn_task.await??;
81
81
log::info!("db connection succeeded and plc migrations appear as expected");
82
82
83
83
Ok(Self {
···
86
86
})
87
87
}
88
88
89
89
-
pub async fn connect(&self) -> Result<Client, PgError> {
89
89
+
#[must_use]
90
90
+
pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> {
90
91
log::trace!("connecting postgres...");
91
91
-
let client = if let Some(ref connector) = self.cert {
92
92
-
let (client, connection) = connect(&self.pg_uri, connector.clone()).await?;
93
93
-
94
94
-
// send the connection away to do the actual communication work
95
95
-
// apparently the connection will complete when the client drops
96
96
-
tokio::task::spawn(async move {
97
97
-
connection
98
98
-
.await
99
99
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
100
100
-
.expect("pg connection not to blow up");
101
101
-
});
102
102
-
client
92
92
+
if let Some(ref connector) = self.cert {
93
93
+
get_client_and_task(&self.pg_uri, connector.clone()).await
103
94
} else {
104
104
-
let (client, connection) = connect(&self.pg_uri, NoTls).await?;
105
105
-
106
106
-
// send the connection away to do the actual communication work
107
107
-
// apparently the connection will complete when the client drops
108
108
-
tokio::task::spawn(async move {
109
109
-
connection
110
110
-
.await
111
111
-
.inspect_err(|e| log::error!("connection ended with error: {e}"))
112
112
-
.expect("pg connection not to blow up");
113
113
-
});
114
114
-
client
115
115
-
};
116
116
-
117
117
-
Ok(client)
95
95
+
get_client_and_task(&self.pg_uri, NoTls).await
96
96
+
}
118
97
}
119
98
120
99
pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> {
121
121
-
let client = self.connect().await?;
100
100
+
let (client, task) = self.connect().await?;
122
101
let dt: Option<Dt> = client
123
102
.query_opt(
124
103
r#"SELECT "createdAt"
···
129
108
)
130
109
.await?
131
110
.map(|row| row.get(0));
111
111
+
drop(task);
132
112
Ok(dt)
133
113
}
134
114
}
···
137
117
db: Db,
138
118
mut pages: mpsc::Receiver<ExportPage>,
139
119
) -> anyhow::Result<&'static str> {
140
140
-
let mut client = db.connect().await?;
120
120
+
let (mut client, task) = db.connect().await?;
141
121
142
122
let ops_stmt = client
143
123
.prepare(
···
174
154
}
175
155
tx.commit().await?;
176
156
}
157
157
+
drop(task);
177
158
178
159
log::info!(
179
160
"no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
···
201
182
mut pages: mpsc::Receiver<ExportPage>,
202
183
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
203
184
) -> anyhow::Result<&'static str> {
204
204
-
let mut client = db.connect().await?;
185
185
+
let (mut client, task) = db.connect().await?;
205
186
206
187
let t0 = Instant::now();
207
188
let tx = client.transaction().await?;
···
316
297
log::trace!("set tables LOGGED: {:?}", t_step.elapsed());
317
298
318
299
tx.commit().await?;
300
300
+
drop(task);
319
301
log::info!("total backfill time: {:?}", t0.elapsed());
320
302
321
303
Ok("backfill_to_pg")