tangled
alpha
login
or
join now
ptr.pet
/
hydrant
29
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
29
fork
atom
overview
issues
6
pulls
pipelines
[db,appwide] add poisoning checks on errors and crash early
ptr.pet
1 month ago
f3035d76
9eb1bd6b
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+40
-7
5 changed files
expand all
collapse all
unified
split
src
backfill
manager.rs
mod.rs
db
mod.rs
ingest
mod.rs
main.rs
+3
src/backfill/manager.rs
···
66
.into_diagnostic()
67
.unwrap_or_else(|e| {
68
warn!("failed to scan errors: {e}");
0
69
Ok(Vec::new())
70
})
71
.unwrap_or_else(|e| {
72
warn!("failed to scan errors: {e}");
0
73
Vec::new()
74
});
75
···
86
// move back to pending
87
if let Err(e) = Db::insert(db.pending.clone(), key, Vec::new()).await {
88
warn!("failed to move {did} to pending: {e}");
0
89
continue;
90
}
91
···
66
.into_diagnostic()
67
.unwrap_or_else(|e| {
68
warn!("failed to scan errors: {e}");
69
+
Db::check_poisoned_report(&e);
70
Ok(Vec::new())
71
})
72
.unwrap_or_else(|e| {
73
warn!("failed to scan errors: {e}");
74
+
Db::check_poisoned_report(&e);
75
Vec::new()
76
});
77
···
88
// move back to pending
89
if let Err(e) = Db::insert(db.pending.clone(), key, Vec::new()).await {
90
warn!("failed to move {did} to pending: {e}");
91
+
Db::check_poisoned_report(&e);
92
continue;
93
}
94
+5
-1
src/backfill/mod.rs
···
66
did.clone(),
67
permit,
68
)
69
-
.inspect_err(move |e| error!("backfill process failed for {did}: {e}")),
0
0
0
70
);
71
}
72
}
···
405
406
if let Err(e) = ops::apply_commit(&state.db, &commit, true) {
407
error!("failed to apply buffered commit for {did}: {e}");
0
408
}
409
410
// delete from buffer
···
66
did.clone(),
67
permit,
68
)
69
+
.inspect_err(move |e| {
70
+
error!("backfill process failed for {did}: {e}");
71
+
Db::check_poisoned_report(e);
72
+
}),
73
);
74
}
75
}
···
408
409
if let Err(e) = ops::apply_commit(&state.db, &commit, true) {
410
error!("failed to apply buffered commit for {did}: {e}");
411
+
Db::check_poisoned_report(&e);
412
}
413
414
// delete from buffer
+16
-3
src/db/mod.rs
···
9
10
use std::sync::atomic::AtomicU64;
11
use tokio::sync::broadcast;
0
12
13
#[derive(Clone)]
14
pub struct Db {
···
98
}
99
100
pub fn persist(&self) -> Result<()> {
101
-
self.inner
102
-
.persist(PersistMode::SyncData)
103
-
.into_diagnostic()?;
104
Ok(())
105
}
106
···
189
.await
190
.into_diagnostic()
191
.flatten()
0
0
0
0
0
0
0
0
0
0
0
0
0
0
192
}
193
}
···
9
10
use std::sync::atomic::AtomicU64;
11
use tokio::sync::broadcast;
12
+
use tracing::error;
13
14
#[derive(Clone)]
15
pub struct Db {
···
99
}
100
101
pub fn persist(&self) -> Result<()> {
102
+
self.inner.persist(PersistMode::SyncAll).into_diagnostic()?;
0
0
103
Ok(())
104
}
105
···
188
.await
189
.into_diagnostic()
190
.flatten()
191
+
}
192
+
193
+
pub fn check_poisoned(e: &fjall::Error) {
194
+
if matches!(e, fjall::Error::Poisoned) {
195
+
error!("!!! DATABASE POISONED !!! exiting");
196
+
std::process::exit(10);
197
+
}
198
+
}
199
+
200
+
pub fn check_poisoned_report(e: &miette::Report) {
201
+
let Some(err) = e.downcast_ref::<fjall::Error>() else {
202
+
return;
203
+
};
204
+
Self::check_poisoned(err);
205
}
206
}
+7
-2
src/ingest/mod.rs
···
57
let res = tokio::task::spawn_blocking(move || batch.commit()).await;
58
match res {
59
Ok(Ok(_)) => {}
60
-
Ok(Err(e)) => error!("failed to persist buffer batch: {}", e),
0
0
0
61
Err(e) => error!("buffer worker join error: {}", e),
62
}
63
}
···
146
147
if let Err(e) = self.process_commit(&commit).await {
148
error!("failed to process commit {}: {e}", commit.seq);
0
149
// buffer for later inspection/retry
150
let _ = self.buffer_event(&commit).await;
151
}
···
241
.into_diagnostic()?;
242
243
if let Err(e) = res {
244
-
error!("failed to apply live commit for {}: {}", did_static, e);
0
245
self.buffer_event(commit).await?;
246
} else {
247
debug!("synced event for {}, {} ops", did_static, commit.ops.len());
···
57
let res = tokio::task::spawn_blocking(move || batch.commit()).await;
58
match res {
59
Ok(Ok(_)) => {}
60
+
Ok(Err(e)) => {
61
+
Db::check_poisoned(&e);
62
+
error!("failed to persist buffer batch: {}", e)
63
+
}
64
Err(e) => error!("buffer worker join error: {}", e),
65
}
66
}
···
149
150
if let Err(e) = self.process_commit(&commit).await {
151
error!("failed to process commit {}: {e}", commit.seq);
152
+
Db::check_poisoned_report(&e);
153
// buffer for later inspection/retry
154
let _ = self.buffer_event(&commit).await;
155
}
···
245
.into_diagnostic()?;
246
247
if let Err(e) = res {
248
+
error!("failed to apply live commit for {did_static}: {e}");
249
+
Db::check_poisoned_report(&e);
250
self.buffer_event(commit).await?;
251
} else {
252
debug!("synced event for {}, {} ops", did_static, commit.ops.len());
+9
-1
src/main.rs
···
56
57
if let Err(e) = crate::backfill::manager::queue_pending_backfills(&state).await {
58
error!("failed to queue pending backfills: {e}");
0
59
}
60
61
tokio::spawn({
···
113
.await
114
{
115
error!("failed to save cursor: {e}");
0
116
}
117
118
let state = state.clone();
···
121
match res {
122
Ok(Err(e)) => {
123
error!("db persist failed: {e}");
0
124
}
125
Err(e) => {
126
error!("persistence task join failed: {e}");
···
139
let crawler = Crawler::new(state, crawler_host);
140
if let Err(e) = crawler.run().await {
141
error!("crawler died: {e}");
0
142
}
143
}
144
});
···
148
149
if let Err(e) = ingestor.run().await {
150
error!("ingestor died: {e}");
0
151
}
152
153
-
state.db.persist()?;
0
0
0
154
155
Ok(())
156
}
···
56
57
if let Err(e) = crate::backfill::manager::queue_pending_backfills(&state).await {
58
error!("failed to queue pending backfills: {e}");
59
+
Db::check_poisoned_report(&e);
60
}
61
62
tokio::spawn({
···
114
.await
115
{
116
error!("failed to save cursor: {e}");
117
+
Db::check_poisoned_report(&e);
118
}
119
120
let state = state.clone();
···
123
match res {
124
Ok(Err(e)) => {
125
error!("db persist failed: {e}");
126
+
Db::check_poisoned_report(&e);
127
}
128
Err(e) => {
129
error!("persistence task join failed: {e}");
···
142
let crawler = Crawler::new(state, crawler_host);
143
if let Err(e) = crawler.run().await {
144
error!("crawler died: {e}");
145
+
Db::check_poisoned_report(&e);
146
}
147
}
148
});
···
152
153
if let Err(e) = ingestor.run().await {
154
error!("ingestor died: {e}");
155
+
Db::check_poisoned_report(&e);
156
}
157
158
+
if let Err(e) = state.db.persist() {
159
+
Db::check_poisoned_report(&e);
160
+
return Err(e);
161
+
}
162
163
Ok(())
164
}