at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 215 lines 6.3 kB view raw
1use fjall::{Keyspace, OwnedWriteBatch}; 2use jacquard_common::IntoStatic; 3use jacquard_common::types::nsid::Nsid; 4use jacquard_common::types::string::Did; 5use miette::{IntoDiagnostic, Result}; 6 7use crate::db::types::TrimmedDid; 8use crate::filter::{FilterConfig, FilterMode, SetUpdate}; 9 10pub const MODE_KEY: &[u8] = b"m"; 11pub const SIGNAL_PREFIX: u8 = b's'; 12pub const COLLECTION_PREFIX: u8 = b'c'; 13pub const EXCLUDE_PREFIX: u8 = b'x'; 14pub const SEP: u8 = b'|'; 15 16pub fn signal_key(val: &str) -> Result<Vec<u8>> { 17 let mut key = Vec::with_capacity(2 + val.len()); 18 key.push(SIGNAL_PREFIX); 19 key.push(SEP); 20 key.extend_from_slice(val.as_bytes()); 21 Ok(key) 22} 23 24pub fn collection_key(val: &str) -> Result<Vec<u8>> { 25 let mut key = Vec::with_capacity(2 + val.len()); 26 key.push(COLLECTION_PREFIX); 27 key.push(SEP); 28 key.extend_from_slice(val.as_bytes()); 29 Ok(key) 30} 31 32pub fn exclude_key(val: &str) -> Result<Vec<u8>> { 33 let did = Did::new(val).into_diagnostic()?; 34 let trimmed = TrimmedDid::from(&did); 35 let mut key = Vec::with_capacity(2 + trimmed.len()); 36 key.push(EXCLUDE_PREFIX); 37 key.push(SEP); 38 trimmed.write_to_vec(&mut key); 39 Ok(key) 40} 41 42pub fn apply_patch( 43 batch: &mut OwnedWriteBatch, 44 ks: &Keyspace, 45 mode: Option<FilterMode>, 46 signals: Option<SetUpdate>, 47 collections: Option<SetUpdate>, 48 excludes: Option<SetUpdate>, 49) -> Result<()> { 50 if let Some(mode) = mode { 51 batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?); 52 } 53 54 apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?; 55 apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?; 56 apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?; 57 58 Ok(()) 59} 60 61fn apply_set_update( 62 batch: &mut OwnedWriteBatch, 63 ks: &Keyspace, 64 prefix: u8, 65 update: Option<SetUpdate>, 66) -> Result<()> { 67 let Some(update) = update else { return Ok(()) }; 68 69 let key_fn = match prefix { 70 SIGNAL_PREFIX => signal_key, 71 COLLECTION_PREFIX => collection_key, 72 EXCLUDE_PREFIX => exclude_key, 73 _ => unreachable!(), 74 }; 75 76 match update { 77 SetUpdate::Set(values) => { 78 let scan_prefix = [prefix, SEP]; 79 for guard in ks.prefix(scan_prefix) { 80 let (k, _) = guard.into_inner().into_diagnostic()?; 81 batch.remove(ks, k); 82 } 83 for val in values { 84 batch.insert(ks, key_fn(&val)?, []); 85 } 86 } 87 SetUpdate::Patch(map) => { 88 for (val, add) in map { 89 let key = key_fn(&val)?; 90 if add { 91 batch.insert(ks, key, []); 92 } else { 93 batch.remove(ks, key); 94 } 95 } 96 } 97 } 98 99 Ok(()) 100} 101 102pub fn load(ks: &Keyspace) -> Result<FilterConfig> { 103 let mode = ks 104 .get(MODE_KEY) 105 .into_diagnostic()? 106 .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 107 .transpose()? 108 .unwrap_or(FilterMode::Filter); 109 110 let mut config = FilterConfig::new(mode); 111 112 let signal_prefix = [SIGNAL_PREFIX, SEP]; 113 for guard in ks.prefix(signal_prefix) { 114 let (k, _) = guard.into_inner().into_diagnostic()?; 115 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 116 config.signals.push(Nsid::new(val)?.into_static()); 117 } 118 119 let col_prefix = [COLLECTION_PREFIX, SEP]; 120 for guard in ks.prefix(col_prefix) { 121 let (k, _) = guard.into_inner().into_diagnostic()?; 122 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 123 config.collections.push(Nsid::new(val)?.into_static()); 124 } 125 126 Ok(config) 127} 128 129pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> { 130 let scan_prefix = [prefix, SEP]; 131 let mut out = Vec::new(); 132 for guard in ks.prefix(scan_prefix) { 133 let (k, _) = guard.into_inner().into_diagnostic()?; 134 let val_bytes = &k[2..]; 135 let val = if prefix == EXCLUDE_PREFIX { 136 TrimmedDid::try_from(val_bytes)?.to_did().to_string() 137 } else { 138 std::str::from_utf8(val_bytes).into_diagnostic()?.to_owned() 139 }; 140 out.push(val); 141 } 142 Ok(out) 143} 144 145#[cfg(test)] 146mod tests { 147 use super::*; 148 149 #[test] 150 fn test_filter_keys() { 151 assert_eq!( 152 signal_key("app.bsky.feed.like").unwrap(), 153 b"s|app.bsky.feed.like" 154 ); 155 assert_eq!( 156 collection_key("app.bsky.feed.post").unwrap(), 157 b"c|app.bsky.feed.post" 158 ); 159 } 160 161 #[test] 162 fn test_exclude_key_trimmed() { 163 let did = "did:plc:yk4q3id7id6p5z3bypvshc64"; 164 let key = exclude_key(did).unwrap(); 165 assert_eq!(key[0], EXCLUDE_PREFIX); 166 assert_eq!(key[1], SEP); 167 // TAG_PLC (1) + 15 bytes 168 assert_eq!(key.len(), 2 + 1 + 15); 169 170 let parsed = TrimmedDid::try_from(&key[2..]).unwrap(); 171 assert_eq!(parsed.to_did().as_str(), did); 172 } 173 174 #[test] 175 fn test_apply_and_load() -> Result<()> { 176 let tmp = tempfile::tempdir().into_diagnostic()?; 177 let keyspace = fjall::Database::builder(tmp.path()) 178 .open() 179 .into_diagnostic()?; 180 let ks = keyspace 181 .keyspace("filter", Default::default) 182 .into_diagnostic()?; 183 184 let mut batch = keyspace.batch(); 185 let signals = SetUpdate::Set(vec!["a.b.c".to_string()]); 186 let collections = SetUpdate::Set(vec!["d.e.f".to_string()]); 187 let excludes = SetUpdate::Set(vec!["did:plc:yk4q3id7id6p5z3bypvshc64".to_string()]); 188 189 apply_patch( 190 &mut batch, 191 &ks, 192 Some(FilterMode::Filter), 193 Some(signals), 194 Some(collections), 195 Some(excludes), 196 )?; 197 batch.commit().into_diagnostic()?; 198 199 let config = load(&ks)?; 200 assert_eq!(config.mode, FilterMode::Filter); 201 assert_eq!( 202 config.signals, 203 vec![Nsid::new("a.b.c").unwrap().into_static()] 204 ); 205 assert_eq!( 206 config.collections, 207 vec![Nsid::new("d.e.f").unwrap().into_static()] 208 ); 209 210 let excludes = read_set(&ks, EXCLUDE_PREFIX)?; 211 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]); 212 213 Ok(()) 214 } 215}