kind of like tap but different and in rust

[db] consolidate record keyspaces

ptr.pet 25e6d3c7 07946248

verified
+95 -164
+1 -1
AGENTS.md
··· 74 74 75 75 Hydrant uses multiple `fjall` keyspaces: 76 76 - `repos`: Maps `{DID}` -> `RepoState` (MessagePack). 77 - - `records`: Partitioned by collection. Maps `{DID}|{RKey}` -> `{CID}` (Binary). 77 + - `records`: Maps `{DID}|{COL}|{RKey}` -> `{CID}` (Binary). 78 78 - `blocks`: Maps `{CID}` -> `Block Data` (Raw CBOR). 79 79 - `events`: Maps `{ID}` (u64) -> `StoredEvent` (MessagePack). This is the source for the JSON stream API. 80 80 - `cursors`: Maps `firehose_cursor` or `crawler_cursor` -> `Value` (u64/i64 BE Bytes).
+6 -15
src/api/debug.rs
··· 1 + use crate::api::AppState; 1 2 use crate::db::keys; 2 3 use crate::types::{RepoState, ResyncState, StoredEvent}; 3 - use crate::{api::AppState, db::types::TrimmedDid}; 4 4 use axum::{ 5 5 Json, 6 6 extract::{Query, State}, ··· 42 42 .map_err(|_| StatusCode::BAD_REQUEST)?; 43 43 44 44 let db = &state.db; 45 - let ks = db 46 - .record_partition(&req.collection) 47 - .map_err(|_| StatusCode::NOT_FOUND)?; 45 + let ks = db.records.clone(); 48 46 49 - // {TrimmedDid}\x00 50 - let mut prefix = Vec::new(); 51 - TrimmedDid::from(&did).write_to_vec(&mut prefix); 52 - prefix.push(keys::SEP); 47 + // {TrimmedDid}|{collection}| 48 + let prefix = keys::record_prefix_collection(&did, &req.collection); 53 49 54 50 let count = tokio::task::spawn_blocking(move || { 55 51 let start_key = prefix.clone(); ··· 255 251 "resync" => Ok(db.resync.clone()), 256 252 "events" => Ok(db.events.clone()), 257 253 "counts" => Ok(db.counts.clone()), 258 - _ => { 259 - if let Some(col) = name.strip_prefix(crate::db::RECORDS_PARTITION_PREFIX) { 260 - db.record_partition(col).map_err(|_| StatusCode::NOT_FOUND) 261 - } else { 262 - Err(StatusCode::BAD_REQUEST) 263 - } 264 - } 254 + "records" => Ok(db.records.clone()), 255 + _ => Err(StatusCode::BAD_REQUEST), 265 256 } 266 257 }
+8 -10
src/api/xrpc.rs
··· 78 78 .await 79 79 .map_err(|e| bad_request(GetRecord::NSID, e))?; 80 80 81 - let partition = db 82 - .record_partition(req.collection.as_str()) 83 - .map_err(|e| internal_error(GetRecord::NSID, e))?; 84 - 85 - let db_key = keys::record_key(&did, &DbRkey::new(req.rkey.0.as_str())); 81 + let db_key = keys::record_key( 82 + &did, 83 + req.collection.as_str(), 84 + &DbRkey::new(req.rkey.0.as_str()), 85 + ); 86 86 87 - let cid_bytes = Db::get(partition, db_key) 87 + let cid_bytes = Db::get(db.records.clone(), db_key) 88 88 .await 89 89 .map_err(|e| internal_error(GetRecord::NSID, e))?; 90 90 ··· 132 132 .await 133 133 .map_err(|e| bad_request(ListRecords::NSID, e))?; 134 134 135 - let ks = db 136 - .record_partition(req.collection.as_str()) 137 - .map_err(|e| internal_error(ListRecords::NSID, e))?; 135 + let ks = db.records.clone(); 138 136 139 - let prefix = keys::record_prefix(&did); 137 + let prefix = keys::record_prefix_collection(&did, req.collection.as_str()); 140 138 141 139 let limit = req.limit.unwrap_or(50).min(100) as usize; 142 140 let reverse = req.reverse.unwrap_or(false);
+37 -22
src/backfill/mod.rs
··· 29 29 30 30 use crate::ingest::{BufferTx, IngestMessage}; 31 31 32 + trait SliceSplitExt { 33 + fn split_once<'a>(&'a self, delimiter: impl Fn(&u8) -> bool) -> Option<(&'a [u8], &'a [u8])>; 34 + } 35 + 36 + impl SliceSplitExt for [u8] { 37 + fn split_once<'a>(&'a self, delimiter: impl Fn(&u8) -> bool) -> Option<(&'a [u8], &'a [u8])> { 38 + let idx = self.iter().position(delimiter)?; 39 + Some((&self[..idx], &self[idx + 1..])) 40 + } 41 + } 42 + 32 43 struct AdaptiveLimiter { 33 44 current_limit: usize, 34 45 max_limit: usize, ··· 608 619 let mut batch = app_state.db.inner.batch(); 609 620 let store = mst.storage(); 610 621 611 - let prefix = keys::record_prefix(&did); 622 + let prefix = keys::record_prefix_did(&did); 612 623 let mut existing_cids: HashMap<(SmolStr, DbRkey), SmolStr> = HashMap::new(); 613 624 614 - let mut partitions = Vec::new(); 615 - app_state.db.record_partitions.iter_sync(|col, ks| { 616 - partitions.push((col.clone(), ks.clone())); 617 - true 618 - }); 625 + for guard in app_state.db.records.prefix(&prefix) { 626 + let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 627 + // key is did|collection|rkey 628 + // skip did| 629 + let remaining = &key[prefix.len()..]; 630 + let (collection_bytes, rkey_bytes) = 631 + SliceSplitExt::split_once(remaining, |b| *b == keys::SEP) 632 + .ok_or_else(|| miette::miette!("invalid record key format: {key:?}"))?; 619 633 620 - for (col_name, ks) in partitions { 621 - for guard in ks.prefix(&prefix) { 622 - let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 623 - let rkey = keys::parse_rkey(&key[prefix.len()..]) 624 - .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 625 - let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 626 - .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 627 - .to_smolstr(); 634 + let collection = std::str::from_utf8(collection_bytes) 635 + .map_err(|e| miette::miette!("invalid collection utf8: {e}"))?; 628 636 629 - existing_cids.insert((col_name.as_str().into(), rkey), cid); 630 - } 637 + let rkey = keys::parse_rkey(rkey_bytes) 638 + .map_err(|e| miette::miette!("invalid rkey '{key:?}' for {did}: {e}"))?; 639 + 640 + let cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 641 + .map_err(|e| miette::miette!("invalid cid '{cid_bytes:?}' for {did}: {e}"))? 642 + .to_smolstr(); 643 + 644 + existing_cids.insert((collection.into(), rkey), cid); 631 645 } 632 646 633 647 for (key, cid) in leaves { ··· 640 654 let rkey = DbRkey::new(rkey); 641 655 let path = (collection.to_smolstr(), rkey.clone()); 642 656 let cid_obj = Cid::ipld(cid); 643 - let partition = app_state.db.record_partition(collection)?; 644 657 645 658 // check if this record already exists with same CID 646 659 let (action, is_new) = if let Some(existing_cid) = existing_cids.remove(&path) { ··· 654 667 }; 655 668 trace!("{action} {did}/{collection}/{rkey} ({cid})"); 656 669 657 - // Key is just did|rkey 658 - let db_key = keys::record_key(&did, &rkey); 670 + // Key is did|collection|rkey 671 + let db_key = keys::record_key(&did, collection, &rkey); 659 672 660 673 batch.insert(&app_state.db.blocks, cid.to_bytes(), val.as_ref()); 661 - batch.insert(&partition, db_key, cid.to_bytes()); 674 + batch.insert(&app_state.db.records, db_key, cid.to_bytes()); 662 675 663 676 added_blocks += 1; 664 677 if is_new { ··· 686 699 // remove any remaining existing records (they weren't in the new MST) 687 700 for ((collection, rkey), cid) in existing_cids { 688 701 trace!("remove {did}/{collection}/{rkey} ({cid})"); 689 - let partition = app_state.db.record_partition(collection.as_str())?; 690 702 691 - batch.remove(&partition, keys::record_key(&did, &rkey)); 703 + batch.remove( 704 + &app_state.db.records, 705 + keys::record_key(&did, &collection, &rkey), 706 + ); 692 707 693 708 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 694 709 let evt = StoredEvent {
+10 -35
src/config.rs
··· 1 - use miette::{IntoDiagnostic, Result}; 1 + use miette::Result; 2 2 use smol_str::SmolStr; 3 3 use std::fmt; 4 4 use std::path::PathBuf; ··· 61 61 pub db_blocks_memtable_size_mb: u64, 62 62 pub db_repos_memtable_size_mb: u64, 63 63 pub db_events_memtable_size_mb: u64, 64 - pub db_records_default_memtable_size_mb: u64, 65 - pub db_records_partition_overrides: Vec<(glob::Pattern, u64)>, 64 + pub db_records_memtable_size_mb: u64, 66 65 pub crawler_max_pending_repos: usize, 67 66 pub crawler_resume_pending_repos: usize, 68 67 } ··· 126 125 default_db_worker_threads, 127 126 default_db_max_journaling_size_mb, 128 127 default_db_memtable_size_mb, 129 - default_records_memtable_size_mb, 130 - default_partition_overrides, 131 - ): (usize, u64, u64, u64, &str) = full_network 132 - .then_some((8usize, 1024u64, 192u64, 8u64, "app.bsky.*=64")) 133 - .unwrap_or((4usize, 512u64, 64u64, 16u64, "")); 128 + ): (usize, u64, u64) = full_network 129 + .then_some((8usize, 1024u64, 192u64)) 130 + .unwrap_or((4usize, 512u64, 64u64)); 134 131 135 132 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 136 133 let db_max_journaling_size_mb = cfg!( ··· 145 142 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 146 143 let db_events_memtable_size_mb = 147 144 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 148 - let db_records_default_memtable_size_mb = cfg!( 149 - "DB_RECORDS_DEFAULT_MEMTABLE_SIZE_MB", 150 - default_records_memtable_size_mb 151 - ); 145 + let db_records_memtable_size_mb = 146 + cfg!("DB_RECORDS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 152 147 153 148 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 154 149 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 155 150 156 - let db_records_partition_overrides: Vec<(glob::Pattern, u64)> = 157 - std::env::var("HYDRANT_DB_RECORDS_PARTITION_OVERRIDES") 158 - .unwrap_or_else(|_| default_partition_overrides.to_string()) 159 - .split(',') 160 - .filter(|s| !s.is_empty()) 161 - .map(|s| { 162 - let mut parts = s.split('='); 163 - let pattern = parts 164 - .next() 165 - .ok_or_else(|| miette::miette!("invalid partition override format"))?; 166 - let size = parts 167 - .next() 168 - .ok_or_else(|| miette::miette!("invalid partition override format"))? 169 - .parse::<u64>() 170 - .into_diagnostic()?; 171 - Ok((glob::Pattern::new(pattern).into_diagnostic()?, size)) 172 - }) 173 - .collect::<Result<Vec<_>>>()?; 174 - 175 151 Ok(Self { 176 152 database_path, 177 153 relay_host, ··· 197 173 db_blocks_memtable_size_mb, 198 174 db_repos_memtable_size_mb, 199 175 db_events_memtable_size_mb, 200 - db_records_default_memtable_size_mb, 201 - db_records_partition_overrides: db_records_partition_overrides, 176 + db_records_memtable_size_mb, 202 177 crawler_max_pending_repos, 203 178 crawler_resume_pending_repos, 204 179 }) ··· 274 249 )?; 275 250 writeln!( 276 251 f, 277 - " db records def memtable: {} mb", 278 - self.db_records_default_memtable_size_mb 252 + " db records memtable: {} mb", 253 + self.db_records_memtable_size_mb 279 254 )?; 280 255 281 256 writeln!(
+18 -5
src/db/keys.rs
··· 15 15 vec 16 16 } 17 17 18 - // prefix format: {DID}\x00 19 - pub fn record_prefix(did: &Did) -> Vec<u8> { 18 + // prefix format: {DID}| (DID trimmed) 19 + pub fn record_prefix_did(did: &Did) -> Vec<u8> { 20 20 let repo = TrimmedDid::from(did); 21 21 let mut prefix = Vec::with_capacity(repo.len() + 1); 22 22 repo.write_to_vec(&mut prefix); ··· 24 24 prefix 25 25 } 26 26 27 - // key format: {DID}\x00{rkey} 28 - pub fn record_key(did: &Did, rkey: &DbRkey) -> Vec<u8> { 27 + // prefix format: {DID}|{collection}| 28 + pub fn record_prefix_collection(did: &Did, collection: &str) -> Vec<u8> { 29 + let repo = TrimmedDid::from(did); 30 + let mut prefix = Vec::with_capacity(repo.len() + 1 + collection.len() + 1); 31 + repo.write_to_vec(&mut prefix); 32 + prefix.push(SEP); 33 + prefix.extend_from_slice(collection.as_bytes()); 34 + prefix.push(SEP); 35 + prefix 36 + } 37 + 38 + // key format: {DID}|{collection}|{rkey} 39 + pub fn record_key(did: &Did, collection: &str, rkey: &DbRkey) -> Vec<u8> { 29 40 let repo = TrimmedDid::from(did); 30 - let mut key = Vec::with_capacity(repo.len() + rkey.len() + 1); 41 + let mut key = Vec::with_capacity(repo.len() + 1 + collection.len() + 1 + rkey.len() + 1); 31 42 repo.write_to_vec(&mut key); 43 + key.push(SEP); 44 + key.extend_from_slice(collection.as_bytes()); 32 45 key.push(SEP); 33 46 write_rkey(&mut key, rkey); 34 47 key
+7 -59
src/db/mod.rs
··· 4 4 use jacquard_common::types::string::Did; 5 5 use miette::{Context, IntoDiagnostic, Result}; 6 6 use scc::HashMap; 7 - use smol_str::{SmolStr, format_smolstr}; 7 + use smol_str::SmolStr; 8 8 9 9 use std::sync::Arc; 10 10 ··· 14 14 use std::sync::atomic::AtomicU64; 15 15 use tokio::sync::broadcast; 16 16 use tracing::error; 17 - 18 - pub const RECORDS_PARTITION_PREFIX: &str = "r:"; 19 17 20 18 fn default_opts() -> KeyspaceCreateOptions { 21 19 KeyspaceCreateOptions::default() ··· 24 22 pub struct Db { 25 23 pub inner: Arc<Database>, 26 24 pub repos: Keyspace, 27 - pub record_partitions: HashMap<String, Keyspace>, 25 + pub records: Keyspace, 28 26 pub blocks: Keyspace, 29 27 pub cursors: Keyspace, 30 28 pub pending: Keyspace, ··· 35 33 pub event_tx: broadcast::Sender<BroadcastEvent>, 36 34 pub next_event_id: Arc<AtomicU64>, 37 35 pub counts_map: HashMap<SmolStr, u64>, 38 - pub record_partition_overrides: Vec<(glob::Pattern, u64)>, 39 - pub record_partition_default_size: u64, 40 36 } 41 37 42 38 impl Db { ··· 86 82 )?; 87 83 let counts = open_ks("counts", opts().expect_point_read_hits(true))?; 88 84 89 - let record_partitions = HashMap::new(); 90 - { 91 - let names = db.list_keyspace_names(); 92 - for name in names { 93 - let name_str: &str = name.as_ref(); 94 - if let Some(collection) = name_str.strip_prefix(RECORDS_PARTITION_PREFIX) { 95 - let opts = Self::get_record_partition_opts(cfg, collection); 96 - let ks = db.keyspace(name_str, move || opts).into_diagnostic()?; 97 - let _ = record_partitions.insert_sync(collection.to_string(), ks); 98 - } 99 - } 100 - } 85 + let records = open_ks( 86 + "records", 87 + opts().max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024), 88 + )?; 101 89 102 90 let mut last_id = 0; 103 91 if let Some(guard) = events.iter().next_back() { ··· 128 116 Ok(Self { 129 117 inner: db, 130 118 repos, 131 - record_partitions, 119 + records, 132 120 blocks, 133 121 cursors, 134 122 pending, ··· 139 127 event_tx, 140 128 counts_map, 141 129 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), 142 - record_partition_overrides: cfg.db_records_partition_overrides.clone(), 143 - record_partition_default_size: cfg.db_records_default_memtable_size_mb, 144 130 }) 145 - } 146 - 147 - fn get_record_partition_opts( 148 - cfg: &crate::config::Config, 149 - collection: &str, 150 - ) -> KeyspaceCreateOptions { 151 - let size = cfg 152 - .db_records_partition_overrides 153 - .iter() 154 - .find(|(p, _)| p.matches(collection)) 155 - .map(|(_, s)| *s) 156 - .unwrap_or(cfg.db_records_default_memtable_size_mb); 157 - 158 - default_opts().max_memtable_size(size * 1024 * 1024) 159 - } 160 - 161 - pub fn record_partition(&self, collection: &str) -> Result<Keyspace> { 162 - use scc::hash_map::Entry; 163 - match self.record_partitions.entry_sync(collection.to_string()) { 164 - Entry::Occupied(o) => Ok(o.get().clone()), 165 - Entry::Vacant(v) => { 166 - let name = format_smolstr!("{}{}", RECORDS_PARTITION_PREFIX, collection); 167 - let size = self 168 - .record_partition_overrides 169 - .iter() 170 - .find(|(p, _)| p.matches(collection)) 171 - .map(|(_, s)| *s) 172 - .unwrap_or(self.record_partition_default_size); 173 - 174 - let ks = self 175 - .inner 176 - .keyspace(&name, move || { 177 - default_opts().max_memtable_size(size * 1024 * 1024) 178 - }) 179 - .into_diagnostic()?; 180 - Ok(v.insert_entry(ks).get().clone()) 181 - } 182 - } 183 131 } 184 132 185 133 pub fn persist(&self) -> Result<()> {
+8 -17
src/ops.rs
··· 80 80 batch.remove(&db.resync_buffer, k); 81 81 } 82 82 83 - // 2. delete from records (all partitions) 84 - let mut partitions = Vec::new(); 85 - db.record_partitions.iter_sync(|_, v| { 86 - partitions.push(v.clone()); 87 - true 88 - }); 89 - 90 - let records_prefix = keys::record_prefix(did); 91 - for ks in partitions { 92 - for guard in ks.prefix(&records_prefix) { 93 - let k = guard.key().into_diagnostic()?; 94 - batch.remove(&ks, k); 95 - } 83 + // 2. delete from records 84 + let records_prefix = keys::record_prefix_did(did); 85 + for guard in db.records.prefix(&records_prefix) { 86 + let k = guard.key().into_diagnostic()?; 87 + batch.remove(&db.records, k); 96 88 } 97 89 98 90 // 3. reset collection counts ··· 249 241 for op in &commit.ops { 250 242 let (collection, rkey) = parse_path(&op.path)?; 251 243 let rkey = DbRkey::new(rkey); 252 - let partition = db.record_partition(collection)?; 253 - let db_key = keys::record_key(did, &rkey); 244 + let db_key = keys::record_key(did, collection, &rkey); 254 245 255 246 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 256 247 ··· 261 252 continue; 262 253 }; 263 254 batch.insert( 264 - &partition, 255 + &db.records, 265 256 db_key.clone(), 266 257 cid.to_ipld() 267 258 .into_diagnostic() ··· 276 267 } 277 268 } 278 269 DbAction::Delete => { 279 - batch.remove(&partition, db_key); 270 + batch.remove(&db.records, db_key); 280 271 281 272 // accumulate counts 282 273 records_delta -= 1;