tracks lexicons and how many times they appeared on the jetstream

refactor(server): use quanta for last activity on lexicon handles aswell

ptr.pet 1dd2ef8c 6b78fe2d

verified
+12 -32
-17
server/Cargo.lock
··· 75 75 ] 76 76 77 77 [[package]] 78 - name = "atomic-time" 79 - version = "0.1.5" 80 - source = "registry+https://github.com/rust-lang/crates.io-index" 81 - checksum = "9622f5c6fb50377516c70f65159e70b25465409760c6bd6d4e581318bf704e83" 82 - dependencies = [ 83 - "once_cell", 84 - "portable-atomic", 85 - ] 86 - 87 - [[package]] 88 78 name = "atomic-waker" 89 79 version = "1.1.2" 90 80 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1055 1045 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 1056 1046 1057 1047 [[package]] 1058 - name = "portable-atomic" 1059 - version = "1.11.1" 1060 - source = "registry+https://github.com/rust-lang/crates.io-index" 1061 - checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1062 - 1063 - [[package]] 1064 1048 name = "proc-macro2" 1065 1049 version = "1.0.95" 1066 1050 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1492 1476 dependencies = [ 1493 1477 "anyhow", 1494 1478 "async-trait", 1495 - "atomic-time", 1496 1479 "axum", 1497 1480 "axum-tws", 1498 1481 "fjall",
-1
server/Cargo.toml
··· 22 22 serde = { version = "1", features = ["derive"] } 23 23 serde_json = "1.0.141" 24 24 scc = "2.3.4" 25 - atomic-time = "0.1.5" 26 25 ordered-varint = "2.0.0" 27 26 threadpool = "1.8.1" 28 27 snmalloc-rs = "0.3.8"
+11 -13
server/src/db/mod.rs
··· 4 4 path::Path, 5 5 sync::{ 6 6 Arc, 7 - atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering}, 7 + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, 8 8 }, 9 9 time::{Duration, Instant}, 10 10 }; ··· 12 12 use atomic_time::AtomicInstant; 13 13 use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 14 14 use ordered_varint::Variable; 15 - use pingora_limits::rate::Rate; 16 15 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17 16 use smol_str::SmolStr; 18 17 use tokio::sync::broadcast; ··· 21 20 db::block::{ReadVariableExt, WriteVariableExt}, 22 21 error::{AppError, AppResult}, 23 22 jetstream::JetstreamEvent, 24 - utils::{DefaultRateTracker, RateTracker}, 23 + utils::{CLOCK, DefaultRateTracker, RateTracker}, 25 24 }; 26 25 27 26 mod block; ··· 77 76 tree: Partition, 78 77 buf: Arc<scc::Queue<EventRecord>>, 79 78 // this is stored here since scc::Queue does not have O(1) length 80 - buf_len: AtomicUsize, // relaxed 81 - last_insert: AtomicInstant, // relaxed 79 + buf_len: AtomicUsize, // relaxed 80 + last_insert: AtomicU64, // relaxed 82 81 eps: DefaultRateTracker, 83 82 } 84 83 ··· 89 88 tree: keyspace.open_partition(nsid, opts).unwrap(), 90 89 buf: Default::default(), 91 90 buf_len: AtomicUsize::new(0), 92 - last_insert: AtomicInstant::now(), 91 + last_insert: AtomicU64::new(0), 93 92 eps: RateTracker::new(Duration::from_secs(10)), 94 93 } 95 94 } ··· 98 97 self.buf_len.load(AtomicOrdering::Relaxed) 99 98 } 100 99 101 - fn last_insert(&self) -> Instant { 102 - self.last_insert.load(AtomicOrdering::Relaxed) 100 + fn since_last_activity(&self) -> u64 { 101 + CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()) 103 102 } 104 103 105 104 fn suggested_block_size(&self) -> usize { ··· 109 108 fn insert(&self, event: EventRecord) { 110 109 self.buf.push(event); 111 110 self.buf_len.fetch_add(1, AtomicOrdering::Relaxed); 112 - self.last_insert 113 - .store(Instant::now(), AtomicOrdering::Relaxed); 111 + self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed); 114 112 self.eps.observe(); 115 113 } 116 114 ··· 164 162 shutting_down: AtomicBool, 165 163 min_block_size: usize, 166 164 max_block_size: usize, 167 - max_last_activity: Duration, 165 + max_last_activity: u64, 168 166 } 169 167 170 168 impl Db { ··· 186 184 shutting_down: AtomicBool::new(false), 187 185 min_block_size: 512, 188 186 max_block_size: 500_000, 189 - max_last_activity: Duration::from_secs(10), 187 + max_last_activity: Duration::from_secs(10).as_nanos() as u64, 190 188 }) 191 189 } 192 190 ··· 205 203 for (nsid, tree) in self.hits.iter(&_guard) { 206 204 let count = tree.item_count(); 207 205 let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 208 - let is_too_old = tree.last_insert().elapsed() > self.max_last_activity; 206 + let is_too_old = tree.since_last_activity() > self.max_last_activity; 209 207 if count > 0 && (all || is_max_block_size || is_too_old) { 210 208 let nsid = nsid.clone(); 211 209 let tree = tree.clone();
+1 -1
server/src/main.rs
··· 92 92 } 93 93 }); 94 94 95 - let sync_thread = std::thread::spawn({ 95 + std::thread::spawn({ 96 96 let db = db.clone(); 97 97 move || { 98 98 loop {