tangled
alpha
login
or
join now
parakeet.at
/
parakeet
63
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
63
fork
atom
overview
issues
12
pulls
pipelines
feat(consumer): simple allowlist for indexer
mia.omg.lol
1 week ago
13893e3f
8f8bf0f0
verified
This commit was signed with the committer's
known signature
.
mia.omg.lol
SSH Key Fingerprint:
SHA256:eb+NhC0QEl+XKRuFP/97oH6LEz0TXTKPXGDIAI5y7CQ=
+36
-10
4 changed files
expand all
collapse all
unified
split
crates
consumer
src
config.rs
firehose
types.rs
indexer
mod.rs
main.rs
+1
crates/consumer/src/config.rs
···
53
53
/// Whether to submit backfill requests for new repos. (Only when history_mode == BackfillHistory).
54
54
#[serde(default)]
55
55
pub request_backfill: bool,
56
56
+
pub allowlist: Option<Vec<String>>,
56
57
}
57
58
58
59
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Deserialize)]
+13
crates/consumer/src/firehose/types.rs
···
34
34
Sync(AtpSyncEvent),
35
35
}
36
36
37
37
+
impl FirehoseEvent {
38
38
+
/// When consuming labels, this returns the labeler's DID.
39
39
+
pub fn did(&self) -> Option<&str> {
40
40
+
match self {
41
41
+
FirehoseEvent::Identity(identity) => Some(&identity.did),
42
42
+
FirehoseEvent::Account(account) => Some(&account.did),
43
43
+
FirehoseEvent::Commit(commit) => Some(&commit.repo),
44
44
+
FirehoseEvent::Label(label) => None,
45
45
+
FirehoseEvent::Sync(sync) => Some(&sync.did),
46
46
+
}
47
47
+
}
48
48
+
}
49
49
+
37
50
#[derive(Debug, Deserialize)]
38
51
pub struct AtpIdentityEvent {
39
52
pub seq: u64,
+15
-10
crates/consumer/src/indexer/mod.rs
···
19
19
use parakeet_index::AggregateType;
20
20
use redis::aio::MultiplexedConnection;
21
21
use redis::AsyncCommands;
22
22
-
use std::collections::HashMap;
22
22
+
use std::collections::{HashMap, HashSet};
23
23
use std::hash::BuildHasher;
24
24
use tokio::sync::mpsc::{channel, Sender};
25
25
use tokio::sync::watch::Receiver as WatchReceiver;
···
32
32
pub history_mode: HistoryMode,
33
33
pub skip_handle_validation: bool,
34
34
pub request_backfill: bool,
35
35
+
pub allowlist: Option<HashSet<String>>,
35
36
}
36
37
37
38
#[derive(Clone)]
···
50
51
firehose: FirehoseConsumer,
51
52
hasher: RandomState,
52
53
resume: sled::Db,
54
54
+
allowlist: Option<HashSet<String>>,
53
55
}
54
56
55
57
impl RelayIndexer {
···
75
77
},
76
78
hasher: RandomState::default(),
77
79
resume,
80
80
+
allowlist: opts.allowlist,
78
81
};
79
82
80
83
if opts.skip_handle_validation {
···
188
191
threads: u64,
189
192
submit: &[Sender<FirehoseEvent>],
190
193
) {
191
191
-
let tgt_idx = match &event {
192
192
-
FirehoseEvent::Identity(identity) => self.hasher.hash_one(&identity.did) % threads,
193
193
-
FirehoseEvent::Account(account) => self.hasher.hash_one(&account.did) % threads,
194
194
-
FirehoseEvent::Commit(commit) => self.hasher.hash_one(&commit.repo) % threads,
195
195
-
FirehoseEvent::Sync(sync) => self.hasher.hash_one(&sync.did) % threads,
196
196
-
FirehoseEvent::Label(_) => {
197
197
-
// We handle all labels through direct connections to labelers
198
198
-
tracing::warn!("got #labels from the relay");
194
194
+
let Some(did) = event.did() else {
195
195
+
tracing::warn!("got #labels from the relay");
196
196
+
return;
197
197
+
};
198
198
+
199
199
+
// enforce the allowlist
200
200
+
if let Some(allowlist) = &self.allowlist {
201
201
+
if !allowlist.contains(did) {
199
202
return;
200
203
}
201
201
-
};
204
204
+
}
205
205
+
206
206
+
let tgt_idx = self.hasher.hash_one(&did) % threads;
202
207
203
208
if let Err(e) = submit[tgt_idx as usize].send(event).await {
204
209
tracing::error!("Error sending event: {e}");
+7
crates/consumer/src/main.rs
···
2
2
use eyre::OptionExt;
3
3
use jacquard_identity::{resolver::ResolverOptions, JacquardResolver};
4
4
use metrics_exporter_prometheus::PrometheusBuilder;
5
5
+
use std::collections::HashSet;
5
6
use tokio::signal::ctrl_c;
6
7
use tokio_postgres::NoTls;
7
8
···
114
115
)
115
116
.await?;
116
117
118
118
+
let allowlist = indexer_cfg.allowlist.map(HashSet::from_iter);
119
119
+
if let Some(l) = &allowlist {
120
120
+
tracing::info!("running with allowlist enabled ({} DIDs)", l.len());
121
121
+
}
122
122
+
117
123
let indexer_opts = indexer::RelayIndexerOpts {
118
124
history_mode: indexer_cfg.history_mode,
119
125
skip_handle_validation: indexer_cfg.skip_handle_validation,
120
126
request_backfill: indexer_cfg.request_backfill,
127
127
+
allowlist,
121
128
};
122
129
123
130
let relay_indexer = indexer::RelayIndexer::new(