at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[filter] implement filter system, replace /repo with /filter

ptr.pet d537b646 528f03b3

verified
+471 -198
+10
Cargo.lock
··· 75 75 checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" 76 76 77 77 [[package]] 78 + name = "arc-swap" 79 + version = "1.8.2" 80 + source = "registry+https://github.com/rust-lang/crates.io-index" 81 + checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5" 82 + dependencies = [ 83 + "rustversion", 84 + ] 85 + 86 + [[package]] 78 87 name = "ascii" 79 88 version = "1.1.0" 80 89 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1658 1667 name = "hydrant" 1659 1668 version = "0.1.0" 1660 1669 dependencies = [ 1670 + "arc-swap", 1661 1671 "async-stream", 1662 1672 "axum", 1663 1673 "chrono",
+2 -1
Cargo.toml
··· 47 47 thiserror = "2.0.18" 48 48 rand = "0.10.0" 49 49 glob = "0.3" 50 - ordermap = { version = "1.1.0", features = ["serde"] } 50 + ordermap = { version = "1.1.0", features = ["serde"] } 51 + arc-swap = "1.8.2"
+161
src/api/filter.rs
··· 1 + use std::sync::Arc; 2 + 3 + use axum::{ 4 + Json, Router, 5 + extract::State, 6 + http::StatusCode, 7 + routing::{get, patch}, 8 + }; 9 + use miette::IntoDiagnostic; 10 + use rand::Rng; 11 + use serde::{Deserialize, Serialize}; 12 + 13 + use crate::api::AppState; 14 + use crate::db::{self, keys, ser_repo_state}; 15 + use crate::filter::{DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, SetUpdate}; 16 + use crate::types::{GaugeState, RepoState}; 17 + 18 + pub fn router() -> Router<Arc<AppState>> { 19 + Router::new() 20 + .route("/filter", get(handle_get_filter)) 21 + .route("/filter", patch(handle_patch_filter)) 22 + } 23 + 24 + #[derive(Serialize)] 25 + pub struct FilterResponse { 26 + pub mode: FilterMode, 27 + pub dids: Vec<String>, 28 + pub signals: Vec<String>, 29 + pub collections: Vec<String>, 30 + pub excludes: Vec<String>, 31 + } 32 + 33 + pub async fn handle_get_filter( 34 + State(state): State<Arc<AppState>>, 35 + ) -> Result<Json<FilterResponse>, (StatusCode, String)> { 36 + let filter_ks = state.db.filter.clone(); 37 + let resp = tokio::task::spawn_blocking(move || { 38 + let hot = FilterConfig::load(&filter_ks).map_err(|e| e.to_string())?; 39 + let dids = db::filter::read_set(&filter_ks, DID_PREFIX).map_err(|e| e.to_string())?; 40 + let excludes = 41 + db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?; 42 + Ok::<_, String>(FilterResponse { 43 + mode: hot.mode, 44 + dids, 45 + signals: hot.signals.iter().map(|s| s.to_string()).collect(), 46 + collections: hot.collections.iter().map(|s| s.to_string()).collect(), 47 + excludes, 48 + }) 49 + }) 50 + .await 51 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 52 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 53 + 54 + Ok(Json(resp)) 55 + } 56 + 57 + #[derive(Deserialize)] 58 + pub struct FilterPatch { 59 + pub mode: Option<FilterMode>, 60 + pub dids: Option<SetUpdate>, 61 + pub signals: Option<SetUpdate>, 62 + pub collections: Option<SetUpdate>, 63 + pub excludes: Option<SetUpdate>, 64 + } 65 + 66 + pub async fn handle_patch_filter( 67 + State(state): State<Arc<AppState>>, 68 + Json(patch): Json<FilterPatch>, 69 + ) -> Result<StatusCode, (StatusCode, String)> { 70 + let db = &state.db; 71 + 72 + let new_dids: Option<Vec<String>> = match &patch.dids { 73 + Some(SetUpdate::Set(dids)) => Some(dids.clone()), 74 + Some(SetUpdate::Patch(map)) => { 75 + let added: Vec<String> = map 76 + .iter() 77 + .filter(|(_, add)| **add) 78 + .map(|(d, _)| d.clone()) 79 + .collect(); 80 + (!added.is_empty()).then_some(added) 81 + } 82 + None => None, 83 + }; 84 + 85 + let filter_ks = db.filter.clone(); 86 + let repos_ks = db.repos.clone(); 87 + let pending_ks = db.pending.clone(); 88 + let inner = db.inner.clone(); 89 + 90 + let patch_mode = patch.mode; 91 + let patch_dids = patch.dids; 92 + let patch_signals = patch.signals; 93 + let patch_collections = patch.collections; 94 + let patch_excludes = patch.excludes; 95 + 96 + let (new_repo_count, new_filter) = tokio::task::spawn_blocking(move || { 97 + let mut batch = inner.batch(); 98 + 99 + db::filter::apply_patch( 100 + &mut batch, 101 + &filter_ks, 102 + patch_mode, 103 + patch_dids, 104 + patch_signals, 105 + patch_collections, 106 + patch_excludes, 107 + ) 108 + .map_err(|e| e.to_string())?; 109 + 110 + let mut added = 0i64; 111 + 112 + if let Some(dids) = new_dids { 113 + for did_str in &dids { 114 + let did = 115 + jacquard::types::did::Did::new_owned(did_str).map_err(|e| e.to_string())?; 116 + let did_key = keys::repo_key(&did); 117 + let exists = repos_ks 118 + .contains_key(&did_key) 119 + .into_diagnostic() 120 + .map_err(|e| e.to_string())?; 121 + if !exists { 122 + let repo_state = RepoState::backfilling(rand::rng().next_u64()); 123 + let bytes = ser_repo_state(&repo_state).map_err(|e| e.to_string())?; 124 + batch.insert(&repos_ks, &did_key, bytes); 125 + batch.insert( 126 + &pending_ks, 127 + keys::pending_key(repo_state.index_id), 128 + &did_key, 129 + ); 130 + added += 1; 131 + } 132 + } 133 + } 134 + 135 + batch 136 + .commit() 137 + .into_diagnostic() 138 + .map_err(|e| e.to_string())?; 139 + 140 + let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 141 + Ok::<_, String>((added, new_filter)) 142 + }) 143 + .await 144 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 145 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 146 + 147 + state.filter.store(Arc::new(new_filter)); 148 + 149 + if new_repo_count > 0 { 150 + state.db.update_count_async("repos", new_repo_count).await; 151 + for _ in 0..new_repo_count { 152 + state 153 + .db 154 + .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending) 155 + .await; 156 + } 157 + state.notify_backfill(); 158 + } 159 + 160 + Ok(StatusCode::OK) 161 + }
+2 -2
src/api/mod.rs
··· 7 7 use tower_http::trace::TraceLayer; 8 8 9 9 mod debug; 10 - pub mod repo; 10 + pub mod filter; 11 11 pub mod stats; 12 12 mod stream; 13 13 pub mod xrpc; ··· 20 20 .route("/stats", get(stats::get_stats)) 21 21 .route("/stream", get(stream::handle_stream)) 22 22 .merge(xrpc::router()) 23 - .merge(repo::router()) 23 + .merge(filter::router()) 24 24 .with_state(state) 25 25 .layer(TraceLayer::new_for_http()) 26 26 .layer(CorsLayer::permissive());
-153
src/api/repo.rs
··· 1 - use crate::api::AppState; 2 - use crate::db::{Db, keys, ser_repo_state}; 3 - use crate::ops; 4 - use crate::types::{GaugeState, RepoState}; 5 - use axum::{Json, Router, extract::State, http::StatusCode, routing::post}; 6 - use jacquard::types::did::Did; 7 - use rand::Rng; 8 - use serde::Deserialize; 9 - use std::sync::Arc; 10 - 11 - pub fn router() -> Router<Arc<AppState>> { 12 - Router::new() 13 - .route("/repo/add", post(handle_repo_add)) 14 - .route("/repo/remove", post(handle_repo_remove)) 15 - } 16 - 17 - #[derive(Deserialize)] 18 - pub struct RepoAddRequest { 19 - pub dids: Vec<String>, 20 - } 21 - 22 - pub async fn handle_repo_add( 23 - State(state): State<Arc<AppState>>, 24 - Json(req): Json<RepoAddRequest>, 25 - ) -> Result<StatusCode, (StatusCode, String)> { 26 - let db = &state.db; 27 - let mut batch = db.inner.batch(); 28 - let mut added = 0; 29 - 30 - for did_str in req.dids { 31 - let did = Did::new_owned(did_str.as_str()) 32 - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 33 - let did_key = keys::repo_key(&did); 34 - if !Db::contains_key(db.repos.clone(), &did_key) 35 - .await 36 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 37 - { 38 - let repo_state = RepoState::backfilling(rand::rng().next_u64()); 39 - let bytes = ser_repo_state(&repo_state) 40 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 41 - 42 - batch.insert(&db.repos, &did_key, bytes); 43 - batch.insert( 44 - &db.pending, 45 - keys::pending_key(repo_state.index_id), 46 - &did_key, 47 - ); 48 - 49 - added += 1; 50 - } 51 - } 52 - 53 - if added > 0 { 54 - tokio::task::spawn_blocking(move || batch.commit().map_err(|e| e.to_string())) 55 - .await 56 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 57 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 58 - 59 - state.db.update_count_async("repos", added).await; 60 - for _ in 0..added { 61 - state 62 - .db 63 - .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending) 64 - .await; 65 - } 66 - 67 - // trigger backfill worker 68 - state.notify_backfill(); 69 - } 70 - Ok(StatusCode::OK) 71 - } 72 - 73 - #[derive(Deserialize)] 74 - pub struct RepoRemoveRequest { 75 - pub dids: Vec<String>, 76 - } 77 - 78 - pub async fn handle_repo_remove( 79 - State(state): State<Arc<AppState>>, 80 - Json(req): Json<RepoRemoveRequest>, 81 - ) -> Result<StatusCode, (StatusCode, String)> { 82 - let db = &state.db; 83 - let mut batch = db.inner.batch(); 84 - let mut removed_repos = 0; 85 - let mut old_gauges = Vec::new(); 86 - 87 - for did_str in req.dids { 88 - let did = Did::new_owned(did_str.as_str()) 89 - .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 90 - let did_key = keys::repo_key(&did); 91 - 92 - if let Some(state_bytes) = Db::get(db.repos.clone(), &did_key) 93 - .await 94 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 95 - { 96 - let repo_state = crate::db::deser_repo_state(&state_bytes) 97 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 98 - 99 - let was_pending = matches!(repo_state.status, crate::types::RepoStatus::Backfilling); 100 - // todo: idk 101 - // let was_resync = matches!( 102 - // repo_state.status, 103 - // crate::types::RepoStatus::Error(_) 104 - // | crate::types::RepoStatus::Deactivated 105 - // | crate::types::RepoStatus::Takendown 106 - // | crate::types::RepoStatus::Suspended 107 - // ); 108 - 109 - let old_gauge = if was_pending { 110 - GaugeState::Pending 111 - } else if let Some(resync_bytes) = Db::get(db.resync.clone(), &did_key) 112 - .await 113 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 114 - { 115 - let resync_state: crate::types::ResyncState = rmp_serde::from_slice(&resync_bytes) 116 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 117 - 118 - let kind = if let crate::types::ResyncState::Error { kind, .. } = resync_state { 119 - Some(kind) 120 - } else { 121 - None 122 - }; 123 - GaugeState::Resync(kind) 124 - } else { 125 - GaugeState::Synced 126 - }; 127 - 128 - ops::delete_repo(&mut batch, db, &did, repo_state) 129 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 130 - 131 - old_gauges.push(old_gauge); 132 - 133 - removed_repos -= 1; 134 - } 135 - } 136 - 137 - tokio::task::spawn_blocking(move || batch.commit().map_err(|e| e.to_string())) 138 - .await 139 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 140 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 141 - 142 - if removed_repos != 0 { 143 - state.db.update_count_async("repos", removed_repos).await; 144 - for old_gauge in old_gauges { 145 - state 146 - .db 147 - .update_gauge_diff_async(&old_gauge, &GaugeState::Synced) 148 - .await; 149 - } 150 - } 151 - 152 - Ok(StatusCode::OK) 153 - }
+77
src/db/filter.rs
··· 1 + use fjall::{Keyspace, OwnedWriteBatch}; 2 + use miette::{IntoDiagnostic, Result}; 3 + 4 + use crate::filter::{ 5 + COLLECTION_PREFIX, DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, MODE_KEY, SEP, 6 + SIGNAL_PREFIX, SetUpdate, filter_key, 7 + }; 8 + 9 + pub fn apply_patch( 10 + batch: &mut OwnedWriteBatch, 11 + ks: &Keyspace, 12 + mode: Option<FilterMode>, 13 + dids: Option<SetUpdate>, 14 + signals: Option<SetUpdate>, 15 + collections: Option<SetUpdate>, 16 + excludes: Option<SetUpdate>, 17 + ) -> Result<()> { 18 + if let Some(mode) = mode { 19 + batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?); 20 + } 21 + 22 + apply_set_update(batch, ks, DID_PREFIX, dids)?; 23 + apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?; 24 + apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?; 25 + apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?; 26 + 27 + Ok(()) 28 + } 29 + 30 + fn apply_set_update( 31 + batch: &mut OwnedWriteBatch, 32 + ks: &Keyspace, 33 + prefix: u8, 34 + update: Option<SetUpdate>, 35 + ) -> Result<()> { 36 + let Some(update) = update else { return Ok(()) }; 37 + 38 + match update { 39 + SetUpdate::Set(values) => { 40 + let scan_prefix = [prefix, SEP]; 41 + for guard in ks.prefix(scan_prefix) { 42 + let (k, _) = guard.into_inner().into_diagnostic()?; 43 + batch.remove(ks, k); 44 + } 45 + for val in values { 46 + batch.insert(ks, filter_key(prefix, &val), []); 47 + } 48 + } 49 + SetUpdate::Patch(map) => { 50 + for (val, add) in map { 51 + let key = filter_key(prefix, &val); 52 + if add { 53 + batch.insert(ks, key, []); 54 + } else { 55 + batch.remove(ks, key); 56 + } 57 + } 58 + } 59 + } 60 + 61 + Ok(()) 62 + } 63 + 64 + pub fn load(ks: &Keyspace) -> Result<FilterConfig> { 65 + FilterConfig::load(ks) 66 + } 67 + 68 + pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> { 69 + let scan_prefix = [prefix, SEP]; 70 + let mut out = Vec::new(); 71 + for guard in ks.prefix(scan_prefix) { 72 + let (k, _) = guard.into_inner().into_diagnostic()?; 73 + let val = std::str::from_utf8(&k[2..]).into_diagnostic()?.to_owned(); 74 + out.push(val); 75 + } 76 + Ok(out) 77 + }
+114
src/filter.rs
··· 1 + use std::sync::Arc; 2 + 3 + use arc_swap::ArcSwap; 4 + use fjall::Keyspace; 5 + use miette::{IntoDiagnostic, Result}; 6 + use serde::{Deserialize, Serialize}; 7 + use smol_str::SmolStr; 8 + 9 + pub const MODE_KEY: &[u8] = b"m"; 10 + pub const DID_PREFIX: u8 = b'd'; 11 + pub const SIGNAL_PREFIX: u8 = b's'; 12 + pub const COLLECTION_PREFIX: u8 = b'c'; 13 + pub const EXCLUDE_PREFIX: u8 = b'x'; 14 + pub const SEP: u8 = b'|'; 15 + 16 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 17 + #[serde(rename_all = "snake_case")] 18 + pub enum FilterMode { 19 + Dids = 0, 20 + Signal = 1, 21 + Full = 2, 22 + } 23 + 24 + /// hot-path in-memory config: only the small fields needed on every event. 25 + /// dids and excludes are large sets kept in the filter keyspace only. 26 + #[derive(Debug, Clone, Serialize)] 27 + pub struct FilterConfig { 28 + pub mode: FilterMode, 29 + pub signals: Vec<SmolStr>, 30 + pub collections: Vec<SmolStr>, 31 + } 32 + 33 + impl FilterConfig { 34 + pub fn new(mode: FilterMode) -> Self { 35 + Self { 36 + mode, 37 + signals: Vec::new(), 38 + collections: Vec::new(), 39 + } 40 + } 41 + 42 + pub fn load(ks: &Keyspace) -> Result<Self> { 43 + let mode = ks 44 + .get(MODE_KEY) 45 + .into_diagnostic()? 46 + .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 47 + .transpose()? 48 + .unwrap_or(FilterMode::Dids); 49 + 50 + let mut config = Self::new(mode); 51 + 52 + let signal_prefix = [SIGNAL_PREFIX, SEP]; 53 + for guard in ks.prefix(signal_prefix) { 54 + let (k, _) = guard.into_inner().into_diagnostic()?; 55 + let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 56 + config.signals.push(SmolStr::new(val)); 57 + } 58 + 59 + let col_prefix = [COLLECTION_PREFIX, SEP]; 60 + for guard in ks.prefix(col_prefix) { 61 + let (k, _) = guard.into_inner().into_diagnostic()?; 62 + let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 63 + config.collections.push(SmolStr::new(val)); 64 + } 65 + 66 + Ok(config) 67 + } 68 + 69 + /// returns true if the collection matches the content filter. 70 + /// if collections is empty, all collections match. 71 + pub fn matches_collection(&self, collection: &str) -> bool { 72 + if self.collections.is_empty() { 73 + return true; 74 + } 75 + self.collections.iter().any(|p| nsid_matches(p, collection)) 76 + } 77 + 78 + /// returns true if the commit touches a collection covered by a signal. 79 + pub fn matches_signal(&self, collection: &str) -> bool { 80 + self.signals.iter().any(|p| nsid_matches(p, collection)) 81 + } 82 + } 83 + 84 + fn nsid_matches(pattern: &str, collection: &str) -> bool { 85 + if let Some(prefix) = pattern.strip_suffix(".*") { 86 + collection == prefix || collection.starts_with(prefix) 87 + } else { 88 + collection == pattern 89 + } 90 + } 91 + 92 + pub type FilterHandle = Arc<ArcSwap<FilterConfig>>; 93 + 94 + pub fn new_handle(config: FilterConfig) -> FilterHandle { 95 + Arc::new(ArcSwap::new(Arc::new(config))) 96 + } 97 + 98 + /// apply a bool patch or set replacement for a single set update. 99 + #[derive(Debug, Deserialize)] 100 + #[serde(untagged)] 101 + pub enum SetUpdate { 102 + /// replace the entire set with this list 103 + Set(Vec<String>), 104 + /// patch: true = add, false = remove 105 + Patch(std::collections::HashMap<String, bool>), 106 + } 107 + 108 + pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> { 109 + let mut key = Vec::with_capacity(2 + val.len()); 110 + key.push(prefix); 111 + key.push(SEP); 112 + key.extend_from_slice(val.as_bytes()); 113 + key 114 + }
+33 -21
src/ingest/firehose.rs
··· 1 1 use crate::db::{self, Db, keys}; 2 + use crate::filter::{FilterHandle, FilterMode}; 2 3 use crate::ingest::{BufferTx, IngestMessage}; 3 4 use crate::state::AppState; 4 5 use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 5 6 use jacquard::types::did::Did; 6 7 use jacquard_common::xrpc::{SubscriptionClient, TungsteniteSubscriptionClient}; 7 - use miette::Result; 8 + use miette::{IntoDiagnostic, Result}; 8 9 use n0_future::StreamExt; 9 10 use std::sync::Arc; 10 11 use std::sync::atomic::Ordering; ··· 16 17 state: Arc<AppState>, 17 18 buffer_tx: BufferTx, 18 19 relay_host: Url, 19 - full_network: bool, 20 + filter: FilterHandle, 20 21 _verify_signatures: bool, 21 22 } 22 23 ··· 25 26 state: Arc<AppState>, 26 27 buffer_tx: BufferTx, 27 28 relay_host: Url, 28 - full_network: bool, 29 + filter: FilterHandle, 29 30 verify_signatures: bool, 30 31 ) -> Self { 31 32 Self { 32 33 state, 33 34 buffer_tx, 34 35 relay_host, 35 - full_network, 36 + filter, 36 37 _verify_signatures: verify_signatures, 37 38 } 38 39 } 39 40 40 41 pub async fn run(mut self) -> Result<()> { 41 42 loop { 42 - // 1. load cursor 43 43 let current_cursor = self.state.cur_firehose.load(Ordering::SeqCst); 44 44 let start_cursor = if current_cursor > 0 { 45 45 Some(current_cursor) ··· 55 55 self.state.cur_firehose.store(c, Ordering::SeqCst); 56 56 } 57 57 58 - // 2. connect 59 58 let client = TungsteniteSubscriptionClient::from_base_uri(self.relay_host.clone()); 60 59 let params = SubscribeRepos::new; 61 60 let params = start_cursor ··· 76 75 77 76 info!("firehose connected"); 78 77 79 - // 3. process loop 80 78 while let Some(msg_res) = messages.next().await { 81 79 match msg_res { 82 80 Ok(msg) => self.handle_message(msg).await, ··· 98 96 SubscribeReposMessage::Identity(identity) => &identity.did, 99 97 SubscribeReposMessage::Account(account) => &account.did, 100 98 SubscribeReposMessage::Sync(sync) => &sync.did, 101 - // todo: handle info and unknowns 102 99 _ => return, 103 100 }; 104 101 ··· 111 108 return; 112 109 } 113 110 114 - // pre-warm the key cache for commit events 115 - // if self.verify_signatures && matches!(&msg, SubscribeReposMessage::Commit(_)) { 116 - // let state = self.state.clone(); 117 - // let did = did.clone(); 118 - // tokio::spawn(async move { 119 - // let _ = state.resolver.resolve_signing_key(&did).await; 120 - // }); 121 - // } 122 - 123 111 if let Err(e) = self.buffer_tx.send(IngestMessage::Firehose(msg)) { 124 112 error!("failed to send message to buffer processor: {e}"); 125 113 } 126 114 } 127 115 128 116 async fn should_process(&self, did: &Did<'_>) -> Result<bool> { 129 - if self.full_network { 130 - return Ok(true); 117 + let filter = self.filter.load(); 118 + 119 + let excl_key = crate::filter::filter_key(crate::filter::EXCLUDE_PREFIX, did.as_str()); 120 + if self 121 + .state 122 + .db 123 + .filter 124 + .contains_key(&excl_key) 125 + .into_diagnostic()? 126 + { 127 + return Ok(false); 131 128 } 132 - let did_key = keys::repo_key(did); 133 - Db::contains_key(self.state.db.repos.clone(), did_key).await 129 + 130 + match filter.mode { 131 + FilterMode::Full => Ok(true), 132 + FilterMode::Dids | FilterMode::Signal => { 133 + let did_key = crate::filter::filter_key(crate::filter::DID_PREFIX, did.as_str()); 134 + if self 135 + .state 136 + .db 137 + .filter 138 + .contains_key(&did_key) 139 + .into_diagnostic()? 140 + { 141 + return Ok(true); 142 + } 143 + Db::contains_key(self.state.db.repos.clone(), keys::repo_key(did)).await 144 + } 145 + } 134 146 } 135 147 }
+20 -5
src/ingest/worker.rs
··· 1 1 use crate::db::{self, keys}; 2 + use crate::filter::FilterMode; 2 3 use crate::ingest::{BufferRx, IngestMessage}; 3 4 use crate::ops; 4 5 use crate::resolver::{NoSigningKeyError, ResolverError}; ··· 501 502 repo_state, 502 503 &commit, 503 504 Self::fetch_key(ctx, did)?.as_ref(), 505 + &ctx.state.filter.load(), 504 506 )?; 505 507 let repo_state = res.repo_state; 506 508 *ctx.added_blocks += res.blocks_count; ··· 526 528 ) -> Result<RepoProcessResult<'s, 'c>, IngestError> { 527 529 let repo_key = keys::repo_key(&did); 528 530 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 529 - // we don't know this repo, but we are receiving events for it 530 - // this means we should backfill it before processing its events 531 + let filter = ctx.state.filter.load(); 532 + 533 + if filter.mode == FilterMode::Signal { 534 + let commit = match msg { 535 + SubscribeReposMessage::Commit(c) => c, 536 + _ => return Ok(RepoProcessResult::Syncing(None)), 537 + }; 538 + let touches_signal = commit.ops.iter().any(|op| { 539 + op.path 540 + .split_once('/') 541 + .map(|(col, _)| filter.matches_signal(col)) 542 + .unwrap_or(false) 543 + }); 544 + if !touches_signal { 545 + return Ok(RepoProcessResult::Syncing(None)); 546 + } 547 + } 548 + 531 549 debug!("discovered new account {did} from firehose, queueing backfill"); 532 550 533 551 let repo_state = RepoState::backfilling(rand::rng().next_u64()); 534 - // using a separate batch here since we want to make it known its being backfilled 535 - // immediately. we could use the batch for the unit of work we are doing but 536 - // then we wouldn't be able to start backfilling until the unit of work is done 537 552 let mut batch = ctx.state.db.inner.batch(); 538 553 batch.insert( 539 554 &ctx.state.db.repos,
+1
src/lib.rs
··· 3 3 pub mod config; 4 4 pub mod crawler; 5 5 pub mod db; 6 + pub mod filter; 6 7 pub mod ingest; 7 8 pub mod ops; 8 9 pub mod resolver;
+23 -2
src/main.rs
··· 25 25 info!("{cfg}"); 26 26 27 27 let state = AppState::new(&cfg)?; 28 + 29 + if cfg.full_network { 30 + let filter_ks = state.db.filter.clone(); 31 + let inner = state.db.inner.clone(); 32 + tokio::task::spawn_blocking(move || { 33 + use hydrant::filter::{FilterMode, MODE_KEY}; 34 + let mut batch = inner.batch(); 35 + batch.insert( 36 + &filter_ks, 37 + MODE_KEY, 38 + rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?, 39 + ); 40 + batch.commit().into_diagnostic() 41 + }) 42 + .await 43 + .into_diagnostic()??; 44 + 45 + let new_filter = hydrant::db::filter::load(&state.db.filter)?; 46 + state.filter.store(new_filter.into()); 47 + } 48 + 28 49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 29 50 let state = Arc::new(state); 30 51 ··· 123 144 } 124 145 }); 125 146 126 - if cfg.full_network { 147 + if state.filter.load().mode == hydrant::filter::FilterMode::Full { 127 148 tokio::spawn( 128 149 Crawler::new( 129 150 state.clone(), ··· 158 179 state.clone(), 159 180 buffer_tx, 160 181 cfg.relay_host, 161 - cfg.full_network, 182 + state.filter.clone(), 162 183 matches!(cfg.verify_signatures, SignatureVerification::Full), 163 184 ); 164 185
+7
src/ops.rs
··· 1 1 use crate::db::types::{DbAction, DbRkey, DbTid, TrimmedDid}; 2 2 use crate::db::{self, Db, keys, ser_repo_state}; 3 + use crate::filter::FilterConfig; 3 4 use crate::types::{ 4 5 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 5 6 StoredEvent, ··· 222 223 mut repo_state: RepoState<'s>, 223 224 commit: &'commit Commit<'commit>, 224 225 signing_key: Option<&PublicKey>, 226 + filter: &FilterConfig, 225 227 ) -> Result<ApplyCommitResults<'s>> { 226 228 let did = &commit.repo; 227 229 debug!("applying commit {} for {did}", &commit.commit); ··· 267 269 268 270 for op in &commit.ops { 269 271 let (collection, rkey) = parse_path(&op.path)?; 272 + 273 + if !filter.matches_collection(collection) { 274 + continue; 275 + } 276 + 270 277 let rkey = DbRkey::new(rkey); 271 278 let db_key = keys::record_key(did, collection, &rkey); 272 279
+10 -1
src/state.rs
··· 3 3 use miette::Result; 4 4 use tokio::sync::Notify; 5 5 6 - use crate::{config::Config, db::Db, resolver::Resolver}; 6 + use crate::{ 7 + config::Config, 8 + db::Db, 9 + filter::{FilterHandle, new_handle}, 10 + resolver::Resolver, 11 + }; 7 12 8 13 pub struct AppState { 9 14 pub db: Db, 10 15 pub resolver: Resolver, 16 + pub filter: FilterHandle, 11 17 pub cur_firehose: AtomicI64, 12 18 pub backfill_notify: Notify, 13 19 } ··· 16 22 pub fn new(config: &Config) -> Result<Self> { 17 23 let db = Db::open(config)?; 18 24 let resolver = Resolver::new(config.plc_urls.clone(), config.identity_cache_size); 25 + let filter_config = crate::db::filter::load(&db.filter)?; 26 + let filter = new_handle(filter_config); 19 27 20 28 Ok(Self { 21 29 db, 22 30 resolver, 31 + filter, 23 32 cur_firehose: AtomicI64::new(0), 24 33 backfill_notify: Notify::new(), 25 34 })
+1 -1
tests/authenticated_stream_test.nu
··· 101 101 # 4. add repo to hydrant (backfill trigger) 102 102 print $"adding repo ($did) to tracking..." 103 103 try { 104 - http post -t application/json $"($url)/repo/add" { dids: [($did)] } 104 + http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 105 105 } catch { 106 106 print "warning: failed to add repo (might already be tracked), continuing..." 107 107 }
+1 -1
tests/debug_endpoints.nu
··· 18 18 if (wait-for-api $url) { 19 19 # Trigger backfill to populate some data 20 20 print $"adding repo ($did) to tracking..." 21 - http post -t application/json $"($url)/repo/add" { dids: [($did)] } 21 + http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 22 22 23 23 if (wait-for-backfill $url) { 24 24 print "backfill complete, testing debug endpoints"
+1 -1
tests/repo_sync_integrity.nu
··· 112 112 if (wait-for-api $url) { 113 113 # track the repo via API 114 114 print $"adding repo ($did) to tracking..." 115 - http post -t application/json $"($url)/repo/add" { dids: [($did)] } 115 + http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 116 116 117 117 if (wait-for-backfill $url) { 118 118 # Run both consistency checks
+1 -1
tests/stream_test.nu
··· 31 31 32 32 # trigger backfill 33 33 print $"adding repo ($did) to tracking..." 34 - http post -t application/json $"($url)/repo/add" { dids: [($did)] } 34 + http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 35 35 36 36 if (wait-for-backfill $url) { 37 37 sleep 2sec
+7 -9
tests/throttling_test.nu
··· 96 96 97 97 # remove 4 repos to drop pending (5) to 1 (<= resume limit 1) 98 98 # mock repos are did:web:mock1.com ... mock5.com 99 - let to_remove = { 100 - dids: [ 101 - "did:web:mock1.com", 102 - "did:web:mock2.com", 103 - "did:web:mock3.com", 104 - "did:web:mock4.com" 105 - ] 99 + http patch --content-type application/json $"($url)/filter" { 100 + dids: { 101 + "did:web:mock1.com": false, 102 + "did:web:mock2.com": false, 103 + "did:web:mock3.com": false, 104 + "did:web:mock4.com": false 105 + } 106 106 } 107 - 108 - http post --content-type application/json $"($url)/repo/remove" $to_remove 109 107 110 108 print "waiting for crawler to wake up (max 10s)..." 111 109 sleep 15sec