A better Rust ATProto crate
at main 1379 lines 44 kB view raw
1use super::{iter::DashMapIter, Iter}; 2use crate::{ 3 common::{ 4 self, 5 concurrent::{ 6 atomic_time::AtomicInstant, 7 constants::{ 8 READ_LOG_FLUSH_POINT, READ_LOG_SIZE, WRITE_LOG_FLUSH_POINT, WRITE_LOG_SIZE, 9 }, 10 deques::Deques, 11 entry_info::EntryInfo, 12 housekeeper::{Housekeeper, InnerSync}, 13 AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, 14 WriteOp, 15 }, 16 deque::{DeqNode, Deque}, 17 frequency_sketch::FrequencySketch, 18 time::{CheckedTimeOps, Clock, Duration, Instant}, 19 CacheRegion, 20 }, 21 Policy, 22}; 23 24use crossbeam_channel::{Receiver, Sender, TrySendError}; 25use crossbeam_utils::atomic::AtomicCell; 26use dashmap::mapref::one::Ref as DashMapRef; 27use smallvec::SmallVec; 28use std::{ 29 borrow::Borrow, 30 collections::hash_map::RandomState, 31 hash::{BuildHasher, Hash}, 32 ptr::NonNull, 33 sync::{ 34 atomic::{AtomicBool, Ordering}, 35 Arc, Mutex, RwLock, 36 }, 37}; 38use triomphe::Arc as TrioArc; 39 40pub(crate) struct BaseCache<K, V, S = RandomState> { 41 pub(crate) inner: Arc<Inner<K, V, S>>, 42 read_op_ch: Sender<ReadOp<K, V>>, 43 pub(crate) write_op_ch: Sender<WriteOp<K, V>>, 44 pub(crate) housekeeper: Option<Arc<Housekeeper>>, 45} 46 47impl<K, V, S> Clone for BaseCache<K, V, S> { 48 /// Makes a clone of this shared cache. 49 /// 50 /// This operation is cheap as it only creates thread-safe reference counted 51 /// pointers to the shared internal data structures. 52 fn clone(&self) -> Self { 53 Self { 54 inner: Arc::clone(&self.inner), 55 read_op_ch: self.read_op_ch.clone(), 56 write_op_ch: self.write_op_ch.clone(), 57 housekeeper: self.housekeeper.clone(), 58 } 59 } 60} 61 62impl<K, V, S> Drop for BaseCache<K, V, S> { 63 fn drop(&mut self) { 64 // The housekeeper needs to be dropped before the inner is dropped. 65 std::mem::drop(self.housekeeper.take()); 66 } 67} 68 69impl<K, V, S> BaseCache<K, V, S> { 70 pub(crate) fn policy(&self) -> Policy { 71 self.inner.policy() 72 } 73 74 pub(crate) fn entry_count(&self) -> u64 { 75 self.inner.entry_count() 76 } 77 78 pub(crate) fn weighted_size(&self) -> u64 { 79 self.inner.weighted_size() 80 } 81} 82 83impl<K, V, S> BaseCache<K, V, S> 84where 85 K: Hash + Eq + Send + Sync + 'static, 86 V: Clone + Send + Sync + 'static, 87 S: BuildHasher + Clone + Send + Sync + 'static, 88{ 89 pub(crate) fn new( 90 max_capacity: Option<u64>, 91 initial_capacity: Option<usize>, 92 build_hasher: S, 93 weigher: Option<Weigher<K, V>>, 94 time_to_live: Option<Duration>, 95 time_to_idle: Option<Duration>, 96 ) -> Self { 97 let (r_snd, r_rcv) = crossbeam_channel::bounded(READ_LOG_SIZE); 98 let (w_snd, w_rcv) = crossbeam_channel::bounded(WRITE_LOG_SIZE); 99 100 let inner = Inner::new( 101 max_capacity, 102 initial_capacity, 103 build_hasher, 104 weigher, 105 r_rcv, 106 w_rcv, 107 time_to_live, 108 time_to_idle, 109 ); 110 Self { 111 #[cfg_attr(beta_clippy, allow(clippy::arc_with_non_send_sync))] 112 inner: Arc::new(inner), 113 read_op_ch: r_snd, 114 write_op_ch: w_snd, 115 housekeeper: Some(Arc::new(Housekeeper::default())), 116 } 117 } 118 119 #[inline] 120 pub(crate) fn hash<Q>(&self, key: &Q) -> u64 121 where 122 Arc<K>: Borrow<Q>, 123 Q: Hash + Eq + ?Sized, 124 { 125 self.inner.hash(key) 126 } 127 128 pub(crate) fn contains_key<Q>(&self, key: &Q) -> bool 129 where 130 Arc<K>: Borrow<Q>, 131 Q: Hash + Eq + ?Sized, 132 { 133 match self.inner.get(key) { 134 None => false, 135 Some(entry) => { 136 let i = &self.inner; 137 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); 138 let now = i.current_time_from_expiration_clock(); 139 let entry = &*entry; 140 141 !is_expired_entry_wo(ttl, va, entry, now) 142 && !is_expired_entry_ao(tti, va, entry, now) 143 } 144 } 145 } 146 147 pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<V> 148 where 149 Arc<K>: Borrow<Q>, 150 Q: Hash + Eq + ?Sized, 151 { 152 let record = |op, now| { 153 self.record_read_op(op, now) 154 .expect("Failed to record a get op"); 155 }; 156 let now = self.inner.current_time_from_expiration_clock(); 157 158 match self.inner.get(key) { 159 None => { 160 record(ReadOp::Miss(hash), now); 161 None 162 } 163 Some(entry) => { 164 let i = &self.inner; 165 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); 166 let arc_entry = &*entry; 167 168 if is_expired_entry_wo(ttl, va, arc_entry, now) 169 || is_expired_entry_ao(tti, va, arc_entry, now) 170 { 171 // Drop the entry to avoid to deadlock with record_read_op. 172 std::mem::drop(entry); 173 // Expired or invalidated entry. Record this access as a cache miss 174 // rather than a hit. 175 record(ReadOp::Miss(hash), now); 176 None 177 } else { 178 // Valid entry. 179 let v = arc_entry.value.clone(); 180 let e = TrioArc::clone(arc_entry); 181 // Drop the entry to avoid to deadlock with record_read_op. 182 std::mem::drop(entry); 183 record(ReadOp::Hit(hash, e, now), now); 184 Some(v) 185 } 186 } 187 } 188 } 189 190 #[inline] 191 pub(crate) fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>> 192 where 193 Arc<K>: Borrow<Q>, 194 Q: Hash + Eq + ?Sized, 195 { 196 self.inner.remove_entry(key) 197 } 198 199 #[inline] 200 pub(crate) fn apply_reads_writes_if_needed( 201 inner: &impl InnerSync, 202 ch: &Sender<WriteOp<K, V>>, 203 now: Instant, 204 housekeeper: Option<&Arc<Housekeeper>>, 205 ) { 206 let w_len = ch.len(); 207 208 if let Some(hk) = housekeeper { 209 if hk.should_apply_writes(w_len, now) { 210 hk.try_sync(inner); 211 } 212 } 213 } 214 215 pub(crate) fn invalidate_all(&self) { 216 let now = self.inner.current_time_from_expiration_clock(); 217 self.inner.set_valid_after(now); 218 } 219} 220 221// Clippy beta 0.1.83 (f41c7ed9889 2024-10-31) warns about unused lifetimes on 'a. 222// This seems a false positive. The lifetimes are used in the trait bounds. 223// https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes 224#[allow(clippy::extra_unused_lifetimes)] 225impl<'a, K, V, S> BaseCache<K, V, S> 226where 227 K: 'a + Eq + Hash, 228 V: 'a, 229 S: BuildHasher + Clone, 230{ 231 pub(crate) fn iter(&self) -> Iter<'_, K, V, S> { 232 Iter::new(self, self.inner.iter()) 233 } 234} 235 236impl<K, V, S> BaseCache<K, V, S> { 237 pub(crate) fn is_expired_entry(&self, entry: &TrioArc<ValueEntry<K, V>>) -> bool { 238 let i = &self.inner; 239 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); 240 let now = i.current_time_from_expiration_clock(); 241 242 is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now) 243 } 244} 245 246// 247// private methods 248// 249impl<K, V, S> BaseCache<K, V, S> 250where 251 K: Hash + Eq + Send + Sync + 'static, 252 V: Clone + Send + Sync + 'static, 253 S: BuildHasher + Clone + Send + Sync + 'static, 254{ 255 #[inline] 256 fn record_read_op( 257 &self, 258 op: ReadOp<K, V>, 259 now: Instant, 260 ) -> Result<(), TrySendError<ReadOp<K, V>>> { 261 self.apply_reads_if_needed(self.inner.as_ref(), now); 262 let ch = &self.read_op_ch; 263 match ch.try_send(op) { 264 // Discard the ReadOp when the channel is full. 265 Ok(()) | Err(TrySendError::Full(_)) => Ok(()), 266 Err(e @ TrySendError::Disconnected(_)) => Err(e), 267 } 268 } 269 270 #[inline] 271 pub(crate) fn do_insert_with_hash( 272 &self, 273 key: Arc<K>, 274 hash: u64, 275 value: V, 276 ) -> (WriteOp<K, V>, Instant) { 277 let ts = self.inner.current_time_from_expiration_clock(); 278 let weight = self.inner.weigh(&key, &value); 279 let mut insert_op = None; 280 let mut update_op = None; 281 282 self.inner 283 .cache 284 .entry(Arc::clone(&key)) 285 // Update 286 .and_modify(|entry| { 287 // NOTES on `new_value_entry_from` method: 288 // 1. The internal EntryInfo will be shared between the old and new 289 // ValueEntries. 290 // 2. This method will set the dirty flag to prevent this new 291 // ValueEntry from being evicted by an expiration policy. 292 // 3. This method will update the policy_weight with the new weight. 293 let old_weight = entry.policy_weight(); 294 *entry = self.new_value_entry_from(value.clone(), ts, weight, entry); 295 update_op = Some(WriteOp::Upsert { 296 key_hash: KeyHash::new(Arc::clone(&key), hash), 297 value_entry: TrioArc::clone(entry), 298 old_weight, 299 new_weight: weight, 300 }); 301 }) 302 // Insert 303 .or_insert_with(|| { 304 let entry = self.new_value_entry(value.clone(), ts, weight); 305 insert_op = Some(WriteOp::Upsert { 306 key_hash: KeyHash::new(Arc::clone(&key), hash), 307 value_entry: TrioArc::clone(&entry), 308 old_weight: 0, 309 new_weight: weight, 310 }); 311 entry 312 }); 313 314 match (insert_op, update_op) { 315 (Some(ins_op), None) => (ins_op, ts), 316 (None, Some(upd_op)) => (upd_op, ts), 317 _ => unreachable!(), 318 } 319 } 320 321 #[inline] 322 fn new_value_entry( 323 &self, 324 value: V, 325 timestamp: Instant, 326 policy_weight: u32, 327 ) -> TrioArc<ValueEntry<K, V>> { 328 let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); 329 TrioArc::new(ValueEntry::new(value, info)) 330 } 331 332 #[inline] 333 fn new_value_entry_from( 334 &self, 335 value: V, 336 timestamp: Instant, 337 policy_weight: u32, 338 other: &ValueEntry<K, V>, 339 ) -> TrioArc<ValueEntry<K, V>> { 340 let info = TrioArc::clone(other.entry_info()); 341 // To prevent this updated ValueEntry from being evicted by an expiration policy, 342 // set the dirty flag to true. It will be reset to false when the write is applied. 343 info.set_dirty(true); 344 info.set_last_accessed(timestamp); 345 info.set_last_modified(timestamp); 346 info.set_policy_weight(policy_weight); 347 TrioArc::new(ValueEntry::new(value, info)) 348 } 349 350 #[inline] 351 fn apply_reads_if_needed(&self, inner: &impl InnerSync, now: Instant) { 352 let len = self.read_op_ch.len(); 353 354 if let Some(hk) = &self.housekeeper { 355 if hk.should_apply_reads(len, now) { 356 if let Some(h) = &self.housekeeper { 357 h.try_sync(inner); 358 } 359 } 360 } 361 } 362 363 #[inline] 364 pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { 365 self.inner.current_time_from_expiration_clock() 366 } 367} 368 369// 370// for testing 371// 372#[cfg(test)] 373impl<K, V, S> BaseCache<K, V, S> 374where 375 K: Hash + Eq + Send + Sync + 'static, 376 V: Clone + Send + Sync + 'static, 377 S: BuildHasher + Clone + Send + Sync + 'static, 378{ 379 pub(crate) fn reconfigure_for_testing(&mut self) { 380 // Enable the frequency sketch. 381 self.inner.enable_frequency_sketch_for_testing(); 382 } 383 384 pub(crate) fn set_expiration_clock(&self, clock: Option<Clock>) { 385 self.inner.set_expiration_clock(clock); 386 } 387} 388 389struct EvictionCounters { 390 entry_count: u64, 391 weighted_size: u64, 392} 393 394impl EvictionCounters { 395 #[inline] 396 fn new(entry_count: u64, weighted_size: u64) -> Self { 397 Self { 398 entry_count, 399 weighted_size, 400 } 401 } 402 403 #[inline] 404 fn saturating_add(&mut self, entry_count: u64, weight: u32) { 405 self.entry_count += entry_count; 406 let total = &mut self.weighted_size; 407 *total = total.saturating_add(weight as u64); 408 } 409 410 #[inline] 411 fn saturating_sub(&mut self, entry_count: u64, weight: u32) { 412 self.entry_count -= entry_count; 413 let total = &mut self.weighted_size; 414 *total = total.saturating_sub(weight as u64); 415 } 416} 417 418#[derive(Default)] 419struct EntrySizeAndFrequency { 420 policy_weight: u64, 421 freq: u32, 422} 423 424impl EntrySizeAndFrequency { 425 fn new(policy_weight: u32) -> Self { 426 Self { 427 policy_weight: policy_weight as u64, 428 ..Default::default() 429 } 430 } 431 432 fn add_policy_weight(&mut self, weight: u32) { 433 self.policy_weight += weight as u64; 434 } 435 436 fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) { 437 self.freq += freq.frequency(hash) as u32; 438 } 439} 440 441// Access-Order Queue Node 442type AoqNode<K> = NonNull<DeqNode<KeyHashDate<K>>>; 443 444enum AdmissionResult<K> { 445 Admitted { 446 victim_nodes: SmallVec<[AoqNode<K>; 8]>, 447 skipped_nodes: SmallVec<[AoqNode<K>; 4]>, 448 }, 449 Rejected { 450 skipped_nodes: SmallVec<[AoqNode<K>; 4]>, 451 }, 452} 453 454type CacheStore<K, V, S> = dashmap::DashMap<Arc<K>, TrioArc<ValueEntry<K, V>>, S>; 455 456type CacheEntryRef<'a, K, V> = DashMapRef<'a, Arc<K>, TrioArc<ValueEntry<K, V>>>; 457 458pub(crate) struct Inner<K, V, S> { 459 max_capacity: Option<u64>, 460 entry_count: AtomicCell<u64>, 461 weighted_size: AtomicCell<u64>, 462 cache: CacheStore<K, V, S>, 463 build_hasher: S, 464 deques: Mutex<Deques<K>>, 465 frequency_sketch: RwLock<FrequencySketch>, 466 frequency_sketch_enabled: AtomicBool, 467 read_op_ch: Receiver<ReadOp<K, V>>, 468 write_op_ch: Receiver<WriteOp<K, V>>, 469 time_to_live: Option<Duration>, 470 time_to_idle: Option<Duration>, 471 valid_after: AtomicInstant, 472 weigher: Option<Weigher<K, V>>, 473 has_expiration_clock: AtomicBool, 474 expiration_clock: RwLock<Option<Clock>>, 475} 476 477// functions/methods used by BaseCache 478impl<K, V, S> Inner<K, V, S> 479where 480 K: Hash + Eq + Send + Sync + 'static, 481 V: Send + Sync + 'static, 482 S: BuildHasher + Clone, 483{ 484 // Disable a Clippy warning for having more than seven arguments. 485 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments 486 #[allow(clippy::too_many_arguments)] 487 fn new( 488 max_capacity: Option<u64>, 489 initial_capacity: Option<usize>, 490 build_hasher: S, 491 weigher: Option<Weigher<K, V>>, 492 read_op_ch: Receiver<ReadOp<K, V>>, 493 write_op_ch: Receiver<WriteOp<K, V>>, 494 time_to_live: Option<Duration>, 495 time_to_idle: Option<Duration>, 496 ) -> Self { 497 let initial_capacity = initial_capacity 498 .map(|cap| cap + WRITE_LOG_SIZE) 499 .unwrap_or_default(); 500 let cache = 501 dashmap::DashMap::with_capacity_and_hasher(initial_capacity, build_hasher.clone()); 502 503 Self { 504 max_capacity, 505 entry_count: Default::default(), 506 weighted_size: Default::default(), 507 cache, 508 build_hasher, 509 deques: Mutex::new(Default::default()), 510 frequency_sketch: RwLock::new(Default::default()), 511 frequency_sketch_enabled: Default::default(), 512 read_op_ch, 513 write_op_ch, 514 time_to_live, 515 time_to_idle, 516 valid_after: Default::default(), 517 weigher, 518 has_expiration_clock: AtomicBool::new(false), 519 expiration_clock: RwLock::new(None), 520 } 521 } 522 523 #[inline] 524 fn hash<Q>(&self, key: &Q) -> u64 525 where 526 Arc<K>: Borrow<Q>, 527 Q: Hash + Eq + ?Sized, 528 { 529 self.build_hasher.hash_one(key) 530 } 531 532 #[inline] 533 fn get<Q>(&self, key: &Q) -> Option<CacheEntryRef<'_, K, V>> 534 where 535 Arc<K>: Borrow<Q>, 536 Q: Hash + Eq + ?Sized, 537 { 538 self.cache.get(key) 539 } 540 541 #[inline] 542 fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>> 543 where 544 Arc<K>: Borrow<Q>, 545 Q: Hash + Eq + ?Sized, 546 { 547 self.cache 548 .remove(key) 549 .map(|(key, entry)| KvEntry::new(key, entry)) 550 } 551} 552 553// functions/methods used by BaseCache 554impl<K, V, S> Inner<K, V, S> { 555 fn policy(&self) -> Policy { 556 Policy::new(self.max_capacity, self.time_to_live, self.time_to_idle) 557 } 558 559 #[inline] 560 fn time_to_live(&self) -> Option<Duration> { 561 self.time_to_live 562 } 563 564 #[inline] 565 fn time_to_idle(&self) -> Option<Duration> { 566 self.time_to_idle 567 } 568 569 #[inline] 570 fn entry_count(&self) -> u64 { 571 self.entry_count.load() 572 } 573 574 #[inline] 575 pub(crate) fn weighted_size(&self) -> u64 { 576 self.weighted_size.load() 577 } 578 579 #[inline] 580 fn has_expiry(&self) -> bool { 581 self.time_to_live.is_some() || self.time_to_idle.is_some() 582 } 583 584 #[inline] 585 fn is_write_order_queue_enabled(&self) -> bool { 586 self.time_to_live.is_some() 587 } 588 589 #[inline] 590 fn valid_after(&self) -> Option<Instant> { 591 self.valid_after.instant() 592 } 593 594 #[inline] 595 fn set_valid_after(&self, timestamp: Instant) { 596 self.valid_after.set_instant(timestamp); 597 } 598 599 #[inline] 600 fn has_valid_after(&self) -> bool { 601 self.valid_after.is_set() 602 } 603 604 #[inline] 605 fn weigh(&self, key: &K, value: &V) -> u32 { 606 self.weigher.as_ref().map(|w| w(key, value)).unwrap_or(1) 607 } 608 609 #[inline] 610 fn current_time_from_expiration_clock(&self) -> Instant { 611 if self.has_expiration_clock.load(Ordering::Relaxed) { 612 Instant::new( 613 self.expiration_clock 614 .read() 615 .expect("lock poisoned") 616 .as_ref() 617 .expect("Cannot get the expiration clock") 618 .now(), 619 ) 620 } else { 621 Instant::now() 622 } 623 } 624} 625 626// Clippy beta 0.1.83 (f41c7ed9889 2024-10-31) warns about unused lifetimes on 'a. 627// This seems a false positive. The lifetimes are used in the trait bounds. 628// https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes 629#[allow(clippy::extra_unused_lifetimes)] 630impl<'a, K, V, S> Inner<K, V, S> 631where 632 K: 'a + Eq + Hash, 633 V: 'a, 634 S: BuildHasher + Clone, 635{ 636 fn iter(&self) -> DashMapIter<'_, K, V, S> { 637 self.cache.iter() 638 } 639} 640 641mod batch_size { 642 pub(crate) const EVICTION_BATCH_SIZE: usize = 500; 643} 644 645// TODO: Divide this method into smaller methods so that unit tests can do more 646// precise testing. 647// - sync_reads 648// - sync_writes 649// - evict 650// - invalidate_entries 651impl<K, V, S> InnerSync for Inner<K, V, S> 652where 653 K: Hash + Eq + Send + Sync + 'static, 654 V: Send + Sync + 'static, 655 S: BuildHasher + Clone + Send + Sync + 'static, 656{ 657 fn sync(&self, max_repeats: usize) { 658 let mut deqs = self.deques.lock().expect("lock poisoned"); 659 let mut calls = 0; 660 let mut should_sync = true; 661 662 let current_ec = self.entry_count.load(); 663 let current_ws = self.weighted_size.load(); 664 let mut counters = EvictionCounters::new(current_ec, current_ws); 665 666 while should_sync && calls <= max_repeats { 667 let r_len = self.read_op_ch.len(); 668 if r_len > 0 { 669 self.apply_reads(&mut deqs, r_len); 670 } 671 672 let w_len = self.write_op_ch.len(); 673 if w_len > 0 { 674 self.apply_writes(&mut deqs, w_len, &mut counters); 675 } 676 677 if self.should_enable_frequency_sketch(&counters) { 678 self.enable_frequency_sketch(&counters); 679 } 680 681 calls += 1; 682 should_sync = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT 683 || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT; 684 } 685 686 if self.has_expiry() || self.has_valid_after() { 687 self.evict_expired(&mut deqs, batch_size::EVICTION_BATCH_SIZE, &mut counters); 688 } 689 690 // Evict if this cache has more entries than its capacity. 691 let weights_to_evict = self.weights_to_evict(&counters); 692 if weights_to_evict > 0 { 693 self.evict_lru_entries( 694 &mut deqs, 695 batch_size::EVICTION_BATCH_SIZE, 696 weights_to_evict, 697 &mut counters, 698 ); 699 } 700 701 debug_assert_eq!(self.entry_count.load(), current_ec); 702 debug_assert_eq!(self.weighted_size.load(), current_ws); 703 self.entry_count.store(counters.entry_count); 704 self.weighted_size.store(counters.weighted_size); 705 } 706 707 fn now(&self) -> Instant { 708 self.current_time_from_expiration_clock() 709 } 710} 711 712// 713// private methods 714// 715impl<K, V, S> Inner<K, V, S> 716where 717 K: Hash + Eq + Send + Sync + 'static, 718 V: Send + Sync + 'static, 719 S: BuildHasher + Clone + Send + Sync + 'static, 720{ 721 fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool { 722 self.max_capacity 723 .map(|limit| counters.weighted_size + candidate_weight as u64 <= limit) 724 .unwrap_or(true) 725 } 726 727 fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 { 728 self.max_capacity 729 .map(|limit| counters.weighted_size.saturating_sub(limit)) 730 .unwrap_or_default() 731 } 732 733 #[inline] 734 fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool { 735 if self.frequency_sketch_enabled.load(Ordering::Acquire) { 736 false 737 } else if let Some(max_cap) = self.max_capacity { 738 counters.weighted_size >= max_cap / 2 739 } else { 740 false 741 } 742 } 743 744 #[inline] 745 fn enable_frequency_sketch(&self, counters: &EvictionCounters) { 746 if let Some(max_cap) = self.max_capacity { 747 let c = counters; 748 let cap = if self.weigher.is_none() { 749 max_cap 750 } else { 751 (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64 752 }; 753 self.do_enable_frequency_sketch(cap); 754 } 755 } 756 757 #[cfg(test)] 758 fn enable_frequency_sketch_for_testing(&self) { 759 if let Some(max_cap) = self.max_capacity { 760 self.do_enable_frequency_sketch(max_cap); 761 } 762 } 763 764 #[inline] 765 fn do_enable_frequency_sketch(&self, cache_capacity: u64) { 766 let skt_capacity = common::sketch_capacity(cache_capacity); 767 self.frequency_sketch 768 .write() 769 .expect("lock poisoned") 770 .ensure_capacity(skt_capacity); 771 self.frequency_sketch_enabled.store(true, Ordering::Release); 772 } 773 774 fn apply_reads(&self, deqs: &mut Deques<K>, count: usize) { 775 use ReadOp::*; 776 let mut freq = self.frequency_sketch.write().expect("lock poisoned"); 777 let ch = &self.read_op_ch; 778 for _ in 0..count { 779 match ch.try_recv() { 780 Ok(Hit(hash, entry, timestamp)) => { 781 freq.increment(hash); 782 entry.set_last_accessed(timestamp); 783 if entry.is_admitted() { 784 deqs.move_to_back_ao(&entry); 785 } 786 } 787 Ok(Miss(hash)) => freq.increment(hash), 788 Err(_) => break, 789 } 790 } 791 } 792 793 fn apply_writes(&self, deqs: &mut Deques<K>, count: usize, counters: &mut EvictionCounters) { 794 use WriteOp::*; 795 let freq = self.frequency_sketch.read().expect("lock poisoned"); 796 let ch = &self.write_op_ch; 797 798 for _ in 0..count { 799 match ch.try_recv() { 800 Ok(Upsert { 801 key_hash: kh, 802 value_entry: entry, 803 old_weight, 804 new_weight, 805 }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters), 806 Ok(Remove(KvEntry { key: _key, entry })) => { 807 Self::handle_remove(deqs, entry, counters) 808 } 809 Err(_) => break, 810 }; 811 } 812 } 813 814 #[allow(clippy::too_many_arguments)] 815 fn handle_upsert( 816 &self, 817 kh: KeyHash<K>, 818 entry: TrioArc<ValueEntry<K, V>>, 819 old_weight: u32, 820 new_weight: u32, 821 deqs: &mut Deques<K>, 822 freq: &FrequencySketch, 823 counters: &mut EvictionCounters, 824 ) { 825 entry.set_dirty(false); 826 827 if entry.is_admitted() { 828 // The entry has been already admitted, so treat this as an update. 829 counters.saturating_sub(0, old_weight); 830 counters.saturating_add(0, new_weight); 831 deqs.move_to_back_ao(&entry); 832 deqs.move_to_back_wo(&entry); 833 return; 834 } 835 836 if self.has_enough_capacity(new_weight, counters) { 837 // There are enough room in the cache (or the cache is unbounded). 838 // Add the candidate to the deques. 839 self.handle_admit(kh, &entry, new_weight, deqs, counters); 840 return; 841 } 842 843 if let Some(max) = self.max_capacity { 844 if new_weight as u64 > max { 845 // The candidate is too big to fit in the cache. Reject it. 846 self.cache.remove(&Arc::clone(&kh.key)); 847 return; 848 } 849 } 850 851 let skipped_nodes; 852 let mut candidate = EntrySizeAndFrequency::new(new_weight); 853 candidate.add_frequency(freq, kh.hash); 854 855 // Try to admit the candidate. 856 match Self::admit(&candidate, &self.cache, deqs, freq) { 857 AdmissionResult::Admitted { 858 victim_nodes, 859 skipped_nodes: mut skipped, 860 } => { 861 // Try to remove the victims from the cache (hash map). 862 for victim in victim_nodes { 863 if let Some((_vic_key, vic_entry)) = 864 self.cache.remove(unsafe { victim.as_ref().element.key() }) 865 { 866 // And then remove the victim from the deques. 867 Self::handle_remove(deqs, vic_entry, counters); 868 } else { 869 // Could not remove the victim from the cache. Skip this 870 // victim node as its ValueEntry might have been 871 // invalidated. Add it to the skipped nodes. 872 skipped.push(victim); 873 } 874 } 875 skipped_nodes = skipped; 876 877 // Add the candidate to the deques. 878 self.handle_admit(kh, &entry, new_weight, deqs, counters); 879 } 880 AdmissionResult::Rejected { skipped_nodes: s } => { 881 skipped_nodes = s; 882 // Remove the candidate from the cache (hash map). 883 self.cache.remove(&Arc::clone(&kh.key)); 884 } 885 }; 886 887 // Move the skipped nodes to the back of the deque. We do not unlink (drop) 888 // them because ValueEntries in the write op queue should be pointing them. 889 for node in skipped_nodes { 890 unsafe { deqs.probation.move_to_back(node) }; 891 } 892 } 893 894 /// Performs size-aware admission explained in the paper: 895 /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper] 896 /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes. 897 /// 898 /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770 899 /// 900 /// There are some modifications in this implementation: 901 /// - To admit to the main space, candidate's frequency must be higher than 902 /// the aggregated frequencies of the potential victims. (In the paper, 903 /// `>=` operator is used rather than `>`) The `>` operator will do a better 904 /// job to prevent the main space from polluting. 905 /// - When a candidate is rejected, the potential victims will stay at the LRU 906 /// position of the probation access-order queue. (In the paper, they will be 907 /// promoted (to the MRU position?) to force the eviction policy to select a 908 /// different set of victims for the next candidate). We may implement the 909 /// paper's behavior later? 910 /// 911 #[inline] 912 fn admit( 913 candidate: &EntrySizeAndFrequency, 914 cache: &CacheStore<K, V, S>, 915 deqs: &Deques<K>, 916 freq: &FrequencySketch, 917 ) -> AdmissionResult<K> { 918 const MAX_CONSECUTIVE_RETRIES: usize = 5; 919 let mut retries = 0; 920 921 let mut victims = EntrySizeAndFrequency::default(); 922 let mut victim_nodes = SmallVec::default(); 923 let mut skipped_nodes = SmallVec::default(); 924 925 // Get first potential victim at the LRU position. 926 let mut next_victim = deqs.probation.peek_front_ptr(); 927 928 // Aggregate potential victims. 929 while victims.policy_weight < candidate.policy_weight { 930 if candidate.freq < victims.freq { 931 break; 932 } 933 if let Some(victim) = next_victim.take() { 934 next_victim = DeqNode::next_node_ptr(victim); 935 let vic_elem = &unsafe { victim.as_ref() }.element; 936 937 if let Some(vic_entry) = cache.get(vic_elem.key()) { 938 victims.add_policy_weight(vic_entry.policy_weight()); 939 victims.add_frequency(freq, vic_elem.hash()); 940 victim_nodes.push(victim); 941 retries = 0; 942 } else { 943 // Could not get the victim from the cache (hash map). Skip this node 944 // as its ValueEntry might have been invalidated. 945 skipped_nodes.push(victim); 946 947 retries += 1; 948 if retries > MAX_CONSECUTIVE_RETRIES { 949 break; 950 } 951 } 952 } else { 953 // No more potential victims. 954 break; 955 } 956 } 957 958 // Admit or reject the candidate. 959 960 // TODO: Implement some randomness to mitigate hash DoS attack. 961 // See Caffeine's implementation. 962 963 if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq { 964 AdmissionResult::Admitted { 965 victim_nodes, 966 skipped_nodes, 967 } 968 } else { 969 AdmissionResult::Rejected { skipped_nodes } 970 } 971 } 972 973 fn handle_admit( 974 &self, 975 kh: KeyHash<K>, 976 entry: &TrioArc<ValueEntry<K, V>>, 977 policy_weight: u32, 978 deqs: &mut Deques<K>, 979 counters: &mut EvictionCounters, 980 ) { 981 let key = Arc::clone(&kh.key); 982 counters.saturating_add(1, policy_weight); 983 deqs.push_back_ao( 984 CacheRegion::MainProbation, 985 KeyHashDate::new(kh, entry.entry_info()), 986 entry, 987 ); 988 if self.is_write_order_queue_enabled() { 989 deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); 990 } 991 entry.set_admitted(true); 992 } 993 994 fn handle_remove( 995 deqs: &mut Deques<K>, 996 entry: TrioArc<ValueEntry<K, V>>, 997 counters: &mut EvictionCounters, 998 ) { 999 if entry.is_admitted() { 1000 entry.set_admitted(false); 1001 counters.saturating_sub(1, entry.policy_weight()); 1002 // The following two unlink_* functions will unset the deq nodes. 1003 deqs.unlink_ao(&entry); 1004 Deques::unlink_wo(&mut deqs.write_order, &entry); 1005 } else { 1006 entry.unset_q_nodes(); 1007 } 1008 } 1009 1010 fn handle_remove_with_deques( 1011 ao_deq_name: &str, 1012 ao_deq: &mut Deque<KeyHashDate<K>>, 1013 wo_deq: &mut Deque<KeyDate<K>>, 1014 entry: TrioArc<ValueEntry<K, V>>, 1015 counters: &mut EvictionCounters, 1016 ) { 1017 if entry.is_admitted() { 1018 entry.set_admitted(false); 1019 counters.saturating_sub(1, entry.policy_weight()); 1020 // The following two unlink_* functions will unset the deq nodes. 1021 Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); 1022 Deques::unlink_wo(wo_deq, &entry); 1023 } else { 1024 entry.unset_q_nodes(); 1025 } 1026 } 1027 1028 fn evict_expired( 1029 &self, 1030 deqs: &mut Deques<K>, 1031 batch_size: usize, 1032 counters: &mut EvictionCounters, 1033 ) { 1034 let now = self.current_time_from_expiration_clock(); 1035 1036 if self.is_write_order_queue_enabled() { 1037 self.remove_expired_wo(deqs, batch_size, now, counters); 1038 } 1039 1040 if self.time_to_idle.is_some() || self.has_valid_after() { 1041 let (window, probation, protected, wo) = ( 1042 &mut deqs.window, 1043 &mut deqs.probation, 1044 &mut deqs.protected, 1045 &mut deqs.write_order, 1046 ); 1047 1048 let mut rm_expired_ao = 1049 |name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now, counters); 1050 1051 rm_expired_ao("window", window); 1052 rm_expired_ao("probation", probation); 1053 rm_expired_ao("protected", protected); 1054 } 1055 } 1056 1057 #[inline] 1058 fn remove_expired_ao( 1059 &self, 1060 deq_name: &str, 1061 deq: &mut Deque<KeyHashDate<K>>, 1062 write_order_deq: &mut Deque<KeyDate<K>>, 1063 batch_size: usize, 1064 now: Instant, 1065 counters: &mut EvictionCounters, 1066 ) { 1067 let tti = &self.time_to_idle; 1068 let va = &self.valid_after(); 1069 for _ in 0..batch_size { 1070 // Peek the front node of the deque and check if it is expired. 1071 let key = deq.peek_front().and_then(|node| { 1072 // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. 1073 if is_expired_entry_ao(tti, va, node, now) { 1074 Some(Arc::clone(node.element.key())) 1075 } else { 1076 None 1077 } 1078 }); 1079 1080 if key.is_none() { 1081 break; 1082 } 1083 1084 let key = key.as_ref().unwrap(); 1085 1086 // Remove the key from the map only when the entry is really 1087 // expired. This check is needed because it is possible that the entry in 1088 // the map has been updated or deleted but its deque node we checked 1089 // above have not been updated yet. 1090 let maybe_entry = self 1091 .cache 1092 .remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now)); 1093 1094 if let Some((_k, entry)) = maybe_entry { 1095 Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry, counters); 1096 } else if !self.try_skip_updated_entry(key, deq_name, deq, write_order_deq) { 1097 break; 1098 } 1099 } 1100 } 1101 1102 #[inline] 1103 fn try_skip_updated_entry( 1104 &self, 1105 key: &K, 1106 deq_name: &str, 1107 deq: &mut Deque<KeyHashDate<K>>, 1108 write_order_deq: &mut Deque<KeyDate<K>>, 1109 ) -> bool { 1110 if let Some(entry) = self.cache.get(key) { 1111 if entry.is_dirty() { 1112 // The key exists and the entry has been updated. 1113 Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); 1114 Deques::move_to_back_wo_in_deque(write_order_deq, &entry); 1115 true 1116 } else { 1117 // The key exists but something unexpected. 1118 false 1119 } 1120 } else { 1121 // Skip this entry as the key might have been invalidated. Since the 1122 // invalidated ValueEntry (which should be still in the write op 1123 // queue) has a pointer to this node, move the node to the back of 1124 // the deque instead of popping (dropping) it. 1125 deq.move_front_to_back(); 1126 true 1127 } 1128 } 1129 1130 #[inline] 1131 fn remove_expired_wo( 1132 &self, 1133 deqs: &mut Deques<K>, 1134 batch_size: usize, 1135 now: Instant, 1136 counters: &mut EvictionCounters, 1137 ) { 1138 let ttl = &self.time_to_live; 1139 let va = &self.valid_after(); 1140 for _ in 0..batch_size { 1141 let key = deqs.write_order.peek_front().and_then(|node| { 1142 // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. 1143 if is_expired_entry_wo(ttl, va, node, now) { 1144 Some(Arc::clone(node.element.key())) 1145 } else { 1146 None 1147 } 1148 }); 1149 1150 if key.is_none() { 1151 break; 1152 } 1153 1154 let key = key.as_ref().unwrap(); 1155 1156 let maybe_entry = self 1157 .cache 1158 .remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now)); 1159 1160 if let Some((_k, entry)) = maybe_entry { 1161 Self::handle_remove(deqs, entry, counters); 1162 } else if let Some(entry) = self.cache.get(key) { 1163 if entry.is_dirty() { 1164 deqs.move_to_back_ao(&entry); 1165 deqs.move_to_back_wo(&entry); 1166 } else { 1167 // The key exists but something unexpected. Break. 1168 break; 1169 } 1170 } else { 1171 // Skip this entry as the key might have been invalidated. Since the 1172 // invalidated ValueEntry (which should be still in the write op 1173 // queue) has a pointer to this node, move the node to the back of 1174 // the deque instead of popping (dropping) it. 1175 deqs.write_order.move_front_to_back(); 1176 } 1177 } 1178 } 1179 1180 fn evict_lru_entries( 1181 &self, 1182 deqs: &mut Deques<K>, 1183 batch_size: usize, 1184 weights_to_evict: u64, 1185 counters: &mut EvictionCounters, 1186 ) { 1187 const DEQ_NAME: &str = "probation"; 1188 let mut evicted = 0u64; 1189 let (deq, write_order_deq) = (&mut deqs.probation, &mut deqs.write_order); 1190 1191 for _ in 0..batch_size { 1192 if evicted >= weights_to_evict { 1193 break; 1194 } 1195 1196 let maybe_key_and_ts = deq.peek_front().map(|node| { 1197 let entry_info = node.element.entry_info(); 1198 ( 1199 Arc::clone(node.element.key()), 1200 entry_info.is_dirty(), 1201 entry_info.last_modified(), 1202 ) 1203 }); 1204 1205 let (key, ts) = match maybe_key_and_ts { 1206 Some((key, false, Some(ts))) => (key, ts), 1207 // TODO: Remove the second pattern `Some((_key, false, None))` once we change 1208 // `last_modified` and `last_accessed` in `EntryInfo` from `Option<Instant>` to 1209 // `Instant`. 1210 Some((key, true, _)) | Some((key, false, None)) => { 1211 if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { 1212 continue; 1213 } else { 1214 break; 1215 } 1216 } 1217 None => break, 1218 }; 1219 1220 let maybe_entry = self.cache.remove_if(&key, |_, v| { 1221 if let Some(lm) = v.last_modified() { 1222 lm == ts 1223 } else { 1224 false 1225 } 1226 }); 1227 1228 if let Some((_k, entry)) = maybe_entry { 1229 let weight = entry.policy_weight(); 1230 Self::handle_remove_with_deques(DEQ_NAME, deq, write_order_deq, entry, counters); 1231 evicted = evicted.saturating_add(weight as u64); 1232 } else if !self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { 1233 break; 1234 } 1235 } 1236 } 1237} 1238 1239// 1240// for testing 1241// 1242#[cfg(test)] 1243impl<K, V, S> Inner<K, V, S> 1244where 1245 K: Hash + Eq, 1246 S: BuildHasher + Clone, 1247{ 1248 fn set_expiration_clock(&self, clock: Option<Clock>) { 1249 let mut exp_clock = self.expiration_clock.write().expect("lock poisoned"); 1250 if let Some(clock) = clock { 1251 *exp_clock = Some(clock); 1252 self.has_expiration_clock.store(true, Ordering::SeqCst); 1253 } else { 1254 self.has_expiration_clock.store(false, Ordering::SeqCst); 1255 *exp_clock = None; 1256 } 1257 } 1258} 1259 1260// 1261// private free-standing functions 1262// 1263#[inline] 1264fn is_expired_entry_ao( 1265 time_to_idle: &Option<Duration>, 1266 valid_after: &Option<Instant>, 1267 entry: &impl AccessTime, 1268 now: Instant, 1269) -> bool { 1270 if let Some(ts) = entry.last_accessed() { 1271 if let Some(va) = valid_after { 1272 if ts < *va { 1273 return true; 1274 } 1275 } 1276 if let Some(tti) = time_to_idle { 1277 let checked_add = ts.checked_add(*tti); 1278 if checked_add.is_none() { 1279 panic!("ttl overflow") 1280 } 1281 return checked_add.unwrap() <= now; 1282 } 1283 } 1284 false 1285} 1286 1287#[inline] 1288fn is_expired_entry_wo( 1289 time_to_live: &Option<Duration>, 1290 valid_after: &Option<Instant>, 1291 entry: &impl AccessTime, 1292 now: Instant, 1293) -> bool { 1294 if let Some(ts) = entry.last_modified() { 1295 if let Some(va) = valid_after { 1296 if ts < *va { 1297 return true; 1298 } 1299 } 1300 if let Some(ttl) = time_to_live { 1301 let checked_add = ts.checked_add(*ttl); 1302 if checked_add.is_none() { 1303 panic!("ttl overflow"); 1304 } 1305 return checked_add.unwrap() <= now; 1306 } 1307 } 1308 false 1309} 1310 1311#[cfg(test)] 1312mod tests { 1313 use super::BaseCache; 1314 1315 #[cfg_attr(target_pointer_width = "16", ignore)] 1316 #[test] 1317 fn test_skt_capacity_will_not_overflow() { 1318 use std::collections::hash_map::RandomState; 1319 1320 // power of two 1321 let pot = |exp| 2u64.pow(exp); 1322 1323 let ensure_sketch_len = |max_capacity, len, name| { 1324 let cache = BaseCache::<u8, u8>::new( 1325 Some(max_capacity), 1326 None, 1327 RandomState::default(), 1328 None, 1329 None, 1330 None, 1331 ); 1332 cache.inner.enable_frequency_sketch_for_testing(); 1333 assert_eq!( 1334 cache 1335 .inner 1336 .frequency_sketch 1337 .read() 1338 .expect("lock poisoned") 1339 .table_len(), 1340 len as usize, 1341 "{}", 1342 name 1343 ); 1344 }; 1345 1346 if cfg!(target_pointer_width = "32") { 1347 let pot24 = pot(24); 1348 let pot16 = pot(16); 1349 ensure_sketch_len(0, 128, "0"); 1350 ensure_sketch_len(128, 128, "128"); 1351 ensure_sketch_len(pot16, pot16, "pot16"); 1352 // due to ceiling to next_power_of_two 1353 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1"); 1354 // due to ceiling to next_power_of_two 1355 ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1"); 1356 ensure_sketch_len(pot24, pot24, "pot24"); 1357 ensure_sketch_len(pot(27), pot24, "pot(27)"); 1358 ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX"); 1359 } else { 1360 // target_pointer_width: 64 or larger. 1361 let pot30 = pot(30); 1362 let pot16 = pot(16); 1363 ensure_sketch_len(0, 128, "0"); 1364 ensure_sketch_len(128, 128, "128"); 1365 ensure_sketch_len(pot16, pot16, "pot16"); 1366 // due to ceiling to next_power_of_two 1367 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1"); 1368 1369 // The following tests will allocate large memory (~8GiB). 1370 // Skip when running on Circle CI. 1371 if !cfg!(circleci) { 1372 // due to ceiling to next_power_of_two 1373 ensure_sketch_len(pot30 - 1, pot30, "pot30- 1"); 1374 ensure_sketch_len(pot30, pot30, "pot30"); 1375 ensure_sketch_len(u64::MAX, pot30, "u64::MAX"); 1376 } 1377 }; 1378 } 1379}