at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[db] tune block sizes per keyspace

ptr.pet a0a9359a d537b646

verified
+69 -11
+69 -11
src/db/mod.rs
··· 1 1 use crate::types::{BroadcastEvent, RepoState}; 2 + use fjall::config::BlockSizePolicy; 2 3 use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice}; 3 4 use jacquard::IntoStatic; 4 5 use jacquard_common::types::string::Did; ··· 8 9 9 10 use std::sync::Arc; 10 11 12 + pub mod filter; 11 13 pub mod keys; 12 14 pub mod types; 13 15 ··· 30 32 pub resync_buffer: Keyspace, 31 33 pub events: Keyspace, 32 34 pub counts: Keyspace, 35 + pub filter: Keyspace, 33 36 pub event_tx: broadcast::Sender<BroadcastEvent>, 34 37 pub next_event_id: Arc<AtomicU64>, 35 38 pub counts_map: HashMap<SmolStr, u64>, ··· 83 86 84 87 impl Db { 85 88 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 89 + const fn kb(v: u32) -> u32 { 90 + v * 1024 91 + } 92 + 86 93 let db = Database::builder(&cfg.database_path) 87 94 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 88 95 .manual_journal_persist(true) ··· 105 112 let repos = open_ks( 106 113 "repos", 107 114 opts() 115 + // most lookups hit since repo must exist after discovery 116 + // we don't hit here if it's not tracked anyway (that happens in filter) 108 117 .expect_point_read_hits(true) 109 - .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024), 118 + .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024) 119 + .data_block_size_policy(BlockSizePolicy::all(kb(4))), 110 120 )?; 111 121 let blocks = open_ks( 112 122 "blocks", 113 123 opts() 114 124 // point reads are used a lot by stream 115 125 .expect_point_read_hits(true) 116 - .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024), 126 + .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024) 127 + // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels 128 + // and any consumers will probably be streaming the newer events... 129 + .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])), 117 130 )?; 118 - let cursors = open_ks("cursors", opts().expect_point_read_hits(true))?; 131 + let records = open_ks( 132 + "records", 133 + // point reads might miss when using getRecord 134 + // but we assume thats not going to be used much... (todo: should be a config option maybe?) 135 + // since this keyspace is big, turning off bloom filters will help a lot 136 + opts() 137 + .expect_point_read_hits(true) 138 + .max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024) 139 + .data_block_size_policy(BlockSizePolicy::all(kb(8))), 140 + )?; 141 + let cursors = open_ks( 142 + "cursors", 143 + opts() 144 + // cursor point reads hit almost 100% of the time 145 + .expect_point_read_hits(true) 146 + .data_block_size_policy(BlockSizePolicy::all(kb(1))), 147 + )?; 119 148 let pending = open_ks( 120 149 "pending", 121 - opts().max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024), 150 + opts() 151 + // iterated over as a queue, no point reads are used so bloom filters are disabled 152 + .expect_point_read_hits(true) 153 + .max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024) 154 + .data_block_size_policy(BlockSizePolicy::all(kb(4))), 122 155 )?; 123 - let resync = open_ks("resync", opts())?; 124 - let resync_buffer = open_ks("resync_buffer", opts())?; 156 + // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits 157 + let resync = open_ks( 158 + "resync", 159 + opts().data_block_size_policy(BlockSizePolicy::all(kb(8))), 160 + )?; 161 + let resync_buffer = open_ks( 162 + "resync_buffer", 163 + opts() 164 + // iterated during backfill, no point reads 165 + .expect_point_read_hits(true) 166 + .data_block_size_policy(BlockSizePolicy::all(kb(32))), 167 + )?; 125 168 let events = open_ks( 126 169 "events", 127 - opts().max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024), 170 + opts() 171 + // only iterators are used here, no point reads 172 + .expect_point_read_hits(true) 173 + .max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024) 174 + .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])), 128 175 )?; 129 - let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 176 + let counts = open_ks( 177 + "counts", 178 + opts() 179 + // count increments hit because counters are mostly pre-initialized 180 + .expect_point_read_hits(true) 181 + // the data is very small 182 + .data_block_size_policy(BlockSizePolicy::all(kb(1))), 183 + )?; 130 184 131 - let records = open_ks( 132 - "records", 133 - opts().max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024), 185 + // filter handles high-volume point reads (checking explicit DID includes and excludes from firehose) 186 + // so it needs the bloom filter 187 + let filter = open_ks( 188 + "filter", 189 + // this can be pretty small since the DIDs wont be compressed that well anyhow 190 + opts().data_block_size_policy(BlockSizePolicy::all(kb(1))), 134 191 )?; 135 192 136 193 let mut last_id = 0; ··· 170 227 resync_buffer, 171 228 events, 172 229 counts, 230 + filter, 173 231 event_tx, 174 232 counts_map, 175 233 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),