kind of like tap but different and in rust
at main 114 lines 3.4 kB view raw
1use std::sync::Arc; 2 3use arc_swap::ArcSwap; 4use fjall::Keyspace; 5use miette::{IntoDiagnostic, Result}; 6use serde::{Deserialize, Serialize}; 7use smol_str::SmolStr; 8 9pub const MODE_KEY: &[u8] = b"m"; 10pub const DID_PREFIX: u8 = b'd'; 11pub const SIGNAL_PREFIX: u8 = b's'; 12pub const COLLECTION_PREFIX: u8 = b'c'; 13pub const EXCLUDE_PREFIX: u8 = b'x'; 14pub const SEP: u8 = b'|'; 15 16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 17#[serde(rename_all = "snake_case")] 18pub enum FilterMode { 19 Dids = 0, 20 Signal = 1, 21 Full = 2, 22} 23 24/// hot-path in-memory config: only the small fields needed on every event. 25/// dids and excludes are large sets kept in the filter keyspace only. 26#[derive(Debug, Clone, Serialize)] 27pub struct FilterConfig { 28 pub mode: FilterMode, 29 pub signals: Vec<SmolStr>, 30 pub collections: Vec<SmolStr>, 31} 32 33impl FilterConfig { 34 pub fn new(mode: FilterMode) -> Self { 35 Self { 36 mode, 37 signals: Vec::new(), 38 collections: Vec::new(), 39 } 40 } 41 42 pub fn load(ks: &Keyspace) -> Result<Self> { 43 let mode = ks 44 .get(MODE_KEY) 45 .into_diagnostic()? 46 .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 47 .transpose()? 48 .unwrap_or(FilterMode::Dids); 49 50 let mut config = Self::new(mode); 51 52 let signal_prefix = [SIGNAL_PREFIX, SEP]; 53 for guard in ks.prefix(signal_prefix) { 54 let (k, _) = guard.into_inner().into_diagnostic()?; 55 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 56 config.signals.push(SmolStr::new(val)); 57 } 58 59 let col_prefix = [COLLECTION_PREFIX, SEP]; 60 for guard in ks.prefix(col_prefix) { 61 let (k, _) = guard.into_inner().into_diagnostic()?; 62 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 63 config.collections.push(SmolStr::new(val)); 64 } 65 66 Ok(config) 67 } 68 69 /// returns true if the collection matches the content filter. 70 /// if collections is empty, all collections match. 71 pub fn matches_collection(&self, collection: &str) -> bool { 72 if self.collections.is_empty() { 73 return true; 74 } 75 self.collections.iter().any(|p| nsid_matches(p, collection)) 76 } 77 78 /// returns true if the commit touches a collection covered by a signal. 79 pub fn matches_signal(&self, collection: &str) -> bool { 80 self.signals.iter().any(|p| nsid_matches(p, collection)) 81 } 82} 83 84fn nsid_matches(pattern: &str, collection: &str) -> bool { 85 if let Some(prefix) = pattern.strip_suffix(".*") { 86 collection == prefix || collection.starts_with(prefix) 87 } else { 88 collection == pattern 89 } 90} 91 92pub type FilterHandle = Arc<ArcSwap<FilterConfig>>; 93 94pub fn new_handle(config: FilterConfig) -> FilterHandle { 95 Arc::new(ArcSwap::new(Arc::new(config))) 96} 97 98/// apply a bool patch or set replacement for a single set update. 99#[derive(Debug, Deserialize)] 100#[serde(untagged)] 101pub enum SetUpdate { 102 /// replace the entire set with this list 103 Set(Vec<String>), 104 /// patch: true = add, false = remove 105 Patch(std::collections::HashMap<String, bool>), 106} 107 108pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> { 109 let mut key = Vec::with_capacity(2 + val.len()); 110 key.push(prefix); 111 key.push(SEP); 112 key.extend_from_slice(val.as_bytes()); 113 key 114}