kind of like tap but different and in rust
at main 473 lines 16 kB view raw
1use crate::types::{BroadcastEvent, RepoState}; 2use fjall::config::BlockSizePolicy; 3use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice}; 4use jacquard::IntoStatic; 5use jacquard_common::types::string::Did; 6use miette::{Context, IntoDiagnostic, Result}; 7use scc::HashMap; 8use smol_str::SmolStr; 9 10use std::sync::Arc; 11 12pub mod filter; 13pub mod keys; 14pub mod types; 15 16use std::sync::atomic::AtomicU64; 17use tokio::sync::broadcast; 18use tracing::error; 19 20fn default_opts() -> KeyspaceCreateOptions { 21 KeyspaceCreateOptions::default() 22} 23 24pub struct Db { 25 pub inner: Arc<Database>, 26 pub repos: Keyspace, 27 pub records: Keyspace, 28 pub blocks: Keyspace, 29 pub cursors: Keyspace, 30 pub pending: Keyspace, 31 pub resync: Keyspace, 32 pub resync_buffer: Keyspace, 33 pub events: Keyspace, 34 pub counts: Keyspace, 35 pub filter: Keyspace, 36 pub event_tx: broadcast::Sender<BroadcastEvent>, 37 pub next_event_id: Arc<AtomicU64>, 38 pub counts_map: HashMap<SmolStr, u64>, 39} 40 41macro_rules! update_gauge_diff_impl { 42 ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{ 43 use crate::types::GaugeState; 44 45 if $old == $new { 46 return; 47 } 48 49 // pending 50 match ($old, $new) { 51 (GaugeState::Pending, GaugeState::Pending) => {} 52 (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?, 53 (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?, 54 _ => {} 55 } 56 57 // resync 58 let old_resync = $old.is_resync(); 59 let new_resync = $new.is_resync(); 60 match (old_resync, new_resync) { 61 (true, false) => $self.$update_method("resync", -1) $(.$await)?, 62 (false, true) => $self.$update_method("resync", 1) $(.$await)?, 63 _ => {} 64 } 65 66 // error kinds 67 if let GaugeState::Resync(Some(kind)) = $old { 68 let key = match kind { 69 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 70 crate::types::ResyncErrorKind::Transport => "error_transport", 71 crate::types::ResyncErrorKind::Generic => "error_generic", 72 }; 73 $self.$update_method(key, -1) $(.$await)?; 74 } 75 76 if let GaugeState::Resync(Some(kind)) = $new { 77 let key = match kind { 78 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 79 crate::types::ResyncErrorKind::Transport => "error_transport", 80 crate::types::ResyncErrorKind::Generic => "error_generic", 81 }; 82 $self.$update_method(key, 1) $(.$await)?; 83 } 84 }}; 85} 86 87impl Db { 88 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 89 const fn kb(v: u32) -> u32 { 90 v * 1024 91 } 92 93 let db = Database::builder(&cfg.database_path) 94 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 95 .manual_journal_persist(true) 96 .journal_compression( 97 cfg.disable_lz4_compression 98 .then_some(fjall::CompressionType::None) 99 .unwrap_or(fjall::CompressionType::Lz4), 100 ) 101 .worker_threads(cfg.db_worker_threads) 102 .max_journaling_size(cfg.db_max_journaling_size_mb * 1024 * 1024) 103 .open() 104 .into_diagnostic()?; 105 let db = Arc::new(db); 106 107 let opts = default_opts; 108 let open_ks = |name: &str, opts: KeyspaceCreateOptions| { 109 db.keyspace(name, move || opts).into_diagnostic() 110 }; 111 112 let repos = open_ks( 113 "repos", 114 opts() 115 // most lookups hit since repo must exist after discovery 116 // we don't hit here if it's not tracked anyway (that happens in filter) 117 .expect_point_read_hits(true) 118 .max_memtable_size(cfg.db_repos_memtable_size_mb * 1024 * 1024) 119 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 120 )?; 121 let blocks = open_ks( 122 "blocks", 123 opts() 124 // point reads are used a lot by stream 125 .expect_point_read_hits(true) 126 .max_memtable_size(cfg.db_blocks_memtable_size_mb * 1024 * 1024) 127 // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels 128 // and any consumers will probably be streaming the newer events... 129 .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])), 130 )?; 131 let records = open_ks( 132 "records", 133 // point reads might miss when using getRecord 134 // but we assume thats not going to be used much... (todo: should be a config option maybe?) 135 // since this keyspace is big, turning off bloom filters will help a lot 136 opts() 137 .expect_point_read_hits(true) 138 .max_memtable_size(cfg.db_records_memtable_size_mb * 1024 * 1024) 139 .data_block_size_policy(BlockSizePolicy::all(kb(8))), 140 )?; 141 let cursors = open_ks( 142 "cursors", 143 opts() 144 // cursor point reads hit almost 100% of the time 145 .expect_point_read_hits(true) 146 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 147 )?; 148 let pending = open_ks( 149 "pending", 150 opts() 151 // iterated over as a queue, no point reads are used so bloom filters are disabled 152 .expect_point_read_hits(true) 153 .max_memtable_size(cfg.db_pending_memtable_size_mb * 1024 * 1024) 154 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 155 )?; 156 // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits 157 let resync = open_ks( 158 "resync", 159 opts().data_block_size_policy(BlockSizePolicy::all(kb(8))), 160 )?; 161 let resync_buffer = open_ks( 162 "resync_buffer", 163 opts() 164 // iterated during backfill, no point reads 165 .expect_point_read_hits(true) 166 .data_block_size_policy(BlockSizePolicy::all(kb(32))), 167 )?; 168 let events = open_ks( 169 "events", 170 opts() 171 // only iterators are used here, no point reads 172 .expect_point_read_hits(true) 173 .max_memtable_size(cfg.db_events_memtable_size_mb * 1024 * 1024) 174 .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])), 175 )?; 176 let counts = open_ks( 177 "counts", 178 opts() 179 // count increments hit because counters are mostly pre-initialized 180 .expect_point_read_hits(true) 181 // the data is very small 182 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 183 )?; 184 185 // filter handles high-volume point reads (checking explicit DID includes and excludes from firehose) 186 // so it needs the bloom filter 187 let filter = open_ks( 188 "filter", 189 // this can be pretty small since the DIDs wont be compressed that well anyhow 190 opts().data_block_size_policy(BlockSizePolicy::all(kb(1))), 191 )?; 192 193 let mut last_id = 0; 194 if let Some(guard) = events.iter().next_back() { 195 let k = guard.key().into_diagnostic()?; 196 last_id = u64::from_be_bytes( 197 k.as_ref() 198 .try_into() 199 .into_diagnostic() 200 .wrap_err("expected to be id (8 bytes)")?, 201 ); 202 } 203 204 // load counts into memory 205 let counts_map = HashMap::new(); 206 for guard in counts.prefix(keys::COUNT_KS_PREFIX) { 207 let (k, v) = guard.into_inner().into_diagnostic()?; 208 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..]) 209 .into_diagnostic() 210 .wrap_err("expected valid utf8 for ks count key")?; 211 let _ = counts_map.insert_sync( 212 SmolStr::new(name), 213 u64::from_be_bytes(v.as_ref().try_into().unwrap()), 214 ); 215 } 216 217 let (event_tx, _) = broadcast::channel(10000); 218 219 Ok(Self { 220 inner: db, 221 repos, 222 records, 223 blocks, 224 cursors, 225 pending, 226 resync, 227 resync_buffer, 228 events, 229 counts, 230 filter, 231 event_tx, 232 counts_map, 233 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), 234 }) 235 } 236 237 pub fn persist(&self) -> Result<()> { 238 self.inner.persist(PersistMode::SyncAll).into_diagnostic()?; 239 Ok(()) 240 } 241 242 pub async fn get(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<Option<Slice>> { 243 let key = key.as_ref().to_vec(); 244 tokio::task::spawn_blocking(move || ks.get(key).into_diagnostic()) 245 .await 246 .into_diagnostic()? 247 } 248 249 #[allow(dead_code)] 250 pub async fn insert( 251 ks: Keyspace, 252 key: impl AsRef<[u8]>, 253 value: impl AsRef<[u8]>, 254 ) -> Result<()> { 255 let key = key.as_ref().to_vec(); 256 let value = value.as_ref().to_vec(); 257 tokio::task::spawn_blocking(move || ks.insert(key, value).into_diagnostic()) 258 .await 259 .into_diagnostic()? 260 } 261 262 #[allow(dead_code)] 263 pub async fn remove(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<()> { 264 let key = key.as_ref().to_vec(); 265 tokio::task::spawn_blocking(move || ks.remove(key).into_diagnostic()) 266 .await 267 .into_diagnostic()? 268 } 269 270 pub async fn contains_key(ks: Keyspace, key: impl AsRef<[u8]>) -> Result<bool> { 271 let key = key.as_ref().to_vec(); 272 tokio::task::spawn_blocking(move || ks.contains_key(key).into_diagnostic()) 273 .await 274 .into_diagnostic()? 275 } 276 277 pub fn update_count(&self, key: &str, delta: i64) { 278 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0); 279 *entry = (*entry as i64).saturating_add(delta) as u64; 280 } 281 282 pub async fn update_count_async(&self, key: &str, delta: i64) { 283 let mut entry = self 284 .counts_map 285 .entry_async(SmolStr::new(key)) 286 .await 287 .or_insert(0); 288 *entry = (*entry as i64).saturating_add(delta) as u64; 289 } 290 291 pub async fn get_count(&self, key: &str) -> u64 { 292 self.counts_map 293 .read_async(key, |_, v| *v) 294 .await 295 .unwrap_or(0) 296 } 297 298 pub fn update_gauge_diff( 299 &self, 300 old: &crate::types::GaugeState, 301 new: &crate::types::GaugeState, 302 ) { 303 update_gauge_diff_impl!(self, old, new, update_count); 304 } 305 306 pub async fn update_gauge_diff_async( 307 &self, 308 old: &crate::types::GaugeState, 309 new: &crate::types::GaugeState, 310 ) { 311 update_gauge_diff_impl!(self, old, new, update_count_async, await); 312 } 313 314 pub fn update_repo_state<F, T>( 315 batch: &mut OwnedWriteBatch, 316 repos: &Keyspace, 317 did: &Did<'_>, 318 f: F, 319 ) -> Result<Option<(RepoState<'static>, T)>> 320 where 321 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>, 322 { 323 let key = keys::repo_key(did); 324 if let Some(bytes) = repos.get(&key).into_diagnostic()? { 325 let mut state: RepoState = deser_repo_state(bytes.as_ref())?.into_static(); 326 let (changed, result) = f(&mut state, (key.as_slice(), batch))?; 327 if changed { 328 batch.insert(repos, key, ser_repo_state(&state)?); 329 } 330 Ok(Some((state, result))) 331 } else { 332 Ok(None) 333 } 334 } 335 336 pub async fn update_repo_state_async<F, T>( 337 &self, 338 did: &Did<'_>, 339 f: F, 340 ) -> Result<Option<(RepoState<'static>, T)>> 341 where 342 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)> 343 + Send 344 + 'static, 345 T: Send + 'static, 346 { 347 let mut batch = self.inner.batch(); 348 let repos = self.repos.clone(); 349 let did = did.clone().into_static(); 350 351 tokio::task::spawn_blocking(move || { 352 let Some((state, t)) = Self::update_repo_state(&mut batch, &repos, &did, f)? else { 353 return Ok(None); 354 }; 355 batch.commit().into_diagnostic()?; 356 Ok(Some((state, t))) 357 }) 358 .await 359 .into_diagnostic()? 360 } 361} 362 363pub fn set_firehose_cursor(db: &Db, cursor: i64) -> Result<()> { 364 db.cursors 365 .insert(keys::CURSOR_KEY, cursor.to_be_bytes()) 366 .into_diagnostic() 367} 368 369pub async fn get_firehose_cursor(db: &Db) -> Result<Option<i64>> { 370 Db::get(db.cursors.clone(), keys::CURSOR_KEY) 371 .await? 372 .map(|v| { 373 Ok(i64::from_be_bytes( 374 v.as_ref() 375 .try_into() 376 .into_diagnostic() 377 .wrap_err("cursor is not 8 bytes")?, 378 )) 379 }) 380 .transpose() 381} 382 383pub fn ser_repo_state(state: &RepoState) -> Result<Vec<u8>> { 384 rmp_serde::to_vec(&state).into_diagnostic() 385} 386 387pub fn deser_repo_state<'b>(bytes: &'b [u8]) -> Result<RepoState<'b>> { 388 rmp_serde::from_slice(bytes).into_diagnostic() 389} 390 391pub fn check_poisoned(e: &fjall::Error) { 392 if matches!(e, fjall::Error::Poisoned) { 393 error!("!!! DATABASE POISONED !!! exiting"); 394 std::process::exit(10); 395 } 396} 397 398pub fn check_poisoned_report(e: &miette::Report) { 399 let Some(err) = e.downcast_ref::<fjall::Error>() else { 400 return; 401 }; 402 self::check_poisoned(err); 403} 404 405pub fn set_ks_count(batch: &mut OwnedWriteBatch, db: &Db, name: &str, count: u64) { 406 let key = keys::count_keyspace_key(name); 407 batch.insert(&db.counts, key, count.to_be_bytes()); 408} 409 410pub fn persist_counts(db: &Db) -> Result<()> { 411 let mut batch = db.inner.batch(); 412 db.counts_map.iter_sync(|k, v| { 413 set_ks_count(&mut batch, db, k, *v); 414 true 415 }); 416 batch.commit().into_diagnostic() 417} 418 419pub fn set_record_count( 420 batch: &mut OwnedWriteBatch, 421 db: &Db, 422 did: &Did<'_>, 423 collection: &str, 424 count: u64, 425) { 426 let key = keys::count_collection_key(did, collection); 427 batch.insert(&db.counts, key, count.to_be_bytes()); 428} 429 430pub fn update_record_count( 431 batch: &mut OwnedWriteBatch, 432 db: &Db, 433 did: &Did<'_>, 434 collection: &str, 435 delta: i64, 436) -> Result<()> { 437 let key = keys::count_collection_key(did, collection); 438 let count = db 439 .counts 440 .get(&key) 441 .into_diagnostic()? 442 .map(|v| -> Result<_> { 443 Ok(u64::from_be_bytes( 444 v.as_ref() 445 .try_into() 446 .into_diagnostic() 447 .wrap_err("expected to be count (8 bytes)")?, 448 )) 449 }) 450 .transpose()? 451 .unwrap_or(0); 452 let new_count = (count as i64).saturating_add(delta) as u64; 453 batch.insert(&db.counts, key, new_count.to_be_bytes()); 454 Ok(()) 455} 456 457pub fn get_record_count(db: &Db, did: &Did<'_>, collection: &str) -> Result<u64> { 458 let key = keys::count_collection_key(did, collection); 459 let count = db 460 .counts 461 .get(&key) 462 .into_diagnostic()? 463 .map(|v| -> Result<_> { 464 Ok(u64::from_be_bytes( 465 v.as_ref() 466 .try_into() 467 .into_diagnostic() 468 .wrap_err("expected to be count (8 bytes)")?, 469 )) 470 }) 471 .transpose()?; 472 Ok(count.unwrap_or(0)) 473}