tangled
alpha
login
or
join now
microcosm.blue
/
Allegedly
51
fork
atom
Server tools to backfill, tail, mirror, and verify PLC logs
51
fork
atom
overview
issues
4
pulls
1
pipelines
task debugs
bad-example.com
5 months ago
d44a5513
78557d62
+31
-17
5 changed files
expand all
collapse all
unified
split
src
backfill.rs
bin
backfill.rs
lib.rs
plc_pg.rs
poll.rs
+7
-3
src/backfill.rs
···
13
dest: mpsc::Sender<ExportPage>,
14
source_workers: usize,
15
until: Option<Dt>,
16
-
) -> anyhow::Result<()> {
17
// queue up the week bundles that should be available
18
let weeks = Arc::new(Mutex::new(
19
until
···
55
res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56
.inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
57
}
58
-
log::info!("finished fetching backfill in {:?}", t_step.elapsed());
59
-
Ok(())
0
0
0
0
60
}
···
13
dest: mpsc::Sender<ExportPage>,
14
source_workers: usize,
15
until: Option<Dt>,
16
+
) -> anyhow::Result<&'static str> {
17
// queue up the week bundles that should be available
18
let weeks = Arc::new(Mutex::new(
19
until
···
55
res.inspect_err(|e| log::error!("problem joining source workers: {e}"))?
56
.inspect_err(|e| log::error!("problem *from* source worker: {e}"))?;
57
}
58
+
log::info!(
59
+
"finished fetching backfill in {:?}. senders remaining: {}",
60
+
t_step.elapsed(),
61
+
dest.strong_count()
62
+
);
63
+
Ok("backfill")
64
}
+10
-4
src/bin/backfill.rs
···
66
catch_up,
67
}: Args,
68
) -> anyhow::Result<()> {
69
-
let mut tasks = JoinSet::new();
70
71
let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
72
···
147
bulk_out,
148
found_last_tx,
149
));
150
-
tasks.spawn(pages_to_pg(db, full_out));
0
0
151
} else {
152
tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
153
-
tasks.spawn(pages_to_stdout(full_out, None));
0
0
154
}
155
}
156
···
168
log::error!("a joinset task completed with error: {e}");
169
return Err(e);
170
}
171
-
_ => {}
0
0
172
}
173
}
174
···
66
catch_up,
67
}: Args,
68
) -> anyhow::Result<()> {
69
+
let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new();
70
71
let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages
72
···
147
bulk_out,
148
found_last_tx,
149
));
150
+
if catch_up {
151
+
tasks.spawn(pages_to_pg(db, full_out));
152
+
}
153
} else {
154
tasks.spawn(pages_to_stdout(bulk_out, found_last_tx));
155
+
if catch_up {
156
+
tasks.spawn(pages_to_stdout(full_out, None));
157
+
}
158
}
159
}
160
···
172
log::error!("a joinset task completed with error: {e}");
173
return Err(e);
174
}
175
+
Ok(Ok(name)) => {
176
+
log::trace!("a task completed: {name:?}. {} left", tasks.len());
177
+
}
178
}
179
}
180
+5
-5
src/lib.rs
···
83
pub async fn full_pages(
84
mut rx: mpsc::Receiver<ExportPage>,
85
tx: mpsc::Sender<ExportPage>,
86
-
) -> anyhow::Result<()> {
87
while let Some(page) = rx.recv().await {
88
let n = page.ops.len();
89
if n < 900 {
90
let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
91
let Some(age) = last_age else {
92
log::info!("full_pages done, empty final page");
93
-
return Ok(());
94
};
95
if age <= chrono::TimeDelta::hours(6) {
96
log::info!("full_pages done, final page of {n} ops");
97
} else {
98
log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
99
}
100
-
return Ok(());
101
}
102
log::trace!("full_pages: continuing with page of {n} ops");
103
tx.send(page).await?;
···
110
pub async fn pages_to_stdout(
111
mut rx: mpsc::Receiver<ExportPage>,
112
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
113
-
) -> anyhow::Result<()> {
114
let mut last_at = None;
115
while let Some(page) = rx.recv().await {
116
for op in &page.ops {
···
128
log::error!("receiver for last_at dropped, can't notify");
129
};
130
}
131
-
Ok(())
132
}
133
134
pub fn logo(name: &str) -> String {
···
83
pub async fn full_pages(
84
mut rx: mpsc::Receiver<ExportPage>,
85
tx: mpsc::Sender<ExportPage>,
86
+
) -> anyhow::Result<&'static str> {
87
while let Some(page) = rx.recv().await {
88
let n = page.ops.len();
89
if n < 900 {
90
let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
91
let Some(age) = last_age else {
92
log::info!("full_pages done, empty final page");
93
+
return Ok("full pages (hmm)");
94
};
95
if age <= chrono::TimeDelta::hours(6) {
96
log::info!("full_pages done, final page of {n} ops");
97
} else {
98
log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
99
}
100
+
return Ok("full pages (cool)");
101
}
102
log::trace!("full_pages: continuing with page of {n} ops");
103
tx.send(page).await?;
···
110
pub async fn pages_to_stdout(
111
mut rx: mpsc::Receiver<ExportPage>,
112
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
113
+
) -> anyhow::Result<&'static str> {
114
let mut last_at = None;
115
while let Some(page) = rx.recv().await {
116
for op in &page.ops {
···
128
log::error!("receiver for last_at dropped, can't notify");
129
};
130
}
131
+
Ok("pages_to_stdout")
132
}
133
134
pub fn logo(name: &str) -> String {
+8
-4
src/plc_pg.rs
···
133
}
134
}
135
136
-
pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> anyhow::Result<()> {
0
0
0
137
let mut client = db.connect().await?;
138
139
let ops_stmt = client
···
176
"no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
177
t0.elapsed()
178
);
179
-
Ok(())
180
}
181
182
/// Dump rows into an empty operations table quickly
···
197
reset: bool,
198
mut pages: mpsc::Receiver<ExportPage>,
199
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
200
-
) -> anyhow::Result<()> {
201
let mut client = db.connect().await?;
202
203
let t0 = Instant::now();
···
272
last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
273
}
274
}
0
275
276
if let Some(notify) = notify_last_at {
277
log::trace!("notifying last_at: {last_at:?}");
···
314
tx.commit().await?;
315
log::info!("total backfill time: {:?}", t0.elapsed());
316
317
-
Ok(())
318
}
···
133
}
134
}
135
136
+
pub async fn pages_to_pg(
137
+
db: Db,
138
+
mut pages: mpsc::Receiver<ExportPage>,
139
+
) -> anyhow::Result<&'static str> {
140
let mut client = db.connect().await?;
141
142
let ops_stmt = client
···
179
"no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}",
180
t0.elapsed()
181
);
182
+
Ok("pages_to_pg")
183
}
184
185
/// Dump rows into an empty operations table quickly
···
200
reset: bool,
201
mut pages: mpsc::Receiver<ExportPage>,
202
notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
203
+
) -> anyhow::Result<&'static str> {
204
let mut client = db.connect().await?;
205
206
let t0 = Instant::now();
···
275
last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
276
}
277
}
278
+
log::debug!("finished receiving bulk pages");
279
280
if let Some(notify) = notify_last_at {
281
log::trace!("notifying last_at: {last_at:?}");
···
318
tx.commit().await?;
319
log::info!("total backfill time: {:?}", t0.elapsed());
320
321
+
Ok("backfill_to_pg")
322
}
+1
-1
src/poll.rs
···
155
after: Option<Dt>,
156
base: Url,
157
dest: mpsc::Sender<ExportPage>,
158
-
) -> anyhow::Result<()> {
159
let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
160
let mut prev_last: Option<LastOp> = after.map(Into::into);
161
let mut boundary_state: Option<PageBoundaryState> = None;
···
155
after: Option<Dt>,
156
base: Url,
157
dest: mpsc::Sender<ExportPage>,
158
+
) -> anyhow::Result<&'static str> {
159
let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
160
let mut prev_last: Option<LastOp> = after.map(Into::into);
161
let mut boundary_state: Option<PageBoundaryState> = None;