at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[all] rework the filter api, re-introduce the repos management api

ptr.pet 4fe44f8a 771fa253

verified
+577 -289
+6 -4
AGENTS.md
··· 36 36 Hydrant consists of several components: 37 37 - **[`hydrant::ingest::firehose`]**: Connects to an upstream Firehose (Relay) and filters events. It manages the transition between discovery and synchronization. 38 38 - **[`hydrant::ingest::worker`]**: Processes buffered Firehose messages concurrently using sharded workers. Verifies signatures, updates repository state (handling account status events like deactivations), detects gaps for backfill, and persists records. 39 - - **[`hydrant::crawler`]**: Periodically enumerates the network via `com.atproto.sync.listRepos` to discover new repositories when in full-network mode. 39 + - **[`hydrant::crawler`]**: Periodically enumerates the network via `com.atproto.sync.listRepos` to discover new repositories. In `Full` mode it is enabled by default; in `Filter` mode it is opt-in via `HYDRANT_ENABLE_CRAWLER`. 40 40 - **[`hydrant::resolver`]**: Manages DID resolution and key lookups. Supports multiple PLC directory sources with failover and caching. 41 41 - **[`hydrant::backfill`]**: A dedicated worker that fetches full repository CAR files. Uses LIFO prioritization and adaptive concurrency to manage backfill load efficiently. 42 - - **[`hydrant::api`]**: An Axum-based XRPC server implementing repository read methods (`getRecord`, `listRecords`) and system stats. It also provides a WebSocket event stream and a filter management API (`GET`/`PATCH /filter`) for configuring indexing mode, DID lists, signals, and collection patterns. 43 - - **Persistence worker** (in `src/main.rs`): Manages periodic background flushes of the LSM-tree and cursor state. 42 + - **[`hydrant::api`]**: An Axum-based XRPC server implementing repository read methods (`getRecord`, `listRecords`) and system stats. It also provides a WebSocket event stream and management APIs: 43 + - `/filter` (`GET`/`PATCH`): Configure indexing mode, signals, and collection patterns. 44 + - `/repos` (`GET`/`PUT`/`DELETE`): Bulk repository management using NDJSON or JSON arrays. 45 + - Persistence worker (in `src/main.rs`): Manages periodic background flushes of the LSM-tree and cursor state. 44 46 45 47 ### Lazy event inflation 46 48 ··· 82 84 - `resync`: Maps `{DID}` -> `ResyncState` (MessagePack) for retry logic/tombstones. 83 85 - `resync_buffer`: Maps `{DID}|{Rev}` -> `Commit` (MessagePack). Used to buffer live events during backfill. 84 86 - `counts`: Maps `k|{NAME}` or `r|{DID}|{COL}` -> `Count` (u64 BE Bytes). 85 - - `filter`: Stores filter config: mode key `m` -> `FilterMode` (MessagePack), and set entries for DIDs (`d|{DID}`), signals (`s|{NSID}`), collections (`c|{NSID}`), and excludes (`x|{DID}`) -> empty value. 87 + - `filter`: Stores filter config. Handled by the `db::filter` module. Includes mode key `m` -> `FilterMode` (MessagePack), and set entries for signals (`s|{NSID}`), collections (`c|{NSID}`), and excludes (`x|{DID}`) -> empty value. 86 88 87 89 ## Safe commands 88 90
+15 -10
README.md
··· 40 40 | `ENABLE_DEBUG` | `false` | enable debug endpoints. | 41 41 | `DEBUG_PORT` | `3001` | port for debug endpoints (if enabled). | 42 42 | `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. | 43 - | `DISABLE_FIREHOSE` | `false` | disable firehose ingestion. | 44 - | `DISABLE_BACKFILL` | `false` | disable backfill processing. | 43 + | `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. | 44 + | `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. | 45 + | `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. | 45 46 | `DB_WORKER_THREADS` | `4` (`8` if full network) | database worker threads. | 46 47 | `DB_MAX_JOURNALING_SIZE_MB` | `512` (`1024` if full network) | max database journaling size in MB. | 47 48 | `DB_PENDING_MEMTABLE_SIZE_MB` | `64` (`192` if full network) | pending memtable size in MB. | ··· 65 66 66 67 | mode | behaviour | 67 68 | :--- | :--- | 68 - | `dids` | only index repositories explicitly listed in `dids`. new accounts seen on the firehose are ignored unless they are in the list. | 69 - | `signal` | like `dids`, but also auto-discovers and backfills any account whose firehose commit touches a collection matching one of the `signals` patterns. | 70 - | `full` | index the entire network. `dids` and `signals` are ignored for discovery, but `excludes` and `collections` still apply. | 69 + | `filter` | auto-discovers and backfills any account whose firehose commit touches a collection matching one of the `signals` patterns. you can also explicitly track individual repositories via the `/repos` endpoint regardless of matching signals. | 70 + | `full` | index the entire network. `signals` are ignored for discovery, but `excludes` and `collections` still apply. | 71 71 72 72 #### fields 73 73 74 74 | field | type | description | 75 75 | :--- | :--- | :--- | 76 - | `mode` | `"dids"` \| `"signal"` \| `"full"` | indexing mode (see above). | 77 - | `dids` | set update | set of DIDs to explicitly track. in `dids` and `signal` modes, always processed regardless of signal matching. adding an untracked DID enqueues a backfill. | 78 - | `signals` | set update | NSID patterns (e.g. `app.bsky.feed.post` or `app.bsky.*`) that trigger auto-discovery in `signal` mode. | 76 + | `mode` | `"filter"` \| `"full"` | indexing mode (see above). | 77 + | `signals` | set update | NSID patterns (e.g. `app.bsky.feed.post` or `app.bsky.*`) that trigger auto-discovery in `filter` mode. | 79 78 | `collections` | set update | NSID patterns used to filter which records are stored. if empty, all collections are stored. applies in all modes. | 80 79 | `excludes` | set update | set of DIDs to always skip, regardless of mode. checked before any other filter logic. | 81 80 ··· 93 92 - `app.bsky.feed.post` — exact match only 94 93 - `app.bsky.feed.*` — matches any collection under `app.bsky.feed` 95 94 95 + ### repository management 96 + 97 + - `GET /repos`: get an NDJSON stream of all repositories and their sync status. 98 + - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 99 + - `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database. 100 + 96 101 ### data access (xrpc) 97 102 98 103 `hydrant` implements the following XRPC endpoints under `/xrpc/`: 99 104 100 105 #### `com.atproto.repo.getRecord` 101 106 102 - retrieve a single record by its AT-URI components. 107 + retrieve a single record by its AT URI components. 103 108 104 109 | param | required | description | 105 110 | :--- | :--- | :--- | ··· 107 112 | `collection` | yes | NSID of the collection. | 108 113 | `rkey` | yes | record key. | 109 114 110 - returns the record value, its CID, and its AT-URI. responds with `RecordNotFound` if not present. 115 + returns the record value, its CID, and its AT URI. responds with `RecordNotFound` if not present. 111 116 112 117 #### `com.atproto.repo.listRecords` 113 118
+7 -66
src/api/filter.rs
··· 1 1 use std::sync::Arc; 2 2 3 + use crate::api::AppState; 4 + use crate::db; 5 + use crate::db::filter::EXCLUDE_PREFIX; 6 + use crate::filter::{FilterMode, SetUpdate}; 3 7 use axum::{ 4 8 Json, Router, 5 9 extract::State, ··· 7 11 routing::{get, patch}, 8 12 }; 9 13 use miette::IntoDiagnostic; 10 - use rand::Rng; 11 14 use serde::{Deserialize, Serialize}; 12 - 13 - use crate::api::AppState; 14 - use crate::db::{self, keys, ser_repo_state}; 15 - use crate::filter::{DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, SetUpdate}; 16 - use crate::types::{GaugeState, RepoState}; 17 15 18 16 pub fn router() -> Router<Arc<AppState>> { 19 17 Router::new() ··· 24 22 #[derive(Serialize)] 25 23 pub struct FilterResponse { 26 24 pub mode: FilterMode, 27 - pub dids: Vec<String>, 28 25 pub signals: Vec<String>, 29 26 pub collections: Vec<String>, 30 27 pub excludes: Vec<String>, ··· 35 32 ) -> Result<Json<FilterResponse>, (StatusCode, String)> { 36 33 let filter_ks = state.db.filter.clone(); 37 34 let resp = tokio::task::spawn_blocking(move || { 38 - let hot = FilterConfig::load(&filter_ks).map_err(|e| e.to_string())?; 39 - let dids = db::filter::read_set(&filter_ks, DID_PREFIX).map_err(|e| e.to_string())?; 35 + let hot = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 40 36 let excludes = 41 37 db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?; 42 38 Ok::<_, String>(FilterResponse { 43 39 mode: hot.mode, 44 - dids, 45 40 signals: hot.signals.iter().map(|s| s.to_string()).collect(), 46 41 collections: hot.collections.iter().map(|s| s.to_string()).collect(), 47 42 excludes, ··· 57 52 #[derive(Deserialize)] 58 53 pub struct FilterPatch { 59 54 pub mode: Option<FilterMode>, 60 - pub dids: Option<SetUpdate>, 61 55 pub signals: Option<SetUpdate>, 62 56 pub collections: Option<SetUpdate>, 63 57 pub excludes: Option<SetUpdate>, ··· 69 63 ) -> Result<StatusCode, (StatusCode, String)> { 70 64 let db = &state.db; 71 65 72 - let new_dids: Option<Vec<String>> = match &patch.dids { 73 - Some(SetUpdate::Set(dids)) => Some(dids.clone()), 74 - Some(SetUpdate::Patch(map)) => { 75 - let added: Vec<String> = map 76 - .iter() 77 - .filter(|(_, add)| **add) 78 - .map(|(d, _)| d.clone()) 79 - .collect(); 80 - (!added.is_empty()).then_some(added) 81 - } 82 - None => None, 83 - }; 84 - 85 66 let filter_ks = db.filter.clone(); 86 - let repos_ks = db.repos.clone(); 87 - let pending_ks = db.pending.clone(); 88 67 let inner = db.inner.clone(); 89 68 90 69 let patch_mode = patch.mode; 91 - let patch_dids = patch.dids; 92 70 let patch_signals = patch.signals; 93 71 let patch_collections = patch.collections; 94 72 let patch_excludes = patch.excludes; 95 73 96 - let (new_repo_count, new_filter) = tokio::task::spawn_blocking(move || { 74 + let new_filter = tokio::task::spawn_blocking(move || { 97 75 let mut batch = inner.batch(); 98 76 99 77 db::filter::apply_patch( 100 78 &mut batch, 101 79 &filter_ks, 102 80 patch_mode, 103 - patch_dids, 104 81 patch_signals, 105 82 patch_collections, 106 83 patch_excludes, 107 84 ) 108 85 .map_err(|e| e.to_string())?; 109 86 110 - let mut added = 0i64; 111 - 112 - if let Some(dids) = new_dids { 113 - for did_str in &dids { 114 - let did = 115 - jacquard::types::did::Did::new_owned(did_str).map_err(|e| e.to_string())?; 116 - let did_key = keys::repo_key(&did); 117 - let exists = repos_ks 118 - .contains_key(&did_key) 119 - .into_diagnostic() 120 - .map_err(|e| e.to_string())?; 121 - if !exists { 122 - let repo_state = RepoState::backfilling(rand::rng().next_u64()); 123 - let bytes = ser_repo_state(&repo_state).map_err(|e| e.to_string())?; 124 - batch.insert(&repos_ks, &did_key, bytes); 125 - batch.insert( 126 - &pending_ks, 127 - keys::pending_key(repo_state.index_id), 128 - &did_key, 129 - ); 130 - added += 1; 131 - } 132 - } 133 - } 134 - 135 87 batch 136 88 .commit() 137 89 .into_diagnostic() 138 90 .map_err(|e| e.to_string())?; 139 91 140 92 let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 141 - Ok::<_, String>((added, new_filter)) 93 + Ok::<_, String>(new_filter) 142 94 }) 143 95 .await 144 96 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 145 97 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 146 98 147 99 state.filter.store(Arc::new(new_filter)); 148 - 149 - if new_repo_count > 0 { 150 - state.db.update_count_async("repos", new_repo_count).await; 151 - for _ in 0..new_repo_count { 152 - state 153 - .db 154 - .update_gauge_diff_async(&GaugeState::Synced, &GaugeState::Pending) 155 - .await; 156 - } 157 - state.notify_backfill(); 158 - } 159 100 160 101 Ok(StatusCode::OK) 161 102 }
+2
src/api/mod.rs
··· 8 8 9 9 mod debug; 10 10 pub mod filter; 11 + pub mod repos; // Added this line 11 12 pub mod stats; 12 13 mod stream; 13 14 pub mod xrpc; ··· 21 22 .route("/stream", get(stream::handle_stream)) 22 23 .merge(xrpc::router()) 23 24 .merge(filter::router()) 25 + .merge(repos::router()) // Added this line 24 26 .with_state(state) 25 27 .layer(TraceLayer::new_for_http()) 26 28 .layer(CorsLayer::permissive());
+275
src/api/repos.rs
··· 1 + use std::sync::Arc; 2 + 3 + use axum::{ 4 + Router, 5 + body::Body, 6 + extract::{Query, State}, 7 + http::{StatusCode, header}, 8 + response::{IntoResponse, Response}, 9 + routing::{delete, get, put}, 10 + }; 11 + use jacquard::IntoStatic; 12 + use miette::IntoDiagnostic; 13 + use rand::Rng; 14 + use serde::{Deserialize, Serialize}; 15 + 16 + use crate::api::AppState; 17 + use crate::db::{keys, ser_repo_state}; 18 + use crate::types::{GaugeState, RepoState}; 19 + 20 + pub fn router() -> Router<Arc<AppState>> { 21 + Router::new() 22 + .route("/repos", get(handle_get_repos)) 23 + .route("/repos", put(handle_put_repos)) 24 + .route("/repos", delete(handle_delete_repos)) 25 + } 26 + 27 + #[derive(Deserialize, Debug)] 28 + pub struct RepoRequest { 29 + pub did: String, 30 + #[serde(skip_serializing_if = "Option::is_none", rename = "deleteData")] 31 + pub delete_data: Option<bool>, 32 + } 33 + 34 + #[derive(Serialize, Debug)] 35 + pub struct RepoResponse { 36 + pub did: String, 37 + pub status: String, 38 + pub tracked: bool, 39 + #[serde(skip_serializing_if = "Option::is_none")] 40 + pub rev: Option<String>, 41 + pub last_updated_at: i64, 42 + } 43 + 44 + #[derive(Deserialize)] 45 + pub struct DeleteParams { 46 + #[serde(default)] 47 + pub delete_data: bool, 48 + } 49 + 50 + pub async fn handle_get_repos( 51 + State(state): State<Arc<AppState>>, 52 + ) -> Result<Response, (StatusCode, String)> { 53 + let repos_ks = state.db.repos.clone(); 54 + 55 + let stream = futures::stream::iter(repos_ks.prefix(&[]).filter_map(|item| { 56 + let (k, v) = item.into_inner().ok()?; 57 + let did_str = std::str::from_utf8(&k[2..]).ok()?; 58 + let repo_state = crate::db::deser_repo_state(&v).ok()?; 59 + 60 + let response = RepoResponse { 61 + did: did_str.to_string(), 62 + status: repo_state.status.to_string(), 63 + tracked: repo_state.tracked, 64 + rev: repo_state.rev.as_ref().map(|r| r.to_string()), 65 + last_updated_at: repo_state.last_updated_at, 66 + }; 67 + 68 + let json = serde_json::to_string(&response).ok()?; 69 + Some(Ok::<_, std::io::Error>(format!("{json}\n"))) 70 + })); 71 + 72 + let body = Body::from_stream(stream); 73 + 74 + Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) 75 + } 76 + 77 + pub async fn handle_put_repos( 78 + State(state): State<Arc<AppState>>, 79 + req: axum::extract::Request, 80 + ) -> Result<StatusCode, (StatusCode, String)> { 81 + let items = parse_body(req).await?; 82 + 83 + let state_task = state.clone(); 84 + let (new_repo_count, gauge_transitions) = tokio::task::spawn_blocking(move || { 85 + let db = &state_task.db; 86 + let mut batch = db.inner.batch(); 87 + let mut added = 0i64; 88 + let mut gauge_transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 89 + 90 + for item in items { 91 + let did = match jacquard::types::did::Did::new_owned(&item.did) { 92 + Ok(d) => d, 93 + Err(_) => continue, 94 + }; 95 + let did_key = keys::repo_key(&did); 96 + 97 + let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 98 + crate::db::deser_repo_state(&bytes) 99 + .ok() 100 + .map(|s| s.into_static()) 101 + } else { 102 + None 103 + }; 104 + 105 + if let Some(mut repo_state) = existing_state { 106 + if !repo_state.tracked { 107 + let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 108 + let old_gauge = 109 + crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 110 + 111 + repo_state.tracked = true; 112 + // re-enqueue into pending 113 + if let Ok(bytes) = ser_repo_state(&repo_state) { 114 + batch.insert(&db.repos, &did_key, bytes); 115 + } 116 + batch.insert( 117 + &db.pending, 118 + keys::pending_key(repo_state.index_id), 119 + &did_key, 120 + ); 121 + batch.remove(&db.resync, &did_key); 122 + gauge_transitions.push((old_gauge, GaugeState::Pending)); 123 + } 124 + } else { 125 + let repo_state = RepoState::backfilling(rand::rng().next_u64()); 126 + if let Ok(bytes) = ser_repo_state(&repo_state) { 127 + batch.insert(&db.repos, &did_key, bytes); 128 + } 129 + batch.insert( 130 + &db.pending, 131 + keys::pending_key(repo_state.index_id), 132 + &did_key, 133 + ); 134 + added += 1; 135 + gauge_transitions.push((GaugeState::Synced, GaugeState::Pending)); // pseudo-transition to just inc pending 136 + } 137 + } 138 + 139 + batch 140 + .commit() 141 + .into_diagnostic() 142 + .map_err(|e| e.to_string())?; 143 + Ok::<_, String>((added, gauge_transitions)) 144 + }) 145 + .await 146 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 147 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 148 + 149 + if new_repo_count > 0 { 150 + state.db.update_count_async("repos", new_repo_count).await; 151 + } 152 + for (old, new) in gauge_transitions { 153 + state.db.update_gauge_diff_async(&old, &new).await; 154 + } 155 + 156 + // Always notify backfill if anything was added to pending! 157 + state.notify_backfill(); 158 + 159 + Ok(StatusCode::OK) 160 + } 161 + 162 + pub async fn handle_delete_repos( 163 + State(state): State<Arc<AppState>>, 164 + Query(params): Query<DeleteParams>, 165 + req: axum::extract::Request, 166 + ) -> Result<StatusCode, (StatusCode, String)> { 167 + let items = parse_body(req).await?; 168 + 169 + let state_task = state.clone(); 170 + let (deleted_count, gauge_decrements) = tokio::task::spawn_blocking(move || { 171 + let db = &state_task.db; 172 + let mut batch = db.inner.batch(); 173 + let mut deleted_count = 0i64; 174 + let mut gauge_decrements = Vec::new(); 175 + 176 + for item in items { 177 + let did = match jacquard::types::did::Did::new_owned(&item.did) { 178 + Ok(d) => d, 179 + Err(_) => continue, 180 + }; 181 + 182 + let delete_data = item.delete_data.unwrap_or(params.delete_data); 183 + let did_key = keys::repo_key(&did); 184 + 185 + let existing_state = if let Ok(Some(bytes)) = db.repos.get(&did_key) { 186 + crate::db::deser_repo_state(&bytes) 187 + .ok() 188 + .map(|s| s.into_static()) 189 + } else { 190 + None 191 + }; 192 + 193 + if let Some(mut repo_state) = existing_state { 194 + let resync_bytes_opt = db.resync.get(&did_key).ok().flatten(); 195 + let old_gauge = 196 + crate::db::Db::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()); 197 + 198 + if delete_data { 199 + if crate::ops::delete_repo(&mut batch, db, &did, &repo_state).is_ok() { 200 + deleted_count += 1; 201 + if old_gauge != GaugeState::Synced { 202 + gauge_decrements.push(old_gauge); 203 + } 204 + } else { 205 + tracing::error!("failed to apply delete_repo_batch to {}", did); 206 + } 207 + } else if repo_state.tracked { 208 + repo_state.tracked = false; 209 + if let Ok(bytes) = ser_repo_state(&repo_state) { 210 + batch.insert(&db.repos, &did_key, bytes); 211 + } 212 + batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 213 + batch.remove(&db.resync, &did_key); 214 + if old_gauge != GaugeState::Synced { 215 + gauge_decrements.push(old_gauge); 216 + } 217 + } 218 + } 219 + } 220 + 221 + batch 222 + .commit() 223 + .into_diagnostic() 224 + .map_err(|e| e.to_string())?; 225 + 226 + Ok::<_, String>((deleted_count, gauge_decrements)) 227 + }) 228 + .await 229 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 230 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 231 + 232 + if deleted_count > 0 { 233 + state.db.update_count_async("repos", -deleted_count).await; 234 + } 235 + for gauge in gauge_decrements { 236 + state 237 + .db 238 + .update_gauge_diff_async(&gauge, &GaugeState::Synced) 239 + .await; 240 + } 241 + 242 + Ok(StatusCode::OK) 243 + } 244 + 245 + async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 246 + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 247 + .await 248 + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 249 + 250 + let text = 251 + std::str::from_utf8(&body_bytes).map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; 252 + 253 + let trimmed = text.trim(); 254 + if trimmed.starts_with('[') { 255 + serde_json::from_str::<Vec<RepoRequest>>(trimmed).map_err(|e| { 256 + ( 257 + StatusCode::BAD_REQUEST, 258 + format!("invalid JSON array: {}", e), 259 + ) 260 + }) 261 + } else { 262 + trimmed 263 + .lines() 264 + .filter(|l| !l.trim().is_empty()) 265 + .map(|line| { 266 + serde_json::from_str::<RepoRequest>(line).map_err(|e| { 267 + ( 268 + StatusCode::BAD_REQUEST, 269 + format!("invalid NDJSON line: {}", e), 270 + ) 271 + }) 272 + }) 273 + .collect() 274 + } 275 + }
+13 -30
src/backfill/mod.rs
··· 5 5 use crate::resolver::ResolverError; 6 6 use crate::state::AppState; 7 7 use crate::types::{ 8 - AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncState, StoredEvent, 8 + AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState, 9 + StoredEvent, 9 10 }; 10 11 11 12 use fjall::Slice; ··· 164 165 let db = &state.db; 165 166 166 167 match process_did(&state, &http, &did, verify_signatures).await { 167 - Ok(Some(previous_state)) => { 168 + Ok(Some(repo_state)) => { 168 169 let did_key = keys::repo_key(&did); 169 170 170 171 // determine old gauge state 171 172 // if it was error/suspended etc, we need to know which error kind it was to decrement correctly. 172 - // we have to peek at the resync state. `previous_state` is the repo state, which tells us the Status. 173 - let old_gauge = match previous_state.status { 174 - RepoStatus::Backfilling => GaugeState::Pending, 175 - RepoStatus::Error(_) 176 - | RepoStatus::Deactivated 177 - | RepoStatus::Takendown 178 - | RepoStatus::Suspended => { 179 - // we need to fetch the resync state to know the kind 180 - // if it's missing, we assume Generic (or handle gracefully) 181 - // this is an extra read, but necessary for accurate gauges. 182 - let resync_state = Db::get(db.resync.clone(), &did_key).await.ok().flatten(); 183 - let kind = resync_state.and_then(|b| { 184 - rmp_serde::from_slice::<ResyncState>(&b) 185 - .ok() 186 - .and_then(|s| match s { 187 - ResyncState::Error { kind, .. } => Some(kind), 188 - _ => None, 189 - }) 190 - }); 191 - GaugeState::Resync(kind) 192 - } 193 - RepoStatus::Synced => GaugeState::Synced, 194 - }; 173 + // we have to peek at the resync state. 174 + let old_gauge = state.db.repo_gauge_state_async(&repo_state, &did_key).await; 195 175 196 176 let mut batch = db.inner.batch(); 197 177 // remove from pending ··· 248 228 } 249 229 250 230 let error_kind = match &e { 251 - BackfillError::Ratelimited => crate::types::ResyncErrorKind::Ratelimited, 252 - BackfillError::Transport(_) => crate::types::ResyncErrorKind::Transport, 253 - BackfillError::Generic(_) => crate::types::ResyncErrorKind::Generic, 231 + BackfillError::Ratelimited => ResyncErrorKind::Ratelimited, 232 + BackfillError::Transport(_) => ResyncErrorKind::Transport, 233 + BackfillError::Generic(_) => ResyncErrorKind::Generic, 254 234 }; 255 235 256 236 let did_key = keys::repo_key(&did); ··· 432 412 if matches!(e, GetRepoError::RepoNotFound(_)) { 433 413 warn!("repo {did} not found, deleting"); 434 414 let mut batch = db.inner.batch(); 435 - ops::delete_repo(&mut batch, db, did, state)?; 415 + if let Err(e) = crate::ops::delete_repo(&mut batch, db, did, &state) { 416 + tracing::error!("failed to wipe repo during backfill: {e}"); 417 + } 436 418 batch.commit().into_diagnostic()?; 437 419 return Ok(Some(previous_state)); // stop backfill 438 420 } ··· 569 551 existing_cids.insert((collection.into(), rkey), cid); 570 552 } 571 553 572 - let mut signal_seen = filter.mode != FilterMode::Signal; 554 + let mut signal_seen = filter.mode == FilterMode::Full || state.tracked; 555 + 573 556 debug!( 574 557 "backfilling {did}: signal_seen initial={signal_seen}, mode={:?}, signals={:?}", 575 558 filter.mode, filter.signals
+11 -6
src/config.rs
··· 52 52 pub enable_debug: bool, 53 53 pub verify_signatures: SignatureVerification, 54 54 pub identity_cache_size: u64, 55 - pub disable_firehose: bool, 56 - pub disable_backfill: bool, 55 + pub enable_firehose: bool, 56 + pub enable_backfill: bool, 57 + pub enable_crawler: Option<bool>, 57 58 pub firehose_workers: usize, 58 59 pub db_worker_threads: usize, 59 60 pub db_max_journaling_size_mb: u64, ··· 116 117 let debug_port = cfg!("DEBUG_PORT", 3001u16); 117 118 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 118 119 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 119 - let disable_firehose = cfg!("DISABLE_FIREHOSE", false); 120 - let disable_backfill = cfg!("DISABLE_BACKFILL", false); 120 + let enable_firehose = cfg!("ENABLE_FIREHOSE", true); 121 + let enable_backfill = cfg!("ENABLE_BACKFILL", true); 122 + let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 123 + .ok() 124 + .and_then(|s| s.parse().ok()); 121 125 122 126 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 123 127 let firehose_workers = cfg!( ··· 168 172 enable_debug, 169 173 verify_signatures, 170 174 identity_cache_size, 171 - disable_firehose, 172 - disable_backfill, 175 + enable_firehose, 176 + enable_backfill, 177 + enable_crawler, 173 178 firehose_workers, 174 179 db_worker_threads, 175 180 db_max_journaling_size_mb,
+4 -13
src/crawler/mod.rs
··· 1 1 use crate::db::{Db, keys, ser_repo_state}; 2 - use crate::filter::FilterMode; 3 2 use crate::state::AppState; 4 3 use crate::types::RepoState; 5 4 use jacquard::api::com_atproto::sync::list_repos::{ListRepos, ListReposOutput}; ··· 137 136 let mut batch = db.inner.batch(); 138 137 let mut to_queue = Vec::new(); 139 138 140 - let filter = self.state.filter.load(); 141 - 142 139 // 3. process repos 143 140 for repo in output.repos { 144 141 let did_key = keys::repo_key(&repo.did); 145 142 146 - let excl_key = 147 - crate::filter::filter_key(crate::filter::EXCLUDE_PREFIX, repo.did.as_str()); 143 + let excl_key = crate::db::filter::filter_key( 144 + crate::db::filter::EXCLUDE_PREFIX, 145 + repo.did.as_str(), 146 + ); 148 147 if db.filter.contains_key(&excl_key).into_diagnostic()? { 149 148 continue; 150 - } 151 - 152 - if filter.mode != FilterMode::Full { 153 - let did_filter_key = 154 - crate::filter::filter_key(crate::filter::DID_PREFIX, repo.did.as_str()); 155 - if !db.filter.contains_key(&did_filter_key).into_diagnostic()? { 156 - continue; 157 - } 158 149 } 159 150 160 151 // check if known
+39 -7
src/db/filter.rs
··· 1 1 use fjall::{Keyspace, OwnedWriteBatch}; 2 2 use miette::{IntoDiagnostic, Result}; 3 3 4 - use crate::filter::{ 5 - COLLECTION_PREFIX, DID_PREFIX, EXCLUDE_PREFIX, FilterConfig, FilterMode, MODE_KEY, SEP, 6 - SIGNAL_PREFIX, SetUpdate, filter_key, 7 - }; 4 + use crate::filter::{FilterConfig, FilterMode, SetUpdate}; 5 + 6 + pub const MODE_KEY: &[u8] = b"m"; 7 + pub const SIGNAL_PREFIX: u8 = b's'; 8 + pub const COLLECTION_PREFIX: u8 = b'c'; 9 + pub const EXCLUDE_PREFIX: u8 = b'x'; 10 + pub const SEP: u8 = b'|'; 11 + 12 + pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> { 13 + let mut key = Vec::with_capacity(2 + val.len()); 14 + key.push(prefix); 15 + key.push(SEP); 16 + key.extend_from_slice(val.as_bytes()); 17 + key 18 + } 8 19 9 20 pub fn apply_patch( 10 21 batch: &mut OwnedWriteBatch, 11 22 ks: &Keyspace, 12 23 mode: Option<FilterMode>, 13 - dids: Option<SetUpdate>, 14 24 signals: Option<SetUpdate>, 15 25 collections: Option<SetUpdate>, 16 26 excludes: Option<SetUpdate>, ··· 19 29 batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?); 20 30 } 21 31 22 - apply_set_update(batch, ks, DID_PREFIX, dids)?; 23 32 apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?; 24 33 apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?; 25 34 apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?; ··· 62 71 } 63 72 64 73 pub fn load(ks: &Keyspace) -> Result<FilterConfig> { 65 - FilterConfig::load(ks) 74 + let mode = ks 75 + .get(MODE_KEY) 76 + .into_diagnostic()? 77 + .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 78 + .transpose()? 79 + .unwrap_or(FilterMode::Filter); 80 + 81 + let mut config = FilterConfig::new(mode); 82 + 83 + let signal_prefix = [SIGNAL_PREFIX, SEP]; 84 + for guard in ks.prefix(signal_prefix) { 85 + let (k, _) = guard.into_inner().into_diagnostic()?; 86 + let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 87 + config.signals.push(smol_str::SmolStr::new(val)); 88 + } 89 + 90 + let col_prefix = [COLLECTION_PREFIX, SEP]; 91 + for guard in ks.prefix(col_prefix) { 92 + let (k, _) = guard.into_inner().into_diagnostic()?; 93 + let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 94 + config.collections.push(smol_str::SmolStr::new(val)); 95 + } 96 + 97 + Ok(config) 66 98 } 67 99 68 100 pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> {
+73 -3
src/db/mod.rs
··· 213 213 u64::from_be_bytes(v.as_ref().try_into().unwrap()), 214 214 ); 215 215 } 216 + // ensure critical counts are initialized 217 + for ks_name in ["repos", "pending", "resync"] { 218 + let _ = counts_map 219 + .entry_sync(SmolStr::new(ks_name)) 220 + .or_insert_with(|| { 221 + let ks = match ks_name { 222 + "repos" => &repos, 223 + "pending" => &pending, 224 + "resync" => &resync, 225 + _ => unreachable!(), 226 + }; 227 + ks.iter().count() as u64 228 + }); 229 + } 216 230 217 231 let (event_tx, _) = broadcast::channel(10000); 218 232 ··· 276 290 277 291 pub fn update_count(&self, key: &str, delta: i64) { 278 292 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0); 279 - *entry = (*entry as i64).saturating_add(delta) as u64; 293 + if delta >= 0 { 294 + *entry = entry.saturating_add(delta as u64); 295 + } else { 296 + *entry = entry.saturating_sub(delta.unsigned_abs()); 297 + } 280 298 } 281 299 282 300 pub async fn update_count_async(&self, key: &str, delta: i64) { ··· 285 303 .entry_async(SmolStr::new(key)) 286 304 .await 287 305 .or_insert(0); 288 - *entry = (*entry as i64).saturating_add(delta) as u64; 306 + if delta >= 0 { 307 + *entry = entry.saturating_add(delta as u64); 308 + } else { 309 + *entry = entry.saturating_sub(delta.unsigned_abs()); 310 + } 289 311 } 290 312 291 313 pub async fn get_count(&self, key: &str) -> u64 { ··· 358 380 .await 359 381 .into_diagnostic()? 360 382 } 383 + 384 + pub fn repo_gauge_state( 385 + repo_state: &RepoState, 386 + resync_bytes: Option<&[u8]>, 387 + ) -> crate::types::GaugeState { 388 + match repo_state.status { 389 + crate::types::RepoStatus::Synced => crate::types::GaugeState::Synced, 390 + crate::types::RepoStatus::Backfilling => crate::types::GaugeState::Pending, 391 + crate::types::RepoStatus::Error(_) 392 + | crate::types::RepoStatus::Deactivated 393 + | crate::types::RepoStatus::Takendown 394 + | crate::types::RepoStatus::Suspended => { 395 + if let Some(resync_bytes) = resync_bytes { 396 + if let Ok(crate::types::ResyncState::Error { kind, .. }) = 397 + rmp_serde::from_slice::<crate::types::ResyncState>(resync_bytes) 398 + { 399 + crate::types::GaugeState::Resync(Some(kind)) 400 + } else { 401 + crate::types::GaugeState::Resync(None) 402 + } 403 + } else { 404 + crate::types::GaugeState::Resync(None) 405 + } 406 + } 407 + } 408 + } 409 + 410 + pub async fn repo_gauge_state_async( 411 + &self, 412 + repo_state: &RepoState<'_>, 413 + did_key: &[u8], 414 + ) -> crate::types::GaugeState { 415 + let repo_state = repo_state.clone().into_static(); 416 + let did_key = did_key.to_vec(); 417 + 418 + let db_resync = self.resync.clone(); 419 + 420 + tokio::task::spawn_blocking(move || { 421 + let resync_bytes_opt = db_resync.get(&did_key).ok().flatten(); 422 + Self::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()) 423 + }) 424 + .await 425 + .unwrap_or(crate::types::GaugeState::Resync(None)) 426 + } 361 427 } 362 428 363 429 pub fn set_firehose_cursor(db: &Db, cursor: i64) -> Result<()> { ··· 449 515 }) 450 516 .transpose()? 451 517 .unwrap_or(0); 452 - let new_count = (count as i64).saturating_add(delta) as u64; 518 + let new_count = if delta >= 0 { 519 + count.saturating_add(delta as u64) 520 + } else { 521 + count.saturating_sub(delta.unsigned_abs()) 522 + }; 453 523 batch.insert(&db.counts, key, new_count.to_be_bytes()); 454 524 Ok(()) 455 525 }
+17 -64
src/filter.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + use smol_str::SmolStr; 1 3 use std::sync::Arc; 2 4 3 - use arc_swap::ArcSwap; 4 - use fjall::Keyspace; 5 - use miette::{IntoDiagnostic, Result}; 6 - use serde::{Deserialize, Serialize}; 7 - use smol_str::SmolStr; 5 + pub type FilterHandle = Arc<arc_swap::ArcSwap<FilterConfig>>; 6 + 7 + pub fn new_handle(config: FilterConfig) -> FilterHandle { 8 + Arc::new(arc_swap::ArcSwap::new(Arc::new(config))) 9 + } 8 10 9 - pub const MODE_KEY: &[u8] = b"m"; 10 - pub const DID_PREFIX: u8 = b'd'; 11 - pub const SIGNAL_PREFIX: u8 = b's'; 12 - pub const COLLECTION_PREFIX: u8 = b'c'; 13 - pub const EXCLUDE_PREFIX: u8 = b'x'; 14 - pub const SEP: u8 = b'|'; 11 + /// apply a bool patch or set replacement for a single set update. 12 + #[derive(Debug, Clone, Serialize, Deserialize)] 13 + #[serde(untagged)] 14 + pub enum SetUpdate { 15 + /// replace the entire set with this list 16 + Set(Vec<String>), 17 + /// patch: true = add, false = remove 18 + Patch(std::collections::HashMap<String, bool>), 19 + } 15 20 16 21 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 17 22 #[serde(rename_all = "snake_case")] 18 23 pub enum FilterMode { 19 - Dids = 0, 20 - Signal = 1, 24 + Filter = 0, 21 25 Full = 2, 22 26 } 23 27 ··· 39 43 } 40 44 } 41 45 42 - pub fn load(ks: &Keyspace) -> Result<Self> { 43 - let mode = ks 44 - .get(MODE_KEY) 45 - .into_diagnostic()? 46 - .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 47 - .transpose()? 48 - .unwrap_or(FilterMode::Dids); 49 - 50 - let mut config = Self::new(mode); 51 - 52 - let signal_prefix = [SIGNAL_PREFIX, SEP]; 53 - for guard in ks.prefix(signal_prefix) { 54 - let (k, _) = guard.into_inner().into_diagnostic()?; 55 - let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 56 - config.signals.push(SmolStr::new(val)); 57 - } 58 - 59 - let col_prefix = [COLLECTION_PREFIX, SEP]; 60 - for guard in ks.prefix(col_prefix) { 61 - let (k, _) = guard.into_inner().into_diagnostic()?; 62 - let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 63 - config.collections.push(SmolStr::new(val)); 64 - } 65 - 66 - Ok(config) 67 - } 68 - 69 46 /// returns true if the collection matches the content filter. 70 47 /// if collections is empty, all collections match. 71 48 pub fn matches_collection(&self, collection: &str) -> bool { ··· 88 65 collection == pattern 89 66 } 90 67 } 91 - 92 - pub type FilterHandle = Arc<ArcSwap<FilterConfig>>; 93 - 94 - pub fn new_handle(config: FilterConfig) -> FilterHandle { 95 - Arc::new(ArcSwap::new(Arc::new(config))) 96 - } 97 - 98 - /// apply a bool patch or set replacement for a single set update. 99 - #[derive(Debug, Deserialize)] 100 - #[serde(untagged)] 101 - pub enum SetUpdate { 102 - /// replace the entire set with this list 103 - Set(Vec<String>), 104 - /// patch: true = add, false = remove 105 - Patch(std::collections::HashMap<String, bool>), 106 - } 107 - 108 - pub fn filter_key(prefix: u8, val: &str) -> Vec<u8> { 109 - let mut key = Vec::with_capacity(2 + val.len()); 110 - key.push(prefix); 111 - key.push(SEP); 112 - key.extend_from_slice(val.as_bytes()); 113 - key 114 - }
+22 -22
src/ingest/firehose.rs
··· 1 - use crate::db::{self, Db, keys}; 1 + use crate::db; 2 2 use crate::filter::{FilterHandle, FilterMode}; 3 3 use crate::ingest::{BufferTx, IngestMessage}; 4 4 use crate::state::AppState; ··· 118 118 async fn should_process(&self, did: &Did<'_>) -> Result<bool> { 119 119 let filter = self.filter.load(); 120 120 121 - let excl_key = crate::filter::filter_key(crate::filter::EXCLUDE_PREFIX, did.as_str()); 121 + let excl_key = 122 + crate::db::filter::filter_key(crate::db::filter::EXCLUDE_PREFIX, did.as_str()); 122 123 if self 123 124 .state 124 125 .db ··· 131 132 132 133 match filter.mode { 133 134 FilterMode::Full => Ok(true), 134 - FilterMode::Dids | FilterMode::Signal => { 135 - let did_key = crate::filter::filter_key(crate::filter::DID_PREFIX, did.as_str()); 136 - if self 137 - .state 138 - .db 139 - .filter 140 - .contains_key(&did_key) 141 - .into_diagnostic()? 142 - { 143 - debug!("{did} is in DID allowlist, processing"); 144 - return Ok(true); 135 + FilterMode::Filter => { 136 + let repo_key = crate::db::keys::repo_key(did); 137 + if let Some(state_bytes) = self.state.db.repos.get(&repo_key).into_diagnostic()? { 138 + let repo_state: crate::types::RepoState = 139 + rmp_serde::from_slice(&state_bytes).into_diagnostic()?; 140 + 141 + if repo_state.tracked { 142 + debug!("{did} is a tracked repo, processing"); 143 + return Ok(true); 144 + } else { 145 + debug!("{did} is known but explicitly untracked, skipping"); 146 + return Ok(false); 147 + } 145 148 } 146 - let known = 147 - Db::contains_key(self.state.db.repos.clone(), keys::repo_key(did)).await?; 148 - if known { 149 - debug!("{did} is a known repo, processing"); 149 + 150 + if !filter.signals.is_empty() { 151 + debug!("{did} is unknown — passing to worker for signal check"); 152 + Ok(true) 150 153 } else { 151 - debug!( 152 - "{did} is unknown — passing to worker for signal check (mode={:?})", 153 - filter.mode 154 - ); 154 + debug!("{did} is unknown and no signals configured, skipping"); 155 + Ok(false) 155 156 } 156 - Ok(known || filter.mode == FilterMode::Signal) 157 157 } 158 158 } 159 159 }
+7 -2
src/ingest/worker.rs
··· 380 380 match &account.status { 381 381 Some(AccountStatus::Deleted) => { 382 382 debug!("account {did} deleted, wiping data"); 383 - ops::delete_repo(ctx.batch, &ctx.state.db, did, repo_state)?; 383 + crate::ops::delete_repo(ctx.batch, &ctx.state.db, did, &repo_state)?; 384 384 return Ok(RepoProcessResult::Deleted); 385 385 } 386 386 status => { ··· 530 530 let Some(state_bytes) = ctx.state.db.repos.get(&repo_key).into_diagnostic()? else { 531 531 let filter = ctx.state.filter.load(); 532 532 533 - if filter.mode == FilterMode::Signal { 533 + if filter.mode == FilterMode::Filter && !filter.signals.is_empty() { 534 534 let commit = match msg { 535 535 SubscribeReposMessage::Commit(c) => c, 536 536 _ => return Ok(RepoProcessResult::Syncing(None)), ··· 582 582 return Ok(RepoProcessResult::Syncing(None)); 583 583 }; 584 584 let mut repo_state = crate::db::deser_repo_state(&state_bytes)?.into_static(); 585 + 586 + if !repo_state.tracked { 587 + debug!("ignoring active status for {did} as it is explicitly untracked"); 588 + return Ok(RepoProcessResult::Syncing(None)); 589 + } 585 590 586 591 // if we are backfilling or it is new, DON'T mark it as synced yet 587 592 // the backfill worker will do that when it finishes
+32 -20
src/main.rs
··· 1 - use futures::{FutureExt, TryFutureExt, future::BoxFuture}; 1 + use futures::{FutureExt, future::BoxFuture}; 2 2 use hydrant::config::{Config, SignatureVerification}; 3 - use hydrant::crawler::Crawler; 4 3 use hydrant::db::{self, set_firehose_cursor}; 5 4 use hydrant::ingest::firehose::FirehoseIngestor; 6 5 use hydrant::state::AppState; ··· 30 29 let filter_ks = state.db.filter.clone(); 31 30 let inner = state.db.inner.clone(); 32 31 tokio::task::spawn_blocking(move || { 33 - use hydrant::filter::{FilterMode, MODE_KEY}; 32 + use hydrant::db::filter::MODE_KEY; 33 + use hydrant::filter::FilterMode; 34 34 let mut batch = inner.batch(); 35 35 batch.insert( 36 36 &filter_ks, ··· 49 49 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 50 50 let state = Arc::new(state); 51 51 52 - if !cfg.disable_backfill { 52 + if cfg.enable_backfill { 53 53 tokio::spawn({ 54 54 let state = state.clone(); 55 55 let timeout = cfg.repo_fetch_timeout; ··· 144 144 } 145 145 }); 146 146 147 - if let hydrant::filter::FilterMode::Full | hydrant::filter::FilterMode::Signal = 148 - state.filter.load().mode 149 - { 150 - tokio::spawn( 151 - Crawler::new( 152 - state.clone(), 153 - cfg.relay_host.clone(), 154 - cfg.crawler_max_pending_repos, 155 - cfg.crawler_resume_pending_repos, 156 - ) 157 - .run() 158 - .inspect_err(|e| { 159 - error!("crawler died: {e}"); 147 + info!("starting crawler ({:?})", state.filter.load().mode); 148 + let state_clone = state.clone(); 149 + let relay_host_clone = cfg.relay_host.clone(); 150 + let crawler_max_pending = cfg.crawler_max_pending_repos; 151 + let crawler_resume_pending = cfg.crawler_resume_pending_repos; 152 + 153 + let should_run_crawler = match cfg.enable_crawler { 154 + Some(true) => true, 155 + Some(false) => false, 156 + None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 157 + }; 158 + 159 + if should_run_crawler { 160 + tokio::spawn(async move { 161 + // the crawler is responsible for finding new repos 162 + let crawler = hydrant::crawler::Crawler::new( 163 + state_clone, 164 + relay_host_clone, 165 + crawler_max_pending, 166 + crawler_resume_pending, 167 + ); 168 + if let Err(e) = crawler.run().await { 169 + error!("crawler error: {e}"); 160 170 db::check_poisoned_report(&e); 161 - }), 162 - ); 171 + } 172 + }); 173 + } else { 174 + info!("crawler disabled by config or filter mode"); 163 175 } 164 176 165 - let mut tasks = if !cfg.disable_firehose { 177 + let mut tasks = if cfg.enable_firehose { 166 178 let firehose_worker = std::thread::spawn({ 167 179 let state = state.clone(); 168 180 let handle = tokio::runtime::Handle::current();
+1 -1
src/ops.rs
··· 67 67 batch: &'batch mut OwnedWriteBatch, 68 68 db: &Db, 69 69 did: &jacquard::types::did::Did, 70 - repo_state: RepoState, 70 + repo_state: &RepoState, 71 71 ) -> Result<()> { 72 72 debug!("deleting repo {did}"); 73 73
+8
src/types.rs
··· 43 43 #[serde(borrow)] 44 44 pub handle: Option<Handle<'i>>, 45 45 pub index_id: u64, 46 + #[serde(default = "default_tracked")] 47 + pub tracked: bool, 48 + } 49 + 50 + fn default_tracked() -> bool { 51 + true 46 52 } 47 53 48 54 impl<'i> RepoState<'i> { ··· 55 61 last_updated_at: chrono::Utc::now().timestamp(), 56 62 handle: None, 57 63 index_id, 64 + tracked: true, 58 65 } 59 66 } 60 67 } ··· 71 78 last_updated_at: self.last_updated_at, 72 79 handle: self.handle.map(|s| s.into_static()), 73 80 index_id: self.index_id, 81 + tracked: self.tracked, 74 82 } 75 83 } 76 84 }
+1 -1
tests/authenticated_stream_test.nu
··· 42 42 # 4. add repo to hydrant (backfill trigger) 43 43 print $"adding repo ($did) to tracking..." 44 44 try { 45 - http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 45 + http put -t application/json $"($url)/repos" [ { did: ($did) } ] 46 46 } catch { 47 47 print "warning: failed to add repo (might already be tracked), continuing..." 48 48 }
+14 -13
tests/common.nu
··· 52 52 # build the hydrant binary 53 53 export def build-hydrant [] { 54 54 print "building hydrant..." 55 - cargo build --release --quiet 55 + cargo build --release 56 56 "target/release/hydrant" 57 57 } 58 58 ··· 61 61 let log_file = $"($db_path)/hydrant.log" 62 62 print $"starting hydrant - logs at ($log_file)..." 63 63 64 - let pid = ( 65 - with-env { 66 - HYDRANT_DATABASE_PATH: ($db_path), 67 - HYDRANT_FULL_NETWORK: "false", 68 - HYDRANT_API_PORT: ($port | into string), 69 - HYDRANT_ENABLE_DEBUG: "true", 70 - HYDRANT_DEBUG_PORT: ($port + 1 | into string), 71 - HYDRANT_LOG_LEVEL: "debug" 72 - } { 73 - sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 74 - } 75 - ) 64 + let hydrant_vars = ($env | transpose k v | where k =~ "HYDRANT_" | reduce -f {} { |it, acc| $acc | upsert $it.k $it.v }) 65 + let env_vars = { 66 + HYDRANT_DATABASE_PATH: ($db_path), 67 + HYDRANT_FULL_NETWORK: "false", 68 + HYDRANT_API_PORT: ($port | into string), 69 + HYDRANT_ENABLE_DEBUG: "true", 70 + HYDRANT_DEBUG_PORT: ($port + 1 | into string), 71 + HYDRANT_LOG_LEVEL: "debug" 72 + } | merge $hydrant_vars 73 + 74 + let pid = (with-env $env_vars { 75 + sh -c $"($binary) >($log_file) 2>&1 & echo $!" | str trim | into int 76 + }) 76 77 77 78 print $"hydrant started with pid: ($pid)" 78 79 { pid: $pid, log: $log_file }
+3 -2
tests/debug_endpoints.nu
··· 18 18 if (wait-for-api $url) { 19 19 # Trigger backfill to populate some data 20 20 print $"adding repo ($did) to tracking..." 21 - http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 21 + http put -t application/json $"($url)/repos" [ { did: ($did) } ] 22 22 23 23 if (wait-for-backfill $url) { 24 24 print "backfill complete, testing debug endpoints" ··· 46 46 47 47 # 2. Test /debug/get with that key (sent as string) 48 48 print "testing /debug/get" 49 - let get_res = http get $"($debug_url)/debug/get?partition=records&key=($key_str)" 49 + let encoded_key = ($key_str | url encode) 50 + let get_res = http get $"($debug_url)/debug/get?partition=records&key=($encoded_key)" 50 51 51 52 if $get_res.value != $value_cid { 52 53 print $"FAILED: /debug/get returned different value. expected: ($value_cid), got: ($get_res.value)"
+1 -1
tests/repo_sync_integrity.nu
··· 112 112 if (wait-for-api $url) { 113 113 # track the repo via API 114 114 print $"adding repo ($did) to tracking..." 115 - http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 115 + http put -t application/json $"($url)/repos" [ { did: ($did) } ] 116 116 117 117 if (wait-for-backfill $url) { 118 118 # Run both consistency checks
+19 -15
tests/signal_filter_test.nu
··· 11 11 exit 1 12 12 } 13 13 14 - let port = 3007 14 + let port = 3011 15 15 let url = $"http://localhost:($port)" 16 16 let db_path = (mktemp -d -t hydrant_signal_test.XXXXXX) 17 - let collection = "app.bsky.feed.post" 17 + 18 + let random_str = (random chars -l 6) 19 + let collection = $"systems.hydrant.test.($random_str)" 18 20 19 21 print $"database path: ($db_path)" 20 22 ··· 26 28 print "authenticated" 27 29 28 30 let binary = build-hydrant 31 + $env.HYDRANT_RELAY_HOST = "wss://bsky.network/" 29 32 let instance = start-hydrant $binary $db_path $port 30 33 31 34 mut test_passed = false 32 35 33 36 if (wait-for-api $url) { 34 - # configure signal mode: index app.bsky.feed.post from anyone on the network 35 - print "configuring signal mode..." 37 + # configure filter mode: index app.bsky.feed.post from anyone on the network 38 + print "configuring filter mode..." 36 39 http patch -t application/json $"($url)/filter" { 37 - mode: "signal", 40 + mode: "filter", 38 41 signals: [$collection] 39 42 } 40 43 ··· 42 45 let filter = (http get $"($url)/filter") 43 46 print $"filter state: ($filter | to json)" 44 47 45 - if $filter.mode != "signal" { 46 - print "FAILED: mode was not set to signal" 48 + if $filter.mode != "filter" { 49 + print "FAILED: mode was not set to filter" 47 50 } else if not ($filter.signals | any { |s| $s == $collection }) { 48 51 print $"FAILED: ($collection) not in signals" 49 52 } else { ··· 52 55 # wait a moment for the firehose to connect and the filter to take effect 53 56 sleep 3sec 54 57 55 - let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 56 - let record_data = { 57 - "$type": $collection, 58 - text: $"hydrant signal filter test ($timestamp)", 59 - createdAt: $timestamp 60 - } 58 + let timestamp = (date now | format date "%Y-%m-%dT%H:%M:%SZ") 59 + let record_data = { 60 + "$type": $collection, 61 + text: $"hydrant signal filter test ($timestamp) - bsky.network relay", 62 + createdAt: $timestamp 63 + } 61 64 62 65 print "creating post..." 63 66 let create_res = (http post -t application/json -H ["Authorization" $"Bearer ($jwt)"] $"($pds_url)/xrpc/com.atproto.repo.createRecord" { 64 67 repo: $did, 65 68 collection: $collection, 69 + validate: false, 66 70 record: $record_data 67 71 }) 68 72 let rkey = ($create_res.uri | split row "/" | last) 69 73 print $"created: ($create_res.uri)" 70 74 71 - # give hydrant time to receive and process the firehose event 72 - sleep 5sec 75 + # give hydrant time to receive and process the firehose event and backfill 76 + sleep 10sec 73 77 74 78 # verify the record was indexed 75 79 print "checking indexed record..."
+1 -1
tests/stream_test.nu
··· 31 31 32 32 # trigger backfill 33 33 print $"adding repo ($did) to tracking..." 34 - http patch -t application/json $"($url)/filter" { dids: { ($did): true } } 34 + http put -t application/json $"($url)/repos" [ { did: ($did) } ] 35 35 36 36 if (wait-for-backfill $url) { 37 37 sleep 2sec
+6 -8
tests/throttling_test.nu
··· 96 96 97 97 # remove 4 repos to drop pending (5) to 1 (<= resume limit 1) 98 98 # mock repos are did:web:mock1.com ... mock5.com 99 - http patch --content-type application/json $"($url)/filter" { 100 - dids: { 101 - "did:web:mock1.com": false, 102 - "did:web:mock2.com": false, 103 - "did:web:mock3.com": false, 104 - "did:web:mock4.com": false 105 - } 106 - } 99 + curl -s -X DELETE -H "Content-Type: application/json" -d '[ 100 + {"did": "did:web:mock1.com"}, 101 + {"did": "did:web:mock2.com"}, 102 + {"did": "did:web:mock3.com"}, 103 + {"did": "did:web:mock4.com"} 104 + ]' $"($url)/repos" 107 105 108 106 print "waiting for crawler to wake up (max 10s)..." 109 107 sleep 15sec