at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[filter] rewrite to use separate keys, add tests

ptr.pet 53ac9ddd 7035e18d

verified
+119 -21
+10 -9
Cargo.lock
··· 1699 1699 "serde_json", 1700 1700 "smallvec 2.0.0-alpha.12", 1701 1701 "smol_str", 1702 + "tempfile", 1702 1703 "thiserror 2.0.18", 1703 1704 "tokio", 1704 1705 "tokio-stream", ··· 2363 2364 2364 2365 [[package]] 2365 2366 name = "libc" 2366 - version = "0.2.180" 2367 + version = "0.2.182" 2367 2368 source = "registry+https://github.com/rust-lang/crates.io-index" 2368 - checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" 2369 + checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" 2369 2370 2370 2371 [[package]] 2371 2372 name = "libm" ··· 2402 2403 2403 2404 [[package]] 2404 2405 name = "linux-raw-sys" 2405 - version = "0.11.0" 2406 + version = "0.12.1" 2406 2407 source = "registry+https://github.com/rust-lang/crates.io-index" 2407 - checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" 2408 + checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" 2408 2409 2409 2410 [[package]] 2410 2411 name = "litemap" ··· 3556 3557 3557 3558 [[package]] 3558 3559 name = "rustix" 3559 - version = "1.1.3" 3560 + version = "1.1.4" 3560 3561 source = "registry+https://github.com/rust-lang/crates.io-index" 3561 - checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" 3562 + checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" 3562 3563 dependencies = [ 3563 3564 "bitflags", 3564 3565 "errno", ··· 4166 4167 4167 4168 [[package]] 4168 4169 name = "tempfile" 4169 - version = "3.24.0" 4170 + version = "3.26.0" 4170 4171 source = "registry+https://github.com/rust-lang/crates.io-index" 4171 - checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" 4172 + checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" 4172 4173 dependencies = [ 4173 4174 "fastrand", 4174 - "getrandom 0.3.4", 4175 + "getrandom 0.4.1", 4175 4176 "once_cell", 4176 4177 "rustix", 4177 4178 "windows-sys 0.61.2",
+3
Cargo.toml
··· 49 49 glob = "0.3" 50 50 ordermap = { version = "1.1.0", features = ["serde"] } 51 51 arc-swap = "1.8.2" 52 + 53 + [dev-dependencies] 54 + tempfile = "3.26.0"
+1 -4
src/crawler/mod.rs
··· 140 140 for repo in output.repos { 141 141 let did_key = keys::repo_key(&repo.did); 142 142 143 - let excl_key = crate::db::filter::filter_key( 144 - crate::db::filter::EXCLUDE_PREFIX, 145 - repo.did.as_str(), 146 - ); 143 + let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 147 144 if db.filter.contains_key(&excl_key).into_diagnostic()? { 148 145 continue; 149 146 }
+104 -6
src/db/filter.rs
··· 1 1 use fjall::{Keyspace, OwnedWriteBatch}; 2 2 use miette::{IntoDiagnostic, Result}; 3 3 4 + use crate::db::types::TrimmedDid; 4 5 use crate::filter::{FilterConfig, FilterMode, SetUpdate}; 6 + use jacquard_common::types::string::Did; 5 7 6 8 pub const MODE_KEY: &[u8] = b"m"; 7 9 pub const SIGNAL_PREFIX: u8 = b's'; ··· 9 11 pub const EXCLUDE_PREFIX: u8 = b'x'; 10 12 pub const SEP: u8 = b'|'; 11 13 12 - pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> { 14 + pub fn signal_key(val: &str) -> Result<Vec<u8>> { 13 15 let mut key = Vec::with_capacity(2 + val.len()); 14 - key.push(prefix); 16 + key.push(SIGNAL_PREFIX); 15 17 key.push(SEP); 16 18 key.extend_from_slice(val.as_bytes()); 17 - key 19 + Ok(key) 20 + } 21 + 22 + pub fn collection_key(val: &str) -> Result<Vec<u8>> { 23 + let mut key = Vec::with_capacity(2 + val.len()); 24 + key.push(COLLECTION_PREFIX); 25 + key.push(SEP); 26 + key.extend_from_slice(val.as_bytes()); 27 + Ok(key) 28 + } 29 + 30 + pub fn exclude_key(val: &str) -> Result<Vec<u8>> { 31 + let did = Did::new(val).into_diagnostic()?; 32 + let trimmed = TrimmedDid::from(&did); 33 + let mut key = Vec::with_capacity(2 + trimmed.len()); 34 + key.push(EXCLUDE_PREFIX); 35 + key.push(SEP); 36 + trimmed.write_to_vec(&mut key); 37 + Ok(key) 18 38 } 19 39 20 40 pub fn apply_patch( ··· 44 64 ) -> Result<()> { 45 65 let Some(update) = update else { return Ok(()) }; 46 66 67 + let key_fn = match prefix { 68 + SIGNAL_PREFIX => signal_key, 69 + COLLECTION_PREFIX => collection_key, 70 + EXCLUDE_PREFIX => exclude_key, 71 + _ => unreachable!(), 72 + }; 73 + 47 74 match update { 48 75 SetUpdate::Set(values) => { 49 76 let scan_prefix = [prefix, SEP]; ··· 52 79 batch.remove(ks, k); 53 80 } 54 81 for val in values { 55 - batch.insert(ks, filter_key(prefix, &val), []); 82 + batch.insert(ks, key_fn(&val)?, []); 56 83 } 57 84 } 58 85 SetUpdate::Patch(map) => { 59 86 for (val, add) in map { 60 - let key = filter_key(prefix, &val); 87 + let key = key_fn(&val)?; 61 88 if add { 62 89 batch.insert(ks, key, []); 63 90 } else { ··· 102 129 let mut out = Vec::new(); 103 130 for guard in ks.prefix(scan_prefix) { 104 131 let (k, _) = guard.into_inner().into_diagnostic()?; 105 - let val = std::str::from_utf8(&k[2..]).into_diagnostic()?.to_owned(); 132 + let val_bytes = &k[2..]; 133 + let val = if prefix == EXCLUDE_PREFIX { 134 + TrimmedDid::try_from(val_bytes)?.to_did().to_string() 135 + } else { 136 + std::str::from_utf8(val_bytes).into_diagnostic()?.to_owned() 137 + }; 106 138 out.push(val); 107 139 } 108 140 Ok(out) 109 141 } 142 + 143 + #[cfg(test)] 144 + mod tests { 145 + use super::*; 146 + 147 + #[test] 148 + fn test_filter_keys() { 149 + assert_eq!( 150 + signal_key("app.bsky.feed.like").unwrap(), 151 + b"s|app.bsky.feed.like" 152 + ); 153 + assert_eq!( 154 + collection_key("app.bsky.feed.post").unwrap(), 155 + b"c|app.bsky.feed.post" 156 + ); 157 + } 158 + 159 + #[test] 160 + fn test_exclude_key_trimmed() { 161 + let did = "did:plc:yk4q3id7id6p5z3bypvshc64"; 162 + let key = exclude_key(did).unwrap(); 163 + assert_eq!(key[0], EXCLUDE_PREFIX); 164 + assert_eq!(key[1], SEP); 165 + // TAG_PLC (1) + 15 bytes 166 + assert_eq!(key.len(), 2 + 1 + 15); 167 + 168 + let parsed = TrimmedDid::try_from(&key[2..]).unwrap(); 169 + assert_eq!(parsed.to_did().as_str(), did); 170 + } 171 + 172 + #[test] 173 + fn test_apply_and_load() -> Result<()> { 174 + let tmp = tempfile::tempdir().into_diagnostic()?; 175 + let keyspace = fjall::Database::builder(tmp.path()) 176 + .open() 177 + .into_diagnostic()?; 178 + let ks = keyspace 179 + .keyspace("filter", Default::default) 180 + .into_diagnostic()?; 181 + 182 + let mut batch = keyspace.batch(); 183 + let signals = SetUpdate::Set(vec!["a.b.c".to_string()]); 184 + let collections = SetUpdate::Set(vec!["d.e.f".to_string()]); 185 + let excludes = SetUpdate::Set(vec!["did:plc:yk4q3id7id6p5z3bypvshc64".to_string()]); 186 + 187 + apply_patch( 188 + &mut batch, 189 + &ks, 190 + Some(FilterMode::Filter), 191 + Some(signals), 192 + Some(collections), 193 + Some(excludes), 194 + )?; 195 + batch.commit().into_diagnostic()?; 196 + 197 + let config = load(&ks)?; 198 + assert_eq!(config.mode, FilterMode::Filter); 199 + assert_eq!(config.signals, vec!["a.b.c"]); 200 + assert_eq!(config.collections, vec!["d.e.f"]); 201 + 202 + let excludes = read_set(&ks, EXCLUDE_PREFIX)?; 203 + assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]); 204 + 205 + Ok(()) 206 + } 207 + }
+1 -2
src/ingest/firehose.rs
··· 118 118 async fn should_process(&self, did: &Did<'_>) -> Result<bool> { 119 119 let filter = self.filter.load(); 120 120 121 - let excl_key = 122 - crate::db::filter::filter_key(crate::db::filter::EXCLUDE_PREFIX, did.as_str()); 121 + let excl_key = crate::db::filter::exclude_key(did.as_str())?; 123 122 if self 124 123 .state 125 124 .db