forked from
nonbinary.computer/jacquard
A better Rust ATProto crate
1use super::{iter::DashMapIter, Iter};
2use crate::{
3 common::{
4 self,
5 concurrent::{
6 atomic_time::AtomicInstant,
7 constants::{
8 READ_LOG_FLUSH_POINT, READ_LOG_SIZE, WRITE_LOG_FLUSH_POINT, WRITE_LOG_SIZE,
9 },
10 deques::Deques,
11 entry_info::EntryInfo,
12 housekeeper::{Housekeeper, InnerSync},
13 AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher,
14 WriteOp,
15 },
16 deque::{DeqNode, Deque},
17 frequency_sketch::FrequencySketch,
18 time::{CheckedTimeOps, Clock, Duration, Instant},
19 CacheRegion,
20 },
21 Policy,
22};
23
24use crossbeam_channel::{Receiver, Sender, TrySendError};
25use crossbeam_utils::atomic::AtomicCell;
26use dashmap::mapref::one::Ref as DashMapRef;
27use smallvec::SmallVec;
28use std::{
29 borrow::Borrow,
30 collections::hash_map::RandomState,
31 hash::{BuildHasher, Hash},
32 ptr::NonNull,
33 sync::{
34 atomic::{AtomicBool, Ordering},
35 Arc, Mutex, RwLock,
36 },
37};
38use triomphe::Arc as TrioArc;
39
40pub(crate) struct BaseCache<K, V, S = RandomState> {
41 pub(crate) inner: Arc<Inner<K, V, S>>,
42 read_op_ch: Sender<ReadOp<K, V>>,
43 pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
44 pub(crate) housekeeper: Option<Arc<Housekeeper>>,
45}
46
47impl<K, V, S> Clone for BaseCache<K, V, S> {
48 /// Makes a clone of this shared cache.
49 ///
50 /// This operation is cheap as it only creates thread-safe reference counted
51 /// pointers to the shared internal data structures.
52 fn clone(&self) -> Self {
53 Self {
54 inner: Arc::clone(&self.inner),
55 read_op_ch: self.read_op_ch.clone(),
56 write_op_ch: self.write_op_ch.clone(),
57 housekeeper: self.housekeeper.clone(),
58 }
59 }
60}
61
62impl<K, V, S> Drop for BaseCache<K, V, S> {
63 fn drop(&mut self) {
64 // The housekeeper needs to be dropped before the inner is dropped.
65 std::mem::drop(self.housekeeper.take());
66 }
67}
68
69impl<K, V, S> BaseCache<K, V, S> {
70 pub(crate) fn policy(&self) -> Policy {
71 self.inner.policy()
72 }
73
74 pub(crate) fn entry_count(&self) -> u64 {
75 self.inner.entry_count()
76 }
77
78 pub(crate) fn weighted_size(&self) -> u64 {
79 self.inner.weighted_size()
80 }
81}
82
83impl<K, V, S> BaseCache<K, V, S>
84where
85 K: Hash + Eq + Send + Sync + 'static,
86 V: Clone + Send + Sync + 'static,
87 S: BuildHasher + Clone + Send + Sync + 'static,
88{
89 pub(crate) fn new(
90 max_capacity: Option<u64>,
91 initial_capacity: Option<usize>,
92 build_hasher: S,
93 weigher: Option<Weigher<K, V>>,
94 time_to_live: Option<Duration>,
95 time_to_idle: Option<Duration>,
96 ) -> Self {
97 let (r_snd, r_rcv) = crossbeam_channel::bounded(READ_LOG_SIZE);
98 let (w_snd, w_rcv) = crossbeam_channel::bounded(WRITE_LOG_SIZE);
99
100 let inner = Inner::new(
101 max_capacity,
102 initial_capacity,
103 build_hasher,
104 weigher,
105 r_rcv,
106 w_rcv,
107 time_to_live,
108 time_to_idle,
109 );
110 Self {
111 #[cfg_attr(beta_clippy, allow(clippy::arc_with_non_send_sync))]
112 inner: Arc::new(inner),
113 read_op_ch: r_snd,
114 write_op_ch: w_snd,
115 housekeeper: Some(Arc::new(Housekeeper::default())),
116 }
117 }
118
119 #[inline]
120 pub(crate) fn hash<Q>(&self, key: &Q) -> u64
121 where
122 Arc<K>: Borrow<Q>,
123 Q: Hash + Eq + ?Sized,
124 {
125 self.inner.hash(key)
126 }
127
128 pub(crate) fn contains_key<Q>(&self, key: &Q) -> bool
129 where
130 Arc<K>: Borrow<Q>,
131 Q: Hash + Eq + ?Sized,
132 {
133 match self.inner.get(key) {
134 None => false,
135 Some(entry) => {
136 let i = &self.inner;
137 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
138 let now = i.current_time_from_expiration_clock();
139 let entry = &*entry;
140
141 !is_expired_entry_wo(ttl, va, entry, now)
142 && !is_expired_entry_ao(tti, va, entry, now)
143 }
144 }
145 }
146
147 pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<V>
148 where
149 Arc<K>: Borrow<Q>,
150 Q: Hash + Eq + ?Sized,
151 {
152 let record = |op, now| {
153 self.record_read_op(op, now)
154 .expect("Failed to record a get op");
155 };
156 let now = self.inner.current_time_from_expiration_clock();
157
158 match self.inner.get(key) {
159 None => {
160 record(ReadOp::Miss(hash), now);
161 None
162 }
163 Some(entry) => {
164 let i = &self.inner;
165 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
166 let arc_entry = &*entry;
167
168 if is_expired_entry_wo(ttl, va, arc_entry, now)
169 || is_expired_entry_ao(tti, va, arc_entry, now)
170 {
171 // Drop the entry to avoid to deadlock with record_read_op.
172 std::mem::drop(entry);
173 // Expired or invalidated entry. Record this access as a cache miss
174 // rather than a hit.
175 record(ReadOp::Miss(hash), now);
176 None
177 } else {
178 // Valid entry.
179 let v = arc_entry.value.clone();
180 let e = TrioArc::clone(arc_entry);
181 // Drop the entry to avoid to deadlock with record_read_op.
182 std::mem::drop(entry);
183 record(ReadOp::Hit(hash, e, now), now);
184 Some(v)
185 }
186 }
187 }
188 }
189
190 #[inline]
191 pub(crate) fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>>
192 where
193 Arc<K>: Borrow<Q>,
194 Q: Hash + Eq + ?Sized,
195 {
196 self.inner.remove_entry(key)
197 }
198
199 #[inline]
200 pub(crate) fn apply_reads_writes_if_needed(
201 inner: &impl InnerSync,
202 ch: &Sender<WriteOp<K, V>>,
203 now: Instant,
204 housekeeper: Option<&Arc<Housekeeper>>,
205 ) {
206 let w_len = ch.len();
207
208 if let Some(hk) = housekeeper {
209 if hk.should_apply_writes(w_len, now) {
210 hk.try_sync(inner);
211 }
212 }
213 }
214
215 pub(crate) fn invalidate_all(&self) {
216 let now = self.inner.current_time_from_expiration_clock();
217 self.inner.set_valid_after(now);
218 }
219}
220
221// Clippy beta 0.1.83 (f41c7ed9889 2024-10-31) warns about unused lifetimes on 'a.
222// This seems a false positive. The lifetimes are used in the trait bounds.
223// https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
224#[allow(clippy::extra_unused_lifetimes)]
225impl<'a, K, V, S> BaseCache<K, V, S>
226where
227 K: 'a + Eq + Hash,
228 V: 'a,
229 S: BuildHasher + Clone,
230{
231 pub(crate) fn iter(&self) -> Iter<'_, K, V, S> {
232 Iter::new(self, self.inner.iter())
233 }
234}
235
236impl<K, V, S> BaseCache<K, V, S> {
237 pub(crate) fn is_expired_entry(&self, entry: &TrioArc<ValueEntry<K, V>>) -> bool {
238 let i = &self.inner;
239 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
240 let now = i.current_time_from_expiration_clock();
241
242 is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now)
243 }
244}
245
246//
247// private methods
248//
249impl<K, V, S> BaseCache<K, V, S>
250where
251 K: Hash + Eq + Send + Sync + 'static,
252 V: Clone + Send + Sync + 'static,
253 S: BuildHasher + Clone + Send + Sync + 'static,
254{
255 #[inline]
256 fn record_read_op(
257 &self,
258 op: ReadOp<K, V>,
259 now: Instant,
260 ) -> Result<(), TrySendError<ReadOp<K, V>>> {
261 self.apply_reads_if_needed(self.inner.as_ref(), now);
262 let ch = &self.read_op_ch;
263 match ch.try_send(op) {
264 // Discard the ReadOp when the channel is full.
265 Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
266 Err(e @ TrySendError::Disconnected(_)) => Err(e),
267 }
268 }
269
270 #[inline]
271 pub(crate) fn do_insert_with_hash(
272 &self,
273 key: Arc<K>,
274 hash: u64,
275 value: V,
276 ) -> (WriteOp<K, V>, Instant) {
277 let ts = self.inner.current_time_from_expiration_clock();
278 let weight = self.inner.weigh(&key, &value);
279 let mut insert_op = None;
280 let mut update_op = None;
281
282 self.inner
283 .cache
284 .entry(Arc::clone(&key))
285 // Update
286 .and_modify(|entry| {
287 // NOTES on `new_value_entry_from` method:
288 // 1. The internal EntryInfo will be shared between the old and new
289 // ValueEntries.
290 // 2. This method will set the dirty flag to prevent this new
291 // ValueEntry from being evicted by an expiration policy.
292 // 3. This method will update the policy_weight with the new weight.
293 let old_weight = entry.policy_weight();
294 *entry = self.new_value_entry_from(value.clone(), ts, weight, entry);
295 update_op = Some(WriteOp::Upsert {
296 key_hash: KeyHash::new(Arc::clone(&key), hash),
297 value_entry: TrioArc::clone(entry),
298 old_weight,
299 new_weight: weight,
300 });
301 })
302 // Insert
303 .or_insert_with(|| {
304 let entry = self.new_value_entry(value.clone(), ts, weight);
305 insert_op = Some(WriteOp::Upsert {
306 key_hash: KeyHash::new(Arc::clone(&key), hash),
307 value_entry: TrioArc::clone(&entry),
308 old_weight: 0,
309 new_weight: weight,
310 });
311 entry
312 });
313
314 match (insert_op, update_op) {
315 (Some(ins_op), None) => (ins_op, ts),
316 (None, Some(upd_op)) => (upd_op, ts),
317 _ => unreachable!(),
318 }
319 }
320
321 #[inline]
322 fn new_value_entry(
323 &self,
324 value: V,
325 timestamp: Instant,
326 policy_weight: u32,
327 ) -> TrioArc<ValueEntry<K, V>> {
328 let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight));
329 TrioArc::new(ValueEntry::new(value, info))
330 }
331
332 #[inline]
333 fn new_value_entry_from(
334 &self,
335 value: V,
336 timestamp: Instant,
337 policy_weight: u32,
338 other: &ValueEntry<K, V>,
339 ) -> TrioArc<ValueEntry<K, V>> {
340 let info = TrioArc::clone(other.entry_info());
341 // To prevent this updated ValueEntry from being evicted by an expiration policy,
342 // set the dirty flag to true. It will be reset to false when the write is applied.
343 info.set_dirty(true);
344 info.set_last_accessed(timestamp);
345 info.set_last_modified(timestamp);
346 info.set_policy_weight(policy_weight);
347 TrioArc::new(ValueEntry::new(value, info))
348 }
349
350 #[inline]
351 fn apply_reads_if_needed(&self, inner: &impl InnerSync, now: Instant) {
352 let len = self.read_op_ch.len();
353
354 if let Some(hk) = &self.housekeeper {
355 if hk.should_apply_reads(len, now) {
356 if let Some(h) = &self.housekeeper {
357 h.try_sync(inner);
358 }
359 }
360 }
361 }
362
363 #[inline]
364 pub(crate) fn current_time_from_expiration_clock(&self) -> Instant {
365 self.inner.current_time_from_expiration_clock()
366 }
367}
368
369//
370// for testing
371//
372#[cfg(test)]
373impl<K, V, S> BaseCache<K, V, S>
374where
375 K: Hash + Eq + Send + Sync + 'static,
376 V: Clone + Send + Sync + 'static,
377 S: BuildHasher + Clone + Send + Sync + 'static,
378{
379 pub(crate) fn reconfigure_for_testing(&mut self) {
380 // Enable the frequency sketch.
381 self.inner.enable_frequency_sketch_for_testing();
382 }
383
384 pub(crate) fn set_expiration_clock(&self, clock: Option<Clock>) {
385 self.inner.set_expiration_clock(clock);
386 }
387}
388
389struct EvictionCounters {
390 entry_count: u64,
391 weighted_size: u64,
392}
393
394impl EvictionCounters {
395 #[inline]
396 fn new(entry_count: u64, weighted_size: u64) -> Self {
397 Self {
398 entry_count,
399 weighted_size,
400 }
401 }
402
403 #[inline]
404 fn saturating_add(&mut self, entry_count: u64, weight: u32) {
405 self.entry_count += entry_count;
406 let total = &mut self.weighted_size;
407 *total = total.saturating_add(weight as u64);
408 }
409
410 #[inline]
411 fn saturating_sub(&mut self, entry_count: u64, weight: u32) {
412 self.entry_count -= entry_count;
413 let total = &mut self.weighted_size;
414 *total = total.saturating_sub(weight as u64);
415 }
416}
417
418#[derive(Default)]
419struct EntrySizeAndFrequency {
420 policy_weight: u64,
421 freq: u32,
422}
423
424impl EntrySizeAndFrequency {
425 fn new(policy_weight: u32) -> Self {
426 Self {
427 policy_weight: policy_weight as u64,
428 ..Default::default()
429 }
430 }
431
432 fn add_policy_weight(&mut self, weight: u32) {
433 self.policy_weight += weight as u64;
434 }
435
436 fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) {
437 self.freq += freq.frequency(hash) as u32;
438 }
439}
440
441// Access-Order Queue Node
442type AoqNode<K> = NonNull<DeqNode<KeyHashDate<K>>>;
443
444enum AdmissionResult<K> {
445 Admitted {
446 victim_nodes: SmallVec<[AoqNode<K>; 8]>,
447 skipped_nodes: SmallVec<[AoqNode<K>; 4]>,
448 },
449 Rejected {
450 skipped_nodes: SmallVec<[AoqNode<K>; 4]>,
451 },
452}
453
454type CacheStore<K, V, S> = dashmap::DashMap<Arc<K>, TrioArc<ValueEntry<K, V>>, S>;
455
456type CacheEntryRef<'a, K, V> = DashMapRef<'a, Arc<K>, TrioArc<ValueEntry<K, V>>>;
457
458pub(crate) struct Inner<K, V, S> {
459 max_capacity: Option<u64>,
460 entry_count: AtomicCell<u64>,
461 weighted_size: AtomicCell<u64>,
462 cache: CacheStore<K, V, S>,
463 build_hasher: S,
464 deques: Mutex<Deques<K>>,
465 frequency_sketch: RwLock<FrequencySketch>,
466 frequency_sketch_enabled: AtomicBool,
467 read_op_ch: Receiver<ReadOp<K, V>>,
468 write_op_ch: Receiver<WriteOp<K, V>>,
469 time_to_live: Option<Duration>,
470 time_to_idle: Option<Duration>,
471 valid_after: AtomicInstant,
472 weigher: Option<Weigher<K, V>>,
473 has_expiration_clock: AtomicBool,
474 expiration_clock: RwLock<Option<Clock>>,
475}
476
477// functions/methods used by BaseCache
478impl<K, V, S> Inner<K, V, S>
479where
480 K: Hash + Eq + Send + Sync + 'static,
481 V: Send + Sync + 'static,
482 S: BuildHasher + Clone,
483{
484 // Disable a Clippy warning for having more than seven arguments.
485 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
486 #[allow(clippy::too_many_arguments)]
487 fn new(
488 max_capacity: Option<u64>,
489 initial_capacity: Option<usize>,
490 build_hasher: S,
491 weigher: Option<Weigher<K, V>>,
492 read_op_ch: Receiver<ReadOp<K, V>>,
493 write_op_ch: Receiver<WriteOp<K, V>>,
494 time_to_live: Option<Duration>,
495 time_to_idle: Option<Duration>,
496 ) -> Self {
497 let initial_capacity = initial_capacity
498 .map(|cap| cap + WRITE_LOG_SIZE)
499 .unwrap_or_default();
500 let cache =
501 dashmap::DashMap::with_capacity_and_hasher(initial_capacity, build_hasher.clone());
502
503 Self {
504 max_capacity,
505 entry_count: Default::default(),
506 weighted_size: Default::default(),
507 cache,
508 build_hasher,
509 deques: Mutex::new(Default::default()),
510 frequency_sketch: RwLock::new(Default::default()),
511 frequency_sketch_enabled: Default::default(),
512 read_op_ch,
513 write_op_ch,
514 time_to_live,
515 time_to_idle,
516 valid_after: Default::default(),
517 weigher,
518 has_expiration_clock: AtomicBool::new(false),
519 expiration_clock: RwLock::new(None),
520 }
521 }
522
523 #[inline]
524 fn hash<Q>(&self, key: &Q) -> u64
525 where
526 Arc<K>: Borrow<Q>,
527 Q: Hash + Eq + ?Sized,
528 {
529 self.build_hasher.hash_one(key)
530 }
531
532 #[inline]
533 fn get<Q>(&self, key: &Q) -> Option<CacheEntryRef<'_, K, V>>
534 where
535 Arc<K>: Borrow<Q>,
536 Q: Hash + Eq + ?Sized,
537 {
538 self.cache.get(key)
539 }
540
541 #[inline]
542 fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>>
543 where
544 Arc<K>: Borrow<Q>,
545 Q: Hash + Eq + ?Sized,
546 {
547 self.cache
548 .remove(key)
549 .map(|(key, entry)| KvEntry::new(key, entry))
550 }
551}
552
553// functions/methods used by BaseCache
554impl<K, V, S> Inner<K, V, S> {
555 fn policy(&self) -> Policy {
556 Policy::new(self.max_capacity, self.time_to_live, self.time_to_idle)
557 }
558
559 #[inline]
560 fn time_to_live(&self) -> Option<Duration> {
561 self.time_to_live
562 }
563
564 #[inline]
565 fn time_to_idle(&self) -> Option<Duration> {
566 self.time_to_idle
567 }
568
569 #[inline]
570 fn entry_count(&self) -> u64 {
571 self.entry_count.load()
572 }
573
574 #[inline]
575 pub(crate) fn weighted_size(&self) -> u64 {
576 self.weighted_size.load()
577 }
578
579 #[inline]
580 fn has_expiry(&self) -> bool {
581 self.time_to_live.is_some() || self.time_to_idle.is_some()
582 }
583
584 #[inline]
585 fn is_write_order_queue_enabled(&self) -> bool {
586 self.time_to_live.is_some()
587 }
588
589 #[inline]
590 fn valid_after(&self) -> Option<Instant> {
591 self.valid_after.instant()
592 }
593
594 #[inline]
595 fn set_valid_after(&self, timestamp: Instant) {
596 self.valid_after.set_instant(timestamp);
597 }
598
599 #[inline]
600 fn has_valid_after(&self) -> bool {
601 self.valid_after.is_set()
602 }
603
604 #[inline]
605 fn weigh(&self, key: &K, value: &V) -> u32 {
606 self.weigher.as_ref().map(|w| w(key, value)).unwrap_or(1)
607 }
608
609 #[inline]
610 fn current_time_from_expiration_clock(&self) -> Instant {
611 if self.has_expiration_clock.load(Ordering::Relaxed) {
612 Instant::new(
613 self.expiration_clock
614 .read()
615 .expect("lock poisoned")
616 .as_ref()
617 .expect("Cannot get the expiration clock")
618 .now(),
619 )
620 } else {
621 Instant::now()
622 }
623 }
624}
625
626// Clippy beta 0.1.83 (f41c7ed9889 2024-10-31) warns about unused lifetimes on 'a.
627// This seems a false positive. The lifetimes are used in the trait bounds.
628// https://rust-lang.github.io/rust-clippy/master/index.html#extra_unused_lifetimes
629#[allow(clippy::extra_unused_lifetimes)]
630impl<'a, K, V, S> Inner<K, V, S>
631where
632 K: 'a + Eq + Hash,
633 V: 'a,
634 S: BuildHasher + Clone,
635{
636 fn iter(&self) -> DashMapIter<'_, K, V, S> {
637 self.cache.iter()
638 }
639}
640
641mod batch_size {
642 pub(crate) const EVICTION_BATCH_SIZE: usize = 500;
643}
644
645// TODO: Divide this method into smaller methods so that unit tests can do more
646// precise testing.
647// - sync_reads
648// - sync_writes
649// - evict
650// - invalidate_entries
651impl<K, V, S> InnerSync for Inner<K, V, S>
652where
653 K: Hash + Eq + Send + Sync + 'static,
654 V: Send + Sync + 'static,
655 S: BuildHasher + Clone + Send + Sync + 'static,
656{
657 fn sync(&self, max_repeats: usize) {
658 let mut deqs = self.deques.lock().expect("lock poisoned");
659 let mut calls = 0;
660 let mut should_sync = true;
661
662 let current_ec = self.entry_count.load();
663 let current_ws = self.weighted_size.load();
664 let mut counters = EvictionCounters::new(current_ec, current_ws);
665
666 while should_sync && calls <= max_repeats {
667 let r_len = self.read_op_ch.len();
668 if r_len > 0 {
669 self.apply_reads(&mut deqs, r_len);
670 }
671
672 let w_len = self.write_op_ch.len();
673 if w_len > 0 {
674 self.apply_writes(&mut deqs, w_len, &mut counters);
675 }
676
677 if self.should_enable_frequency_sketch(&counters) {
678 self.enable_frequency_sketch(&counters);
679 }
680
681 calls += 1;
682 should_sync = self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
683 || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT;
684 }
685
686 if self.has_expiry() || self.has_valid_after() {
687 self.evict_expired(&mut deqs, batch_size::EVICTION_BATCH_SIZE, &mut counters);
688 }
689
690 // Evict if this cache has more entries than its capacity.
691 let weights_to_evict = self.weights_to_evict(&counters);
692 if weights_to_evict > 0 {
693 self.evict_lru_entries(
694 &mut deqs,
695 batch_size::EVICTION_BATCH_SIZE,
696 weights_to_evict,
697 &mut counters,
698 );
699 }
700
701 debug_assert_eq!(self.entry_count.load(), current_ec);
702 debug_assert_eq!(self.weighted_size.load(), current_ws);
703 self.entry_count.store(counters.entry_count);
704 self.weighted_size.store(counters.weighted_size);
705 }
706
707 fn now(&self) -> Instant {
708 self.current_time_from_expiration_clock()
709 }
710}
711
712//
713// private methods
714//
715impl<K, V, S> Inner<K, V, S>
716where
717 K: Hash + Eq + Send + Sync + 'static,
718 V: Send + Sync + 'static,
719 S: BuildHasher + Clone + Send + Sync + 'static,
720{
721 fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool {
722 self.max_capacity
723 .map(|limit| counters.weighted_size + candidate_weight as u64 <= limit)
724 .unwrap_or(true)
725 }
726
727 fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 {
728 self.max_capacity
729 .map(|limit| counters.weighted_size.saturating_sub(limit))
730 .unwrap_or_default()
731 }
732
733 #[inline]
734 fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
735 if self.frequency_sketch_enabled.load(Ordering::Acquire) {
736 false
737 } else if let Some(max_cap) = self.max_capacity {
738 counters.weighted_size >= max_cap / 2
739 } else {
740 false
741 }
742 }
743
744 #[inline]
745 fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
746 if let Some(max_cap) = self.max_capacity {
747 let c = counters;
748 let cap = if self.weigher.is_none() {
749 max_cap
750 } else {
751 (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
752 };
753 self.do_enable_frequency_sketch(cap);
754 }
755 }
756
757 #[cfg(test)]
758 fn enable_frequency_sketch_for_testing(&self) {
759 if let Some(max_cap) = self.max_capacity {
760 self.do_enable_frequency_sketch(max_cap);
761 }
762 }
763
764 #[inline]
765 fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
766 let skt_capacity = common::sketch_capacity(cache_capacity);
767 self.frequency_sketch
768 .write()
769 .expect("lock poisoned")
770 .ensure_capacity(skt_capacity);
771 self.frequency_sketch_enabled.store(true, Ordering::Release);
772 }
773
774 fn apply_reads(&self, deqs: &mut Deques<K>, count: usize) {
775 use ReadOp::*;
776 let mut freq = self.frequency_sketch.write().expect("lock poisoned");
777 let ch = &self.read_op_ch;
778 for _ in 0..count {
779 match ch.try_recv() {
780 Ok(Hit(hash, entry, timestamp)) => {
781 freq.increment(hash);
782 entry.set_last_accessed(timestamp);
783 if entry.is_admitted() {
784 deqs.move_to_back_ao(&entry);
785 }
786 }
787 Ok(Miss(hash)) => freq.increment(hash),
788 Err(_) => break,
789 }
790 }
791 }
792
793 fn apply_writes(&self, deqs: &mut Deques<K>, count: usize, counters: &mut EvictionCounters) {
794 use WriteOp::*;
795 let freq = self.frequency_sketch.read().expect("lock poisoned");
796 let ch = &self.write_op_ch;
797
798 for _ in 0..count {
799 match ch.try_recv() {
800 Ok(Upsert {
801 key_hash: kh,
802 value_entry: entry,
803 old_weight,
804 new_weight,
805 }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters),
806 Ok(Remove(KvEntry { key: _key, entry })) => {
807 Self::handle_remove(deqs, entry, counters)
808 }
809 Err(_) => break,
810 };
811 }
812 }
813
814 #[allow(clippy::too_many_arguments)]
815 fn handle_upsert(
816 &self,
817 kh: KeyHash<K>,
818 entry: TrioArc<ValueEntry<K, V>>,
819 old_weight: u32,
820 new_weight: u32,
821 deqs: &mut Deques<K>,
822 freq: &FrequencySketch,
823 counters: &mut EvictionCounters,
824 ) {
825 entry.set_dirty(false);
826
827 if entry.is_admitted() {
828 // The entry has been already admitted, so treat this as an update.
829 counters.saturating_sub(0, old_weight);
830 counters.saturating_add(0, new_weight);
831 deqs.move_to_back_ao(&entry);
832 deqs.move_to_back_wo(&entry);
833 return;
834 }
835
836 if self.has_enough_capacity(new_weight, counters) {
837 // There are enough room in the cache (or the cache is unbounded).
838 // Add the candidate to the deques.
839 self.handle_admit(kh, &entry, new_weight, deqs, counters);
840 return;
841 }
842
843 if let Some(max) = self.max_capacity {
844 if new_weight as u64 > max {
845 // The candidate is too big to fit in the cache. Reject it.
846 self.cache.remove(&Arc::clone(&kh.key));
847 return;
848 }
849 }
850
851 let skipped_nodes;
852 let mut candidate = EntrySizeAndFrequency::new(new_weight);
853 candidate.add_frequency(freq, kh.hash);
854
855 // Try to admit the candidate.
856 match Self::admit(&candidate, &self.cache, deqs, freq) {
857 AdmissionResult::Admitted {
858 victim_nodes,
859 skipped_nodes: mut skipped,
860 } => {
861 // Try to remove the victims from the cache (hash map).
862 for victim in victim_nodes {
863 if let Some((_vic_key, vic_entry)) =
864 self.cache.remove(unsafe { victim.as_ref().element.key() })
865 {
866 // And then remove the victim from the deques.
867 Self::handle_remove(deqs, vic_entry, counters);
868 } else {
869 // Could not remove the victim from the cache. Skip this
870 // victim node as its ValueEntry might have been
871 // invalidated. Add it to the skipped nodes.
872 skipped.push(victim);
873 }
874 }
875 skipped_nodes = skipped;
876
877 // Add the candidate to the deques.
878 self.handle_admit(kh, &entry, new_weight, deqs, counters);
879 }
880 AdmissionResult::Rejected { skipped_nodes: s } => {
881 skipped_nodes = s;
882 // Remove the candidate from the cache (hash map).
883 self.cache.remove(&Arc::clone(&kh.key));
884 }
885 };
886
887 // Move the skipped nodes to the back of the deque. We do not unlink (drop)
888 // them because ValueEntries in the write op queue should be pointing them.
889 for node in skipped_nodes {
890 unsafe { deqs.probation.move_to_back(node) };
891 }
892 }
893
894 /// Performs size-aware admission explained in the paper:
895 /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper]
896 /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes.
897 ///
898 /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770
899 ///
900 /// There are some modifications in this implementation:
901 /// - To admit to the main space, candidate's frequency must be higher than
902 /// the aggregated frequencies of the potential victims. (In the paper,
903 /// `>=` operator is used rather than `>`) The `>` operator will do a better
904 /// job to prevent the main space from polluting.
905 /// - When a candidate is rejected, the potential victims will stay at the LRU
906 /// position of the probation access-order queue. (In the paper, they will be
907 /// promoted (to the MRU position?) to force the eviction policy to select a
908 /// different set of victims for the next candidate). We may implement the
909 /// paper's behavior later?
910 ///
911 #[inline]
912 fn admit(
913 candidate: &EntrySizeAndFrequency,
914 cache: &CacheStore<K, V, S>,
915 deqs: &Deques<K>,
916 freq: &FrequencySketch,
917 ) -> AdmissionResult<K> {
918 const MAX_CONSECUTIVE_RETRIES: usize = 5;
919 let mut retries = 0;
920
921 let mut victims = EntrySizeAndFrequency::default();
922 let mut victim_nodes = SmallVec::default();
923 let mut skipped_nodes = SmallVec::default();
924
925 // Get first potential victim at the LRU position.
926 let mut next_victim = deqs.probation.peek_front_ptr();
927
928 // Aggregate potential victims.
929 while victims.policy_weight < candidate.policy_weight {
930 if candidate.freq < victims.freq {
931 break;
932 }
933 if let Some(victim) = next_victim.take() {
934 next_victim = DeqNode::next_node_ptr(victim);
935 let vic_elem = &unsafe { victim.as_ref() }.element;
936
937 if let Some(vic_entry) = cache.get(vic_elem.key()) {
938 victims.add_policy_weight(vic_entry.policy_weight());
939 victims.add_frequency(freq, vic_elem.hash());
940 victim_nodes.push(victim);
941 retries = 0;
942 } else {
943 // Could not get the victim from the cache (hash map). Skip this node
944 // as its ValueEntry might have been invalidated.
945 skipped_nodes.push(victim);
946
947 retries += 1;
948 if retries > MAX_CONSECUTIVE_RETRIES {
949 break;
950 }
951 }
952 } else {
953 // No more potential victims.
954 break;
955 }
956 }
957
958 // Admit or reject the candidate.
959
960 // TODO: Implement some randomness to mitigate hash DoS attack.
961 // See Caffeine's implementation.
962
963 if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
964 AdmissionResult::Admitted {
965 victim_nodes,
966 skipped_nodes,
967 }
968 } else {
969 AdmissionResult::Rejected { skipped_nodes }
970 }
971 }
972
973 fn handle_admit(
974 &self,
975 kh: KeyHash<K>,
976 entry: &TrioArc<ValueEntry<K, V>>,
977 policy_weight: u32,
978 deqs: &mut Deques<K>,
979 counters: &mut EvictionCounters,
980 ) {
981 let key = Arc::clone(&kh.key);
982 counters.saturating_add(1, policy_weight);
983 deqs.push_back_ao(
984 CacheRegion::MainProbation,
985 KeyHashDate::new(kh, entry.entry_info()),
986 entry,
987 );
988 if self.is_write_order_queue_enabled() {
989 deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry);
990 }
991 entry.set_admitted(true);
992 }
993
994 fn handle_remove(
995 deqs: &mut Deques<K>,
996 entry: TrioArc<ValueEntry<K, V>>,
997 counters: &mut EvictionCounters,
998 ) {
999 if entry.is_admitted() {
1000 entry.set_admitted(false);
1001 counters.saturating_sub(1, entry.policy_weight());
1002 // The following two unlink_* functions will unset the deq nodes.
1003 deqs.unlink_ao(&entry);
1004 Deques::unlink_wo(&mut deqs.write_order, &entry);
1005 } else {
1006 entry.unset_q_nodes();
1007 }
1008 }
1009
1010 fn handle_remove_with_deques(
1011 ao_deq_name: &str,
1012 ao_deq: &mut Deque<KeyHashDate<K>>,
1013 wo_deq: &mut Deque<KeyDate<K>>,
1014 entry: TrioArc<ValueEntry<K, V>>,
1015 counters: &mut EvictionCounters,
1016 ) {
1017 if entry.is_admitted() {
1018 entry.set_admitted(false);
1019 counters.saturating_sub(1, entry.policy_weight());
1020 // The following two unlink_* functions will unset the deq nodes.
1021 Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
1022 Deques::unlink_wo(wo_deq, &entry);
1023 } else {
1024 entry.unset_q_nodes();
1025 }
1026 }
1027
1028 fn evict_expired(
1029 &self,
1030 deqs: &mut Deques<K>,
1031 batch_size: usize,
1032 counters: &mut EvictionCounters,
1033 ) {
1034 let now = self.current_time_from_expiration_clock();
1035
1036 if self.is_write_order_queue_enabled() {
1037 self.remove_expired_wo(deqs, batch_size, now, counters);
1038 }
1039
1040 if self.time_to_idle.is_some() || self.has_valid_after() {
1041 let (window, probation, protected, wo) = (
1042 &mut deqs.window,
1043 &mut deqs.probation,
1044 &mut deqs.protected,
1045 &mut deqs.write_order,
1046 );
1047
1048 let mut rm_expired_ao =
1049 |name, deq| self.remove_expired_ao(name, deq, wo, batch_size, now, counters);
1050
1051 rm_expired_ao("window", window);
1052 rm_expired_ao("probation", probation);
1053 rm_expired_ao("protected", protected);
1054 }
1055 }
1056
1057 #[inline]
1058 fn remove_expired_ao(
1059 &self,
1060 deq_name: &str,
1061 deq: &mut Deque<KeyHashDate<K>>,
1062 write_order_deq: &mut Deque<KeyDate<K>>,
1063 batch_size: usize,
1064 now: Instant,
1065 counters: &mut EvictionCounters,
1066 ) {
1067 let tti = &self.time_to_idle;
1068 let va = &self.valid_after();
1069 for _ in 0..batch_size {
1070 // Peek the front node of the deque and check if it is expired.
1071 let key = deq.peek_front().and_then(|node| {
1072 // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example.
1073 if is_expired_entry_ao(tti, va, node, now) {
1074 Some(Arc::clone(node.element.key()))
1075 } else {
1076 None
1077 }
1078 });
1079
1080 if key.is_none() {
1081 break;
1082 }
1083
1084 let key = key.as_ref().unwrap();
1085
1086 // Remove the key from the map only when the entry is really
1087 // expired. This check is needed because it is possible that the entry in
1088 // the map has been updated or deleted but its deque node we checked
1089 // above have not been updated yet.
1090 let maybe_entry = self
1091 .cache
1092 .remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now));
1093
1094 if let Some((_k, entry)) = maybe_entry {
1095 Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry, counters);
1096 } else if !self.try_skip_updated_entry(key, deq_name, deq, write_order_deq) {
1097 break;
1098 }
1099 }
1100 }
1101
1102 #[inline]
1103 fn try_skip_updated_entry(
1104 &self,
1105 key: &K,
1106 deq_name: &str,
1107 deq: &mut Deque<KeyHashDate<K>>,
1108 write_order_deq: &mut Deque<KeyDate<K>>,
1109 ) -> bool {
1110 if let Some(entry) = self.cache.get(key) {
1111 if entry.is_dirty() {
1112 // The key exists and the entry has been updated.
1113 Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
1114 Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
1115 true
1116 } else {
1117 // The key exists but something unexpected.
1118 false
1119 }
1120 } else {
1121 // Skip this entry as the key might have been invalidated. Since the
1122 // invalidated ValueEntry (which should be still in the write op
1123 // queue) has a pointer to this node, move the node to the back of
1124 // the deque instead of popping (dropping) it.
1125 deq.move_front_to_back();
1126 true
1127 }
1128 }
1129
1130 #[inline]
1131 fn remove_expired_wo(
1132 &self,
1133 deqs: &mut Deques<K>,
1134 batch_size: usize,
1135 now: Instant,
1136 counters: &mut EvictionCounters,
1137 ) {
1138 let ttl = &self.time_to_live;
1139 let va = &self.valid_after();
1140 for _ in 0..batch_size {
1141 let key = deqs.write_order.peek_front().and_then(|node| {
1142 // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example.
1143 if is_expired_entry_wo(ttl, va, node, now) {
1144 Some(Arc::clone(node.element.key()))
1145 } else {
1146 None
1147 }
1148 });
1149
1150 if key.is_none() {
1151 break;
1152 }
1153
1154 let key = key.as_ref().unwrap();
1155
1156 let maybe_entry = self
1157 .cache
1158 .remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now));
1159
1160 if let Some((_k, entry)) = maybe_entry {
1161 Self::handle_remove(deqs, entry, counters);
1162 } else if let Some(entry) = self.cache.get(key) {
1163 if entry.is_dirty() {
1164 deqs.move_to_back_ao(&entry);
1165 deqs.move_to_back_wo(&entry);
1166 } else {
1167 // The key exists but something unexpected. Break.
1168 break;
1169 }
1170 } else {
1171 // Skip this entry as the key might have been invalidated. Since the
1172 // invalidated ValueEntry (which should be still in the write op
1173 // queue) has a pointer to this node, move the node to the back of
1174 // the deque instead of popping (dropping) it.
1175 deqs.write_order.move_front_to_back();
1176 }
1177 }
1178 }
1179
1180 fn evict_lru_entries(
1181 &self,
1182 deqs: &mut Deques<K>,
1183 batch_size: usize,
1184 weights_to_evict: u64,
1185 counters: &mut EvictionCounters,
1186 ) {
1187 const DEQ_NAME: &str = "probation";
1188 let mut evicted = 0u64;
1189 let (deq, write_order_deq) = (&mut deqs.probation, &mut deqs.write_order);
1190
1191 for _ in 0..batch_size {
1192 if evicted >= weights_to_evict {
1193 break;
1194 }
1195
1196 let maybe_key_and_ts = deq.peek_front().map(|node| {
1197 let entry_info = node.element.entry_info();
1198 (
1199 Arc::clone(node.element.key()),
1200 entry_info.is_dirty(),
1201 entry_info.last_modified(),
1202 )
1203 });
1204
1205 let (key, ts) = match maybe_key_and_ts {
1206 Some((key, false, Some(ts))) => (key, ts),
1207 // TODO: Remove the second pattern `Some((_key, false, None))` once we change
1208 // `last_modified` and `last_accessed` in `EntryInfo` from `Option<Instant>` to
1209 // `Instant`.
1210 Some((key, true, _)) | Some((key, false, None)) => {
1211 if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) {
1212 continue;
1213 } else {
1214 break;
1215 }
1216 }
1217 None => break,
1218 };
1219
1220 let maybe_entry = self.cache.remove_if(&key, |_, v| {
1221 if let Some(lm) = v.last_modified() {
1222 lm == ts
1223 } else {
1224 false
1225 }
1226 });
1227
1228 if let Some((_k, entry)) = maybe_entry {
1229 let weight = entry.policy_weight();
1230 Self::handle_remove_with_deques(DEQ_NAME, deq, write_order_deq, entry, counters);
1231 evicted = evicted.saturating_add(weight as u64);
1232 } else if !self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) {
1233 break;
1234 }
1235 }
1236 }
1237}
1238
1239//
1240// for testing
1241//
1242#[cfg(test)]
1243impl<K, V, S> Inner<K, V, S>
1244where
1245 K: Hash + Eq,
1246 S: BuildHasher + Clone,
1247{
1248 fn set_expiration_clock(&self, clock: Option<Clock>) {
1249 let mut exp_clock = self.expiration_clock.write().expect("lock poisoned");
1250 if let Some(clock) = clock {
1251 *exp_clock = Some(clock);
1252 self.has_expiration_clock.store(true, Ordering::SeqCst);
1253 } else {
1254 self.has_expiration_clock.store(false, Ordering::SeqCst);
1255 *exp_clock = None;
1256 }
1257 }
1258}
1259
1260//
1261// private free-standing functions
1262//
1263#[inline]
1264fn is_expired_entry_ao(
1265 time_to_idle: &Option<Duration>,
1266 valid_after: &Option<Instant>,
1267 entry: &impl AccessTime,
1268 now: Instant,
1269) -> bool {
1270 if let Some(ts) = entry.last_accessed() {
1271 if let Some(va) = valid_after {
1272 if ts < *va {
1273 return true;
1274 }
1275 }
1276 if let Some(tti) = time_to_idle {
1277 let checked_add = ts.checked_add(*tti);
1278 if checked_add.is_none() {
1279 panic!("ttl overflow")
1280 }
1281 return checked_add.unwrap() <= now;
1282 }
1283 }
1284 false
1285}
1286
1287#[inline]
1288fn is_expired_entry_wo(
1289 time_to_live: &Option<Duration>,
1290 valid_after: &Option<Instant>,
1291 entry: &impl AccessTime,
1292 now: Instant,
1293) -> bool {
1294 if let Some(ts) = entry.last_modified() {
1295 if let Some(va) = valid_after {
1296 if ts < *va {
1297 return true;
1298 }
1299 }
1300 if let Some(ttl) = time_to_live {
1301 let checked_add = ts.checked_add(*ttl);
1302 if checked_add.is_none() {
1303 panic!("ttl overflow");
1304 }
1305 return checked_add.unwrap() <= now;
1306 }
1307 }
1308 false
1309}
1310
1311#[cfg(test)]
1312mod tests {
1313 use super::BaseCache;
1314
1315 #[cfg_attr(target_pointer_width = "16", ignore)]
1316 #[test]
1317 fn test_skt_capacity_will_not_overflow() {
1318 use std::collections::hash_map::RandomState;
1319
1320 // power of two
1321 let pot = |exp| 2u64.pow(exp);
1322
1323 let ensure_sketch_len = |max_capacity, len, name| {
1324 let cache = BaseCache::<u8, u8>::new(
1325 Some(max_capacity),
1326 None,
1327 RandomState::default(),
1328 None,
1329 None,
1330 None,
1331 );
1332 cache.inner.enable_frequency_sketch_for_testing();
1333 assert_eq!(
1334 cache
1335 .inner
1336 .frequency_sketch
1337 .read()
1338 .expect("lock poisoned")
1339 .table_len(),
1340 len as usize,
1341 "{}",
1342 name
1343 );
1344 };
1345
1346 if cfg!(target_pointer_width = "32") {
1347 let pot24 = pot(24);
1348 let pot16 = pot(16);
1349 ensure_sketch_len(0, 128, "0");
1350 ensure_sketch_len(128, 128, "128");
1351 ensure_sketch_len(pot16, pot16, "pot16");
1352 // due to ceiling to next_power_of_two
1353 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
1354 // due to ceiling to next_power_of_two
1355 ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1");
1356 ensure_sketch_len(pot24, pot24, "pot24");
1357 ensure_sketch_len(pot(27), pot24, "pot(27)");
1358 ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX");
1359 } else {
1360 // target_pointer_width: 64 or larger.
1361 let pot30 = pot(30);
1362 let pot16 = pot(16);
1363 ensure_sketch_len(0, 128, "0");
1364 ensure_sketch_len(128, 128, "128");
1365 ensure_sketch_len(pot16, pot16, "pot16");
1366 // due to ceiling to next_power_of_two
1367 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
1368
1369 // The following tests will allocate large memory (~8GiB).
1370 // Skip when running on Circle CI.
1371 if !cfg!(circleci) {
1372 // due to ceiling to next_power_of_two
1373 ensure_sketch_len(pot30 - 1, pot30, "pot30- 1");
1374 ensure_sketch_len(pot30, pot30, "pot30");
1375 ensure_sketch_len(u64::MAX, pot30, "u64::MAX");
1376 }
1377 };
1378 }
1379}