tangled
alpha
login
or
join now
ptr.pet
/
hydrant
26
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
26
fork
atom
overview
issues
6
pulls
pipelines
[ingest] refactor worker errors to IngestError
ptr.pet
3 weeks ago
8c239314
28795a0c
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+34
-55
1 changed file
expand all
collapse all
unified
split
src
ingest
worker.rs
+34
-55
src/ingest/worker.rs
···
19
use std::collections::{HashMap, HashSet, hash_map::DefaultHasher};
20
use std::hash::{Hash, Hasher};
21
use std::sync::Arc;
0
22
use tokio::sync::mpsc;
23
use tracing::{debug, error, info, trace, warn};
24
25
-
#[derive(Debug)]
26
-
struct KeyFetchError(miette::Report);
0
0
27
28
-
impl std::fmt::Display for KeyFetchError {
29
-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30
-
write!(f, "{}", self.0)
31
-
}
32
-
}
0
33
34
-
impl std::error::Error for KeyFetchError {
35
-
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
36
-
self.0.source()
37
-
}
38
}
39
40
-
impl Diagnostic for KeyFetchError {
41
-
fn code<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> {
42
-
self.0.code()
43
-
}
44
-
45
-
fn help<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> {
46
-
self.0.help()
47
-
}
48
-
49
-
fn labels(&self) -> Option<Box<dyn Iterator<Item = miette::LabeledSpan> + '_>> {
50
-
self.0.labels()
51
-
}
52
-
53
-
fn diagnostic_source(&self) -> Option<&dyn Diagnostic> {
54
-
self.0.diagnostic_source()
55
-
}
56
-
57
-
fn related<'a>(&'a self) -> Option<Box<dyn Iterator<Item = &'a dyn Diagnostic> + 'a>> {
58
-
self.0.related()
59
-
}
60
-
61
-
fn source_code(&self) -> Option<&dyn miette::SourceCode> {
62
-
self.0.source_code()
63
-
}
64
-
65
-
fn severity(&self) -> Option<miette::Severity> {
66
-
self.0.severity()
67
-
}
68
-
69
-
fn url<'a>(&'a self) -> Option<Box<dyn std::fmt::Display + 'a>> {
70
-
self.0.url()
71
}
72
}
73
···
280
}
281
Ok(RepoProcessResult::Syncing(None)) => {}
282
Err(e) => {
0
0
0
283
error!("error processing message for {did}: {e}");
284
-
db::check_poisoned_report(&e);
285
if Self::check_if_retriable_failure(&e) {
286
if let SubscribeReposMessage::Commit(commit) = &msg {
287
if let Err(e) =
···
322
323
// dont retry commit or sync on key fetch errors
324
// since we'll just try again later if we get commit or sync again
325
-
fn check_if_retriable_failure(e: &miette::Report) -> bool {
326
-
e.downcast_ref::<KeyFetchError>().is_none()
327
-
&& e.downcast_ref::<CommitError>().is_none()
328
-
&& e.downcast_ref::<NoSigningKeyError>().is_none()
0
329
}
330
331
fn process_message<'s, 'c>(
332
ctx: &mut WorkerContext,
333
msg: &'c SubscribeReposMessage<'static>,
334
did: &Did,
335
-
) -> Result<RepoProcessResult<'s, 'c>> {
336
let check_repo_res = Self::check_repo_state(ctx, did, msg)?;
337
let mut repo_state = match check_repo_res {
338
RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => {
···
486
did: &Did,
487
repo_state: RepoState<'s>,
488
commit: &'c Commit<'c>,
489
-
) -> Result<RepoProcessResult<'ns, 'c>> {
490
// check for replayed events (already seen revision)
491
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
492
debug!(
···
561
ctx: &mut WorkerContext,
562
did: &Did<'_>,
563
msg: &'c SubscribeReposMessage<'static>,
564
-
) -> Result<RepoProcessResult<'s, 'c>> {
565
// check if we have this repo
566
if let Some(state) = ctx.repo_cache.get(did) {
567
return Ok(RepoProcessResult::Ok(state.clone()));
···
640
ctx: &mut WorkerContext,
641
did: &Did,
642
mut repo_state: RepoState<'s>,
643
-
) -> Result<RepoProcessResult<'s, 'static>> {
644
let prefix = keys::resync_buffer_prefix(did);
645
646
for guard in ctx.state.db.resync_buffer.prefix(&prefix) {
···
675
Ok(RepoProcessResult::Ok(repo_state))
676
}
677
678
-
fn fetch_key(ctx: &WorkerContext, did: &Did) -> Result<Option<PublicKey<'static>>> {
0
0
0
679
if ctx.verify_signatures {
680
let key = ctx
681
.handle
682
.block_on(ctx.state.resolver.resolve_signing_key(did))
683
-
.map_err(|e| {
684
-
KeyFetchError(miette::miette!("failed to get pubkey for {did}: {e}"))
685
-
})?;
686
Ok(Some(key))
687
} else {
688
Ok(None)
···
19
use std::collections::{HashMap, HashSet, hash_map::DefaultHasher};
20
use std::hash::{Hash, Hasher};
21
use std::sync::Arc;
22
+
use thiserror::Error;
23
use tokio::sync::mpsc;
24
use tracing::{debug, error, info, trace, warn};
25
26
+
#[derive(Debug, Diagnostic, Error)]
27
+
enum IngestError {
28
+
#[error("{0}")]
29
+
Generic(miette::Report),
30
31
+
#[error("key fetch failed: {0}")]
32
+
KeyFetch(smol_str::SmolStr),
33
+
34
+
#[error(transparent)]
35
+
#[diagnostic(transparent)]
36
+
Commit(#[from] CommitError),
37
38
+
#[error(transparent)]
39
+
#[diagnostic(transparent)]
40
+
NoSigningKey(#[from] NoSigningKeyError),
0
41
}
42
43
+
impl From<miette::Report> for IngestError {
44
+
fn from(report: miette::Report) -> Self {
45
+
IngestError::Generic(report)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
46
}
47
}
48
···
255
}
256
Ok(RepoProcessResult::Syncing(None)) => {}
257
Err(e) => {
258
+
if let IngestError::Generic(e) = &e {
259
+
db::check_poisoned_report(e);
260
+
}
261
error!("error processing message for {did}: {e}");
0
262
if Self::check_if_retriable_failure(&e) {
263
if let SubscribeReposMessage::Commit(commit) = &msg {
264
if let Err(e) =
···
299
300
// dont retry commit or sync on key fetch errors
301
// since we'll just try again later if we get commit or sync again
302
+
fn check_if_retriable_failure(e: &IngestError) -> bool {
303
+
!matches!(
304
+
e,
305
+
IngestError::KeyFetch(_) | IngestError::Commit(_) | IngestError::NoSigningKey(_)
306
+
)
307
}
308
309
fn process_message<'s, 'c>(
310
ctx: &mut WorkerContext,
311
msg: &'c SubscribeReposMessage<'static>,
312
did: &Did,
313
+
) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
314
let check_repo_res = Self::check_repo_state(ctx, did, msg)?;
315
let mut repo_state = match check_repo_res {
316
RepoProcessResult::Syncing(_) | RepoProcessResult::Deleted => {
···
464
did: &Did,
465
repo_state: RepoState<'s>,
466
commit: &'c Commit<'c>,
467
+
) -> Result<RepoProcessResult<'ns, 'c>, IngestError> {
468
// check for replayed events (already seen revision)
469
if matches!(repo_state.rev, Some(ref rev) if commit.rev.as_str() <= rev.to_tid().as_str()) {
470
debug!(
···
539
ctx: &mut WorkerContext,
540
did: &Did<'_>,
541
msg: &'c SubscribeReposMessage<'static>,
542
+
) -> Result<RepoProcessResult<'s, 'c>, IngestError> {
543
// check if we have this repo
544
if let Some(state) = ctx.repo_cache.get(did) {
545
return Ok(RepoProcessResult::Ok(state.clone()));
···
618
ctx: &mut WorkerContext,
619
did: &Did,
620
mut repo_state: RepoState<'s>,
621
+
) -> Result<RepoProcessResult<'s, 'static>, IngestError> {
622
let prefix = keys::resync_buffer_prefix(did);
623
624
for guard in ctx.state.db.resync_buffer.prefix(&prefix) {
···
653
Ok(RepoProcessResult::Ok(repo_state))
654
}
655
656
+
fn fetch_key(
657
+
ctx: &WorkerContext,
658
+
did: &Did,
659
+
) -> Result<Option<PublicKey<'static>>, IngestError> {
660
if ctx.verify_signatures {
661
let key = ctx
662
.handle
663
.block_on(ctx.state.resolver.resolve_signing_key(did))
664
+
.map_err(|e| IngestError::KeyFetch(e.to_smolstr()))?;
0
0
665
Ok(Some(key))
666
} else {
667
Ok(None)