tangled
alpha
login
or
join now
ptr.pet
/
hydrant
28
fork
atom
at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
28
fork
atom
overview
issues
6
pulls
pipelines
[ingest] add configurable firehose worker count
ptr.pet
1 month ago
433d1b09
505a93c8
verified
This commit was signed with the committer's
known signature
.
ptr.pet
SSH Key Fingerprint:
SHA256:Abmvag+juovVufZTxyWY8KcVgrznxvBjQpJesv071Aw=
+7
-1
3 changed files
expand all
collapse all
unified
split
src
config.rs
ingest
worker.rs
main.rs
+4
src/config.rs
···
54
54
pub identity_cache_size: u64,
55
55
pub disable_firehose: bool,
56
56
pub disable_backfill: bool,
57
57
+
pub firehose_workers: usize,
57
58
}
58
59
59
60
impl Config {
···
101
102
let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 100_000u64);
102
103
let disable_firehose = cfg!("DISABLE_FIREHOSE", false);
103
104
let disable_backfill = cfg!("DISABLE_BACKFILL", false);
105
105
+
let firehose_workers = cfg!("FIREHOSE_WORKERS", 64usize);
104
106
105
107
Ok(Self {
106
108
database_path,
···
120
122
identity_cache_size,
121
123
disable_firehose,
122
124
disable_backfill,
125
125
+
firehose_workers,
123
126
})
124
127
}
125
128
}
···
164
167
self.disable_lz4_compression
165
168
)?;
166
169
writeln!(f, " api port: {}", self.api_port)?;
170
170
+
writeln!(f, " firehose workers: {}", self.firehose_workers)?;
167
171
writeln!(f, " enable debug: {}", self.enable_debug)?;
168
172
if self.enable_debug {
169
173
writeln!(f, " debug port: {}", self.debug_port)?;
+2
-1
src/ingest/worker.rs
···
101
101
state: Arc<AppState>,
102
102
rx: mpsc::UnboundedReceiver<BufferedMessage>,
103
103
verify_signatures: bool,
104
104
+
num_shards: usize,
104
105
) -> Self {
105
106
Self {
106
107
state,
107
108
rx,
108
109
verify_signatures,
109
109
-
num_shards: 64,
110
110
+
num_shards,
110
111
}
111
112
}
112
113
+1
src/main.rs
···
143
143
state,
144
144
buffer_rx,
145
145
matches!(cfg.verify_signatures, SignatureVerification::Full),
146
146
+
cfg.firehose_workers,
146
147
)
147
148
.run(handle)
148
149
}