at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use fjall::{Keyspace, OwnedWriteBatch};
2use jacquard_common::IntoStatic;
3use jacquard_common::types::nsid::Nsid;
4use jacquard_common::types::string::Did;
5use miette::{IntoDiagnostic, Result};
6
7use crate::db::types::TrimmedDid;
8use crate::filter::{FilterConfig, FilterMode, SetUpdate};
9
10pub const MODE_KEY: &[u8] = b"m";
11pub const SIGNAL_PREFIX: u8 = b's';
12pub const COLLECTION_PREFIX: u8 = b'c';
13pub const EXCLUDE_PREFIX: u8 = b'x';
14pub const SEP: u8 = b'|';
15
16pub fn signal_key(val: &str) -> Result<Vec<u8>> {
17 let mut key = Vec::with_capacity(2 + val.len());
18 key.push(SIGNAL_PREFIX);
19 key.push(SEP);
20 key.extend_from_slice(val.as_bytes());
21 Ok(key)
22}
23
24pub fn collection_key(val: &str) -> Result<Vec<u8>> {
25 let mut key = Vec::with_capacity(2 + val.len());
26 key.push(COLLECTION_PREFIX);
27 key.push(SEP);
28 key.extend_from_slice(val.as_bytes());
29 Ok(key)
30}
31
32pub fn exclude_key(val: &str) -> Result<Vec<u8>> {
33 let did = Did::new(val).into_diagnostic()?;
34 let trimmed = TrimmedDid::from(&did);
35 let mut key = Vec::with_capacity(2 + trimmed.len());
36 key.push(EXCLUDE_PREFIX);
37 key.push(SEP);
38 trimmed.write_to_vec(&mut key);
39 Ok(key)
40}
41
42pub fn apply_patch(
43 batch: &mut OwnedWriteBatch,
44 ks: &Keyspace,
45 mode: Option<FilterMode>,
46 signals: Option<SetUpdate>,
47 collections: Option<SetUpdate>,
48 excludes: Option<SetUpdate>,
49) -> Result<()> {
50 if let Some(mode) = mode {
51 batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?);
52 }
53
54 apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?;
55 apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?;
56 apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?;
57
58 Ok(())
59}
60
61fn apply_set_update(
62 batch: &mut OwnedWriteBatch,
63 ks: &Keyspace,
64 prefix: u8,
65 update: Option<SetUpdate>,
66) -> Result<()> {
67 let Some(update) = update else { return Ok(()) };
68
69 let key_fn = match prefix {
70 SIGNAL_PREFIX => signal_key,
71 COLLECTION_PREFIX => collection_key,
72 EXCLUDE_PREFIX => exclude_key,
73 _ => unreachable!(),
74 };
75
76 match update {
77 SetUpdate::Set(values) => {
78 let scan_prefix = [prefix, SEP];
79 for guard in ks.prefix(scan_prefix) {
80 let (k, _) = guard.into_inner().into_diagnostic()?;
81 batch.remove(ks, k);
82 }
83 for val in values {
84 batch.insert(ks, key_fn(&val)?, []);
85 }
86 }
87 SetUpdate::Patch(map) => {
88 for (val, add) in map {
89 let key = key_fn(&val)?;
90 if add {
91 batch.insert(ks, key, []);
92 } else {
93 batch.remove(ks, key);
94 }
95 }
96 }
97 }
98
99 Ok(())
100}
101
102pub fn load(ks: &Keyspace) -> Result<FilterConfig> {
103 let mode = ks
104 .get(MODE_KEY)
105 .into_diagnostic()?
106 .map(|v| rmp_serde::from_slice(&v).into_diagnostic())
107 .transpose()?
108 .unwrap_or(FilterMode::Filter);
109
110 let mut config = FilterConfig::new(mode);
111
112 let signal_prefix = [SIGNAL_PREFIX, SEP];
113 for guard in ks.prefix(signal_prefix) {
114 let (k, _) = guard.into_inner().into_diagnostic()?;
115 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
116 config.signals.push(Nsid::new(val)?.into_static());
117 }
118
119 let col_prefix = [COLLECTION_PREFIX, SEP];
120 for guard in ks.prefix(col_prefix) {
121 let (k, _) = guard.into_inner().into_diagnostic()?;
122 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
123 config.collections.push(Nsid::new(val)?.into_static());
124 }
125
126 Ok(config)
127}
128
129pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> {
130 let scan_prefix = [prefix, SEP];
131 let mut out = Vec::new();
132 for guard in ks.prefix(scan_prefix) {
133 let (k, _) = guard.into_inner().into_diagnostic()?;
134 let val_bytes = &k[2..];
135 let val = if prefix == EXCLUDE_PREFIX {
136 TrimmedDid::try_from(val_bytes)?.to_did().to_string()
137 } else {
138 std::str::from_utf8(val_bytes).into_diagnostic()?.to_owned()
139 };
140 out.push(val);
141 }
142 Ok(out)
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_filter_keys() {
151 assert_eq!(
152 signal_key("app.bsky.feed.like").unwrap(),
153 b"s|app.bsky.feed.like"
154 );
155 assert_eq!(
156 collection_key("app.bsky.feed.post").unwrap(),
157 b"c|app.bsky.feed.post"
158 );
159 }
160
161 #[test]
162 fn test_exclude_key_trimmed() {
163 let did = "did:plc:yk4q3id7id6p5z3bypvshc64";
164 let key = exclude_key(did).unwrap();
165 assert_eq!(key[0], EXCLUDE_PREFIX);
166 assert_eq!(key[1], SEP);
167 // TAG_PLC (1) + 15 bytes
168 assert_eq!(key.len(), 2 + 1 + 15);
169
170 let parsed = TrimmedDid::try_from(&key[2..]).unwrap();
171 assert_eq!(parsed.to_did().as_str(), did);
172 }
173
174 #[test]
175 fn test_apply_and_load() -> Result<()> {
176 let tmp = tempfile::tempdir().into_diagnostic()?;
177 let keyspace = fjall::Database::builder(tmp.path())
178 .open()
179 .into_diagnostic()?;
180 let ks = keyspace
181 .keyspace("filter", Default::default)
182 .into_diagnostic()?;
183
184 let mut batch = keyspace.batch();
185 let signals = SetUpdate::Set(vec!["a.b.c".to_string()]);
186 let collections = SetUpdate::Set(vec!["d.e.f".to_string()]);
187 let excludes = SetUpdate::Set(vec!["did:plc:yk4q3id7id6p5z3bypvshc64".to_string()]);
188
189 apply_patch(
190 &mut batch,
191 &ks,
192 Some(FilterMode::Filter),
193 Some(signals),
194 Some(collections),
195 Some(excludes),
196 )?;
197 batch.commit().into_diagnostic()?;
198
199 let config = load(&ks)?;
200 assert_eq!(config.mode, FilterMode::Filter);
201 assert_eq!(
202 config.signals,
203 vec![Nsid::new("a.b.c").unwrap().into_static()]
204 );
205 assert_eq!(
206 config.collections,
207 vec![Nsid::new("d.e.f").unwrap().into_static()]
208 );
209
210 let excludes = read_set(&ks, EXCLUDE_PREFIX)?;
211 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]);
212
213 Ok(())
214 }
215}