tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[ingest] refactor internal message types and config cleanup
ptr.pet
3 weeks ago
2864d674
114f7891
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+12
-10
3 changed files
expand all
collapse all
unified
split
src
config.rs
ingest
mod.rs
worker.rs
+6
-2
src/config.rs
···
104
.unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
105
106
let full_network: bool = cfg!("FULL_NETWORK", false);
107
-
let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
108
let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
109
let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
110
···
119
let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
120
let disable_firehose = cfg!("DISABLE_FIREHOSE", false);
121
let disable_backfill = cfg!("DISABLE_BACKFILL", false);
122
-
let firehose_workers = cfg!("FIREHOSE_WORKERS", 32usize);
0
0
0
0
0
123
124
let (
125
default_db_worker_threads,
···
104
.unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?;
105
106
let full_network: bool = cfg!("FULL_NETWORK", false);
0
107
let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 5, sec);
108
let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
109
···
118
let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
119
let disable_firehose = cfg!("DISABLE_FIREHOSE", false);
120
let disable_backfill = cfg!("DISABLE_BACKFILL", false);
121
+
122
+
let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize);
123
+
let firehose_workers = cfg!(
124
+
"FIREHOSE_WORKERS",
125
+
full_network.then_some(32usize).unwrap_or(8usize)
126
+
);
127
128
let (
129
default_db_worker_threads,
+2
-4
src/ingest/mod.rs
···
12
BackfillFinished(Did<'static>),
13
}
14
15
-
pub type BufferedMessage = IngestMessage;
16
-
17
-
pub type BufferTx = mpsc::UnboundedSender<BufferedMessage>;
18
#[allow(dead_code)]
19
-
pub type BufferRx = mpsc::UnboundedReceiver<BufferedMessage>;
···
12
BackfillFinished(Did<'static>),
13
}
14
15
+
pub type BufferTx = mpsc::UnboundedSender<IngestMessage>;
0
0
16
#[allow(dead_code)]
17
+
pub type BufferRx = mpsc::UnboundedReceiver<IngestMessage>;
+4
-4
src/ingest/worker.rs
···
1
use crate::db::{self, keys};
2
-
use crate::ingest::{BufferedMessage, IngestMessage};
3
use crate::ops;
4
use crate::resolver::{NoSigningKeyError, ResolverError};
5
use crate::state::AppState;
···
56
57
pub struct FirehoseWorker {
58
state: Arc<AppState>,
59
-
rx: mpsc::UnboundedReceiver<BufferedMessage>,
60
verify_signatures: bool,
61
num_shards: usize,
62
}
···
75
impl FirehoseWorker {
76
pub fn new(
77
state: Arc<AppState>,
78
-
rx: mpsc::UnboundedReceiver<BufferedMessage>,
79
verify_signatures: bool,
80
num_shards: usize,
81
) -> Self {
···
148
// enters the tokio runtime only when necessary (key resolution)
149
fn worker_thread(
150
id: usize,
151
-
mut rx: mpsc::UnboundedReceiver<BufferedMessage>,
152
state: Arc<AppState>,
153
verify_signatures: bool,
154
handle: tokio::runtime::Handle,
···
1
use crate::db::{self, keys};
2
+
use crate::ingest::{BufferRx, IngestMessage};
3
use crate::ops;
4
use crate::resolver::{NoSigningKeyError, ResolverError};
5
use crate::state::AppState;
···
56
57
pub struct FirehoseWorker {
58
state: Arc<AppState>,
59
+
rx: BufferRx,
60
verify_signatures: bool,
61
num_shards: usize,
62
}
···
75
impl FirehoseWorker {
76
pub fn new(
77
state: Arc<AppState>,
78
+
rx: BufferRx,
79
verify_signatures: bool,
80
num_shards: usize,
81
) -> Self {
···
148
// enters the tokio runtime only when necessary (key resolution)
149
fn worker_thread(
150
id: usize,
151
+
mut rx: mpsc::UnboundedReceiver<IngestMessage>,
152
state: Arc<AppState>,
153
verify_signatures: bool,
154
handle: tokio::runtime::Handle,