forked from
ptr.pet/hydrant
kind of like tap but different and in rust
1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use fjall::Keyspace;
5use miette::{IntoDiagnostic, Result};
6use serde::{Deserialize, Serialize};
7use smol_str::SmolStr;
8
9pub const MODE_KEY: &[u8] = b"m";
10pub const DID_PREFIX: u8 = b'd';
11pub const SIGNAL_PREFIX: u8 = b's';
12pub const COLLECTION_PREFIX: u8 = b'c';
13pub const EXCLUDE_PREFIX: u8 = b'x';
14pub const SEP: u8 = b'|';
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum FilterMode {
19 Dids = 0,
20 Signal = 1,
21 Full = 2,
22}
23
24/// hot-path in-memory config: only the small fields needed on every event.
25/// dids and excludes are large sets kept in the filter keyspace only.
26#[derive(Debug, Clone, Serialize)]
27pub struct FilterConfig {
28 pub mode: FilterMode,
29 pub signals: Vec<SmolStr>,
30 pub collections: Vec<SmolStr>,
31}
32
33impl FilterConfig {
34 pub fn new(mode: FilterMode) -> Self {
35 Self {
36 mode,
37 signals: Vec::new(),
38 collections: Vec::new(),
39 }
40 }
41
42 pub fn load(ks: &Keyspace) -> Result<Self> {
43 let mode = ks
44 .get(MODE_KEY)
45 .into_diagnostic()?
46 .map(|v| rmp_serde::from_slice(&v).into_diagnostic())
47 .transpose()?
48 .unwrap_or(FilterMode::Dids);
49
50 let mut config = Self::new(mode);
51
52 let signal_prefix = [SIGNAL_PREFIX, SEP];
53 for guard in ks.prefix(signal_prefix) {
54 let (k, _) = guard.into_inner().into_diagnostic()?;
55 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
56 config.signals.push(SmolStr::new(val));
57 }
58
59 let col_prefix = [COLLECTION_PREFIX, SEP];
60 for guard in ks.prefix(col_prefix) {
61 let (k, _) = guard.into_inner().into_diagnostic()?;
62 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
63 config.collections.push(SmolStr::new(val));
64 }
65
66 Ok(config)
67 }
68
69 /// returns true if the collection matches the content filter.
70 /// if collections is empty, all collections match.
71 pub fn matches_collection(&self, collection: &str) -> bool {
72 if self.collections.is_empty() {
73 return true;
74 }
75 self.collections.iter().any(|p| nsid_matches(p, collection))
76 }
77
78 /// returns true if the commit touches a collection covered by a signal.
79 pub fn matches_signal(&self, collection: &str) -> bool {
80 self.signals.iter().any(|p| nsid_matches(p, collection))
81 }
82}
83
84fn nsid_matches(pattern: &str, collection: &str) -> bool {
85 if let Some(prefix) = pattern.strip_suffix(".*") {
86 collection == prefix || collection.starts_with(prefix)
87 } else {
88 collection == pattern
89 }
90}
91
92pub type FilterHandle = Arc<ArcSwap<FilterConfig>>;
93
94pub fn new_handle(config: FilterConfig) -> FilterHandle {
95 Arc::new(ArcSwap::new(Arc::new(config)))
96}
97
98/// apply a bool patch or set replacement for a single set update.
99#[derive(Debug, Deserialize)]
100#[serde(untagged)]
101pub enum SetUpdate {
102 /// replace the entire set with this list
103 Set(Vec<String>),
104 /// patch: true = add, false = remove
105 Patch(std::collections::HashMap<String, bool>),
106}
107
108pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> {
109 let mut key = Vec::with_capacity(2 + val.len());
110 key.push(prefix);
111 key.push(SEP);
112 key.extend_from_slice(val.as_bytes());
113 key
114}