Our Personal Data Server from scratch! tranquil.farm
oauth atproto pds rust postgresql objectstorage fun

feat: cache locks less

+1000 -278
+8
.config/nextest.toml
··· 29 filter = "binary(whole_story)" 30 test-group = "heavy-load-tests" 31 32 [[profile.ci.overrides]] 33 filter = "test(/import_with_verification/) | test(/plc_migration/)" 34 test-group = "serial-env-tests" ··· 40 [[profile.ci.overrides]] 41 filter = "binary(whole_story)" 42 test-group = "heavy-load-tests"
··· 29 filter = "binary(whole_story)" 30 test-group = "heavy-load-tests" 31 32 + [[profile.default.overrides]] 33 + filter = "test(/two_node_stress_concurrent_load/)" 34 + test-group = "heavy-load-tests" 35 + 36 [[profile.ci.overrides]] 37 filter = "test(/import_with_verification/) | test(/plc_migration/)" 38 test-group = "serial-env-tests" ··· 44 [[profile.ci.overrides]] 45 filter = "binary(whole_story)" 46 test-group = "heavy-load-tests" 47 + 48 + [[profile.ci.overrides]] 49 + filter = "test(/two_node_stress_concurrent_load/)" 50 + test-group = "heavy-load-tests"
+24 -22
Cargo.lock
··· 2778 "libc", 2779 "percent-encoding", 2780 "pin-project-lite", 2781 - "socket2 0.6.1", 2782 "system-configuration", 2783 "tokio", 2784 "tower-service", ··· 4242 "quinn-udp", 4243 "rustc-hash", 4244 "rustls 0.23.35", 4245 - "socket2 0.6.1", 4246 "thiserror 2.0.17", 4247 "tokio", 4248 "tracing", ··· 4279 "cfg_aliases", 4280 "libc", 4281 "once_cell", 4282 - "socket2 0.6.1", 4283 "tracing", 4284 "windows-sys 0.60.2", 4285 ] ··· 4402 "pin-project-lite", 4403 "ryu", 4404 "sha1_smol", 4405 - "socket2 0.6.1", 4406 "tokio", 4407 "tokio-util", 4408 "url", ··· 5134 5135 [[package]] 5136 name = "socket2" 5137 - version = "0.6.1" 5138 source = "registry+https://github.com/rust-lang/crates.io-index" 5139 - checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" 5140 dependencies = [ 5141 "libc", 5142 "windows-sys 0.60.2", ··· 5693 "mio", 5694 "pin-project-lite", 5695 "signal-hook-registry", 5696 - "socket2 0.6.1", 5697 "tokio-macros", 5698 "windows-sys 0.61.2", 5699 ] ··· 5797 "hyper-util", 5798 "percent-encoding", 5799 "pin-project", 5800 - "socket2 0.6.1", 5801 "sync_wrapper", 5802 "tokio", 5803 "tokio-stream", ··· 5963 5964 [[package]] 5965 name = "tranquil-auth" 5966 - version = "0.1.0" 5967 dependencies = [ 5968 "anyhow", 5969 "base32", ··· 5985 5986 [[package]] 5987 name = "tranquil-cache" 5988 - version = "0.1.0" 5989 dependencies = [ 5990 "async-trait", 5991 "base64 0.22.1", ··· 5998 5999 [[package]] 6000 name = "tranquil-comms" 6001 - version = "0.1.0" 6002 dependencies = [ 6003 "async-trait", 6004 "base64 0.22.1", ··· 6012 6013 [[package]] 6014 name = "tranquil-crypto" 6015 - version = "0.1.0" 6016 dependencies = [ 6017 "aes-gcm", 6018 "base64 0.22.1", ··· 6028 6029 [[package]] 6030 name = "tranquil-db" 6031 - version = "0.1.0" 6032 dependencies = [ 6033 "async-trait", 6034 "chrono", ··· 6045 6046 [[package]] 6047 name = "tranquil-db-traits" 6048 - version = "0.1.0" 6049 dependencies = [ 6050 "async-trait", 6051 "base64 0.22.1", ··· 6061 6062 [[package]] 6063 name = "tranquil-infra" 6064 - version = "0.1.0" 6065 dependencies = [ 6066 "async-trait", 6067 "bytes", ··· 6071 6072 [[package]] 6073 name = "tranquil-oauth" 6074 - version = "0.1.0" 6075 dependencies = [ 6076 "anyhow", 6077 "axum", ··· 6094 6095 [[package]] 6096 name = "tranquil-pds" 6097 - version = "0.1.0" 6098 dependencies = [ 6099 "aes-gcm", 6100 "anyhow", ··· 6179 6180 [[package]] 6181 name = "tranquil-repo" 6182 - version = "0.1.0" 6183 dependencies = [ 6184 "bytes", 6185 "cid", ··· 6191 6192 [[package]] 6193 name = "tranquil-ripple" 6194 - version = "0.1.0" 6195 dependencies = [ 6196 "async-trait", 6197 "backon", ··· 6199 "bytes", 6200 "foca", 6201 "futures", 6202 "parking_lot", 6203 "rand 0.9.2", 6204 "serde", 6205 "thiserror 2.0.17", 6206 "tokio", 6207 "tokio-util", ··· 6213 6214 [[package]] 6215 name = "tranquil-scopes" 6216 - version = "0.1.0" 6217 dependencies = [ 6218 "axum", 6219 "futures", ··· 6228 6229 [[package]] 6230 name = "tranquil-storage" 6231 - version = "0.1.0" 6232 dependencies = [ 6233 "async-trait", 6234 "aws-config", ··· 6244 6245 [[package]] 6246 name = "tranquil-types" 6247 - version = "0.1.0" 6248 dependencies = [ 6249 "chrono", 6250 "cid",
··· 2778 "libc", 2779 "percent-encoding", 2780 "pin-project-lite", 2781 + "socket2 0.5.10", 2782 "system-configuration", 2783 "tokio", 2784 "tower-service", ··· 4242 "quinn-udp", 4243 "rustc-hash", 4244 "rustls 0.23.35", 4245 + "socket2 0.5.10", 4246 "thiserror 2.0.17", 4247 "tokio", 4248 "tracing", ··· 4279 "cfg_aliases", 4280 "libc", 4281 "once_cell", 4282 + "socket2 0.5.10", 4283 "tracing", 4284 "windows-sys 0.60.2", 4285 ] ··· 4402 "pin-project-lite", 4403 "ryu", 4404 "sha1_smol", 4405 + "socket2 0.6.2", 4406 "tokio", 4407 "tokio-util", 4408 "url", ··· 5134 5135 [[package]] 5136 name = "socket2" 5137 + version = "0.6.2" 5138 source = "registry+https://github.com/rust-lang/crates.io-index" 5139 + checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" 5140 dependencies = [ 5141 "libc", 5142 "windows-sys 0.60.2", ··· 5693 "mio", 5694 "pin-project-lite", 5695 "signal-hook-registry", 5696 + "socket2 0.6.2", 5697 "tokio-macros", 5698 "windows-sys 0.61.2", 5699 ] ··· 5797 "hyper-util", 5798 "percent-encoding", 5799 "pin-project", 5800 + "socket2 0.6.2", 5801 "sync_wrapper", 5802 "tokio", 5803 "tokio-stream", ··· 5963 5964 [[package]] 5965 name = "tranquil-auth" 5966 + version = "0.2.0" 5967 dependencies = [ 5968 "anyhow", 5969 "base32", ··· 5985 5986 [[package]] 5987 name = "tranquil-cache" 5988 + version = "0.2.0" 5989 dependencies = [ 5990 "async-trait", 5991 "base64 0.22.1", ··· 5998 5999 [[package]] 6000 name = "tranquil-comms" 6001 + version = "0.2.0" 6002 dependencies = [ 6003 "async-trait", 6004 "base64 0.22.1", ··· 6012 6013 [[package]] 6014 name = "tranquil-crypto" 6015 + version = "0.2.0" 6016 dependencies = [ 6017 "aes-gcm", 6018 "base64 0.22.1", ··· 6028 6029 [[package]] 6030 name = "tranquil-db" 6031 + version = "0.2.0" 6032 dependencies = [ 6033 "async-trait", 6034 "chrono", ··· 6045 6046 [[package]] 6047 name = "tranquil-db-traits" 6048 + version = "0.2.0" 6049 dependencies = [ 6050 "async-trait", 6051 "base64 0.22.1", ··· 6061 6062 [[package]] 6063 name = "tranquil-infra" 6064 + version = "0.2.0" 6065 dependencies = [ 6066 "async-trait", 6067 "bytes", ··· 6071 6072 [[package]] 6073 name = "tranquil-oauth" 6074 + version = "0.2.0" 6075 dependencies = [ 6076 "anyhow", 6077 "axum", ··· 6094 6095 [[package]] 6096 name = "tranquil-pds" 6097 + version = "0.2.0" 6098 dependencies = [ 6099 "aes-gcm", 6100 "anyhow", ··· 6179 6180 [[package]] 6181 name = "tranquil-repo" 6182 + version = "0.2.0" 6183 dependencies = [ 6184 "bytes", 6185 "cid", ··· 6191 6192 [[package]] 6193 name = "tranquil-ripple" 6194 + version = "0.2.0" 6195 dependencies = [ 6196 "async-trait", 6197 "backon", ··· 6199 "bytes", 6200 "foca", 6201 "futures", 6202 + "metrics", 6203 "parking_lot", 6204 "rand 0.9.2", 6205 "serde", 6206 + "socket2 0.6.2", 6207 "thiserror 2.0.17", 6208 "tokio", 6209 "tokio-util", ··· 6215 6216 [[package]] 6217 name = "tranquil-scopes" 6218 + version = "0.2.0" 6219 dependencies = [ 6220 "axum", 6221 "futures", ··· 6230 6231 [[package]] 6232 name = "tranquil-storage" 6233 + version = "0.2.0" 6234 dependencies = [ 6235 "async-trait", 6236 "aws-config", ··· 6246 6247 [[package]] 6248 name = "tranquil-types" 6249 + version = "0.2.0" 6250 dependencies = [ 6251 "chrono", 6252 "cid",
+2
crates/tranquil-ripple/Cargo.toml
··· 12 bincode = { workspace = true } 13 bytes = { workspace = true } 14 foca = { workspace = true } 15 parking_lot = { workspace = true } 16 rand = "0.9" 17 serde = { workspace = true } 18 thiserror = { workspace = true } 19 tokio = { workspace = true, features = ["net", "io-util", "sync", "time"] } 20 tokio-util = { workspace = true }
··· 12 bincode = { workspace = true } 13 bytes = { workspace = true } 14 foca = { workspace = true } 15 + metrics = { workspace = true } 16 parking_lot = { workspace = true } 17 rand = "0.9" 18 serde = { workspace = true } 19 + socket2 = "0.6.2" 20 thiserror = { workspace = true } 21 tokio = { workspace = true, features = ["net", "io-util", "sync", "time"] } 22 tokio-util = { workspace = true }
+29 -16
crates/tranquil-ripple/src/cache.rs
··· 1 - use crate::crdt::CrdtStore; 2 use async_trait::async_trait; 3 - use parking_lot::RwLock; 4 use std::sync::Arc; 5 use std::time::Duration; 6 use tranquil_infra::{Cache, CacheError}; 7 8 pub struct RippleCache { 9 - store: Arc<RwLock<CrdtStore>>, 10 } 11 12 impl RippleCache { 13 - pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self { 14 Self { store } 15 } 16 } ··· 18 #[async_trait] 19 impl Cache for RippleCache { 20 async fn get(&self, key: &str) -> Option<String> { 21 - self.store 22 - .read() 23 .cache_get(key) 24 - .and_then(|bytes| String::from_utf8(bytes).ok()) 25 } 26 27 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { 28 self.store 29 - .write() 30 - .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl.as_millis() as u64); 31 Ok(()) 32 } 33 34 async fn delete(&self, key: &str) -> Result<(), CacheError> { 35 - self.store.write().cache_delete(key); 36 Ok(()) 37 } 38 39 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> { 40 - self.store.read().cache_get(key) 41 } 42 43 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { 44 self.store 45 - .write() 46 - .cache_set(key.to_string(), value.to_vec(), ttl.as_millis() as u64); 47 Ok(()) 48 } 49 } ··· 54 55 #[tokio::test] 56 async fn cache_trait_roundtrip() { 57 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 58 let cache = RippleCache::new(store); 59 cache 60 .set("test", "value", Duration::from_secs(60)) ··· 65 66 #[tokio::test] 67 async fn cache_trait_bytes() { 68 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 69 let cache = RippleCache::new(store); 70 let data = vec![0xDE, 0xAD, 0xBE, 0xEF]; 71 cache ··· 77 78 #[tokio::test] 79 async fn cache_trait_delete() { 80 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 81 let cache = RippleCache::new(store); 82 cache 83 .set("del", "x", Duration::from_secs(60))
··· 1 + use crate::crdt::ShardedCrdtStore; 2 + use crate::metrics; 3 use async_trait::async_trait; 4 use std::sync::Arc; 5 use std::time::Duration; 6 use tranquil_infra::{Cache, CacheError}; 7 8 pub struct RippleCache { 9 + store: Arc<ShardedCrdtStore>, 10 } 11 12 impl RippleCache { 13 + pub fn new(store: Arc<ShardedCrdtStore>) -> Self { 14 Self { store } 15 } 16 } ··· 18 #[async_trait] 19 impl Cache for RippleCache { 20 async fn get(&self, key: &str) -> Option<String> { 21 + let result = self 22 + .store 23 .cache_get(key) 24 + .and_then(|bytes| String::from_utf8(bytes).ok()); 25 + match result.is_some() { 26 + true => metrics::record_cache_hit(), 27 + false => metrics::record_cache_miss(), 28 + } 29 + result 30 } 31 32 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { 33 + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); 34 self.store 35 + .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl_ms); 36 + metrics::record_cache_write(); 37 Ok(()) 38 } 39 40 async fn delete(&self, key: &str) -> Result<(), CacheError> { 41 + self.store.cache_delete(key); 42 + metrics::record_cache_delete(); 43 Ok(()) 44 } 45 46 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> { 47 + let result = self.store.cache_get(key); 48 + match result.is_some() { 49 + true => metrics::record_cache_hit(), 50 + false => metrics::record_cache_miss(), 51 + } 52 + result 53 } 54 55 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { 56 + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); 57 self.store 58 + .cache_set(key.to_string(), value.to_vec(), ttl_ms); 59 + metrics::record_cache_write(); 60 Ok(()) 61 } 62 } ··· 67 68 #[tokio::test] 69 async fn cache_trait_roundtrip() { 70 + let store = Arc::new(ShardedCrdtStore::new(1)); 71 let cache = RippleCache::new(store); 72 cache 73 .set("test", "value", Duration::from_secs(60)) ··· 78 79 #[tokio::test] 80 async fn cache_trait_bytes() { 81 + let store = Arc::new(ShardedCrdtStore::new(1)); 82 let cache = RippleCache::new(store); 83 let data = vec![0xDE, 0xAD, 0xBE, 0xEF]; 84 cache ··· 90 91 #[tokio::test] 92 async fn cache_trait_delete() { 93 + let store = Arc::new(ShardedCrdtStore::new(1)); 94 let cache = RippleCache::new(store); 95 cache 96 .set("del", "x", Duration::from_secs(60))
+1 -1
crates/tranquil-ripple/src/config.rs
··· 1 use std::net::SocketAddr; 2 3 - fn fnv1a(data: &[u8]) -> u64 { 4 data.iter().fold(0xcbf29ce484222325u64, |hash, &byte| { 5 (hash ^ byte as u64).wrapping_mul(0x100000001b3) 6 })
··· 1 use std::net::SocketAddr; 2 3 + pub(crate) fn fnv1a(data: &[u8]) -> u64 { 4 data.iter().fold(0xcbf29ce484222325u64, |hash, &byte| { 5 (hash ^ byte as u64).wrapping_mul(0x100000001b3) 6 })
+13
crates/tranquil-ripple/src/crdt/g_counter.rs
··· 163 } 164 165 pub fn peek_count(&self, key: &str, window_ms: u64, now_wall_ms: u64) -> u64 { 166 match self.counters.get(key) { 167 Some(counter) if counter.window_start_ms == Self::aligned_window_start(now_wall_ms, window_ms) => { 168 counter.total() ··· 193 + PER_COUNTER_OVERHEAD 194 }) 195 .fold(0usize, usize::saturating_add) 196 } 197 198 pub fn gc_expired(&mut self, now_wall_ms: u64) {
··· 163 } 164 165 pub fn peek_count(&self, key: &str, window_ms: u64, now_wall_ms: u64) -> u64 { 166 + if window_ms == 0 { 167 + return 0; 168 + } 169 match self.counters.get(key) { 170 Some(counter) if counter.window_start_ms == Self::aligned_window_start(now_wall_ms, window_ms) => { 171 counter.total() ··· 196 + PER_COUNTER_OVERHEAD 197 }) 198 .fold(0usize, usize::saturating_add) 199 + } 200 + 201 + pub fn extract_all_deltas(&self) -> Vec<GCounterDelta> { 202 + self.counters 203 + .iter() 204 + .map(|(key, counter)| GCounterDelta { 205 + key: key.clone(), 206 + counter: counter.clone(), 207 + }) 208 + .collect() 209 } 210 211 pub fn gc_expired(&mut self, now_wall_ms: u64) {
+7 -4
crates/tranquil-ripple/src/crdt/hlc.rs
··· 54 } 55 56 fn physical_now() -> u64 { 57 - SystemTime::now() 58 - .duration_since(UNIX_EPOCH) 59 - .unwrap_or_default() 60 - .as_millis() as u64 61 } 62 63 pub fn now(&mut self) -> HlcTimestamp {
··· 54 } 55 56 fn physical_now() -> u64 { 57 + u64::try_from( 58 + SystemTime::now() 59 + .duration_since(UNIX_EPOCH) 60 + .unwrap_or_default() 61 + .as_millis(), 62 + ) 63 + .unwrap_or(u64::MAX) 64 } 65 66 pub fn now(&mut self) -> HlcTimestamp {
+68 -35
crates/tranquil-ripple/src/crdt/lww_map.rs
··· 1 use super::hlc::HlcTimestamp; 2 - use parking_lot::Mutex; 3 use serde::{Deserialize, Serialize}; 4 use std::collections::{BTreeMap, HashMap}; 5 ··· 27 } 28 29 fn entry_byte_size(&self, key: &str) -> usize { 30 - const OVERHEAD: usize = 128; 31 - key.len() 32 - + self.value.as_ref().map_or(0, Vec::len) 33 - + std::mem::size_of::<Self>() 34 - + OVERHEAD 35 } 36 } 37 ··· 59 if let Some(old_counter) = self.key_to_counter.remove(key) { 60 self.counter_to_key.remove(&old_counter); 61 } 62 self.counter = self.counter.saturating_add(1); 63 self.counter_to_key.insert(self.counter, key.to_string()); 64 self.key_to_counter.insert(key.to_string(), self.counter); 65 } 66 67 fn remove(&mut self, key: &str) { ··· 80 81 pub struct LwwMap { 82 entries: HashMap<String, LwwEntry>, 83 - lru: Mutex<LruTracker>, 84 estimated_bytes: usize, 85 } 86 ··· 88 pub fn new() -> Self { 89 Self { 90 entries: HashMap::new(), 91 - lru: Mutex::new(LruTracker::new()), 92 estimated_bytes: 0, 93 } 94 } ··· 98 if entry.is_expired(now_wall_ms) || entry.is_tombstone() { 99 return None; 100 } 101 - let value = entry.value.clone(); 102 - self.lru.lock().promote(key); 103 - value 104 } 105 106 pub fn set(&mut self, key: String, value: Vec<u8>, timestamp: HlcTimestamp, ttl_ms: u64, wall_ms_now: u64) { ··· 113 self.remove_estimated_bytes(&key); 114 self.estimated_bytes += entry.entry_byte_size(&key); 115 self.entries.insert(key.clone(), entry); 116 - self.lru.lock().promote(&key); 117 } 118 119 pub fn delete(&mut self, key: &str, timestamp: HlcTimestamp, wall_ms_now: u64) { 120 - match self.entries.get(key) { 121 Some(existing) if existing.timestamp >= timestamp => return, 122 - _ => {} 123 - } 124 - let ttl_ms = self 125 - .entries 126 - .get(key) 127 - .map_or(60_000, |e| e.ttl_ms.max(60_000)); 128 let entry = LwwEntry { 129 value: None, 130 timestamp, ··· 134 self.remove_estimated_bytes(key); 135 self.estimated_bytes += entry.entry_byte_size(key); 136 self.entries.insert(key.to_string(), entry); 137 - self.lru.lock().remove(key); 138 } 139 140 pub fn merge_entry(&mut self, key: String, remote: LwwEntry) -> bool { ··· 145 self.remove_estimated_bytes(&key); 146 self.estimated_bytes += remote.entry_byte_size(&key); 147 self.entries.insert(key.clone(), remote); 148 - let mut lru = self.lru.lock(); 149 match is_tombstone { 150 - true => lru.remove(&key), 151 - false => lru.promote(&key), 152 } 153 true 154 } ··· 175 expired_keys.iter().for_each(|key| { 176 self.remove_estimated_bytes(key); 177 self.entries.remove(key); 178 - }); 179 - let mut lru = self.lru.lock(); 180 - expired_keys.iter().for_each(|key| { 181 - lru.remove(key); 182 }); 183 } 184 ··· 192 expired_keys.iter().for_each(|key| { 193 self.remove_estimated_bytes(key); 194 self.entries.remove(key); 195 - }); 196 - let mut lru = self.lru.lock(); 197 - expired_keys.iter().for_each(|key| { 198 - lru.remove(key); 199 }); 200 } 201 202 pub fn evict_lru(&mut self) -> Option<String> { 203 - let key = self.lru.lock().pop_least_recent()?; 204 self.remove_estimated_bytes(&key); 205 self.entries.remove(&key); 206 Some(key) 207 } 208 209 pub fn estimated_bytes(&self) -> usize { ··· 359 } 360 361 #[test] 362 - fn lru_eviction() { 363 let mut map = LwwMap::new(); 364 map.set("k1".into(), b"a".to_vec(), ts(100, 0, 1), 60_000, 100); 365 map.set("k2".into(), b"b".to_vec(), ts(101, 0, 1), 60_000, 101); 366 map.set("k3".into(), b"c".to_vec(), ts(102, 0, 1), 60_000, 102); 367 - let _ = map.get("k1", 102); 368 let evicted = map.evict_lru(); 369 - assert_eq!(evicted.as_deref(), Some("k2")); 370 } 371 372 #[test]
··· 1 use super::hlc::HlcTimestamp; 2 use serde::{Deserialize, Serialize}; 3 use std::collections::{BTreeMap, HashMap}; 4 ··· 26 } 27 28 fn entry_byte_size(&self, key: &str) -> usize { 29 + const HASHMAP_ENTRY_OVERHEAD: usize = 64; 30 + const BTREE_NODE_OVERHEAD: usize = 64; 31 + const STRING_HEADER: usize = 24; 32 + const COUNTER_SIZE: usize = 8; 33 + 34 + let key_len = key.len(); 35 + let value_len = self.value.as_ref().map_or(0, Vec::len); 36 + 37 + let main_entry = key_len 38 + .saturating_add(value_len) 39 + .saturating_add(std::mem::size_of::<Self>()) 40 + .saturating_add(HASHMAP_ENTRY_OVERHEAD); 41 + 42 + match self.is_tombstone() { 43 + true => main_entry, 44 + false => { 45 + let lru_btree = COUNTER_SIZE 46 + .saturating_add(STRING_HEADER) 47 + .saturating_add(key_len) 48 + .saturating_add(BTREE_NODE_OVERHEAD); 49 + 50 + let lru_hashmap = STRING_HEADER 51 + .saturating_add(key_len) 52 + .saturating_add(COUNTER_SIZE) 53 + .saturating_add(HASHMAP_ENTRY_OVERHEAD); 54 + 55 + main_entry 56 + .saturating_add(lru_btree) 57 + .saturating_add(lru_hashmap) 58 + } 59 + } 60 } 61 } 62 ··· 84 if let Some(old_counter) = self.key_to_counter.remove(key) { 85 self.counter_to_key.remove(&old_counter); 86 } 87 + if self.counter >= u64::MAX - 1 { 88 + self.compact(); 89 + } 90 self.counter = self.counter.saturating_add(1); 91 self.counter_to_key.insert(self.counter, key.to_string()); 92 self.key_to_counter.insert(key.to_string(), self.counter); 93 + } 94 + 95 + fn compact(&mut self) { 96 + let keys: Vec<String> = self.counter_to_key.values().cloned().collect(); 97 + self.counter_to_key.clear(); 98 + self.key_to_counter.clear(); 99 + keys.into_iter().enumerate().for_each(|(i, key)| { 100 + let new_counter = (i as u64).saturating_add(1); 101 + self.counter_to_key.insert(new_counter, key.clone()); 102 + self.key_to_counter.insert(key, new_counter); 103 + }); 104 + self.counter = self.counter_to_key.len() as u64; 105 } 106 107 fn remove(&mut self, key: &str) { ··· 120 121 pub struct LwwMap { 122 entries: HashMap<String, LwwEntry>, 123 + lru: LruTracker, 124 estimated_bytes: usize, 125 } 126 ··· 128 pub fn new() -> Self { 129 Self { 130 entries: HashMap::new(), 131 + lru: LruTracker::new(), 132 estimated_bytes: 0, 133 } 134 } ··· 138 if entry.is_expired(now_wall_ms) || entry.is_tombstone() { 139 return None; 140 } 141 + entry.value.clone() 142 } 143 144 pub fn set(&mut self, key: String, value: Vec<u8>, timestamp: HlcTimestamp, ttl_ms: u64, wall_ms_now: u64) { ··· 151 self.remove_estimated_bytes(&key); 152 self.estimated_bytes += entry.entry_byte_size(&key); 153 self.entries.insert(key.clone(), entry); 154 + self.lru.promote(&key); 155 } 156 157 pub fn delete(&mut self, key: &str, timestamp: HlcTimestamp, wall_ms_now: u64) { 158 + let ttl_ms = match self.entries.get(key) { 159 Some(existing) if existing.timestamp >= timestamp => return, 160 + Some(existing) => existing.ttl_ms.max(60_000), 161 + None => 60_000, 162 + }; 163 let entry = LwwEntry { 164 value: None, 165 timestamp, ··· 169 self.remove_estimated_bytes(key); 170 self.estimated_bytes += entry.entry_byte_size(key); 171 self.entries.insert(key.to_string(), entry); 172 + self.lru.remove(key); 173 } 174 175 pub fn merge_entry(&mut self, key: String, remote: LwwEntry) -> bool { ··· 180 self.remove_estimated_bytes(&key); 181 self.estimated_bytes += remote.entry_byte_size(&key); 182 self.entries.insert(key.clone(), remote); 183 match is_tombstone { 184 + true => self.lru.remove(&key), 185 + false => self.lru.promote(&key), 186 } 187 true 188 } ··· 209 expired_keys.iter().for_each(|key| { 210 self.remove_estimated_bytes(key); 211 self.entries.remove(key); 212 + self.lru.remove(key); 213 }); 214 } 215 ··· 223 expired_keys.iter().for_each(|key| { 224 self.remove_estimated_bytes(key); 225 self.entries.remove(key); 226 + self.lru.remove(key); 227 }); 228 } 229 230 pub fn evict_lru(&mut self) -> Option<String> { 231 + let key = self.lru.pop_least_recent()?; 232 self.remove_estimated_bytes(&key); 233 self.entries.remove(&key); 234 Some(key) 235 + } 236 + 237 + pub fn touch(&mut self, key: &str) { 238 + if self.entries.contains_key(key) { 239 + self.lru.promote(key); 240 + } 241 } 242 243 pub fn estimated_bytes(&self) -> usize { ··· 393 } 394 395 #[test] 396 + fn lru_eviction_by_write_order() { 397 let mut map = LwwMap::new(); 398 map.set("k1".into(), b"a".to_vec(), ts(100, 0, 1), 60_000, 100); 399 map.set("k2".into(), b"b".to_vec(), ts(101, 0, 1), 60_000, 101); 400 map.set("k3".into(), b"c".to_vec(), ts(102, 0, 1), 60_000, 102); 401 let evicted = map.evict_lru(); 402 + assert_eq!(evicted.as_deref(), Some("k1")); 403 } 404 405 #[test]
+274 -72
crates/tranquil-ripple/src/crdt/mod.rs
··· 3 pub mod lww_map; 4 pub mod g_counter; 5 6 use delta::CrdtDelta; 7 use hlc::{Hlc, HlcTimestamp}; 8 - use lww_map::LwwMap; 9 use g_counter::RateLimitStore; 10 use std::time::{SystemTime, UNIX_EPOCH}; 11 12 - pub struct CrdtStore { 13 - hlc: Hlc, 14 cache: LwwMap, 15 rate_limits: RateLimitStore, 16 last_broadcast_ts: HlcTimestamp, 17 } 18 19 - impl CrdtStore { 20 - pub fn new(node_id: u64) -> Self { 21 Self { 22 - hlc: Hlc::new(node_id), 23 cache: LwwMap::new(), 24 rate_limits: RateLimitStore::new(node_id), 25 last_broadcast_ts: HlcTimestamp::ZERO, 26 } 27 } 28 29 fn wall_ms_now() -> u64 { 30 - SystemTime::now() 31 - .duration_since(UNIX_EPOCH) 32 - .unwrap_or_default() 33 - .as_millis() as u64 34 } 35 36 pub fn cache_get(&self, key: &str) -> Option<Vec<u8>> { 37 - self.cache.get(key, Self::wall_ms_now()) 38 } 39 40 - pub fn cache_set(&mut self, key: String, value: Vec<u8>, ttl_ms: u64) { 41 - let ts = self.hlc.now(); 42 - self.cache.set(key, value, ts, ttl_ms, Self::wall_ms_now()); 43 } 44 45 - pub fn cache_delete(&mut self, key: &str) { 46 - let ts = self.hlc.now(); 47 - self.cache.delete(key, ts, Self::wall_ms_now()); 48 } 49 50 pub fn rate_limit_peek(&self, key: &str, window_ms: u64) -> u64 { 51 - self.rate_limits 52 .peek_count(key, window_ms, Self::wall_ms_now()) 53 } 54 55 - pub fn rate_limit_check(&mut self, key: &str, limit: u32, window_ms: u64) -> bool { 56 - self.rate_limits 57 .check_and_increment(key, limit, window_ms, Self::wall_ms_now()) 58 } 59 60 pub fn peek_broadcast_delta(&self) -> CrdtDelta { 61 - let cache_delta = { 62 - let d = self.cache.extract_delta_since(self.last_broadcast_ts); 63 - match d.entries.is_empty() { 64 - true => None, 65 - false => Some(d), 66 - } 67 }; 68 - let rate_limit_deltas = self.rate_limits.extract_dirty_deltas(); 69 CrdtDelta { 70 version: 1, 71 - source_node: self.hlc.node_id(), 72 cache_delta, 73 rate_limit_deltas, 74 } 75 } 76 77 - pub fn commit_broadcast(&mut self, delta: &CrdtDelta) { 78 - let max_ts = delta 79 .cache_delta 80 .as_ref() 81 - .and_then(|d| d.entries.iter().map(|(_, e)| e.timestamp).max()) 82 - .unwrap_or(self.last_broadcast_ts); 83 - self.last_broadcast_ts = max_ts; 84 - let committed_keys: std::collections::HashSet<&str> = delta 85 .rate_limit_deltas 86 .iter() 87 - .map(|d| d.key.as_str()) 88 .collect(); 89 - committed_keys.iter().for_each(|&key| { 90 - let still_matches = self 91 - .rate_limits 92 - .peek_dirty_counter(key) 93 - .zip(delta.rate_limit_deltas.iter().find(|d| d.key == key)) 94 - .is_some_and(|(current, committed)| { 95 - current.window_start_ms == committed.counter.window_start_ms 96 - && current.total() == committed.counter.total() 97 - }); 98 - if still_matches { 99 - self.rate_limits.clear_single_dirty(key); 100 } 101 }); 102 } 103 104 - pub fn merge_delta(&mut self, delta: &CrdtDelta) -> bool { 105 if !delta.is_compatible() { 106 tracing::warn!( 107 version = delta.version, ··· 109 ); 110 return false; 111 } 112 let mut changed = false; 113 if let Some(ref cache_delta) = delta.cache_delta { 114 cache_delta.entries.iter().for_each(|(key, entry)| { 115 - let _ = self.hlc.receive(entry.timestamp); 116 - if self.cache.merge_entry(key.clone(), entry.clone()) { 117 - changed = true; 118 } 119 }); 120 } 121 - delta.rate_limit_deltas.iter().for_each(|rd| { 122 - if self 123 - .rate_limits 124 - .merge_counter(rd.key.clone(), &rd.counter) 125 - { 126 - changed = true; 127 - } 128 - }); 129 changed 130 } 131 132 - pub fn run_maintenance(&mut self) { 133 let now = Self::wall_ms_now(); 134 - self.cache.gc_tombstones(now); 135 - self.cache.gc_expired(now); 136 - self.rate_limits.gc_expired(now); 137 } 138 139 pub fn cache_estimated_bytes(&self) -> usize { 140 - self.cache.estimated_bytes() 141 } 142 143 pub fn rate_limit_estimated_bytes(&self) -> usize { 144 - self.rate_limits.estimated_bytes() 145 } 146 147 - pub fn evict_lru(&mut self) -> Option<String> { 148 - self.cache.evict_lru() 149 } 150 } 151 ··· 155 156 #[test] 157 fn roundtrip_cache() { 158 - let mut store = CrdtStore::new(1); 159 store.cache_set("key".into(), b"value".to_vec(), 60_000); 160 assert_eq!(store.cache_get("key"), Some(b"value".to_vec())); 161 } 162 163 #[test] 164 fn delta_merge_convergence() { 165 - let mut store_a = CrdtStore::new(1); 166 - let mut store_b = CrdtStore::new(2); 167 168 store_a.cache_set("x".into(), b"from_a".to_vec(), 60_000); 169 store_b.cache_set("y".into(), b"from_b".to_vec(), 60_000); ··· 184 185 #[test] 186 fn rate_limit_across_stores() { 187 - let mut store_a = CrdtStore::new(1); 188 - let mut store_b = CrdtStore::new(2); 189 190 store_a.rate_limit_check("rl:test", 5, 60_000); 191 store_a.rate_limit_check("rl:test", 5, 60_000); ··· 202 203 #[test] 204 fn incompatible_version_rejected() { 205 - let mut store = CrdtStore::new(1); 206 let delta = CrdtDelta { 207 version: 255, 208 source_node: 99,
··· 3 pub mod lww_map; 4 pub mod g_counter; 5 6 + use crate::config::fnv1a; 7 use delta::CrdtDelta; 8 use hlc::{Hlc, HlcTimestamp}; 9 + use lww_map::{LwwDelta, LwwMap}; 10 use g_counter::RateLimitStore; 11 + use parking_lot::{Mutex, RwLock}; 12 use std::time::{SystemTime, UNIX_EPOCH}; 13 14 + const SHARD_COUNT: usize = 64; 15 + const MAX_PROMOTIONS_PER_SHARD: usize = 8192; 16 + const MAX_REPLICABLE_VALUE_SIZE: usize = 15 * 1024 * 1024; 17 + 18 + struct CrdtShard { 19 cache: LwwMap, 20 rate_limits: RateLimitStore, 21 last_broadcast_ts: HlcTimestamp, 22 } 23 24 + impl CrdtShard { 25 + fn new(node_id: u64) -> Self { 26 Self { 27 cache: LwwMap::new(), 28 rate_limits: RateLimitStore::new(node_id), 29 last_broadcast_ts: HlcTimestamp::ZERO, 30 } 31 } 32 + } 33 + 34 + pub struct ShardedCrdtStore { 35 + hlc: Mutex<Hlc>, 36 + shards: Box<[RwLock<CrdtShard>]>, 37 + promotions: Box<[Mutex<Vec<String>>]>, 38 + shard_mask: usize, 39 + node_id: u64, 40 + } 41 + 42 + impl ShardedCrdtStore { 43 + pub fn new(node_id: u64) -> Self { 44 + const { assert!(SHARD_COUNT.is_power_of_two()) }; 45 + let shards: Vec<RwLock<CrdtShard>> = (0..SHARD_COUNT) 46 + .map(|_| RwLock::new(CrdtShard::new(node_id))) 47 + .collect(); 48 + let promotions: Vec<Mutex<Vec<String>>> = (0..SHARD_COUNT) 49 + .map(|_| Mutex::new(Vec::new())) 50 + .collect(); 51 + Self { 52 + hlc: Mutex::new(Hlc::new(node_id)), 53 + shards: shards.into_boxed_slice(), 54 + promotions: promotions.into_boxed_slice(), 55 + shard_mask: SHARD_COUNT - 1, 56 + node_id, 57 + } 58 + } 59 + 60 + fn shard_for(&self, key: &str) -> usize { 61 + fnv1a(key.as_bytes()) as usize & self.shard_mask 62 + } 63 64 fn wall_ms_now() -> u64 { 65 + u64::try_from( 66 + SystemTime::now() 67 + .duration_since(UNIX_EPOCH) 68 + .unwrap_or_default() 69 + .as_millis(), 70 + ) 71 + .unwrap_or(u64::MAX) 72 } 73 74 pub fn cache_get(&self, key: &str) -> Option<Vec<u8>> { 75 + let idx = self.shard_for(key); 76 + let result = self.shards[idx].read().cache.get(key, Self::wall_ms_now()); 77 + if result.is_some() { 78 + let mut promos = self.promotions[idx].lock(); 79 + if promos.len() < MAX_PROMOTIONS_PER_SHARD { 80 + promos.push(key.to_string()); 81 + } 82 + } 83 + result 84 } 85 86 + pub fn cache_set(&self, key: String, value: Vec<u8>, ttl_ms: u64) { 87 + if value.len() > MAX_REPLICABLE_VALUE_SIZE { 88 + tracing::warn!( 89 + key = %key, 90 + value_size = value.len(), 91 + max = MAX_REPLICABLE_VALUE_SIZE, 92 + "value exceeds replicable size limit, may fail to replicate to peers" 93 + ); 94 + } 95 + let ts = self.hlc.lock().now(); 96 + let wall = Self::wall_ms_now(); 97 + self.shards[self.shard_for(&key)] 98 + .write() 99 + .cache 100 + .set(key, value, ts, ttl_ms, wall); 101 } 102 103 + pub fn cache_delete(&self, key: &str) { 104 + let ts = self.hlc.lock().now(); 105 + let wall = Self::wall_ms_now(); 106 + self.shards[self.shard_for(key)] 107 + .write() 108 + .cache 109 + .delete(key, ts, wall); 110 } 111 112 pub fn rate_limit_peek(&self, key: &str, window_ms: u64) -> u64 { 113 + self.shards[self.shard_for(key)] 114 + .read() 115 + .rate_limits 116 .peek_count(key, window_ms, Self::wall_ms_now()) 117 } 118 119 + pub fn rate_limit_check(&self, key: &str, limit: u32, window_ms: u64) -> bool { 120 + self.shards[self.shard_for(key)] 121 + .write() 122 + .rate_limits 123 .check_and_increment(key, limit, window_ms, Self::wall_ms_now()) 124 } 125 126 pub fn peek_broadcast_delta(&self) -> CrdtDelta { 127 + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); 128 + let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new(); 129 + 130 + self.shards.iter().for_each(|shard_lock| { 131 + let shard = shard_lock.read(); 132 + let lww_delta = shard.cache.extract_delta_since(shard.last_broadcast_ts); 133 + cache_entries.extend(lww_delta.entries); 134 + rate_limit_deltas.extend(shard.rate_limits.extract_dirty_deltas()); 135 + }); 136 + 137 + let cache_delta = match cache_entries.is_empty() { 138 + true => None, 139 + false => Some(LwwDelta { entries: cache_entries }), 140 }; 141 + 142 CrdtDelta { 143 version: 1, 144 + source_node: self.node_id, 145 cache_delta, 146 rate_limit_deltas, 147 } 148 } 149 150 + pub fn commit_broadcast(&self, delta: &CrdtDelta) { 151 + let cache_entries_by_shard: Vec<(usize, &HlcTimestamp)> = delta 152 .cache_delta 153 .as_ref() 154 + .map(|d| { 155 + d.entries 156 + .iter() 157 + .map(|(key, entry)| (self.shard_for(key), &entry.timestamp)) 158 + .collect() 159 + }) 160 + .unwrap_or_default(); 161 + 162 + let mut max_ts_per_shard: Vec<Option<HlcTimestamp>> = (0..self.shards.len()) 163 + .map(|_| None) 164 + .collect(); 165 + 166 + cache_entries_by_shard.iter().for_each(|&(shard_idx, ts)| { 167 + let slot = &mut max_ts_per_shard[shard_idx]; 168 + *slot = Some(match slot { 169 + Some(existing) if *existing >= *ts => *existing, 170 + _ => *ts, 171 + }); 172 + }); 173 + 174 + let rl_index: std::collections::HashMap<&str, &g_counter::GCounter> = delta 175 .rate_limit_deltas 176 .iter() 177 + .map(|d| (d.key.as_str(), &d.counter)) 178 .collect(); 179 + 180 + let mut shard_rl_keys: Vec<Vec<&str>> = (0..self.shards.len()) 181 + .map(|_| Vec::new()) 182 + .collect(); 183 + rl_index.keys().for_each(|&key| { 184 + shard_rl_keys[self.shard_for(key)].push(key); 185 + }); 186 + 187 + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { 188 + let has_cache_update = max_ts_per_shard[idx].is_some(); 189 + let has_rl_keys = !shard_rl_keys[idx].is_empty(); 190 + if !has_cache_update && !has_rl_keys { 191 + return; 192 } 193 + let mut shard = shard_lock.write(); 194 + if let Some(max_ts) = max_ts_per_shard[idx] { 195 + shard.last_broadcast_ts = max_ts; 196 + } 197 + shard_rl_keys[idx].iter().for_each(|&key| { 198 + let still_matches = shard 199 + .rate_limits 200 + .peek_dirty_counter(key) 201 + .zip(rl_index.get(key)) 202 + .is_some_and(|(current, committed)| { 203 + current.window_start_ms == committed.window_start_ms 204 + && current.total() == committed.total() 205 + }); 206 + if still_matches { 207 + shard.rate_limits.clear_single_dirty(key); 208 + } 209 + }); 210 }); 211 } 212 213 + pub fn merge_delta(&self, delta: &CrdtDelta) -> bool { 214 if !delta.is_compatible() { 215 tracing::warn!( 216 version = delta.version, ··· 218 ); 219 return false; 220 } 221 + 222 + if let Some(ref cache_delta) = delta.cache_delta { 223 + if let Some(max_ts) = cache_delta.entries.iter().map(|(_, e)| e.timestamp).max() { 224 + let _ = self.hlc.lock().receive(max_ts); 225 + } 226 + } 227 + 228 let mut changed = false; 229 + 230 if let Some(ref cache_delta) = delta.cache_delta { 231 + let mut entries_by_shard: Vec<Vec<(String, lww_map::LwwEntry)>> = 232 + (0..self.shards.len()).map(|_| Vec::new()).collect(); 233 + 234 cache_delta.entries.iter().for_each(|(key, entry)| { 235 + entries_by_shard[self.shard_for(key)].push((key.clone(), entry.clone())); 236 + }); 237 + 238 + entries_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { 239 + if entries.is_empty() { 240 + return; 241 + } 242 + let mut shard = self.shards[idx].write(); 243 + entries.into_iter().for_each(|(key, entry)| { 244 + if shard.cache.merge_entry(key, entry) { 245 + changed = true; 246 + } 247 + }); 248 + }); 249 + } 250 + 251 + if !delta.rate_limit_deltas.is_empty() { 252 + let mut rl_by_shard: Vec<Vec<(String, &g_counter::GCounter)>> = 253 + (0..self.shards.len()).map(|_| Vec::new()).collect(); 254 + 255 + delta.rate_limit_deltas.iter().for_each(|rd| { 256 + rl_by_shard[self.shard_for(&rd.key)].push((rd.key.clone(), &rd.counter)); 257 + }); 258 + 259 + rl_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { 260 + if entries.is_empty() { 261 + return; 262 } 263 + let mut shard = self.shards[idx].write(); 264 + entries.into_iter().for_each(|(key, counter)| { 265 + if shard.rate_limits.merge_counter(key, counter) { 266 + changed = true; 267 + } 268 + }); 269 }); 270 } 271 + 272 changed 273 } 274 275 + pub fn run_maintenance(&self) { 276 let now = Self::wall_ms_now(); 277 + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { 278 + let pending: Vec<String> = self.promotions[idx].lock().drain(..).collect(); 279 + let mut shard = shard_lock.write(); 280 + pending.iter().for_each(|key| shard.cache.touch(key)); 281 + shard.cache.gc_tombstones(now); 282 + shard.cache.gc_expired(now); 283 + shard.rate_limits.gc_expired(now); 284 + }); 285 + } 286 + 287 + pub fn peek_full_state(&self) -> CrdtDelta { 288 + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); 289 + let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new(); 290 + 291 + self.shards.iter().for_each(|shard_lock| { 292 + let shard = shard_lock.read(); 293 + let lww_delta = shard.cache.extract_delta_since(HlcTimestamp::ZERO); 294 + cache_entries.extend(lww_delta.entries); 295 + rate_limit_deltas.extend(shard.rate_limits.extract_all_deltas()); 296 + }); 297 + 298 + let cache_delta = match cache_entries.is_empty() { 299 + true => None, 300 + false => Some(LwwDelta { entries: cache_entries }), 301 + }; 302 + 303 + CrdtDelta { 304 + version: 1, 305 + source_node: self.node_id, 306 + cache_delta, 307 + rate_limit_deltas, 308 + } 309 } 310 311 pub fn cache_estimated_bytes(&self) -> usize { 312 + self.shards 313 + .iter() 314 + .map(|s| s.read().cache.estimated_bytes()) 315 + .fold(0usize, usize::saturating_add) 316 } 317 318 pub fn rate_limit_estimated_bytes(&self) -> usize { 319 + self.shards 320 + .iter() 321 + .map(|s| s.read().rate_limits.estimated_bytes()) 322 + .fold(0usize, usize::saturating_add) 323 } 324 325 + pub fn total_estimated_bytes(&self) -> usize { 326 + self.shards 327 + .iter() 328 + .map(|s| { 329 + let shard = s.read(); 330 + shard.cache.estimated_bytes().saturating_add(shard.rate_limits.estimated_bytes()) 331 + }) 332 + .fold(0usize, usize::saturating_add) 333 + } 334 + 335 + pub fn evict_lru_round_robin(&self, start_shard: usize) -> Option<(usize, usize)> { 336 + (0..self.shards.len()).find_map(|offset| { 337 + let idx = (start_shard + offset) & self.shard_mask; 338 + let has_entries = self.shards[idx].read().cache.len() > 0; 339 + match has_entries { 340 + true => { 341 + let mut shard = self.shards[idx].write(); 342 + let before = shard.cache.estimated_bytes(); 343 + shard.cache.evict_lru().map(|_| { 344 + let freed = before.saturating_sub(shard.cache.estimated_bytes()); 345 + ((idx + 1) & self.shard_mask, freed) 346 + }) 347 + } 348 + false => None, 349 + } 350 + }) 351 } 352 } 353 ··· 357 358 #[test] 359 fn roundtrip_cache() { 360 + let store = ShardedCrdtStore::new(1); 361 store.cache_set("key".into(), b"value".to_vec(), 60_000); 362 assert_eq!(store.cache_get("key"), Some(b"value".to_vec())); 363 } 364 365 #[test] 366 fn delta_merge_convergence() { 367 + let store_a = ShardedCrdtStore::new(1); 368 + let store_b = ShardedCrdtStore::new(2); 369 370 store_a.cache_set("x".into(), b"from_a".to_vec(), 60_000); 371 store_b.cache_set("y".into(), b"from_b".to_vec(), 60_000); ··· 386 387 #[test] 388 fn rate_limit_across_stores() { 389 + let store_a = ShardedCrdtStore::new(1); 390 + let store_b = ShardedCrdtStore::new(2); 391 392 store_a.rate_limit_check("rl:test", 5, 60_000); 393 store_a.rate_limit_check("rl:test", 5, 60_000); ··· 404 405 #[test] 406 fn incompatible_version_rejected() { 407 + let store = ShardedCrdtStore::new(1); 408 let delta = CrdtDelta { 409 version: 255, 410 source_node: 99,
+15 -5
crates/tranquil-ripple/src/engine.rs
··· 1 use crate::cache::RippleCache; 2 use crate::config::RippleConfig; 3 - use crate::crdt::CrdtStore; 4 use crate::eviction::MemoryBudget; 5 use crate::gossip::{GossipEngine, PeerId}; 6 use crate::rate_limiter::RippleRateLimiter; 7 use crate::transport::Transport; 8 - use parking_lot::RwLock; 9 use std::net::SocketAddr; 10 use std::sync::Arc; 11 use tokio_util::sync::CancellationToken; ··· 18 config: RippleConfig, 19 shutdown: CancellationToken, 20 ) -> Result<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>, SocketAddr), RippleStartError> { 21 - let store = Arc::new(RwLock::new(CrdtStore::new(config.machine_id))); 22 23 let (transport, incoming_rx) = Transport::bind(config.bind_addr, config.machine_id, shutdown.clone()) 24 .await ··· 27 let transport = Arc::new(transport); 28 29 let bound_addr = transport.local_addr(); 30 let local_id = PeerId { 31 addr: bound_addr, 32 machine_id: config.machine_id, 33 - generation: 0, 34 }; 35 36 let gossip = GossipEngine::new(transport, store.clone(), local_id); ··· 51 tokio::select! { 52 _ = eviction_shutdown.cancelled() => break, 53 _ = interval.tick() => { 54 - budget.enforce(&mut store_for_eviction.write()); 55 } 56 } 57 } ··· 73 let cache: Arc<dyn Cache> = Arc::new(RippleCache::new(store.clone())); 74 let rate_limiter: Arc<dyn DistributedRateLimiter> = 75 Arc::new(RippleRateLimiter::new(store)); 76 77 tracing::info!( 78 bind = %bound_addr,
··· 1 use crate::cache::RippleCache; 2 use crate::config::RippleConfig; 3 + use crate::crdt::ShardedCrdtStore; 4 use crate::eviction::MemoryBudget; 5 use crate::gossip::{GossipEngine, PeerId}; 6 + use crate::metrics; 7 use crate::rate_limiter::RippleRateLimiter; 8 use crate::transport::Transport; 9 use std::net::SocketAddr; 10 use std::sync::Arc; 11 use tokio_util::sync::CancellationToken; ··· 18 config: RippleConfig, 19 shutdown: CancellationToken, 20 ) -> Result<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>, SocketAddr), RippleStartError> { 21 + let store = Arc::new(ShardedCrdtStore::new(config.machine_id)); 22 23 let (transport, incoming_rx) = Transport::bind(config.bind_addr, config.machine_id, shutdown.clone()) 24 .await ··· 27 let transport = Arc::new(transport); 28 29 let bound_addr = transport.local_addr(); 30 + let generation = u32::try_from( 31 + std::time::SystemTime::now() 32 + .duration_since(std::time::UNIX_EPOCH) 33 + .unwrap_or_default() 34 + .as_secs() 35 + % u64::from(u32::MAX), 36 + ) 37 + .unwrap_or(0); 38 let local_id = PeerId { 39 addr: bound_addr, 40 machine_id: config.machine_id, 41 + generation, 42 }; 43 44 let gossip = GossipEngine::new(transport, store.clone(), local_id); ··· 59 tokio::select! { 60 _ = eviction_shutdown.cancelled() => break, 61 _ = interval.tick() => { 62 + budget.enforce(&store_for_eviction); 63 } 64 } 65 } ··· 81 let cache: Arc<dyn Cache> = Arc::new(RippleCache::new(store.clone())); 82 let rate_limiter: Arc<dyn DistributedRateLimiter> = 83 Arc::new(RippleRateLimiter::new(store)); 84 + 85 + metrics::describe_metrics(); 86 87 tracing::info!( 88 bind = %bound_addr,
+50 -24
crates/tranquil-ripple/src/eviction.rs
··· 1 - use crate::crdt::CrdtStore; 2 3 pub struct MemoryBudget { 4 max_bytes: usize, 5 } 6 7 impl MemoryBudget { 8 pub fn new(max_bytes: usize) -> Self { 9 - Self { max_bytes } 10 } 11 12 - pub fn enforce(&self, store: &mut CrdtStore) { 13 store.run_maintenance(); 14 15 let max_bytes = self.max_bytes; 16 - let total_bytes = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 17 - let overshoot_ratio = match total_bytes > max_bytes && max_bytes > 0 { 18 - true => total_bytes / max_bytes, 19 false => 0, 20 }; 21 22 const BASE_BATCH: usize = 256; 23 - let batch_size = match overshoot_ratio { 24 - 0..=1 => BASE_BATCH, 25 - 2..=4 => BASE_BATCH * 4, 26 _ => BASE_BATCH * 8, 27 }; 28 29 - let evicted = std::iter::from_fn(|| { 30 - let current = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 31 - match current > max_bytes { 32 - true => store.evict_lru(), 33 - false => None, 34 } 35 }) 36 - .take(batch_size) 37 - .count(); 38 if evicted > 0 { 39 tracing::info!( 40 evicted_entries = evicted, 41 - cache_bytes = store.cache_estimated_bytes(), 42 - rate_limit_bytes = store.rate_limit_estimated_bytes(), 43 max_bytes = self.max_bytes, 44 "memory budget eviction" 45 ); ··· 53 54 #[test] 55 fn eviction_under_budget() { 56 - let mut store = CrdtStore::new(1); 57 let budget = MemoryBudget::new(1024 * 1024); 58 store.cache_set("k".into(), vec![1, 2, 3], 60_000); 59 - budget.enforce(&mut store); 60 assert!(store.cache_get("k").is_some()); 61 } 62 63 #[test] 64 fn eviction_over_budget() { 65 - let mut store = CrdtStore::new(1); 66 let budget = MemoryBudget::new(100); 67 (0..50).for_each(|i| { 68 store.cache_set( ··· 71 60_000, 72 ); 73 }); 74 - budget.enforce(&mut store); 75 - let total = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 76 - assert!(total <= 100); 77 } 78 }
··· 1 + use crate::crdt::ShardedCrdtStore; 2 + use crate::metrics; 3 4 pub struct MemoryBudget { 5 max_bytes: usize, 6 + next_shard: std::sync::atomic::AtomicUsize, 7 } 8 9 impl MemoryBudget { 10 pub fn new(max_bytes: usize) -> Self { 11 + Self { 12 + max_bytes, 13 + next_shard: std::sync::atomic::AtomicUsize::new(0), 14 + } 15 } 16 17 + pub fn enforce(&self, store: &ShardedCrdtStore) { 18 store.run_maintenance(); 19 20 + let cache_bytes = store.cache_estimated_bytes(); 21 + let rl_bytes = store.rate_limit_estimated_bytes(); 22 + metrics::set_cache_bytes(cache_bytes); 23 + metrics::set_rate_limit_bytes(rl_bytes); 24 + 25 let max_bytes = self.max_bytes; 26 + let total_bytes = cache_bytes.saturating_add(rl_bytes); 27 + let overshoot_pct = match total_bytes > max_bytes && max_bytes > 0 { 28 + true => total_bytes.saturating_sub(max_bytes).saturating_mul(100) / max_bytes, 29 false => 0, 30 }; 31 32 const BASE_BATCH: usize = 256; 33 + let batch_size = match overshoot_pct { 34 + 0..=25 => BASE_BATCH, 35 + 26..=100 => BASE_BATCH * 4, 36 _ => BASE_BATCH * 8, 37 }; 38 39 + let mut remaining = total_bytes; 40 + let mut next_shard: usize = self.next_shard.load(std::sync::atomic::Ordering::Relaxed); 41 + let mut evicted: usize = 0; 42 + (0..batch_size).try_for_each(|_| { 43 + match remaining > max_bytes { 44 + true => { 45 + match store.evict_lru_round_robin(next_shard) { 46 + Some((ns, freed)) => { 47 + next_shard = ns; 48 + remaining = remaining.saturating_sub(freed); 49 + evicted += 1; 50 + Ok(()) 51 + } 52 + None => Err(()), 53 + } 54 + } 55 + false => Err(()), 56 } 57 }) 58 + .ok(); 59 + self.next_shard.store(next_shard, std::sync::atomic::Ordering::Relaxed); 60 if evicted > 0 { 61 + metrics::record_evictions(evicted); 62 + let cache_bytes_after = store.cache_estimated_bytes(); 63 + let rl_bytes_after = store.rate_limit_estimated_bytes(); 64 + metrics::set_cache_bytes(cache_bytes_after); 65 + metrics::set_rate_limit_bytes(rl_bytes_after); 66 tracing::info!( 67 evicted_entries = evicted, 68 + cache_bytes = cache_bytes_after, 69 + rate_limit_bytes = rl_bytes_after, 70 max_bytes = self.max_bytes, 71 "memory budget eviction" 72 ); ··· 80 81 #[test] 82 fn eviction_under_budget() { 83 + let store = ShardedCrdtStore::new(1); 84 let budget = MemoryBudget::new(1024 * 1024); 85 store.cache_set("k".into(), vec![1, 2, 3], 60_000); 86 + budget.enforce(&store); 87 assert!(store.cache_get("k").is_some()); 88 } 89 90 #[test] 91 fn eviction_over_budget() { 92 + let store = ShardedCrdtStore::new(1); 93 let budget = MemoryBudget::new(100); 94 (0..50).for_each(|i| { 95 store.cache_set( ··· 98 60_000, 99 ); 100 }); 101 + budget.enforce(&store); 102 + assert!(store.total_estimated_bytes() <= 100); 103 } 104 }
+171 -72
crates/tranquil-ripple/src/gossip.rs
··· 1 use crate::crdt::delta::CrdtDelta; 2 - use crate::crdt::CrdtStore; 3 use crate::transport::{ChannelTag, IncomingFrame, Transport}; 4 use foca::{Config, Foca, Notification, Runtime, Timer}; 5 - use parking_lot::RwLock; 6 use rand::rngs::StdRng; 7 use rand::SeedableRng; 8 use std::collections::HashSet; ··· 114 fn active_peers(&self) -> impl Iterator<Item = SocketAddr> + '_ { 115 self.active_addrs.iter().copied() 116 } 117 } 118 119 impl Runtime<PeerId> for &mut BufferedRuntime { ··· 141 142 pub struct GossipEngine { 143 transport: Arc<Transport>, 144 - store: Arc<RwLock<CrdtStore>>, 145 local_id: PeerId, 146 } 147 148 impl GossipEngine { 149 pub fn new( 150 transport: Arc<Transport>, 151 - store: Arc<RwLock<CrdtStore>>, 152 local_id: PeerId, 153 ) -> Self { 154 Self { ··· 203 } 204 }); 205 206 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 207 208 let mut gossip_tick = 209 tokio::time::interval(Duration::from_millis(gossip_interval_ms)); 210 - let mut maintenance_tick = tokio::time::interval(Duration::from_secs(10)); 211 212 loop { 213 tokio::select! { ··· 222 if let Err(e) = foca.handle_data(&frame.data, &mut runtime) { 223 tracing::warn!(error = %e, "foca handle_data error"); 224 } 225 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 226 } 227 ChannelTag::CrdtSync => { 228 const MAX_DELTA_ENTRIES: usize = 10_000; 229 const MAX_DELTA_RATE_LIMITS: usize = 10_000; 230 match bincode::serde::decode_from_slice::<CrdtDelta, _>(&frame.data, bincode::config::standard()) { 231 Ok((delta, _)) => { 232 let cache_len = delta.cache_delta.as_ref().map_or(0, |d| d.entries.len()); ··· 235 let window_mismatch = delta.rate_limit_deltas.iter().any(|rd| rd.counter.window_duration_ms == 0); 236 match cache_len > MAX_DELTA_ENTRIES || rl_len > MAX_DELTA_RATE_LIMITS || gcounter_oversize || window_mismatch { 237 true => { 238 tracing::warn!( 239 cache_entries = cache_len, 240 rate_limit_entries = rl_len, ··· 243 ); 244 } 245 false => { 246 - store.write().merge_delta(&delta); 247 } 248 } 249 } 250 Err(e) => { 251 tracing::warn!(error = %e, "failed to decode crdt sync delta"); 252 } 253 } ··· 256 } 257 } 258 _ = gossip_tick.tick() => { 259 - let pending = { 260 - let s = store.read(); 261 - let delta = s.peek_broadcast_delta(); 262 - match delta.is_empty() { 263 - true => None, 264 - false => { 265 - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { 266 - Ok(bytes) => Some((bytes, delta)), 267 - Err(e) => { 268 - tracing::warn!(error = %e, "failed to serialize broadcast delta"); 269 - None 270 - } 271 - } 272 - }, 273 } 274 - }; 275 - if let Some((ref data, ref delta)) = pending { 276 - let peers: Vec<SocketAddr> = members.active_peers().collect(); 277 - let mut all_queued = true; 278 - let cancel = shutdown.clone(); 279 - peers.iter().for_each(|&addr| { 280 - match transport.try_queue(addr, ChannelTag::CrdtSync, data) { 281 - true => {} 282 false => { 283 - all_queued = false; 284 - let t = transport.clone(); 285 - let d = data.clone(); 286 - let c = cancel.clone(); 287 - tokio::spawn(async move { 288 - tokio::select! { 289 - _ = c.cancelled() => {} 290 - _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} 291 } 292 - }); 293 } 294 } 295 - }); 296 - let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); 297 - if all_queued || peers.is_empty() || stale { 298 - if stale && !all_queued { 299 - tracing::warn!( 300 - elapsed_secs = last_commit.elapsed().as_secs(), 301 - "force-advancing broadcast watermark (staleness cap)" 302 - ); 303 - } 304 - store.write().commit_broadcast(delta); 305 - last_commit = tokio::time::Instant::now(); 306 } 307 - } 308 if let Err(e) = foca.gossip(&mut runtime) { 309 tracing::warn!(error = %e, "foca gossip error"); 310 } 311 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 312 } 313 Some((timer, _)) = timer_rx.recv() => { 314 if let Err(e) = foca.handle_timer(timer, &mut runtime) { 315 tracing::warn!(error = %e, "foca handle_timer error"); 316 } 317 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 318 } 319 - _ = maintenance_tick.tick() => { 320 - store.write().run_maintenance(); 321 tracing::trace!( 322 members = foca.num_members(), 323 - cache_bytes = store.read().cache_estimated_bytes(), 324 - "maintenance cycle" 325 ); 326 } 327 } ··· 331 } 332 333 fn flush_final_delta( 334 - store: &Arc<RwLock<CrdtStore>>, 335 transport: &Arc<Transport>, 336 members: &MemberTracker, 337 ) { 338 - let s = store.read(); 339 - let delta = s.peek_broadcast_delta(); 340 if delta.is_empty() { 341 return; 342 } 343 - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { 344 - Ok(bytes) => { 345 - members.active_peers().for_each(|addr| { 346 - let _ = transport.try_queue(addr, ChannelTag::CrdtSync, &bytes); 347 - }); 348 - } 349 - Err(e) => { 350 - tracing::warn!(error = %e, "failed to serialize final delta on shutdown"); 351 - } 352 - } 353 } 354 355 fn drain_runtime_actions( ··· 357 transport: &Arc<Transport>, 358 timer_tx: &mpsc::Sender<(Timer<PeerId>, Duration)>, 359 members: &mut MemberTracker, 360 shutdown: &CancellationToken, 361 ) { 362 let actions: Vec<RuntimeAction> = runtime.actions.drain(..).collect(); ··· 373 } 374 RuntimeAction::ScheduleTimer(timer, duration) => { 375 let tx = timer_tx.clone(); 376 tokio::spawn(async move { 377 - tokio::time::sleep(duration).await; 378 - let _ = tx.send((timer, duration)).await; 379 }); 380 } 381 RuntimeAction::MemberUp(addr) => { 382 tracing::info!(peer = %addr, "member up"); 383 members.member_up(addr); 384 } 385 RuntimeAction::MemberDown(addr) => { 386 tracing::info!(peer = %addr, "member down"); 387 members.member_down(addr); 388 } 389 }); 390 }
··· 1 use crate::crdt::delta::CrdtDelta; 2 + use crate::crdt::lww_map::LwwDelta; 3 + use crate::crdt::ShardedCrdtStore; 4 + use crate::metrics; 5 use crate::transport::{ChannelTag, IncomingFrame, Transport}; 6 use foca::{Config, Foca, Notification, Runtime, Timer}; 7 use rand::rngs::StdRng; 8 use rand::SeedableRng; 9 use std::collections::HashSet; ··· 115 fn active_peers(&self) -> impl Iterator<Item = SocketAddr> + '_ { 116 self.active_addrs.iter().copied() 117 } 118 + 119 + fn peer_count(&self) -> usize { 120 + self.active_addrs.len() 121 + } 122 } 123 124 impl Runtime<PeerId> for &mut BufferedRuntime { ··· 146 147 pub struct GossipEngine { 148 transport: Arc<Transport>, 149 + store: Arc<ShardedCrdtStore>, 150 local_id: PeerId, 151 } 152 153 impl GossipEngine { 154 pub fn new( 155 transport: Arc<Transport>, 156 + store: Arc<ShardedCrdtStore>, 157 local_id: PeerId, 158 ) -> Self { 159 Self { ··· 208 } 209 }); 210 211 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 212 213 let mut gossip_tick = 214 tokio::time::interval(Duration::from_millis(gossip_interval_ms)); 215 + gossip_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 216 217 loop { 218 tokio::select! { ··· 227 if let Err(e) = foca.handle_data(&frame.data, &mut runtime) { 228 tracing::warn!(error = %e, "foca handle_data error"); 229 } 230 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 231 } 232 ChannelTag::CrdtSync => { 233 const MAX_DELTA_ENTRIES: usize = 10_000; 234 const MAX_DELTA_RATE_LIMITS: usize = 10_000; 235 + metrics::record_gossip_delta_received(); 236 + metrics::record_gossip_delta_bytes(frame.data.len()); 237 match bincode::serde::decode_from_slice::<CrdtDelta, _>(&frame.data, bincode::config::standard()) { 238 Ok((delta, _)) => { 239 let cache_len = delta.cache_delta.as_ref().map_or(0, |d| d.entries.len()); ··· 242 let window_mismatch = delta.rate_limit_deltas.iter().any(|rd| rd.counter.window_duration_ms == 0); 243 match cache_len > MAX_DELTA_ENTRIES || rl_len > MAX_DELTA_RATE_LIMITS || gcounter_oversize || window_mismatch { 244 true => { 245 + metrics::record_gossip_drop(); 246 tracing::warn!( 247 cache_entries = cache_len, 248 rate_limit_entries = rl_len, ··· 251 ); 252 } 253 false => { 254 + if store.merge_delta(&delta) { 255 + metrics::record_gossip_merge(); 256 + } 257 } 258 } 259 } 260 Err(e) => { 261 + metrics::record_gossip_drop(); 262 tracing::warn!(error = %e, "failed to decode crdt sync delta"); 263 } 264 } ··· 267 } 268 } 269 _ = gossip_tick.tick() => { 270 + let delta = store.peek_broadcast_delta(); 271 + match delta.is_empty() { 272 + true => { 273 + last_commit = tokio::time::Instant::now(); 274 } 275 + false => { 276 + let chunks = chunk_and_serialize(&delta); 277 + match chunks.is_empty() { 278 + true => { 279 + tracing::warn!("all delta chunks failed to serialize, force-committing watermark"); 280 + store.commit_broadcast(&delta); 281 + last_commit = tokio::time::Instant::now(); 282 + } 283 false => { 284 + let peers: Vec<SocketAddr> = members.active_peers().collect(); 285 + let mut all_queued = true; 286 + let cancel = shutdown.clone(); 287 + chunks.iter().for_each(|chunk| { 288 + metrics::record_gossip_delta_bytes(chunk.len()); 289 + peers.iter().for_each(|&addr| { 290 + metrics::record_gossip_delta_sent(); 291 + match transport.try_queue(addr, ChannelTag::CrdtSync, chunk) { 292 + true => {} 293 + false => { 294 + all_queued = false; 295 + let t = transport.clone(); 296 + let d = chunk.clone(); 297 + let c = cancel.clone(); 298 + tokio::spawn(async move { 299 + tokio::select! { 300 + _ = c.cancelled() => {} 301 + _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} 302 + } 303 + }); 304 + } 305 + } 306 + }); 307 + }); 308 + let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); 309 + if all_queued || peers.is_empty() || stale { 310 + if stale && !all_queued { 311 + tracing::warn!( 312 + elapsed_secs = last_commit.elapsed().as_secs(), 313 + "force-advancing broadcast watermark (staleness cap)" 314 + ); 315 } 316 + store.commit_broadcast(&delta); 317 + last_commit = tokio::time::Instant::now(); 318 + } 319 } 320 } 321 } 322 + }; 323 if let Err(e) = foca.gossip(&mut runtime) { 324 tracing::warn!(error = %e, "foca gossip error"); 325 } 326 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 327 } 328 Some((timer, _)) = timer_rx.recv() => { 329 if let Err(e) = foca.handle_timer(timer, &mut runtime) { 330 tracing::warn!(error = %e, "foca handle_timer error"); 331 } 332 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 333 } 334 + _ = tokio::time::sleep(Duration::from_secs(10)) => { 335 tracing::trace!( 336 members = foca.num_members(), 337 + cache_bytes = store.cache_estimated_bytes(), 338 + rate_limit_bytes = store.rate_limit_estimated_bytes(), 339 + "gossip health check" 340 ); 341 } 342 } ··· 346 } 347 348 fn flush_final_delta( 349 + store: &Arc<ShardedCrdtStore>, 350 transport: &Arc<Transport>, 351 members: &MemberTracker, 352 ) { 353 + let delta = store.peek_broadcast_delta(); 354 if delta.is_empty() { 355 return; 356 } 357 + let chunks = chunk_and_serialize(&delta); 358 + chunks.iter().for_each(|chunk| { 359 + members.active_peers().for_each(|addr| { 360 + let _ = transport.try_queue(addr, ChannelTag::CrdtSync, chunk); 361 + }); 362 + }); 363 + store.commit_broadcast(&delta); 364 } 365 366 fn drain_runtime_actions( ··· 368 transport: &Arc<Transport>, 369 timer_tx: &mpsc::Sender<(Timer<PeerId>, Duration)>, 370 members: &mut MemberTracker, 371 + store: &Arc<ShardedCrdtStore>, 372 shutdown: &CancellationToken, 373 ) { 374 let actions: Vec<RuntimeAction> = runtime.actions.drain(..).collect(); ··· 385 } 386 RuntimeAction::ScheduleTimer(timer, duration) => { 387 let tx = timer_tx.clone(); 388 + let c = shutdown.clone(); 389 tokio::spawn(async move { 390 + tokio::select! { 391 + _ = c.cancelled() => {} 392 + _ = tokio::time::sleep(duration) => { 393 + let _ = tx.send((timer, duration)).await; 394 + } 395 + } 396 }); 397 } 398 RuntimeAction::MemberUp(addr) => { 399 tracing::info!(peer = %addr, "member up"); 400 members.member_up(addr); 401 + metrics::set_gossip_peers(members.peer_count()); 402 + let snapshot = store.peek_full_state(); 403 + if !snapshot.is_empty() { 404 + chunk_and_serialize(&snapshot).into_iter().for_each(|chunk| { 405 + let t = transport.clone(); 406 + let c = shutdown.clone(); 407 + tokio::spawn(async move { 408 + tokio::select! { 409 + _ = c.cancelled() => {} 410 + _ = t.send(addr, ChannelTag::CrdtSync, &chunk) => {} 411 + } 412 + }); 413 + }); 414 + } 415 } 416 RuntimeAction::MemberDown(addr) => { 417 tracing::info!(peer = %addr, "member down"); 418 members.member_down(addr); 419 + metrics::set_gossip_peers(members.peer_count()); 420 } 421 }); 422 } 423 + 424 + fn chunk_and_serialize(delta: &CrdtDelta) -> Vec<Vec<u8>> { 425 + let config = bincode::config::standard(); 426 + match bincode::serde::encode_to_vec(delta, config) { 427 + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => vec![bytes], 428 + Ok(_) => split_and_serialize(delta.clone()), 429 + Err(e) => { 430 + tracing::warn!(error = %e, "failed to serialize delta"); 431 + vec![] 432 + } 433 + } 434 + } 435 + 436 + fn split_and_serialize(delta: CrdtDelta) -> Vec<Vec<u8>> { 437 + let version = delta.version; 438 + let source_node = delta.source_node; 439 + let cache_entries = delta.cache_delta.map_or(Vec::new(), |d| d.entries); 440 + let rl_deltas = delta.rate_limit_deltas; 441 + 442 + if cache_entries.is_empty() && rl_deltas.is_empty() { 443 + return vec![]; 444 + } 445 + 446 + if cache_entries.len() <= 1 && rl_deltas.len() <= 1 { 447 + let mini = CrdtDelta { 448 + version, 449 + source_node, 450 + cache_delta: match cache_entries.is_empty() { 451 + true => None, 452 + false => Some(LwwDelta { entries: cache_entries }), 453 + }, 454 + rate_limit_deltas: rl_deltas, 455 + }; 456 + match bincode::serde::encode_to_vec(&mini, bincode::config::standard()) { 457 + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => return vec![bytes], 458 + _ => { 459 + tracing::error!("irreducible delta entry exceeds max frame size, dropping"); 460 + return vec![]; 461 + } 462 + } 463 + } 464 + 465 + let mid_cache = cache_entries.len() / 2; 466 + let mid_rl = rl_deltas.len() / 2; 467 + 468 + let mut left_cache = cache_entries; 469 + let right_cache = left_cache.split_off(mid_cache); 470 + let mut left_rl = rl_deltas; 471 + let right_rl = left_rl.split_off(mid_rl); 472 + 473 + let make_sub = |entries: Vec<_>, rls| CrdtDelta { 474 + version, 475 + source_node, 476 + cache_delta: match entries.is_empty() { 477 + true => None, 478 + false => Some(LwwDelta { entries }), 479 + }, 480 + rate_limit_deltas: rls, 481 + }; 482 + 483 + let left = make_sub(left_cache, left_rl); 484 + let right = make_sub(right_cache, right_rl); 485 + 486 + let mut result = chunk_and_serialize(&left); 487 + result.extend(chunk_and_serialize(&right)); 488 + result 489 + }
+1
crates/tranquil-ripple/src/lib.rs
··· 4 pub mod engine; 5 pub mod eviction; 6 pub mod gossip; 7 pub mod rate_limiter; 8 pub mod transport; 9
··· 4 pub mod engine; 5 pub mod eviction; 6 pub mod gossip; 7 + pub mod metrics; 8 pub mod rate_limiter; 9 pub mod transport; 10
+108
crates/tranquil-ripple/src/metrics.rs
···
··· 1 + use metrics::{counter, gauge, histogram}; 2 + 3 + pub fn describe_metrics() { 4 + metrics::describe_gauge!( 5 + "tranquil_ripple_cache_bytes", 6 + "Estimated memory used by cache entries" 7 + ); 8 + metrics::describe_gauge!( 9 + "tranquil_ripple_rate_limit_bytes", 10 + "Estimated memory used by rate limit counters" 11 + ); 12 + metrics::describe_gauge!( 13 + "tranquil_ripple_gossip_peers", 14 + "Number of active gossip peers" 15 + ); 16 + metrics::describe_counter!( 17 + "tranquil_ripple_cache_hits_total", 18 + "Total cache read hits" 19 + ); 20 + metrics::describe_counter!( 21 + "tranquil_ripple_cache_misses_total", 22 + "Total cache read misses" 23 + ); 24 + metrics::describe_counter!( 25 + "tranquil_ripple_cache_writes_total", 26 + "Total cache write operations" 27 + ); 28 + metrics::describe_counter!( 29 + "tranquil_ripple_cache_deletes_total", 30 + "Total cache delete operations" 31 + ); 32 + metrics::describe_counter!( 33 + "tranquil_ripple_evictions_total", 34 + "Total cache entries evicted by memory budget" 35 + ); 36 + metrics::describe_counter!( 37 + "tranquil_ripple_gossip_deltas_sent_total", 38 + "Total CRDT delta chunks sent to peers" 39 + ); 40 + metrics::describe_counter!( 41 + "tranquil_ripple_gossip_deltas_received_total", 42 + "Total CRDT delta messages received from peers" 43 + ); 44 + metrics::describe_counter!( 45 + "tranquil_ripple_gossip_merges_total", 46 + "Total CRDT deltas merged with local state change" 47 + ); 48 + metrics::describe_counter!( 49 + "tranquil_ripple_gossip_drops_total", 50 + "Total CRDT deltas dropped (validation or decode failure)" 51 + ); 52 + metrics::describe_histogram!( 53 + "tranquil_ripple_gossip_delta_bytes", 54 + "Size of CRDT delta chunks in bytes" 55 + ); 56 + } 57 + 58 + pub fn record_cache_hit() { 59 + counter!("tranquil_ripple_cache_hits_total").increment(1); 60 + } 61 + 62 + pub fn record_cache_miss() { 63 + counter!("tranquil_ripple_cache_misses_total").increment(1); 64 + } 65 + 66 + pub fn record_cache_write() { 67 + counter!("tranquil_ripple_cache_writes_total").increment(1); 68 + } 69 + 70 + pub fn record_cache_delete() { 71 + counter!("tranquil_ripple_cache_deletes_total").increment(1); 72 + } 73 + 74 + pub fn set_cache_bytes(bytes: usize) { 75 + gauge!("tranquil_ripple_cache_bytes").set(bytes as f64); 76 + } 77 + 78 + pub fn set_rate_limit_bytes(bytes: usize) { 79 + gauge!("tranquil_ripple_rate_limit_bytes").set(bytes as f64); 80 + } 81 + 82 + pub fn set_gossip_peers(count: usize) { 83 + gauge!("tranquil_ripple_gossip_peers").set(count as f64); 84 + } 85 + 86 + pub fn record_evictions(count: usize) { 87 + counter!("tranquil_ripple_evictions_total").increment(count as u64); 88 + } 89 + 90 + pub fn record_gossip_delta_sent() { 91 + counter!("tranquil_ripple_gossip_deltas_sent_total").increment(1); 92 + } 93 + 94 + pub fn record_gossip_delta_received() { 95 + counter!("tranquil_ripple_gossip_deltas_received_total").increment(1); 96 + } 97 + 98 + pub fn record_gossip_merge() { 99 + counter!("tranquil_ripple_gossip_merges_total").increment(1); 100 + } 101 + 102 + pub fn record_gossip_drop() { 103 + counter!("tranquil_ripple_gossip_drops_total").increment(1); 104 + } 105 + 106 + pub fn record_gossip_delta_bytes(bytes: usize) { 107 + histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64); 108 + }
+7 -8
crates/tranquil-ripple/src/rate_limiter.rs
··· 1 - use crate::crdt::CrdtStore; 2 use async_trait::async_trait; 3 - use parking_lot::RwLock; 4 use std::sync::Arc; 5 use tranquil_infra::DistributedRateLimiter; 6 7 pub struct RippleRateLimiter { 8 - store: Arc<RwLock<CrdtStore>>, 9 } 10 11 impl RippleRateLimiter { 12 - pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self { 13 Self { store } 14 } 15 } ··· 17 #[async_trait] 18 impl DistributedRateLimiter for RippleRateLimiter { 19 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { 20 - self.store.write().rate_limit_check(key, limit, window_ms) 21 } 22 23 async fn peek_rate_limit_count(&self, key: &str, window_ms: u64) -> u64 { 24 - self.store.read().rate_limit_peek(key, window_ms) 25 } 26 } 27 ··· 31 32 #[tokio::test] 33 async fn rate_limiter_trait_allows_within_limit() { 34 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 35 let rl = RippleRateLimiter::new(store); 36 assert!(rl.check_rate_limit("test", 5, 60_000).await); 37 assert!(rl.check_rate_limit("test", 5, 60_000).await); ··· 39 40 #[tokio::test] 41 async fn rate_limiter_trait_blocks_over_limit() { 42 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 43 let rl = RippleRateLimiter::new(store); 44 assert!(rl.check_rate_limit("k", 3, 60_000).await); 45 assert!(rl.check_rate_limit("k", 3, 60_000).await);
··· 1 + use crate::crdt::ShardedCrdtStore; 2 use async_trait::async_trait; 3 use std::sync::Arc; 4 use tranquil_infra::DistributedRateLimiter; 5 6 pub struct RippleRateLimiter { 7 + store: Arc<ShardedCrdtStore>, 8 } 9 10 impl RippleRateLimiter { 11 + pub fn new(store: Arc<ShardedCrdtStore>) -> Self { 12 Self { store } 13 } 14 } ··· 16 #[async_trait] 17 impl DistributedRateLimiter for RippleRateLimiter { 18 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { 19 + self.store.rate_limit_check(key, limit, window_ms) 20 } 21 22 async fn peek_rate_limit_count(&self, key: &str, window_ms: u64) -> u64 { 23 + self.store.rate_limit_peek(key, window_ms) 24 } 25 } 26 ··· 30 31 #[tokio::test] 32 async fn rate_limiter_trait_allows_within_limit() { 33 + let store = Arc::new(ShardedCrdtStore::new(1)); 34 let rl = RippleRateLimiter::new(store); 35 assert!(rl.check_rate_limit("test", 5, 60_000).await); 36 assert!(rl.check_rate_limit("test", 5, 60_000).await); ··· 38 39 #[tokio::test] 40 async fn rate_limiter_trait_blocks_over_limit() { 41 + let store = Arc::new(ShardedCrdtStore::new(1)); 42 let rl = RippleRateLimiter::new(store); 43 assert!(rl.check_rate_limit("k", 3, 60_000).await); 44 assert!(rl.check_rate_limit("k", 3, 60_000).await);
+62 -19
crates/tranquil-ripple/src/transport.rs
··· 3 use std::collections::HashMap; 4 use std::net::SocketAddr; 5 use std::sync::Arc; 6 - use std::sync::atomic::{AtomicUsize, Ordering}; 7 use std::time::Duration; 8 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 9 use tokio::net::{TcpListener, TcpStream}; 10 use tokio::sync::mpsc; 11 use tokio_util::sync::CancellationToken; 12 13 - const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; 14 const MAX_INBOUND_CONNECTIONS: usize = 512; 15 const WRITE_TIMEOUT: Duration = Duration::from_secs(10); 16 17 #[derive(Debug, Clone, Copy, PartialEq, Eq)] ··· 44 45 struct ConnectionWriter { 46 tx: mpsc::Sender<Vec<u8>>, 47 } 48 49 pub struct Transport { ··· 51 _machine_id: u64, 52 connections: Arc<parking_lot::Mutex<HashMap<SocketAddr, ConnectionWriter>>>, 53 connecting: Arc<parking_lot::Mutex<std::collections::HashSet<SocketAddr>>>, 54 #[allow(dead_code)] 55 inbound_count: Arc<AtomicUsize>, 56 shutdown: CancellationToken, 57 incoming_tx: mpsc::Sender<IncomingFrame>, 58 } ··· 73 _machine_id: machine_id, 74 connections: Arc::new(parking_lot::Mutex::new(HashMap::new())), 75 connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())), 76 inbound_count: inbound_count.clone(), 77 shutdown: shutdown.clone(), 78 incoming_tx: incoming_tx.clone(), 79 }; ··· 99 continue; 100 } 101 inbound_counter.fetch_add(1, Ordering::Relaxed); 102 Self::spawn_reader( 103 stream, 104 peer_addr, ··· 144 }; 145 let writer = { 146 let conns = self.connections.lock(); 147 - conns.get(&target).map(|w| w.tx.clone()) 148 }; 149 match writer { 150 - Some(tx) => { 151 if tx.send(frame).await.is_err() { 152 - self.connections.lock().remove(&target); 153 self.connect_and_send(target, tag, data).await; 154 } 155 } ··· 163 { 164 let mut connecting = self.connecting.lock(); 165 if connecting.contains(&target) { 166 - tracing::debug!(peer = %target, "connection already in-flight, dropping frame"); 167 return; 168 } 169 connecting.insert(target); ··· 191 .await; 192 match stream { 193 Ok(stream) => { 194 let (read_half, write_half) = stream.into_split(); 195 let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(1024); 196 - let cancel = self.shutdown.clone(); 197 let connections = self.connections.clone(); 198 let peer = target; 199 200 tokio::spawn(async move { 201 let mut writer = write_half; 202 loop { 203 tokio::select! { 204 - _ = cancel.cancelled() => break, 205 msg = write_rx.recv() => { 206 match msg { 207 Some(buf) => { ··· 227 } 228 } 229 connections.lock().remove(&peer); 230 }); 231 232 - Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), self.shutdown.clone()); 233 - 234 - let frame = match encode_frame(tag, data) { 235 - Some(f) => f, 236 - None => return, 237 - }; 238 - let _ = write_tx.send(frame).await; 239 - self.connections.lock().insert( 240 - target, 241 - ConnectionWriter { tx: write_tx }, 242 - ); 243 tracing::debug!(peer = %target, "established outbound connection"); 244 } 245 Err(e) => { ··· 309 } 310 } 311 } 312 }); 313 } 314 ··· 332 DecodeResult::Corrupt => return false, 333 } 334 } 335 } 336 } 337
··· 3 use std::collections::HashMap; 4 use std::net::SocketAddr; 5 use std::sync::Arc; 6 + use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; 7 use std::time::Duration; 8 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 9 use tokio::net::{TcpListener, TcpStream}; 10 use tokio::sync::mpsc; 11 use tokio_util::sync::CancellationToken; 12 13 + pub(crate) const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; 14 const MAX_INBOUND_CONNECTIONS: usize = 512; 15 + const MAX_OUTBOUND_CONNECTIONS: usize = 512; 16 const WRITE_TIMEOUT: Duration = Duration::from_secs(10); 17 18 #[derive(Debug, Clone, Copy, PartialEq, Eq)] ··· 45 46 struct ConnectionWriter { 47 tx: mpsc::Sender<Vec<u8>>, 48 + generation: u64, 49 } 50 51 pub struct Transport { ··· 53 _machine_id: u64, 54 connections: Arc<parking_lot::Mutex<HashMap<SocketAddr, ConnectionWriter>>>, 55 connecting: Arc<parking_lot::Mutex<std::collections::HashSet<SocketAddr>>>, 56 + conn_generation: Arc<AtomicU64>, 57 #[allow(dead_code)] 58 inbound_count: Arc<AtomicUsize>, 59 + outbound_count: Arc<AtomicUsize>, 60 shutdown: CancellationToken, 61 incoming_tx: mpsc::Sender<IncomingFrame>, 62 } ··· 77 _machine_id: machine_id, 78 connections: Arc::new(parking_lot::Mutex::new(HashMap::new())), 79 connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())), 80 + conn_generation: Arc::new(AtomicU64::new(0)), 81 inbound_count: inbound_count.clone(), 82 + outbound_count: Arc::new(AtomicUsize::new(0)), 83 shutdown: shutdown.clone(), 84 incoming_tx: incoming_tx.clone(), 85 }; ··· 105 continue; 106 } 107 inbound_counter.fetch_add(1, Ordering::Relaxed); 108 + configure_socket(&stream); 109 Self::spawn_reader( 110 stream, 111 peer_addr, ··· 151 }; 152 let writer = { 153 let conns = self.connections.lock(); 154 + conns.get(&target).map(|w| (w.tx.clone(), w.generation)) 155 }; 156 match writer { 157 + Some((tx, acquired_gen)) => { 158 if tx.send(frame).await.is_err() { 159 + { 160 + let mut conns = self.connections.lock(); 161 + let stale = conns 162 + .get(&target) 163 + .is_some_and(|w| w.generation == acquired_gen); 164 + if stale { 165 + conns.remove(&target); 166 + } 167 + } 168 self.connect_and_send(target, tag, data).await; 169 } 170 } ··· 178 { 179 let mut connecting = self.connecting.lock(); 180 if connecting.contains(&target) { 181 + tracing::warn!(peer = %target, "connection already in-flight, dropping frame"); 182 return; 183 } 184 connecting.insert(target); ··· 206 .await; 207 match stream { 208 Ok(stream) => { 209 + if self.outbound_count.load(Ordering::Relaxed) >= MAX_OUTBOUND_CONNECTIONS { 210 + tracing::warn!( 211 + peer = %target, 212 + max = MAX_OUTBOUND_CONNECTIONS, 213 + "outbound connection limit reached, dropping" 214 + ); 215 + return; 216 + } 217 + self.outbound_count.fetch_add(1, Ordering::Relaxed); 218 + configure_socket(&stream); 219 let (read_half, write_half) = stream.into_split(); 220 let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(1024); 221 + 222 + let conn_gen = self.conn_generation.fetch_add(1, Ordering::Relaxed); 223 + self.connections.lock().insert( 224 + target, 225 + ConnectionWriter { tx: write_tx.clone(), generation: conn_gen }, 226 + ); 227 + if let Some(frame) = encode_frame(tag, data) { 228 + let _ = write_tx.try_send(frame); 229 + } 230 + 231 + let conn_cancel = self.shutdown.child_token(); 232 + let reader_cancel = conn_cancel.clone(); 233 let connections = self.connections.clone(); 234 + let outbound_counter = self.outbound_count.clone(); 235 let peer = target; 236 237 tokio::spawn(async move { 238 let mut writer = write_half; 239 loop { 240 tokio::select! { 241 + _ = conn_cancel.cancelled() => break, 242 msg = write_rx.recv() => { 243 match msg { 244 Some(buf) => { ··· 264 } 265 } 266 connections.lock().remove(&peer); 267 + outbound_counter.fetch_sub(1, Ordering::Relaxed); 268 + conn_cancel.cancel(); 269 }); 270 271 + Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), reader_cancel); 272 tracing::debug!(peer = %target, "established outbound connection"); 273 } 274 Err(e) => { ··· 338 } 339 } 340 } 341 + cancel.cancel(); 342 }); 343 } 344 ··· 362 DecodeResult::Corrupt => return false, 363 } 364 } 365 + } 366 + } 367 + 368 + fn configure_socket(stream: &TcpStream) { 369 + let sock_ref = socket2::SockRef::from(stream); 370 + if let Err(e) = sock_ref.set_tcp_nodelay(true) { 371 + tracing::warn!(error = %e, "failed to set TCP_NODELAY"); 372 + } 373 + let params = socket2::TcpKeepalive::new() 374 + .with_time(Duration::from_secs(30)) 375 + .with_interval(Duration::from_secs(10)); 376 + if let Err(e) = sock_ref.set_tcp_keepalive(&params) { 377 + tracing::warn!(error = %e, "failed to set TCP keepalive"); 378 } 379 } 380
+160
crates/tranquil-ripple/tests/two_node_convergence.rs
··· 608 609 shutdown.cancel(); 610 }
··· 608 609 shutdown.cancel(); 610 } 611 + 612 + #[tokio::test] 613 + async fn two_node_partition_recovery() { 614 + tracing_subscriber::fmt() 615 + .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) 616 + .with_test_writer() 617 + .try_init() 618 + .ok(); 619 + 620 + let shutdown = CancellationToken::new(); 621 + 622 + let config_a = RippleConfig { 623 + bind_addr: "127.0.0.1:0".parse().unwrap(), 624 + seed_peers: vec![], 625 + machine_id: 100, 626 + gossip_interval_ms: 100, 627 + cache_max_bytes: 64 * 1024 * 1024, 628 + }; 629 + let (cache_a, _rl_a, addr_a) = RippleEngine::start(config_a, shutdown.clone()) 630 + .await 631 + .expect("node A failed to start"); 632 + 633 + futures::future::join_all((0..50).map(|i| { 634 + let cache = cache_a.clone(); 635 + async move { 636 + cache 637 + .set( 638 + &format!("pre-{i}"), 639 + &format!("val-{i}"), 640 + Duration::from_secs(300), 641 + ) 642 + .await 643 + .expect("set on A failed"); 644 + } 645 + })) 646 + .await; 647 + 648 + let config_b = RippleConfig { 649 + bind_addr: "127.0.0.1:0".parse().unwrap(), 650 + seed_peers: vec![addr_a], 651 + machine_id: 200, 652 + gossip_interval_ms: 100, 653 + cache_max_bytes: 64 * 1024 * 1024, 654 + }; 655 + let (cache_b, _rl_b, _addr_b) = RippleEngine::start(config_b, shutdown.clone()) 656 + .await 657 + .expect("node B failed to start"); 658 + 659 + let b = cache_b.clone(); 660 + poll_until(15_000, 200, move || { 661 + let b = b.clone(); 662 + async move { 663 + futures::future::join_all((0..50).map(|i| { 664 + let b = b.clone(); 665 + async move { b.get(&format!("pre-{i}")).await.is_some() } 666 + })) 667 + .await 668 + .into_iter() 669 + .all(|present| present) 670 + } 671 + }) 672 + .await; 673 + 674 + futures::future::join_all((0..50).map(|i| { 675 + let cache = cache_b.clone(); 676 + async move { 677 + cache 678 + .set( 679 + &format!("post-{i}"), 680 + &format!("bval-{i}"), 681 + Duration::from_secs(300), 682 + ) 683 + .await 684 + .expect("set on B failed"); 685 + } 686 + })) 687 + .await; 688 + 689 + let a = cache_a.clone(); 690 + poll_until(15_000, 200, move || { 691 + let a = a.clone(); 692 + async move { 693 + futures::future::join_all((0..50).map(|i| { 694 + let a = a.clone(); 695 + async move { a.get(&format!("post-{i}")).await.is_some() } 696 + })) 697 + .await 698 + .into_iter() 699 + .all(|present| present) 700 + } 701 + }) 702 + .await; 703 + 704 + shutdown.cancel(); 705 + } 706 + 707 + #[tokio::test] 708 + async fn two_node_stress_concurrent_load() { 709 + tracing_subscriber::fmt() 710 + .with_max_level(tracing_subscriber::filter::LevelFilter::INFO) 711 + .with_test_writer() 712 + .try_init() 713 + .ok(); 714 + 715 + let shutdown = CancellationToken::new(); 716 + let ((cache_a, rl_a), (cache_b, rl_b)) = spawn_pair(shutdown.clone()).await; 717 + 718 + let tasks: Vec<tokio::task::JoinHandle<()>> = (0u32..8).map(|task_id| { 719 + let cache = match task_id < 4 { 720 + true => cache_a.clone(), 721 + false => cache_b.clone(), 722 + }; 723 + let rl = match task_id < 4 { 724 + true => rl_a.clone(), 725 + false => rl_b.clone(), 726 + }; 727 + tokio::spawn(async move { 728 + let value = vec![0xABu8; 1024]; 729 + futures::future::join_all((0u32..500).map(|op| { 730 + let cache = cache.clone(); 731 + let rl = rl.clone(); 732 + let value = value.clone(); 733 + async move { 734 + let key_idx = op % 100; 735 + let key = format!("stress-{task_id}-{key_idx}"); 736 + match op % 4 { 737 + 0 | 1 => { 738 + cache 739 + .set_bytes(&key, &value, Duration::from_secs(120)) 740 + .await 741 + .expect("set_bytes failed"); 742 + } 743 + 2 => { 744 + let _ = cache.get(&key).await; 745 + } 746 + _ => { 747 + let _ = rl.check_rate_limit(&key, 1000, 60_000).await; 748 + } 749 + } 750 + } 751 + })) 752 + .await; 753 + }) 754 + }).collect(); 755 + 756 + let results = tokio::time::timeout( 757 + Duration::from_secs(30), 758 + futures::future::join_all(tasks), 759 + ) 760 + .await 761 + .expect("stress test timed out after 30s"); 762 + 763 + results.into_iter().enumerate().for_each(|(i, r)| { 764 + r.unwrap_or_else(|e| panic!("task {i} panicked: {e}")); 765 + }); 766 + 767 + tokio::time::sleep(Duration::from_secs(12)).await; 768 + 769 + shutdown.cancel(); 770 + }