at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] implement tuning configuration and conditional defaults

ptr.pet 50d9793b 4ed265ff

verified
+158 -27
+1
.gitignore
··· 4 4 result 5 5 .env 6 6 mock_debug.log 7 + hydrant.log
+7
Cargo.lock
··· 1382 1382 checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 1383 1383 1384 1384 [[package]] 1385 + name = "glob" 1386 + version = "0.3.3" 1387 + source = "registry+https://github.com/rust-lang/crates.io-index" 1388 + checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" 1389 + 1390 + [[package]] 1385 1391 name = "gloo-storage" 1386 1392 version = "0.3.0" 1387 1393 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1659 1665 "data-encoding", 1660 1666 "fjall", 1661 1667 "futures", 1668 + "glob", 1662 1669 "hex", 1663 1670 "humantime", 1664 1671 "jacquard",
+1
Cargo.toml
··· 46 46 smallvec = "2.0.0-alpha.12" 47 47 thiserror = "2.0.18" 48 48 rand = "0.10.0" 49 + glob = "0.3" 49 50 ordermap = { version = "1.1.0", features = ["serde"] }
+96 -2
src/config.rs
··· 1 - use miette::Result; 1 + use miette::{IntoDiagnostic, Result}; 2 2 use smol_str::SmolStr; 3 3 use std::fmt; 4 4 use std::path::PathBuf; ··· 55 55 pub disable_firehose: bool, 56 56 pub disable_backfill: bool, 57 57 pub firehose_workers: usize, 58 + pub db_worker_threads: usize, 59 + pub db_max_journaling_size_mb: u64, 60 + pub db_pending_memtable_size_mb: u64, 61 + pub db_blocks_memtable_size_mb: u64, 62 + pub db_repos_memtable_size_mb: u64, 63 + pub db_events_memtable_size_mb: u64, 64 + pub db_records_default_memtable_size_mb: u64, 65 + pub db_records_partition_overrides: Vec<(glob::Pattern, u64)>, 58 66 } 59 67 60 68 impl Config { ··· 94 102 }) 95 103 .unwrap_or_else(|| Ok(vec![Url::parse("https://plc.wtf").unwrap()]))?; 96 104 97 - let full_network = cfg!("FULL_NETWORK", false); 105 + let full_network: bool = cfg!("FULL_NETWORK", false); 98 106 let backfill_concurrency_limit = cfg!("BACKFILL_CONCURRENCY_LIMIT", 128usize); 99 107 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 10, sec); 100 108 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); ··· 112 120 let disable_backfill = cfg!("DISABLE_BACKFILL", false); 113 121 let firehose_workers = cfg!("FIREHOSE_WORKERS", 64usize); 114 122 123 + let ( 124 + default_db_worker_threads, 125 + default_db_max_journaling_size_mb, 126 + default_db_memtable_size_mb, 127 + default_records_memtable_size_mb, 128 + default_partition_overrides, 129 + ): (usize, u64, u64, u64, &str) = full_network 130 + .then_some((8usize, 2048u64, 192u64, 8u64, "app.bsky.*=64")) 131 + .unwrap_or((4usize, 512u64, 64u64, 8u64, "")); 132 + 133 + let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 134 + let db_max_journaling_size_mb = cfg!( 135 + "DB_MAX_JOURNALING_SIZE_MB", 136 + default_db_max_journaling_size_mb 137 + ); 138 + let db_pending_memtable_size_mb = 139 + cfg!("DB_PENDING_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 140 + let db_blocks_memtable_size_mb = 141 + cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 142 + let db_repos_memtable_size_mb = 143 + cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 144 + let db_events_memtable_size_mb = 145 + cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 146 + let db_records_default_memtable_size_mb = cfg!( 147 + "DB_RECORDS_DEFAULT_MEMTABLE_SIZE_MB", 148 + default_records_memtable_size_mb 149 + ); 150 + 151 + let db_records_partition_overrides: Vec<(glob::Pattern, u64)> = 152 + std::env::var("HYDRANT_DB_RECORDS_PARTITION_OVERRIDES") 153 + .unwrap_or_else(|_| default_partition_overrides.to_string()) 154 + .split(',') 155 + .filter(|s| !s.is_empty()) 156 + .map(|s| { 157 + let mut parts = s.split('='); 158 + let pattern = parts 159 + .next() 160 + .ok_or_else(|| miette::miette!("invalid partition override format"))?; 161 + let size = parts 162 + .next() 163 + .ok_or_else(|| miette::miette!("invalid partition override format"))? 164 + .parse::<u64>() 165 + .into_diagnostic()?; 166 + Ok((glob::Pattern::new(pattern).into_diagnostic()?, size)) 167 + }) 168 + .collect::<Result<Vec<_>>>()?; 169 + 115 170 Ok(Self { 116 171 database_path, 117 172 relay_host, ··· 131 186 disable_firehose, 132 187 disable_backfill, 133 188 firehose_workers, 189 + db_worker_threads, 190 + db_max_journaling_size_mb, 191 + db_pending_memtable_size_mb, 192 + db_blocks_memtable_size_mb, 193 + db_repos_memtable_size_mb, 194 + db_events_memtable_size_mb, 195 + db_records_default_memtable_size_mb, 196 + db_records_partition_overrides: db_records_partition_overrides, 134 197 }) 135 198 } 136 199 } ··· 176 239 )?; 177 240 writeln!(f, " api port: {}", self.api_port)?; 178 241 writeln!(f, " firehose workers: {}", self.firehose_workers)?; 242 + writeln!(f, " db worker threads: {}", self.db_worker_threads)?; 243 + writeln!( 244 + f, 245 + " db journal size: {} mb", 246 + self.db_max_journaling_size_mb 247 + )?; 248 + writeln!( 249 + f, 250 + " db pending memtable: {} mb", 251 + self.db_pending_memtable_size_mb 252 + )?; 253 + writeln!( 254 + f, 255 + " db blocks memtable: {} mb", 256 + self.db_blocks_memtable_size_mb 257 + )?; 258 + writeln!( 259 + f, 260 + " db repos memtable: {} mb", 261 + self.db_repos_memtable_size_mb 262 + )?; 263 + writeln!( 264 + f, 265 + " db events memtable: {} mb", 266 + self.db_events_memtable_size_mb 267 + )?; 268 + writeln!( 269 + f, 270 + " db records def memtable: {} mb", 271 + self.db_records_default_memtable_size_mb 272 + )?; 179 273 writeln!(f, " enable debug: {}", self.enable_debug)?; 180 274 if self.enable_debug { 181 275 writeln!(f, " debug port: {}", self.debug_port)?;
+52 -20
src/db/mod.rs
··· 5 5 use miette::{Context, IntoDiagnostic, Result}; 6 6 use scc::HashMap; 7 7 use smol_str::{SmolStr, format_smolstr}; 8 - use std::path::Path; 8 + 9 9 use std::sync::Arc; 10 10 11 11 pub mod keys; ··· 21 21 KeyspaceCreateOptions::default() 22 22 } 23 23 24 - fn record_partition_opts() -> KeyspaceCreateOptions { 25 - default_opts().max_memtable_size(32 * 2_u64.pow(20)) 26 - } 27 - 28 24 pub struct Db { 29 25 pub inner: Arc<Database>, 30 26 pub repos: Keyspace, ··· 39 35 pub event_tx: broadcast::Sender<BroadcastEvent>, 40 36 pub next_event_id: Arc<AtomicU64>, 41 37 pub counts_map: HashMap<SmolStr, u64>, 38 + pub record_partition_overrides: Vec<(glob::Pattern, u64)>, 39 + pub record_partition_default_size: u64, 42 40 } 43 41 44 42 impl Db { 45 - pub fn open( 46 - path: impl AsRef<Path>, 47 - cache_size: u64, 48 - disable_lz4_compression: bool, 49 - ) -> Result<Self> { 50 - let db = Database::builder(path.as_ref()) 51 - .cache_size(cache_size * 2_u64.pow(20) / 2) 43 + pub fn open(cfg: &crate::config::Config) -> Result<Self> { 44 + let db = Database::builder(&cfg.database_path) 45 + .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 52 46 .manual_journal_persist(true) 53 47 .journal_compression( 54 - disable_lz4_compression 48 + cfg.disable_lz4_compression 55 49 .then_some(fjall::CompressionType::None) 56 50 .unwrap_or(fjall::CompressionType::Lz4), 57 51 ) 52 + .worker_threads(cfg.db_worker_threads) 53 + .max_journaling_size(cfg.db_max_journaling_size_mb * 1024 * 1024) 58 54 .open() 59 55 .into_diagnostic()?; 60 56 let db = Arc::new(db); ··· 64 60 db.keyspace(name, move || opts).into_diagnostic() 65 61 }; 66 62 67 - let repos = open_ks("repos", opts().expect_point_read_hits(true))?; 63 + let repos = open_ks( 64 + "repos", 65 + opts() 66 + .expect_point_read_hits(true) 67 + .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024), 68 + )?; 68 69 let blocks = open_ks( 69 70 "blocks", 70 71 opts() 71 72 // point reads are used a lot by stream 72 73 .expect_point_read_hits(true) 73 - .max_memtable_size(32 * 2_u64.pow(20)), 74 + .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024), 74 75 )?; 75 76 let cursors = open_ks("cursors", opts().expect_point_read_hits(true))?; 76 - let pending = open_ks("pending", opts())?; 77 + let pending = open_ks( 78 + "pending", 79 + opts().max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024), 80 + )?; 77 81 let resync = open_ks("resync", opts())?; 78 82 let resync_buffer = open_ks("resync_buffer", opts())?; 79 - let events = open_ks("events", opts())?; 83 + let events = open_ks( 84 + "events", 85 + opts().max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024), 86 + )?; 80 87 let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 81 88 82 89 let record_partitions = HashMap::new(); ··· 85 92 for name in names { 86 93 let name_str: &str = name.as_ref(); 87 94 if let Some(collection) = name_str.strip_prefix(RECORDS_PARTITION_PREFIX) { 88 - let popts = record_partition_opts(); 89 - let ks = db.keyspace(name_str, move || popts).into_diagnostic()?; 95 + let opts = Self::get_record_partition_opts(cfg, collection); 96 + let ks = db.keyspace(name_str, move || opts).into_diagnostic()?; 90 97 let _ = record_partitions.insert_sync(collection.to_string(), ks); 91 98 } 92 99 } ··· 132 139 event_tx, 133 140 counts_map, 134 141 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), 142 + record_partition_overrides: cfg.db_records_partition_overrides.clone(), 143 + record_partition_default_size: cfg.db_records_default_memtable_size_mb, 135 144 }) 136 145 } 137 146 147 + fn get_record_partition_opts( 148 + cfg: &crate::config::Config, 149 + collection: &str, 150 + ) -> KeyspaceCreateOptions { 151 + let size = cfg 152 + .db_records_partition_overrides 153 + .iter() 154 + .find(|(p, _)| p.matches(collection)) 155 + .map(|(_, s)| *s) 156 + .unwrap_or(cfg.db_records_default_memtable_size_mb); 157 + 158 + default_opts().max_memtable_size(size * 1024 * 1024) 159 + } 160 + 138 161 pub fn record_partition(&self, collection: &str) -> Result<Keyspace> { 139 162 use scc::hash_map::Entry; 140 163 match self.record_partitions.entry_sync(collection.to_string()) { 141 164 Entry::Occupied(o) => Ok(o.get().clone()), 142 165 Entry::Vacant(v) => { 143 166 let name = format_smolstr!("{}{}", RECORDS_PARTITION_PREFIX, collection); 167 + let size = self 168 + .record_partition_overrides 169 + .iter() 170 + .find(|(p, _)| p.matches(collection)) 171 + .map(|(_, s)| *s) 172 + .unwrap_or(self.record_partition_default_size); 173 + 144 174 let ks = self 145 175 .inner 146 - .keyspace(&name, record_partition_opts) 176 + .keyspace(&name, move || { 177 + default_opts().max_memtable_size(size * 1024 * 1024) 178 + }) 147 179 .into_diagnostic()?; 148 180 Ok(v.insert_entry(ks).get().clone()) 149 181 }
+1 -5
src/state.rs
··· 14 14 15 15 impl AppState { 16 16 pub fn new(config: &Config) -> Result<Self> { 17 - let db = Db::open( 18 - &config.database_path, 19 - config.cache_size, 20 - config.disable_lz4_compression, 21 - )?; 17 + let db = Db::open(config)?; 22 18 let resolver = Resolver::new(config.plc_urls.clone(), config.identity_cache_size); 23 19 24 20 Ok(Self {