Our Personal Data Server from scratch! tranquil.farm
oauth atproto pds rust postgresql objectstorage fun

feat: initial in-house cache distribution #18

merged opened by lewis.moe targeting main from feat/initial-in-house-cache-dist
  • membership: swim, via foca
  • cache replication: last-write-wins map
  • rate limit replication: PN-counter
  • clock: hybrid logical clock
  • delta propagation: direct peer broadcast that bypasses swim protocol / foca because foca wasn't broadcasting nicely
  • serialization: bincode
  • eviction: lru with memory budget
  • schema versioning: u8 version tag on deltas

we can change any of these after the fact fearlessly because hey, it's all in mem ya know?

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3me7vy2x3bb22
+1000 -278
Interdiff #0 #1
+8
.config/nextest.toml
··· 29 29 filter = "binary(whole_story)" 30 30 test-group = "heavy-load-tests" 31 31 32 + [[profile.default.overrides]] 33 + filter = "test(/two_node_stress_concurrent_load/)" 34 + test-group = "heavy-load-tests" 35 + 32 36 [[profile.ci.overrides]] 33 37 filter = "test(/import_with_verification/) | test(/plc_migration/)" 34 38 test-group = "serial-env-tests" ··· 39 43 40 44 [[profile.ci.overrides]] 41 45 filter = "binary(whole_story)" 46 + test-group = "heavy-load-tests" 47 + 48 + [[profile.ci.overrides]] 49 + filter = "test(/two_node_stress_concurrent_load/)" 42 50 test-group = "heavy-load-tests"
.env.example

This file has not been changed.

+24 -22
Cargo.lock
··· 2778 2778 "libc", 2779 2779 "percent-encoding", 2780 2780 "pin-project-lite", 2781 - "socket2 0.6.1", 2781 + "socket2 0.5.10", 2782 2782 "system-configuration", 2783 2783 "tokio", 2784 2784 "tower-service", ··· 4242 4242 "quinn-udp", 4243 4243 "rustc-hash", 4244 4244 "rustls 0.23.35", 4245 - "socket2 0.6.1", 4245 + "socket2 0.5.10", 4246 4246 "thiserror 2.0.17", 4247 4247 "tokio", 4248 4248 "tracing", ··· 4279 4279 "cfg_aliases", 4280 4280 "libc", 4281 4281 "once_cell", 4282 - "socket2 0.6.1", 4282 + "socket2 0.5.10", 4283 4283 "tracing", 4284 4284 "windows-sys 0.60.2", 4285 4285 ] ··· 4402 4402 "pin-project-lite", 4403 4403 "ryu", 4404 4404 "sha1_smol", 4405 - "socket2 0.6.1", 4405 + "socket2 0.6.2", 4406 4406 "tokio", 4407 4407 "tokio-util", 4408 4408 "url", ··· 5134 5134 5135 5135 [[package]] 5136 5136 name = "socket2" 5137 - version = "0.6.1" 5137 + version = "0.6.2" 5138 5138 source = "registry+https://github.com/rust-lang/crates.io-index" 5139 - checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" 5139 + checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" 5140 5140 dependencies = [ 5141 5141 "libc", 5142 5142 "windows-sys 0.60.2", ··· 5693 5693 "mio", 5694 5694 "pin-project-lite", 5695 5695 "signal-hook-registry", 5696 - "socket2 0.6.1", 5696 + "socket2 0.6.2", 5697 5697 "tokio-macros", 5698 5698 "windows-sys 0.61.2", 5699 5699 ] ··· 5797 5797 "hyper-util", 5798 5798 "percent-encoding", 5799 5799 "pin-project", 5800 - "socket2 0.6.1", 5800 + "socket2 0.6.2", 5801 5801 "sync_wrapper", 5802 5802 "tokio", 5803 5803 "tokio-stream", ··· 5963 5963 5964 5964 [[package]] 5965 5965 name = "tranquil-auth" 5966 - version = "0.1.0" 5966 + version = "0.2.0" 5967 5967 dependencies = [ 5968 5968 "anyhow", 5969 5969 "base32", ··· 5985 5985 5986 5986 [[package]] 5987 5987 name = "tranquil-cache" 5988 - version = "0.1.0" 5988 + version = "0.2.0" 5989 5989 dependencies = [ 5990 5990 "async-trait", 5991 5991 "base64 0.22.1", ··· 5998 5998 5999 5999 [[package]] 6000 6000 name = "tranquil-comms" 6001 - version = "0.1.0" 6001 + version = "0.2.0" 6002 6002 dependencies = [ 6003 6003 "async-trait", 6004 6004 "base64 0.22.1", ··· 6012 6012 6013 6013 [[package]] 6014 6014 name = "tranquil-crypto" 6015 - version = "0.1.0" 6015 + version = "0.2.0" 6016 6016 dependencies = [ 6017 6017 "aes-gcm", 6018 6018 "base64 0.22.1", ··· 6028 6028 6029 6029 [[package]] 6030 6030 name = "tranquil-db" 6031 - version = "0.1.0" 6031 + version = "0.2.0" 6032 6032 dependencies = [ 6033 6033 "async-trait", 6034 6034 "chrono", ··· 6045 6045 6046 6046 [[package]] 6047 6047 name = "tranquil-db-traits" 6048 - version = "0.1.0" 6048 + version = "0.2.0" 6049 6049 dependencies = [ 6050 6050 "async-trait", 6051 6051 "base64 0.22.1", ··· 6061 6061 6062 6062 [[package]] 6063 6063 name = "tranquil-infra" 6064 - version = "0.1.0" 6064 + version = "0.2.0" 6065 6065 dependencies = [ 6066 6066 "async-trait", 6067 6067 "bytes", ··· 6071 6071 6072 6072 [[package]] 6073 6073 name = "tranquil-oauth" 6074 - version = "0.1.0" 6074 + version = "0.2.0" 6075 6075 dependencies = [ 6076 6076 "anyhow", 6077 6077 "axum", ··· 6094 6094 6095 6095 [[package]] 6096 6096 name = "tranquil-pds" 6097 - version = "0.1.0" 6097 + version = "0.2.0" 6098 6098 dependencies = [ 6099 6099 "aes-gcm", 6100 6100 "anyhow", ··· 6179 6179 6180 6180 [[package]] 6181 6181 name = "tranquil-repo" 6182 - version = "0.1.0" 6182 + version = "0.2.0" 6183 6183 dependencies = [ 6184 6184 "bytes", 6185 6185 "cid", ··· 6191 6191 6192 6192 [[package]] 6193 6193 name = "tranquil-ripple" 6194 - version = "0.1.0" 6194 + version = "0.2.0" 6195 6195 dependencies = [ 6196 6196 "async-trait", 6197 6197 "backon", ··· 6199 6199 "bytes", 6200 6200 "foca", 6201 6201 "futures", 6202 + "metrics", 6202 6203 "parking_lot", 6203 6204 "rand 0.9.2", 6204 6205 "serde", 6206 + "socket2 0.6.2", 6205 6207 "thiserror 2.0.17", 6206 6208 "tokio", 6207 6209 "tokio-util", ··· 6213 6215 6214 6216 [[package]] 6215 6217 name = "tranquil-scopes" 6216 - version = "0.1.0" 6218 + version = "0.2.0" 6217 6219 dependencies = [ 6218 6220 "axum", 6219 6221 "futures", ··· 6228 6230 6229 6231 [[package]] 6230 6232 name = "tranquil-storage" 6231 - version = "0.1.0" 6233 + version = "0.2.0" 6232 6234 dependencies = [ 6233 6235 "async-trait", 6234 6236 "aws-config", ··· 6244 6246 6245 6247 [[package]] 6246 6248 name = "tranquil-types" 6247 - version = "0.1.0" 6249 + version = "0.2.0" 6248 6250 dependencies = [ 6249 6251 "chrono", 6250 6252 "cid",
Cargo.toml

This file has not been changed.

README.md

This file has not been changed.

crates/tranquil-cache/Cargo.toml

This file has not been changed.

crates/tranquil-cache/src/lib.rs

This file has not been changed.

crates/tranquil-infra/src/lib.rs

This file has not been changed.

crates/tranquil-pds/Cargo.toml

This file has not been changed.

crates/tranquil-pds/src/api/repo/blob.rs

This file has not been changed.

crates/tranquil-pds/src/state.rs

This file has not been changed.

crates/tranquil-pds/tests/admin_search.rs

This file has not been changed.

crates/tranquil-pds/tests/common/mod.rs

This file has not been changed.

crates/tranquil-pds/tests/firehose_validation.rs

This file has not been changed.

crates/tranquil-pds/tests/repo_blob.rs

This file has not been changed.

crates/tranquil-pds/tests/ripple_cluster.rs

This file has not been changed.

crates/tranquil-pds/tests/sync_conformance.rs

This file has not been changed.

crates/tranquil-pds/tests/whole_story.rs

This file has not been changed.

+2
crates/tranquil-ripple/Cargo.toml
··· 12 12 bincode = { workspace = true } 13 13 bytes = { workspace = true } 14 14 foca = { workspace = true } 15 + metrics = { workspace = true } 15 16 parking_lot = { workspace = true } 16 17 rand = "0.9" 17 18 serde = { workspace = true } 19 + socket2 = "0.6.2" 18 20 thiserror = { workspace = true } 19 21 tokio = { workspace = true, features = ["net", "io-util", "sync", "time"] } 20 22 tokio-util = { workspace = true }
+29 -16
crates/tranquil-ripple/src/cache.rs
··· 1 - use crate::crdt::CrdtStore; 1 + use crate::crdt::ShardedCrdtStore; 2 + use crate::metrics; 2 3 use async_trait::async_trait; 3 - use parking_lot::RwLock; 4 4 use std::sync::Arc; 5 5 use std::time::Duration; 6 6 use tranquil_infra::{Cache, CacheError}; 7 7 8 8 pub struct RippleCache { 9 - store: Arc<RwLock<CrdtStore>>, 9 + store: Arc<ShardedCrdtStore>, 10 10 } 11 11 12 12 impl RippleCache { 13 - pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self { 13 + pub fn new(store: Arc<ShardedCrdtStore>) -> Self { 14 14 Self { store } 15 15 } 16 16 } ··· 18 18 #[async_trait] 19 19 impl Cache for RippleCache { 20 20 async fn get(&self, key: &str) -> Option<String> { 21 - self.store 22 - .read() 21 + let result = self 22 + .store 23 23 .cache_get(key) 24 - .and_then(|bytes| String::from_utf8(bytes).ok()) 24 + .and_then(|bytes| String::from_utf8(bytes).ok()); 25 + match result.is_some() { 26 + true => metrics::record_cache_hit(), 27 + false => metrics::record_cache_miss(), 28 + } 29 + result 25 30 } 26 31 27 32 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { 33 + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); 28 34 self.store 29 - .write() 30 - .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl.as_millis() as u64); 35 + .cache_set(key.to_string(), value.as_bytes().to_vec(), ttl_ms); 36 + metrics::record_cache_write(); 31 37 Ok(()) 32 38 } 33 39 34 40 async fn delete(&self, key: &str) -> Result<(), CacheError> { 35 - self.store.write().cache_delete(key); 41 + self.store.cache_delete(key); 42 + metrics::record_cache_delete(); 36 43 Ok(()) 37 44 } 38 45 39 46 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> { 40 - self.store.read().cache_get(key) 47 + let result = self.store.cache_get(key); 48 + match result.is_some() { 49 + true => metrics::record_cache_hit(), 50 + false => metrics::record_cache_miss(), 51 + } 52 + result 41 53 } 42 54 43 55 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { 56 + let ttl_ms = u64::try_from(ttl.as_millis()).unwrap_or(u64::MAX); 44 57 self.store 45 - .write() 46 - .cache_set(key.to_string(), value.to_vec(), ttl.as_millis() as u64); 58 + .cache_set(key.to_string(), value.to_vec(), ttl_ms); 59 + metrics::record_cache_write(); 47 60 Ok(()) 48 61 } 49 62 } ··· 54 67 55 68 #[tokio::test] 56 69 async fn cache_trait_roundtrip() { 57 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 70 + let store = Arc::new(ShardedCrdtStore::new(1)); 58 71 let cache = RippleCache::new(store); 59 72 cache 60 73 .set("test", "value", Duration::from_secs(60)) ··· 65 78 66 79 #[tokio::test] 67 80 async fn cache_trait_bytes() { 68 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 81 + let store = Arc::new(ShardedCrdtStore::new(1)); 69 82 let cache = RippleCache::new(store); 70 83 let data = vec![0xDE, 0xAD, 0xBE, 0xEF]; 71 84 cache ··· 77 90 78 91 #[tokio::test] 79 92 async fn cache_trait_delete() { 80 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 93 + let store = Arc::new(ShardedCrdtStore::new(1)); 81 94 let cache = RippleCache::new(store); 82 95 cache 83 96 .set("del", "x", Duration::from_secs(60))
+1 -1
crates/tranquil-ripple/src/config.rs
··· 1 1 use std::net::SocketAddr; 2 2 3 - fn fnv1a(data: &[u8]) -> u64 { 3 + pub(crate) fn fnv1a(data: &[u8]) -> u64 { 4 4 data.iter().fold(0xcbf29ce484222325u64, |hash, &byte| { 5 5 (hash ^ byte as u64).wrapping_mul(0x100000001b3) 6 6 })
crates/tranquil-ripple/src/crdt/delta.rs

This file has not been changed.

+13
crates/tranquil-ripple/src/crdt/g_counter.rs
··· 163 163 } 164 164 165 165 pub fn peek_count(&self, key: &str, window_ms: u64, now_wall_ms: u64) -> u64 { 166 + if window_ms == 0 { 167 + return 0; 168 + } 166 169 match self.counters.get(key) { 167 170 Some(counter) if counter.window_start_ms == Self::aligned_window_start(now_wall_ms, window_ms) => { 168 171 counter.total() ··· 193 196 + PER_COUNTER_OVERHEAD 194 197 }) 195 198 .fold(0usize, usize::saturating_add) 199 + } 200 + 201 + pub fn extract_all_deltas(&self) -> Vec<GCounterDelta> { 202 + self.counters 203 + .iter() 204 + .map(|(key, counter)| GCounterDelta { 205 + key: key.clone(), 206 + counter: counter.clone(), 207 + }) 208 + .collect() 196 209 } 197 210 198 211 pub fn gc_expired(&mut self, now_wall_ms: u64) {
+7 -4
crates/tranquil-ripple/src/crdt/hlc.rs
··· 54 54 } 55 55 56 56 fn physical_now() -> u64 { 57 - SystemTime::now() 58 - .duration_since(UNIX_EPOCH) 59 - .unwrap_or_default() 60 - .as_millis() as u64 57 + u64::try_from( 58 + SystemTime::now() 59 + .duration_since(UNIX_EPOCH) 60 + .unwrap_or_default() 61 + .as_millis(), 62 + ) 63 + .unwrap_or(u64::MAX) 61 64 } 62 65 63 66 pub fn now(&mut self) -> HlcTimestamp {
+68 -35
crates/tranquil-ripple/src/crdt/lww_map.rs
··· 1 1 use super::hlc::HlcTimestamp; 2 - use parking_lot::Mutex; 3 2 use serde::{Deserialize, Serialize}; 4 3 use std::collections::{BTreeMap, HashMap}; 5 4 ··· 27 26 } 28 27 29 28 fn entry_byte_size(&self, key: &str) -> usize { 30 - const OVERHEAD: usize = 128; 31 - key.len() 32 - + self.value.as_ref().map_or(0, Vec::len) 33 - + std::mem::size_of::<Self>() 34 - + OVERHEAD 29 + const HASHMAP_ENTRY_OVERHEAD: usize = 64; 30 + const BTREE_NODE_OVERHEAD: usize = 64; 31 + const STRING_HEADER: usize = 24; 32 + const COUNTER_SIZE: usize = 8; 33 + 34 + let key_len = key.len(); 35 + let value_len = self.value.as_ref().map_or(0, Vec::len); 36 + 37 + let main_entry = key_len 38 + .saturating_add(value_len) 39 + .saturating_add(std::mem::size_of::<Self>()) 40 + .saturating_add(HASHMAP_ENTRY_OVERHEAD); 41 + 42 + match self.is_tombstone() { 43 + true => main_entry, 44 + false => { 45 + let lru_btree = COUNTER_SIZE 46 + .saturating_add(STRING_HEADER) 47 + .saturating_add(key_len) 48 + .saturating_add(BTREE_NODE_OVERHEAD); 49 + 50 + let lru_hashmap = STRING_HEADER 51 + .saturating_add(key_len) 52 + .saturating_add(COUNTER_SIZE) 53 + .saturating_add(HASHMAP_ENTRY_OVERHEAD); 54 + 55 + main_entry 56 + .saturating_add(lru_btree) 57 + .saturating_add(lru_hashmap) 58 + } 59 + } 35 60 } 36 61 } 37 62 ··· 59 84 if let Some(old_counter) = self.key_to_counter.remove(key) { 60 85 self.counter_to_key.remove(&old_counter); 61 86 } 87 + if self.counter >= u64::MAX - 1 { 88 + self.compact(); 89 + } 62 90 self.counter = self.counter.saturating_add(1); 63 91 self.counter_to_key.insert(self.counter, key.to_string()); 64 92 self.key_to_counter.insert(key.to_string(), self.counter); 65 93 } 66 94 95 + fn compact(&mut self) { 96 + let keys: Vec<String> = self.counter_to_key.values().cloned().collect(); 97 + self.counter_to_key.clear(); 98 + self.key_to_counter.clear(); 99 + keys.into_iter().enumerate().for_each(|(i, key)| { 100 + let new_counter = (i as u64).saturating_add(1); 101 + self.counter_to_key.insert(new_counter, key.clone()); 102 + self.key_to_counter.insert(key, new_counter); 103 + }); 104 + self.counter = self.counter_to_key.len() as u64; 105 + } 106 + 67 107 fn remove(&mut self, key: &str) { 68 108 if let Some(counter) = self.key_to_counter.remove(key) { 69 109 self.counter_to_key.remove(&counter); ··· 80 120 81 121 pub struct LwwMap { 82 122 entries: HashMap<String, LwwEntry>, 83 - lru: Mutex<LruTracker>, 123 + lru: LruTracker, 84 124 estimated_bytes: usize, 85 125 } 86 126 ··· 88 128 pub fn new() -> Self { 89 129 Self { 90 130 entries: HashMap::new(), 91 - lru: Mutex::new(LruTracker::new()), 131 + lru: LruTracker::new(), 92 132 estimated_bytes: 0, 93 133 } 94 134 } ··· 98 138 if entry.is_expired(now_wall_ms) || entry.is_tombstone() { 99 139 return None; 100 140 } 101 - let value = entry.value.clone(); 102 - self.lru.lock().promote(key); 103 - value 141 + entry.value.clone() 104 142 } 105 143 106 144 pub fn set(&mut self, key: String, value: Vec<u8>, timestamp: HlcTimestamp, ttl_ms: u64, wall_ms_now: u64) { ··· 113 151 self.remove_estimated_bytes(&key); 114 152 self.estimated_bytes += entry.entry_byte_size(&key); 115 153 self.entries.insert(key.clone(), entry); 116 - self.lru.lock().promote(&key); 154 + self.lru.promote(&key); 117 155 } 118 156 119 157 pub fn delete(&mut self, key: &str, timestamp: HlcTimestamp, wall_ms_now: u64) { 120 - match self.entries.get(key) { 158 + let ttl_ms = match self.entries.get(key) { 121 159 Some(existing) if existing.timestamp >= timestamp => return, 122 - _ => {} 123 - } 124 - let ttl_ms = self 125 - .entries 126 - .get(key) 127 - .map_or(60_000, |e| e.ttl_ms.max(60_000)); 160 + Some(existing) => existing.ttl_ms.max(60_000), 161 + None => 60_000, 162 + }; 128 163 let entry = LwwEntry { 129 164 value: None, 130 165 timestamp, ··· 134 169 self.remove_estimated_bytes(key); 135 170 self.estimated_bytes += entry.entry_byte_size(key); 136 171 self.entries.insert(key.to_string(), entry); 137 - self.lru.lock().remove(key); 172 + self.lru.remove(key); 138 173 } 139 174 140 175 pub fn merge_entry(&mut self, key: String, remote: LwwEntry) -> bool { ··· 145 180 self.remove_estimated_bytes(&key); 146 181 self.estimated_bytes += remote.entry_byte_size(&key); 147 182 self.entries.insert(key.clone(), remote); 148 - let mut lru = self.lru.lock(); 149 183 match is_tombstone { 150 - true => lru.remove(&key), 151 - false => lru.promote(&key), 184 + true => self.lru.remove(&key), 185 + false => self.lru.promote(&key), 152 186 } 153 187 true 154 188 } ··· 175 209 expired_keys.iter().for_each(|key| { 176 210 self.remove_estimated_bytes(key); 177 211 self.entries.remove(key); 212 + self.lru.remove(key); 178 213 }); 179 - let mut lru = self.lru.lock(); 180 - expired_keys.iter().for_each(|key| { 181 - lru.remove(key); 182 - }); 183 214 } 184 215 185 216 pub fn gc_expired(&mut self, now_wall_ms: u64) { ··· 192 223 expired_keys.iter().for_each(|key| { 193 224 self.remove_estimated_bytes(key); 194 225 self.entries.remove(key); 226 + self.lru.remove(key); 195 227 }); 196 - let mut lru = self.lru.lock(); 197 - expired_keys.iter().for_each(|key| { 198 - lru.remove(key); 199 - }); 200 228 } 201 229 202 230 pub fn evict_lru(&mut self) -> Option<String> { 203 - let key = self.lru.lock().pop_least_recent()?; 231 + let key = self.lru.pop_least_recent()?; 204 232 self.remove_estimated_bytes(&key); 205 233 self.entries.remove(&key); 206 234 Some(key) 207 235 } 208 236 237 + pub fn touch(&mut self, key: &str) { 238 + if self.entries.contains_key(key) { 239 + self.lru.promote(key); 240 + } 241 + } 242 + 209 243 pub fn estimated_bytes(&self) -> usize { 210 244 self.estimated_bytes 211 245 } ··· 359 393 } 360 394 361 395 #[test] 362 - fn lru_eviction() { 396 + fn lru_eviction_by_write_order() { 363 397 let mut map = LwwMap::new(); 364 398 map.set("k1".into(), b"a".to_vec(), ts(100, 0, 1), 60_000, 100); 365 399 map.set("k2".into(), b"b".to_vec(), ts(101, 0, 1), 60_000, 101); 366 400 map.set("k3".into(), b"c".to_vec(), ts(102, 0, 1), 60_000, 102); 367 - let _ = map.get("k1", 102); 368 401 let evicted = map.evict_lru(); 369 - assert_eq!(evicted.as_deref(), Some("k2")); 402 + assert_eq!(evicted.as_deref(), Some("k1")); 370 403 } 371 404 372 405 #[test]
+274 -72
crates/tranquil-ripple/src/crdt/mod.rs
··· 3 3 pub mod lww_map; 4 4 pub mod g_counter; 5 5 6 + use crate::config::fnv1a; 6 7 use delta::CrdtDelta; 7 8 use hlc::{Hlc, HlcTimestamp}; 8 - use lww_map::LwwMap; 9 + use lww_map::{LwwDelta, LwwMap}; 9 10 use g_counter::RateLimitStore; 11 + use parking_lot::{Mutex, RwLock}; 10 12 use std::time::{SystemTime, UNIX_EPOCH}; 11 13 12 - pub struct CrdtStore { 13 - hlc: Hlc, 14 + const SHARD_COUNT: usize = 64; 15 + const MAX_PROMOTIONS_PER_SHARD: usize = 8192; 16 + const MAX_REPLICABLE_VALUE_SIZE: usize = 15 * 1024 * 1024; 17 + 18 + struct CrdtShard { 14 19 cache: LwwMap, 15 20 rate_limits: RateLimitStore, 16 21 last_broadcast_ts: HlcTimestamp, 17 22 } 18 23 19 - impl CrdtStore { 20 - pub fn new(node_id: u64) -> Self { 24 + impl CrdtShard { 25 + fn new(node_id: u64) -> Self { 21 26 Self { 22 - hlc: Hlc::new(node_id), 23 27 cache: LwwMap::new(), 24 28 rate_limits: RateLimitStore::new(node_id), 25 29 last_broadcast_ts: HlcTimestamp::ZERO, 26 30 } 27 31 } 32 + } 28 33 34 + pub struct ShardedCrdtStore { 35 + hlc: Mutex<Hlc>, 36 + shards: Box<[RwLock<CrdtShard>]>, 37 + promotions: Box<[Mutex<Vec<String>>]>, 38 + shard_mask: usize, 39 + node_id: u64, 40 + } 41 + 42 + impl ShardedCrdtStore { 43 + pub fn new(node_id: u64) -> Self { 44 + const { assert!(SHARD_COUNT.is_power_of_two()) }; 45 + let shards: Vec<RwLock<CrdtShard>> = (0..SHARD_COUNT) 46 + .map(|_| RwLock::new(CrdtShard::new(node_id))) 47 + .collect(); 48 + let promotions: Vec<Mutex<Vec<String>>> = (0..SHARD_COUNT) 49 + .map(|_| Mutex::new(Vec::new())) 50 + .collect(); 51 + Self { 52 + hlc: Mutex::new(Hlc::new(node_id)), 53 + shards: shards.into_boxed_slice(), 54 + promotions: promotions.into_boxed_slice(), 55 + shard_mask: SHARD_COUNT - 1, 56 + node_id, 57 + } 58 + } 59 + 60 + fn shard_for(&self, key: &str) -> usize { 61 + fnv1a(key.as_bytes()) as usize & self.shard_mask 62 + } 63 + 29 64 fn wall_ms_now() -> u64 { 30 - SystemTime::now() 31 - .duration_since(UNIX_EPOCH) 32 - .unwrap_or_default() 33 - .as_millis() as u64 65 + u64::try_from( 66 + SystemTime::now() 67 + .duration_since(UNIX_EPOCH) 68 + .unwrap_or_default() 69 + .as_millis(), 70 + ) 71 + .unwrap_or(u64::MAX) 34 72 } 35 73 36 74 pub fn cache_get(&self, key: &str) -> Option<Vec<u8>> { 37 - self.cache.get(key, Self::wall_ms_now()) 75 + let idx = self.shard_for(key); 76 + let result = self.shards[idx].read().cache.get(key, Self::wall_ms_now()); 77 + if result.is_some() { 78 + let mut promos = self.promotions[idx].lock(); 79 + if promos.len() < MAX_PROMOTIONS_PER_SHARD { 80 + promos.push(key.to_string()); 81 + } 82 + } 83 + result 38 84 } 39 85 40 - pub fn cache_set(&mut self, key: String, value: Vec<u8>, ttl_ms: u64) { 41 - let ts = self.hlc.now(); 42 - self.cache.set(key, value, ts, ttl_ms, Self::wall_ms_now()); 86 + pub fn cache_set(&self, key: String, value: Vec<u8>, ttl_ms: u64) { 87 + if value.len() > MAX_REPLICABLE_VALUE_SIZE { 88 + tracing::warn!( 89 + key = %key, 90 + value_size = value.len(), 91 + max = MAX_REPLICABLE_VALUE_SIZE, 92 + "value exceeds replicable size limit, may fail to replicate to peers" 93 + ); 94 + } 95 + let ts = self.hlc.lock().now(); 96 + let wall = Self::wall_ms_now(); 97 + self.shards[self.shard_for(&key)] 98 + .write() 99 + .cache 100 + .set(key, value, ts, ttl_ms, wall); 43 101 } 44 102 45 - pub fn cache_delete(&mut self, key: &str) { 46 - let ts = self.hlc.now(); 47 - self.cache.delete(key, ts, Self::wall_ms_now()); 103 + pub fn cache_delete(&self, key: &str) { 104 + let ts = self.hlc.lock().now(); 105 + let wall = Self::wall_ms_now(); 106 + self.shards[self.shard_for(key)] 107 + .write() 108 + .cache 109 + .delete(key, ts, wall); 48 110 } 49 111 50 112 pub fn rate_limit_peek(&self, key: &str, window_ms: u64) -> u64 { 51 - self.rate_limits 113 + self.shards[self.shard_for(key)] 114 + .read() 115 + .rate_limits 52 116 .peek_count(key, window_ms, Self::wall_ms_now()) 53 117 } 54 118 55 - pub fn rate_limit_check(&mut self, key: &str, limit: u32, window_ms: u64) -> bool { 56 - self.rate_limits 119 + pub fn rate_limit_check(&self, key: &str, limit: u32, window_ms: u64) -> bool { 120 + self.shards[self.shard_for(key)] 121 + .write() 122 + .rate_limits 57 123 .check_and_increment(key, limit, window_ms, Self::wall_ms_now()) 58 124 } 59 125 60 126 pub fn peek_broadcast_delta(&self) -> CrdtDelta { 61 - let cache_delta = { 62 - let d = self.cache.extract_delta_since(self.last_broadcast_ts); 63 - match d.entries.is_empty() { 64 - true => None, 65 - false => Some(d), 66 - } 127 + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); 128 + let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new(); 129 + 130 + self.shards.iter().for_each(|shard_lock| { 131 + let shard = shard_lock.read(); 132 + let lww_delta = shard.cache.extract_delta_since(shard.last_broadcast_ts); 133 + cache_entries.extend(lww_delta.entries); 134 + rate_limit_deltas.extend(shard.rate_limits.extract_dirty_deltas()); 135 + }); 136 + 137 + let cache_delta = match cache_entries.is_empty() { 138 + true => None, 139 + false => Some(LwwDelta { entries: cache_entries }), 67 140 }; 68 - let rate_limit_deltas = self.rate_limits.extract_dirty_deltas(); 141 + 69 142 CrdtDelta { 70 143 version: 1, 71 - source_node: self.hlc.node_id(), 144 + source_node: self.node_id, 72 145 cache_delta, 73 146 rate_limit_deltas, 74 147 } 75 148 } 76 149 77 - pub fn commit_broadcast(&mut self, delta: &CrdtDelta) { 78 - let max_ts = delta 150 + pub fn commit_broadcast(&self, delta: &CrdtDelta) { 151 + let cache_entries_by_shard: Vec<(usize, &HlcTimestamp)> = delta 79 152 .cache_delta 80 153 .as_ref() 81 - .and_then(|d| d.entries.iter().map(|(_, e)| e.timestamp).max()) 82 - .unwrap_or(self.last_broadcast_ts); 83 - self.last_broadcast_ts = max_ts; 84 - let committed_keys: std::collections::HashSet<&str> = delta 154 + .map(|d| { 155 + d.entries 156 + .iter() 157 + .map(|(key, entry)| (self.shard_for(key), &entry.timestamp)) 158 + .collect() 159 + }) 160 + .unwrap_or_default(); 161 + 162 + let mut max_ts_per_shard: Vec<Option<HlcTimestamp>> = (0..self.shards.len()) 163 + .map(|_| None) 164 + .collect(); 165 + 166 + cache_entries_by_shard.iter().for_each(|&(shard_idx, ts)| { 167 + let slot = &mut max_ts_per_shard[shard_idx]; 168 + *slot = Some(match slot { 169 + Some(existing) if *existing >= *ts => *existing, 170 + _ => *ts, 171 + }); 172 + }); 173 + 174 + let rl_index: std::collections::HashMap<&str, &g_counter::GCounter> = delta 85 175 .rate_limit_deltas 86 176 .iter() 87 - .map(|d| d.key.as_str()) 177 + .map(|d| (d.key.as_str(), &d.counter)) 88 178 .collect(); 89 - committed_keys.iter().for_each(|&key| { 90 - let still_matches = self 91 - .rate_limits 92 - .peek_dirty_counter(key) 93 - .zip(delta.rate_limit_deltas.iter().find(|d| d.key == key)) 94 - .is_some_and(|(current, committed)| { 95 - current.window_start_ms == committed.counter.window_start_ms 96 - && current.total() == committed.counter.total() 97 - }); 98 - if still_matches { 99 - self.rate_limits.clear_single_dirty(key); 179 + 180 + let mut shard_rl_keys: Vec<Vec<&str>> = (0..self.shards.len()) 181 + .map(|_| Vec::new()) 182 + .collect(); 183 + rl_index.keys().for_each(|&key| { 184 + shard_rl_keys[self.shard_for(key)].push(key); 185 + }); 186 + 187 + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { 188 + let has_cache_update = max_ts_per_shard[idx].is_some(); 189 + let has_rl_keys = !shard_rl_keys[idx].is_empty(); 190 + if !has_cache_update && !has_rl_keys { 191 + return; 100 192 } 193 + let mut shard = shard_lock.write(); 194 + if let Some(max_ts) = max_ts_per_shard[idx] { 195 + shard.last_broadcast_ts = max_ts; 196 + } 197 + shard_rl_keys[idx].iter().for_each(|&key| { 198 + let still_matches = shard 199 + .rate_limits 200 + .peek_dirty_counter(key) 201 + .zip(rl_index.get(key)) 202 + .is_some_and(|(current, committed)| { 203 + current.window_start_ms == committed.window_start_ms 204 + && current.total() == committed.total() 205 + }); 206 + if still_matches { 207 + shard.rate_limits.clear_single_dirty(key); 208 + } 209 + }); 101 210 }); 102 211 } 103 212 104 - pub fn merge_delta(&mut self, delta: &CrdtDelta) -> bool { 213 + pub fn merge_delta(&self, delta: &CrdtDelta) -> bool { 105 214 if !delta.is_compatible() { 106 215 tracing::warn!( 107 216 version = delta.version, ··· 109 218 ); 110 219 return false; 111 220 } 221 + 222 + if let Some(ref cache_delta) = delta.cache_delta { 223 + if let Some(max_ts) = cache_delta.entries.iter().map(|(_, e)| e.timestamp).max() { 224 + let _ = self.hlc.lock().receive(max_ts); 225 + } 226 + } 227 + 112 228 let mut changed = false; 229 + 113 230 if let Some(ref cache_delta) = delta.cache_delta { 231 + let mut entries_by_shard: Vec<Vec<(String, lww_map::LwwEntry)>> = 232 + (0..self.shards.len()).map(|_| Vec::new()).collect(); 233 + 114 234 cache_delta.entries.iter().for_each(|(key, entry)| { 115 - let _ = self.hlc.receive(entry.timestamp); 116 - if self.cache.merge_entry(key.clone(), entry.clone()) { 117 - changed = true; 235 + entries_by_shard[self.shard_for(key)].push((key.clone(), entry.clone())); 236 + }); 237 + 238 + entries_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { 239 + if entries.is_empty() { 240 + return; 118 241 } 242 + let mut shard = self.shards[idx].write(); 243 + entries.into_iter().for_each(|(key, entry)| { 244 + if shard.cache.merge_entry(key, entry) { 245 + changed = true; 246 + } 247 + }); 119 248 }); 120 249 } 121 - delta.rate_limit_deltas.iter().for_each(|rd| { 122 - if self 123 - .rate_limits 124 - .merge_counter(rd.key.clone(), &rd.counter) 125 - { 126 - changed = true; 127 - } 128 - }); 250 + 251 + if !delta.rate_limit_deltas.is_empty() { 252 + let mut rl_by_shard: Vec<Vec<(String, &g_counter::GCounter)>> = 253 + (0..self.shards.len()).map(|_| Vec::new()).collect(); 254 + 255 + delta.rate_limit_deltas.iter().for_each(|rd| { 256 + rl_by_shard[self.shard_for(&rd.key)].push((rd.key.clone(), &rd.counter)); 257 + }); 258 + 259 + rl_by_shard.into_iter().enumerate().for_each(|(idx, entries)| { 260 + if entries.is_empty() { 261 + return; 262 + } 263 + let mut shard = self.shards[idx].write(); 264 + entries.into_iter().for_each(|(key, counter)| { 265 + if shard.rate_limits.merge_counter(key, counter) { 266 + changed = true; 267 + } 268 + }); 269 + }); 270 + } 271 + 129 272 changed 130 273 } 131 274 132 - pub fn run_maintenance(&mut self) { 275 + pub fn run_maintenance(&self) { 133 276 let now = Self::wall_ms_now(); 134 - self.cache.gc_tombstones(now); 135 - self.cache.gc_expired(now); 136 - self.rate_limits.gc_expired(now); 277 + self.shards.iter().enumerate().for_each(|(idx, shard_lock)| { 278 + let pending: Vec<String> = self.promotions[idx].lock().drain(..).collect(); 279 + let mut shard = shard_lock.write(); 280 + pending.iter().for_each(|key| shard.cache.touch(key)); 281 + shard.cache.gc_tombstones(now); 282 + shard.cache.gc_expired(now); 283 + shard.rate_limits.gc_expired(now); 284 + }); 137 285 } 138 286 287 + pub fn peek_full_state(&self) -> CrdtDelta { 288 + let mut cache_entries: Vec<(String, lww_map::LwwEntry)> = Vec::new(); 289 + let mut rate_limit_deltas: Vec<g_counter::GCounterDelta> = Vec::new(); 290 + 291 + self.shards.iter().for_each(|shard_lock| { 292 + let shard = shard_lock.read(); 293 + let lww_delta = shard.cache.extract_delta_since(HlcTimestamp::ZERO); 294 + cache_entries.extend(lww_delta.entries); 295 + rate_limit_deltas.extend(shard.rate_limits.extract_all_deltas()); 296 + }); 297 + 298 + let cache_delta = match cache_entries.is_empty() { 299 + true => None, 300 + false => Some(LwwDelta { entries: cache_entries }), 301 + }; 302 + 303 + CrdtDelta { 304 + version: 1, 305 + source_node: self.node_id, 306 + cache_delta, 307 + rate_limit_deltas, 308 + } 309 + } 310 + 139 311 pub fn cache_estimated_bytes(&self) -> usize { 140 - self.cache.estimated_bytes() 312 + self.shards 313 + .iter() 314 + .map(|s| s.read().cache.estimated_bytes()) 315 + .fold(0usize, usize::saturating_add) 141 316 } 142 317 143 318 pub fn rate_limit_estimated_bytes(&self) -> usize { 144 - self.rate_limits.estimated_bytes() 319 + self.shards 320 + .iter() 321 + .map(|s| s.read().rate_limits.estimated_bytes()) 322 + .fold(0usize, usize::saturating_add) 145 323 } 146 324 147 - pub fn evict_lru(&mut self) -> Option<String> { 148 - self.cache.evict_lru() 325 + pub fn total_estimated_bytes(&self) -> usize { 326 + self.shards 327 + .iter() 328 + .map(|s| { 329 + let shard = s.read(); 330 + shard.cache.estimated_bytes().saturating_add(shard.rate_limits.estimated_bytes()) 331 + }) 332 + .fold(0usize, usize::saturating_add) 149 333 } 334 + 335 + pub fn evict_lru_round_robin(&self, start_shard: usize) -> Option<(usize, usize)> { 336 + (0..self.shards.len()).find_map(|offset| { 337 + let idx = (start_shard + offset) & self.shard_mask; 338 + let has_entries = self.shards[idx].read().cache.len() > 0; 339 + match has_entries { 340 + true => { 341 + let mut shard = self.shards[idx].write(); 342 + let before = shard.cache.estimated_bytes(); 343 + shard.cache.evict_lru().map(|_| { 344 + let freed = before.saturating_sub(shard.cache.estimated_bytes()); 345 + ((idx + 1) & self.shard_mask, freed) 346 + }) 347 + } 348 + false => None, 349 + } 350 + }) 351 + } 150 352 } 151 353 152 354 #[cfg(test)] ··· 155 357 156 358 #[test] 157 359 fn roundtrip_cache() { 158 - let mut store = CrdtStore::new(1); 360 + let store = ShardedCrdtStore::new(1); 159 361 store.cache_set("key".into(), b"value".to_vec(), 60_000); 160 362 assert_eq!(store.cache_get("key"), Some(b"value".to_vec())); 161 363 } 162 364 163 365 #[test] 164 366 fn delta_merge_convergence() { 165 - let mut store_a = CrdtStore::new(1); 166 - let mut store_b = CrdtStore::new(2); 367 + let store_a = ShardedCrdtStore::new(1); 368 + let store_b = ShardedCrdtStore::new(2); 167 369 168 370 store_a.cache_set("x".into(), b"from_a".to_vec(), 60_000); 169 371 store_b.cache_set("y".into(), b"from_b".to_vec(), 60_000); ··· 184 386 185 387 #[test] 186 388 fn rate_limit_across_stores() { 187 - let mut store_a = CrdtStore::new(1); 188 - let mut store_b = CrdtStore::new(2); 389 + let store_a = ShardedCrdtStore::new(1); 390 + let store_b = ShardedCrdtStore::new(2); 189 391 190 392 store_a.rate_limit_check("rl:test", 5, 60_000); 191 393 store_a.rate_limit_check("rl:test", 5, 60_000); ··· 202 404 203 405 #[test] 204 406 fn incompatible_version_rejected() { 205 - let mut store = CrdtStore::new(1); 407 + let store = ShardedCrdtStore::new(1); 206 408 let delta = CrdtDelta { 207 409 version: 255, 208 410 source_node: 99,
+15 -5
crates/tranquil-ripple/src/engine.rs
··· 1 1 use crate::cache::RippleCache; 2 2 use crate::config::RippleConfig; 3 - use crate::crdt::CrdtStore; 3 + use crate::crdt::ShardedCrdtStore; 4 4 use crate::eviction::MemoryBudget; 5 5 use crate::gossip::{GossipEngine, PeerId}; 6 + use crate::metrics; 6 7 use crate::rate_limiter::RippleRateLimiter; 7 8 use crate::transport::Transport; 8 - use parking_lot::RwLock; 9 9 use std::net::SocketAddr; 10 10 use std::sync::Arc; 11 11 use tokio_util::sync::CancellationToken; ··· 18 18 config: RippleConfig, 19 19 shutdown: CancellationToken, 20 20 ) -> Result<(Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>, SocketAddr), RippleStartError> { 21 - let store = Arc::new(RwLock::new(CrdtStore::new(config.machine_id))); 21 + let store = Arc::new(ShardedCrdtStore::new(config.machine_id)); 22 22 23 23 let (transport, incoming_rx) = Transport::bind(config.bind_addr, config.machine_id, shutdown.clone()) 24 24 .await ··· 27 27 let transport = Arc::new(transport); 28 28 29 29 let bound_addr = transport.local_addr(); 30 + let generation = u32::try_from( 31 + std::time::SystemTime::now() 32 + .duration_since(std::time::UNIX_EPOCH) 33 + .unwrap_or_default() 34 + .as_secs() 35 + % u64::from(u32::MAX), 36 + ) 37 + .unwrap_or(0); 30 38 let local_id = PeerId { 31 39 addr: bound_addr, 32 40 machine_id: config.machine_id, 33 - generation: 0, 41 + generation, 34 42 }; 35 43 36 44 let gossip = GossipEngine::new(transport, store.clone(), local_id); ··· 51 59 tokio::select! { 52 60 _ = eviction_shutdown.cancelled() => break, 53 61 _ = interval.tick() => { 54 - budget.enforce(&mut store_for_eviction.write()); 62 + budget.enforce(&store_for_eviction); 55 63 } 56 64 } 57 65 } ··· 73 81 let cache: Arc<dyn Cache> = Arc::new(RippleCache::new(store.clone())); 74 82 let rate_limiter: Arc<dyn DistributedRateLimiter> = 75 83 Arc::new(RippleRateLimiter::new(store)); 84 + 85 + metrics::describe_metrics(); 76 86 77 87 tracing::info!( 78 88 bind = %bound_addr,
+50 -24
crates/tranquil-ripple/src/eviction.rs
··· 1 - use crate::crdt::CrdtStore; 1 + use crate::crdt::ShardedCrdtStore; 2 + use crate::metrics; 2 3 3 4 pub struct MemoryBudget { 4 5 max_bytes: usize, 6 + next_shard: std::sync::atomic::AtomicUsize, 5 7 } 6 8 7 9 impl MemoryBudget { 8 10 pub fn new(max_bytes: usize) -> Self { 9 - Self { max_bytes } 11 + Self { 12 + max_bytes, 13 + next_shard: std::sync::atomic::AtomicUsize::new(0), 14 + } 10 15 } 11 16 12 - pub fn enforce(&self, store: &mut CrdtStore) { 17 + pub fn enforce(&self, store: &ShardedCrdtStore) { 13 18 store.run_maintenance(); 14 19 20 + let cache_bytes = store.cache_estimated_bytes(); 21 + let rl_bytes = store.rate_limit_estimated_bytes(); 22 + metrics::set_cache_bytes(cache_bytes); 23 + metrics::set_rate_limit_bytes(rl_bytes); 24 + 15 25 let max_bytes = self.max_bytes; 16 - let total_bytes = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 17 - let overshoot_ratio = match total_bytes > max_bytes && max_bytes > 0 { 18 - true => total_bytes / max_bytes, 26 + let total_bytes = cache_bytes.saturating_add(rl_bytes); 27 + let overshoot_pct = match total_bytes > max_bytes && max_bytes > 0 { 28 + true => total_bytes.saturating_sub(max_bytes).saturating_mul(100) / max_bytes, 19 29 false => 0, 20 30 }; 21 31 22 32 const BASE_BATCH: usize = 256; 23 - let batch_size = match overshoot_ratio { 24 - 0..=1 => BASE_BATCH, 25 - 2..=4 => BASE_BATCH * 4, 33 + let batch_size = match overshoot_pct { 34 + 0..=25 => BASE_BATCH, 35 + 26..=100 => BASE_BATCH * 4, 26 36 _ => BASE_BATCH * 8, 27 37 }; 28 38 29 - let evicted = std::iter::from_fn(|| { 30 - let current = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 31 - match current > max_bytes { 32 - true => store.evict_lru(), 33 - false => None, 39 + let mut remaining = total_bytes; 40 + let mut next_shard: usize = self.next_shard.load(std::sync::atomic::Ordering::Relaxed); 41 + let mut evicted: usize = 0; 42 + (0..batch_size).try_for_each(|_| { 43 + match remaining > max_bytes { 44 + true => { 45 + match store.evict_lru_round_robin(next_shard) { 46 + Some((ns, freed)) => { 47 + next_shard = ns; 48 + remaining = remaining.saturating_sub(freed); 49 + evicted += 1; 50 + Ok(()) 51 + } 52 + None => Err(()), 53 + } 54 + } 55 + false => Err(()), 34 56 } 35 57 }) 36 - .take(batch_size) 37 - .count(); 58 + .ok(); 59 + self.next_shard.store(next_shard, std::sync::atomic::Ordering::Relaxed); 38 60 if evicted > 0 { 61 + metrics::record_evictions(evicted); 62 + let cache_bytes_after = store.cache_estimated_bytes(); 63 + let rl_bytes_after = store.rate_limit_estimated_bytes(); 64 + metrics::set_cache_bytes(cache_bytes_after); 65 + metrics::set_rate_limit_bytes(rl_bytes_after); 39 66 tracing::info!( 40 67 evicted_entries = evicted, 41 - cache_bytes = store.cache_estimated_bytes(), 42 - rate_limit_bytes = store.rate_limit_estimated_bytes(), 68 + cache_bytes = cache_bytes_after, 69 + rate_limit_bytes = rl_bytes_after, 43 70 max_bytes = self.max_bytes, 44 71 "memory budget eviction" 45 72 ); ··· 53 80 54 81 #[test] 55 82 fn eviction_under_budget() { 56 - let mut store = CrdtStore::new(1); 83 + let store = ShardedCrdtStore::new(1); 57 84 let budget = MemoryBudget::new(1024 * 1024); 58 85 store.cache_set("k".into(), vec![1, 2, 3], 60_000); 59 - budget.enforce(&mut store); 86 + budget.enforce(&store); 60 87 assert!(store.cache_get("k").is_some()); 61 88 } 62 89 63 90 #[test] 64 91 fn eviction_over_budget() { 65 - let mut store = CrdtStore::new(1); 92 + let store = ShardedCrdtStore::new(1); 66 93 let budget = MemoryBudget::new(100); 67 94 (0..50).for_each(|i| { 68 95 store.cache_set( ··· 71 98 60_000, 72 99 ); 73 100 }); 74 - budget.enforce(&mut store); 75 - let total = store.cache_estimated_bytes().saturating_add(store.rate_limit_estimated_bytes()); 76 - assert!(total <= 100); 101 + budget.enforce(&store); 102 + assert!(store.total_estimated_bytes() <= 100); 77 103 } 78 104 }
+171 -72
crates/tranquil-ripple/src/gossip.rs
··· 1 1 use crate::crdt::delta::CrdtDelta; 2 - use crate::crdt::CrdtStore; 2 + use crate::crdt::lww_map::LwwDelta; 3 + use crate::crdt::ShardedCrdtStore; 4 + use crate::metrics; 3 5 use crate::transport::{ChannelTag, IncomingFrame, Transport}; 4 6 use foca::{Config, Foca, Notification, Runtime, Timer}; 5 - use parking_lot::RwLock; 6 7 use rand::rngs::StdRng; 7 8 use rand::SeedableRng; 8 9 use std::collections::HashSet; ··· 114 115 fn active_peers(&self) -> impl Iterator<Item = SocketAddr> + '_ { 115 116 self.active_addrs.iter().copied() 116 117 } 118 + 119 + fn peer_count(&self) -> usize { 120 + self.active_addrs.len() 121 + } 117 122 } 118 123 119 124 impl Runtime<PeerId> for &mut BufferedRuntime { ··· 141 146 142 147 pub struct GossipEngine { 143 148 transport: Arc<Transport>, 144 - store: Arc<RwLock<CrdtStore>>, 149 + store: Arc<ShardedCrdtStore>, 145 150 local_id: PeerId, 146 151 } 147 152 148 153 impl GossipEngine { 149 154 pub fn new( 150 155 transport: Arc<Transport>, 151 - store: Arc<RwLock<CrdtStore>>, 156 + store: Arc<ShardedCrdtStore>, 152 157 local_id: PeerId, 153 158 ) -> Self { 154 159 Self { ··· 203 208 } 204 209 }); 205 210 206 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 211 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 207 212 208 213 let mut gossip_tick = 209 214 tokio::time::interval(Duration::from_millis(gossip_interval_ms)); 210 - let mut maintenance_tick = tokio::time::interval(Duration::from_secs(10)); 215 + gossip_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 211 216 212 217 loop { 213 218 tokio::select! { ··· 222 227 if let Err(e) = foca.handle_data(&frame.data, &mut runtime) { 223 228 tracing::warn!(error = %e, "foca handle_data error"); 224 229 } 225 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 230 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 226 231 } 227 232 ChannelTag::CrdtSync => { 228 233 const MAX_DELTA_ENTRIES: usize = 10_000; 229 234 const MAX_DELTA_RATE_LIMITS: usize = 10_000; 235 + metrics::record_gossip_delta_received(); 236 + metrics::record_gossip_delta_bytes(frame.data.len()); 230 237 match bincode::serde::decode_from_slice::<CrdtDelta, _>(&frame.data, bincode::config::standard()) { 231 238 Ok((delta, _)) => { 232 239 let cache_len = delta.cache_delta.as_ref().map_or(0, |d| d.entries.len()); ··· 235 242 let window_mismatch = delta.rate_limit_deltas.iter().any(|rd| rd.counter.window_duration_ms == 0); 236 243 match cache_len > MAX_DELTA_ENTRIES || rl_len > MAX_DELTA_RATE_LIMITS || gcounter_oversize || window_mismatch { 237 244 true => { 245 + metrics::record_gossip_drop(); 238 246 tracing::warn!( 239 247 cache_entries = cache_len, 240 248 rate_limit_entries = rl_len, ··· 243 251 ); 244 252 } 245 253 false => { 246 - store.write().merge_delta(&delta); 254 + if store.merge_delta(&delta) { 255 + metrics::record_gossip_merge(); 256 + } 247 257 } 248 258 } 249 259 } 250 260 Err(e) => { 261 + metrics::record_gossip_drop(); 251 262 tracing::warn!(error = %e, "failed to decode crdt sync delta"); 252 263 } 253 264 } ··· 256 267 } 257 268 } 258 269 _ = gossip_tick.tick() => { 259 - let pending = { 260 - let s = store.read(); 261 - let delta = s.peek_broadcast_delta(); 262 - match delta.is_empty() { 263 - true => None, 264 - false => { 265 - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { 266 - Ok(bytes) => Some((bytes, delta)), 267 - Err(e) => { 268 - tracing::warn!(error = %e, "failed to serialize broadcast delta"); 269 - None 270 - } 271 - } 272 - }, 270 + let delta = store.peek_broadcast_delta(); 271 + match delta.is_empty() { 272 + true => { 273 + last_commit = tokio::time::Instant::now(); 273 274 } 274 - }; 275 - if let Some((ref data, ref delta)) = pending { 276 - let peers: Vec<SocketAddr> = members.active_peers().collect(); 277 - let mut all_queued = true; 278 - let cancel = shutdown.clone(); 279 - peers.iter().for_each(|&addr| { 280 - match transport.try_queue(addr, ChannelTag::CrdtSync, data) { 281 - true => {} 275 + false => { 276 + let chunks = chunk_and_serialize(&delta); 277 + match chunks.is_empty() { 278 + true => { 279 + tracing::warn!("all delta chunks failed to serialize, force-committing watermark"); 280 + store.commit_broadcast(&delta); 281 + last_commit = tokio::time::Instant::now(); 282 + } 282 283 false => { 283 - all_queued = false; 284 - let t = transport.clone(); 285 - let d = data.clone(); 286 - let c = cancel.clone(); 287 - tokio::spawn(async move { 288 - tokio::select! { 289 - _ = c.cancelled() => {} 290 - _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} 291 - } 284 + let peers: Vec<SocketAddr> = members.active_peers().collect(); 285 + let mut all_queued = true; 286 + let cancel = shutdown.clone(); 287 + chunks.iter().for_each(|chunk| { 288 + metrics::record_gossip_delta_bytes(chunk.len()); 289 + peers.iter().for_each(|&addr| { 290 + metrics::record_gossip_delta_sent(); 291 + match transport.try_queue(addr, ChannelTag::CrdtSync, chunk) { 292 + true => {} 293 + false => { 294 + all_queued = false; 295 + let t = transport.clone(); 296 + let d = chunk.clone(); 297 + let c = cancel.clone(); 298 + tokio::spawn(async move { 299 + tokio::select! { 300 + _ = c.cancelled() => {} 301 + _ = t.send(addr, ChannelTag::CrdtSync, &d) => {} 302 + } 303 + }); 304 + } 305 + } 306 + }); 292 307 }); 308 + let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); 309 + if all_queued || peers.is_empty() || stale { 310 + if stale && !all_queued { 311 + tracing::warn!( 312 + elapsed_secs = last_commit.elapsed().as_secs(), 313 + "force-advancing broadcast watermark (staleness cap)" 314 + ); 315 + } 316 + store.commit_broadcast(&delta); 317 + last_commit = tokio::time::Instant::now(); 318 + } 293 319 } 294 320 } 295 - }); 296 - let stale = last_commit.elapsed() > Duration::from_secs(WATERMARK_STALE_SECS); 297 - if all_queued || peers.is_empty() || stale { 298 - if stale && !all_queued { 299 - tracing::warn!( 300 - elapsed_secs = last_commit.elapsed().as_secs(), 301 - "force-advancing broadcast watermark (staleness cap)" 302 - ); 303 - } 304 - store.write().commit_broadcast(delta); 305 - last_commit = tokio::time::Instant::now(); 306 321 } 307 - } 322 + }; 308 323 if let Err(e) = foca.gossip(&mut runtime) { 309 324 tracing::warn!(error = %e, "foca gossip error"); 310 325 } 311 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 326 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 312 327 } 313 328 Some((timer, _)) = timer_rx.recv() => { 314 329 if let Err(e) = foca.handle_timer(timer, &mut runtime) { 315 330 tracing::warn!(error = %e, "foca handle_timer error"); 316 331 } 317 - drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &shutdown); 332 + drain_runtime_actions(&mut runtime, &transport, &timer_tx, &mut members, &store, &shutdown); 318 333 } 319 - _ = maintenance_tick.tick() => { 320 - store.write().run_maintenance(); 334 + _ = tokio::time::sleep(Duration::from_secs(10)) => { 321 335 tracing::trace!( 322 336 members = foca.num_members(), 323 - cache_bytes = store.read().cache_estimated_bytes(), 324 - "maintenance cycle" 337 + cache_bytes = store.cache_estimated_bytes(), 338 + rate_limit_bytes = store.rate_limit_estimated_bytes(), 339 + "gossip health check" 325 340 ); 326 341 } 327 342 } ··· 331 346 } 332 347 333 348 fn flush_final_delta( 334 - store: &Arc<RwLock<CrdtStore>>, 349 + store: &Arc<ShardedCrdtStore>, 335 350 transport: &Arc<Transport>, 336 351 members: &MemberTracker, 337 352 ) { 338 - let s = store.read(); 339 - let delta = s.peek_broadcast_delta(); 353 + let delta = store.peek_broadcast_delta(); 340 354 if delta.is_empty() { 341 355 return; 342 356 } 343 - match bincode::serde::encode_to_vec(&delta, bincode::config::standard()) { 344 - Ok(bytes) => { 345 - members.active_peers().for_each(|addr| { 346 - let _ = transport.try_queue(addr, ChannelTag::CrdtSync, &bytes); 347 - }); 348 - } 349 - Err(e) => { 350 - tracing::warn!(error = %e, "failed to serialize final delta on shutdown"); 351 - } 352 - } 357 + let chunks = chunk_and_serialize(&delta); 358 + chunks.iter().for_each(|chunk| { 359 + members.active_peers().for_each(|addr| { 360 + let _ = transport.try_queue(addr, ChannelTag::CrdtSync, chunk); 361 + }); 362 + }); 363 + store.commit_broadcast(&delta); 353 364 } 354 365 355 366 fn drain_runtime_actions( ··· 357 368 transport: &Arc<Transport>, 358 369 timer_tx: &mpsc::Sender<(Timer<PeerId>, Duration)>, 359 370 members: &mut MemberTracker, 371 + store: &Arc<ShardedCrdtStore>, 360 372 shutdown: &CancellationToken, 361 373 ) { 362 374 let actions: Vec<RuntimeAction> = runtime.actions.drain(..).collect(); ··· 373 385 } 374 386 RuntimeAction::ScheduleTimer(timer, duration) => { 375 387 let tx = timer_tx.clone(); 388 + let c = shutdown.clone(); 376 389 tokio::spawn(async move { 377 - tokio::time::sleep(duration).await; 378 - let _ = tx.send((timer, duration)).await; 390 + tokio::select! { 391 + _ = c.cancelled() => {} 392 + _ = tokio::time::sleep(duration) => { 393 + let _ = tx.send((timer, duration)).await; 394 + } 395 + } 379 396 }); 380 397 } 381 398 RuntimeAction::MemberUp(addr) => { 382 399 tracing::info!(peer = %addr, "member up"); 383 400 members.member_up(addr); 401 + metrics::set_gossip_peers(members.peer_count()); 402 + let snapshot = store.peek_full_state(); 403 + if !snapshot.is_empty() { 404 + chunk_and_serialize(&snapshot).into_iter().for_each(|chunk| { 405 + let t = transport.clone(); 406 + let c = shutdown.clone(); 407 + tokio::spawn(async move { 408 + tokio::select! { 409 + _ = c.cancelled() => {} 410 + _ = t.send(addr, ChannelTag::CrdtSync, &chunk) => {} 411 + } 412 + }); 413 + }); 414 + } 384 415 } 385 416 RuntimeAction::MemberDown(addr) => { 386 417 tracing::info!(peer = %addr, "member down"); 387 418 members.member_down(addr); 419 + metrics::set_gossip_peers(members.peer_count()); 388 420 } 389 421 }); 422 + } 423 + 424 + fn chunk_and_serialize(delta: &CrdtDelta) -> Vec<Vec<u8>> { 425 + let config = bincode::config::standard(); 426 + match bincode::serde::encode_to_vec(delta, config) { 427 + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => vec![bytes], 428 + Ok(_) => split_and_serialize(delta.clone()), 429 + Err(e) => { 430 + tracing::warn!(error = %e, "failed to serialize delta"); 431 + vec![] 432 + } 433 + } 434 + } 435 + 436 + fn split_and_serialize(delta: CrdtDelta) -> Vec<Vec<u8>> { 437 + let version = delta.version; 438 + let source_node = delta.source_node; 439 + let cache_entries = delta.cache_delta.map_or(Vec::new(), |d| d.entries); 440 + let rl_deltas = delta.rate_limit_deltas; 441 + 442 + if cache_entries.is_empty() && rl_deltas.is_empty() { 443 + return vec![]; 444 + } 445 + 446 + if cache_entries.len() <= 1 && rl_deltas.len() <= 1 { 447 + let mini = CrdtDelta { 448 + version, 449 + source_node, 450 + cache_delta: match cache_entries.is_empty() { 451 + true => None, 452 + false => Some(LwwDelta { entries: cache_entries }), 453 + }, 454 + rate_limit_deltas: rl_deltas, 455 + }; 456 + match bincode::serde::encode_to_vec(&mini, bincode::config::standard()) { 457 + Ok(bytes) if bytes.len() <= crate::transport::MAX_FRAME_SIZE => return vec![bytes], 458 + _ => { 459 + tracing::error!("irreducible delta entry exceeds max frame size, dropping"); 460 + return vec![]; 461 + } 462 + } 463 + } 464 + 465 + let mid_cache = cache_entries.len() / 2; 466 + let mid_rl = rl_deltas.len() / 2; 467 + 468 + let mut left_cache = cache_entries; 469 + let right_cache = left_cache.split_off(mid_cache); 470 + let mut left_rl = rl_deltas; 471 + let right_rl = left_rl.split_off(mid_rl); 472 + 473 + let make_sub = |entries: Vec<_>, rls| CrdtDelta { 474 + version, 475 + source_node, 476 + cache_delta: match entries.is_empty() { 477 + true => None, 478 + false => Some(LwwDelta { entries }), 479 + }, 480 + rate_limit_deltas: rls, 481 + }; 482 + 483 + let left = make_sub(left_cache, left_rl); 484 + let right = make_sub(right_cache, right_rl); 485 + 486 + let mut result = chunk_and_serialize(&left); 487 + result.extend(chunk_and_serialize(&right)); 488 + result 390 489 }
+1
crates/tranquil-ripple/src/lib.rs
··· 4 4 pub mod engine; 5 5 pub mod eviction; 6 6 pub mod gossip; 7 + pub mod metrics; 7 8 pub mod rate_limiter; 8 9 pub mod transport; 9 10
+7 -8
crates/tranquil-ripple/src/rate_limiter.rs
··· 1 - use crate::crdt::CrdtStore; 1 + use crate::crdt::ShardedCrdtStore; 2 2 use async_trait::async_trait; 3 - use parking_lot::RwLock; 4 3 use std::sync::Arc; 5 4 use tranquil_infra::DistributedRateLimiter; 6 5 7 6 pub struct RippleRateLimiter { 8 - store: Arc<RwLock<CrdtStore>>, 7 + store: Arc<ShardedCrdtStore>, 9 8 } 10 9 11 10 impl RippleRateLimiter { 12 - pub fn new(store: Arc<RwLock<CrdtStore>>) -> Self { 11 + pub fn new(store: Arc<ShardedCrdtStore>) -> Self { 13 12 Self { store } 14 13 } 15 14 } ··· 17 16 #[async_trait] 18 17 impl DistributedRateLimiter for RippleRateLimiter { 19 18 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { 20 - self.store.write().rate_limit_check(key, limit, window_ms) 19 + self.store.rate_limit_check(key, limit, window_ms) 21 20 } 22 21 23 22 async fn peek_rate_limit_count(&self, key: &str, window_ms: u64) -> u64 { 24 - self.store.read().rate_limit_peek(key, window_ms) 23 + self.store.rate_limit_peek(key, window_ms) 25 24 } 26 25 } 27 26 ··· 31 30 32 31 #[tokio::test] 33 32 async fn rate_limiter_trait_allows_within_limit() { 34 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 33 + let store = Arc::new(ShardedCrdtStore::new(1)); 35 34 let rl = RippleRateLimiter::new(store); 36 35 assert!(rl.check_rate_limit("test", 5, 60_000).await); 37 36 assert!(rl.check_rate_limit("test", 5, 60_000).await); ··· 39 38 40 39 #[tokio::test] 41 40 async fn rate_limiter_trait_blocks_over_limit() { 42 - let store = Arc::new(RwLock::new(CrdtStore::new(1))); 41 + let store = Arc::new(ShardedCrdtStore::new(1)); 43 42 let rl = RippleRateLimiter::new(store); 44 43 assert!(rl.check_rate_limit("k", 3, 60_000).await); 45 44 assert!(rl.check_rate_limit("k", 3, 60_000).await);
+62 -19
crates/tranquil-ripple/src/transport.rs
··· 3 3 use std::collections::HashMap; 4 4 use std::net::SocketAddr; 5 5 use std::sync::Arc; 6 - use std::sync::atomic::{AtomicUsize, Ordering}; 6 + use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; 7 7 use std::time::Duration; 8 8 use tokio::io::{AsyncReadExt, AsyncWriteExt}; 9 9 use tokio::net::{TcpListener, TcpStream}; 10 10 use tokio::sync::mpsc; 11 11 use tokio_util::sync::CancellationToken; 12 12 13 - const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; 13 + pub(crate) const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; 14 14 const MAX_INBOUND_CONNECTIONS: usize = 512; 15 + const MAX_OUTBOUND_CONNECTIONS: usize = 512; 15 16 const WRITE_TIMEOUT: Duration = Duration::from_secs(10); 16 17 17 18 #[derive(Debug, Clone, Copy, PartialEq, Eq)] ··· 44 45 45 46 struct ConnectionWriter { 46 47 tx: mpsc::Sender<Vec<u8>>, 48 + generation: u64, 47 49 } 48 50 49 51 pub struct Transport { ··· 51 53 _machine_id: u64, 52 54 connections: Arc<parking_lot::Mutex<HashMap<SocketAddr, ConnectionWriter>>>, 53 55 connecting: Arc<parking_lot::Mutex<std::collections::HashSet<SocketAddr>>>, 56 + conn_generation: Arc<AtomicU64>, 54 57 #[allow(dead_code)] 55 58 inbound_count: Arc<AtomicUsize>, 59 + outbound_count: Arc<AtomicUsize>, 56 60 shutdown: CancellationToken, 57 61 incoming_tx: mpsc::Sender<IncomingFrame>, 58 62 } ··· 73 77 _machine_id: machine_id, 74 78 connections: Arc::new(parking_lot::Mutex::new(HashMap::new())), 75 79 connecting: Arc::new(parking_lot::Mutex::new(std::collections::HashSet::new())), 80 + conn_generation: Arc::new(AtomicU64::new(0)), 76 81 inbound_count: inbound_count.clone(), 82 + outbound_count: Arc::new(AtomicUsize::new(0)), 77 83 shutdown: shutdown.clone(), 78 84 incoming_tx: incoming_tx.clone(), 79 85 }; ··· 99 105 continue; 100 106 } 101 107 inbound_counter.fetch_add(1, Ordering::Relaxed); 108 + configure_socket(&stream); 102 109 Self::spawn_reader( 103 110 stream, 104 111 peer_addr, ··· 144 151 }; 145 152 let writer = { 146 153 let conns = self.connections.lock(); 147 - conns.get(&target).map(|w| w.tx.clone()) 154 + conns.get(&target).map(|w| (w.tx.clone(), w.generation)) 148 155 }; 149 156 match writer { 150 - Some(tx) => { 157 + Some((tx, acquired_gen)) => { 151 158 if tx.send(frame).await.is_err() { 152 - self.connections.lock().remove(&target); 159 + { 160 + let mut conns = self.connections.lock(); 161 + let stale = conns 162 + .get(&target) 163 + .is_some_and(|w| w.generation == acquired_gen); 164 + if stale { 165 + conns.remove(&target); 166 + } 167 + } 153 168 self.connect_and_send(target, tag, data).await; 154 169 } 155 170 } ··· 163 178 { 164 179 let mut connecting = self.connecting.lock(); 165 180 if connecting.contains(&target) { 166 - tracing::debug!(peer = %target, "connection already in-flight, dropping frame"); 181 + tracing::warn!(peer = %target, "connection already in-flight, dropping frame"); 167 182 return; 168 183 } 169 184 connecting.insert(target); ··· 191 206 .await; 192 207 match stream { 193 208 Ok(stream) => { 209 + if self.outbound_count.load(Ordering::Relaxed) >= MAX_OUTBOUND_CONNECTIONS { 210 + tracing::warn!( 211 + peer = %target, 212 + max = MAX_OUTBOUND_CONNECTIONS, 213 + "outbound connection limit reached, dropping" 214 + ); 215 + return; 216 + } 217 + self.outbound_count.fetch_add(1, Ordering::Relaxed); 218 + configure_socket(&stream); 194 219 let (read_half, write_half) = stream.into_split(); 195 220 let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(1024); 196 - let cancel = self.shutdown.clone(); 221 + 222 + let conn_gen = self.conn_generation.fetch_add(1, Ordering::Relaxed); 223 + self.connections.lock().insert( 224 + target, 225 + ConnectionWriter { tx: write_tx.clone(), generation: conn_gen }, 226 + ); 227 + if let Some(frame) = encode_frame(tag, data) { 228 + let _ = write_tx.try_send(frame); 229 + } 230 + 231 + let conn_cancel = self.shutdown.child_token(); 232 + let reader_cancel = conn_cancel.clone(); 197 233 let connections = self.connections.clone(); 234 + let outbound_counter = self.outbound_count.clone(); 198 235 let peer = target; 199 236 200 237 tokio::spawn(async move { 201 238 let mut writer = write_half; 202 239 loop { 203 240 tokio::select! { 204 - _ = cancel.cancelled() => break, 241 + _ = conn_cancel.cancelled() => break, 205 242 msg = write_rx.recv() => { 206 243 match msg { 207 244 Some(buf) => { ··· 227 264 } 228 265 } 229 266 connections.lock().remove(&peer); 267 + outbound_counter.fetch_sub(1, Ordering::Relaxed); 268 + conn_cancel.cancel(); 230 269 }); 231 270 232 - Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), self.shutdown.clone()); 233 - 234 - let frame = match encode_frame(tag, data) { 235 - Some(f) => f, 236 - None => return, 237 - }; 238 - let _ = write_tx.send(frame).await; 239 - self.connections.lock().insert( 240 - target, 241 - ConnectionWriter { tx: write_tx }, 242 - ); 271 + Self::spawn_reader_half(read_half, target, self.incoming_tx.clone(), reader_cancel); 243 272 tracing::debug!(peer = %target, "established outbound connection"); 244 273 } 245 274 Err(e) => { ··· 309 338 } 310 339 } 311 340 } 341 + cancel.cancel(); 312 342 }); 313 343 } 314 344 ··· 332 362 DecodeResult::Corrupt => return false, 333 363 } 334 364 } 365 + } 366 + } 367 + 368 + fn configure_socket(stream: &TcpStream) { 369 + let sock_ref = socket2::SockRef::from(stream); 370 + if let Err(e) = sock_ref.set_tcp_nodelay(true) { 371 + tracing::warn!(error = %e, "failed to set TCP_NODELAY"); 372 + } 373 + let params = socket2::TcpKeepalive::new() 374 + .with_time(Duration::from_secs(30)) 375 + .with_interval(Duration::from_secs(10)); 376 + if let Err(e) = sock_ref.set_tcp_keepalive(&params) { 377 + tracing::warn!(error = %e, "failed to set TCP keepalive"); 335 378 } 336 379 } 337 380
+160
crates/tranquil-ripple/tests/two_node_convergence.rs
··· 608 608 609 609 shutdown.cancel(); 610 610 } 611 + 612 + #[tokio::test] 613 + async fn two_node_partition_recovery() { 614 + tracing_subscriber::fmt() 615 + .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) 616 + .with_test_writer() 617 + .try_init() 618 + .ok(); 619 + 620 + let shutdown = CancellationToken::new(); 621 + 622 + let config_a = RippleConfig { 623 + bind_addr: "127.0.0.1:0".parse().unwrap(), 624 + seed_peers: vec![], 625 + machine_id: 100, 626 + gossip_interval_ms: 100, 627 + cache_max_bytes: 64 * 1024 * 1024, 628 + }; 629 + let (cache_a, _rl_a, addr_a) = RippleEngine::start(config_a, shutdown.clone()) 630 + .await 631 + .expect("node A failed to start"); 632 + 633 + futures::future::join_all((0..50).map(|i| { 634 + let cache = cache_a.clone(); 635 + async move { 636 + cache 637 + .set( 638 + &format!("pre-{i}"), 639 + &format!("val-{i}"), 640 + Duration::from_secs(300), 641 + ) 642 + .await 643 + .expect("set on A failed"); 644 + } 645 + })) 646 + .await; 647 + 648 + let config_b = RippleConfig { 649 + bind_addr: "127.0.0.1:0".parse().unwrap(), 650 + seed_peers: vec![addr_a], 651 + machine_id: 200, 652 + gossip_interval_ms: 100, 653 + cache_max_bytes: 64 * 1024 * 1024, 654 + }; 655 + let (cache_b, _rl_b, _addr_b) = RippleEngine::start(config_b, shutdown.clone()) 656 + .await 657 + .expect("node B failed to start"); 658 + 659 + let b = cache_b.clone(); 660 + poll_until(15_000, 200, move || { 661 + let b = b.clone(); 662 + async move { 663 + futures::future::join_all((0..50).map(|i| { 664 + let b = b.clone(); 665 + async move { b.get(&format!("pre-{i}")).await.is_some() } 666 + })) 667 + .await 668 + .into_iter() 669 + .all(|present| present) 670 + } 671 + }) 672 + .await; 673 + 674 + futures::future::join_all((0..50).map(|i| { 675 + let cache = cache_b.clone(); 676 + async move { 677 + cache 678 + .set( 679 + &format!("post-{i}"), 680 + &format!("bval-{i}"), 681 + Duration::from_secs(300), 682 + ) 683 + .await 684 + .expect("set on B failed"); 685 + } 686 + })) 687 + .await; 688 + 689 + let a = cache_a.clone(); 690 + poll_until(15_000, 200, move || { 691 + let a = a.clone(); 692 + async move { 693 + futures::future::join_all((0..50).map(|i| { 694 + let a = a.clone(); 695 + async move { a.get(&format!("post-{i}")).await.is_some() } 696 + })) 697 + .await 698 + .into_iter() 699 + .all(|present| present) 700 + } 701 + }) 702 + .await; 703 + 704 + shutdown.cancel(); 705 + } 706 + 707 + #[tokio::test] 708 + async fn two_node_stress_concurrent_load() { 709 + tracing_subscriber::fmt() 710 + .with_max_level(tracing_subscriber::filter::LevelFilter::INFO) 711 + .with_test_writer() 712 + .try_init() 713 + .ok(); 714 + 715 + let shutdown = CancellationToken::new(); 716 + let ((cache_a, rl_a), (cache_b, rl_b)) = spawn_pair(shutdown.clone()).await; 717 + 718 + let tasks: Vec<tokio::task::JoinHandle<()>> = (0u32..8).map(|task_id| { 719 + let cache = match task_id < 4 { 720 + true => cache_a.clone(), 721 + false => cache_b.clone(), 722 + }; 723 + let rl = match task_id < 4 { 724 + true => rl_a.clone(), 725 + false => rl_b.clone(), 726 + }; 727 + tokio::spawn(async move { 728 + let value = vec![0xABu8; 1024]; 729 + futures::future::join_all((0u32..500).map(|op| { 730 + let cache = cache.clone(); 731 + let rl = rl.clone(); 732 + let value = value.clone(); 733 + async move { 734 + let key_idx = op % 100; 735 + let key = format!("stress-{task_id}-{key_idx}"); 736 + match op % 4 { 737 + 0 | 1 => { 738 + cache 739 + .set_bytes(&key, &value, Duration::from_secs(120)) 740 + .await 741 + .expect("set_bytes failed"); 742 + } 743 + 2 => { 744 + let _ = cache.get(&key).await; 745 + } 746 + _ => { 747 + let _ = rl.check_rate_limit(&key, 1000, 60_000).await; 748 + } 749 + } 750 + } 751 + })) 752 + .await; 753 + }) 754 + }).collect(); 755 + 756 + let results = tokio::time::timeout( 757 + Duration::from_secs(30), 758 + futures::future::join_all(tasks), 759 + ) 760 + .await 761 + .expect("stress test timed out after 30s"); 762 + 763 + results.into_iter().enumerate().for_each(|(i, r)| { 764 + r.unwrap_or_else(|e| panic!("task {i} panicked: {e}")); 765 + }); 766 + 767 + tokio::time::sleep(Duration::from_secs(12)).await; 768 + 769 + shutdown.cancel(); 770 + }
deploy/quadlets/tranquil-pds-app.container

This file has not been changed.

docker-compose.prod.yaml

This file has not been changed.

docker-compose.yaml

This file has not been changed.

docs/install-containers.md

This file has not been changed.

docs/install-debian.md

This file has not been changed.

docs/install-kubernetes.md

This file has not been changed.

scripts/install-debian.sh

This file has not been changed.

scripts/run-tests.sh

This file has not been changed.

scripts/test-infra.sh

This file has not been changed.

+108
crates/tranquil-ripple/src/metrics.rs
··· 1 + use metrics::{counter, gauge, histogram}; 2 + 3 + pub fn describe_metrics() { 4 + metrics::describe_gauge!( 5 + "tranquil_ripple_cache_bytes", 6 + "Estimated memory used by cache entries" 7 + ); 8 + metrics::describe_gauge!( 9 + "tranquil_ripple_rate_limit_bytes", 10 + "Estimated memory used by rate limit counters" 11 + ); 12 + metrics::describe_gauge!( 13 + "tranquil_ripple_gossip_peers", 14 + "Number of active gossip peers" 15 + ); 16 + metrics::describe_counter!( 17 + "tranquil_ripple_cache_hits_total", 18 + "Total cache read hits" 19 + ); 20 + metrics::describe_counter!( 21 + "tranquil_ripple_cache_misses_total", 22 + "Total cache read misses" 23 + ); 24 + metrics::describe_counter!( 25 + "tranquil_ripple_cache_writes_total", 26 + "Total cache write operations" 27 + ); 28 + metrics::describe_counter!( 29 + "tranquil_ripple_cache_deletes_total", 30 + "Total cache delete operations" 31 + ); 32 + metrics::describe_counter!( 33 + "tranquil_ripple_evictions_total", 34 + "Total cache entries evicted by memory budget" 35 + ); 36 + metrics::describe_counter!( 37 + "tranquil_ripple_gossip_deltas_sent_total", 38 + "Total CRDT delta chunks sent to peers" 39 + ); 40 + metrics::describe_counter!( 41 + "tranquil_ripple_gossip_deltas_received_total", 42 + "Total CRDT delta messages received from peers" 43 + ); 44 + metrics::describe_counter!( 45 + "tranquil_ripple_gossip_merges_total", 46 + "Total CRDT deltas merged with local state change" 47 + ); 48 + metrics::describe_counter!( 49 + "tranquil_ripple_gossip_drops_total", 50 + "Total CRDT deltas dropped (validation or decode failure)" 51 + ); 52 + metrics::describe_histogram!( 53 + "tranquil_ripple_gossip_delta_bytes", 54 + "Size of CRDT delta chunks in bytes" 55 + ); 56 + } 57 + 58 + pub fn record_cache_hit() { 59 + counter!("tranquil_ripple_cache_hits_total").increment(1); 60 + } 61 + 62 + pub fn record_cache_miss() { 63 + counter!("tranquil_ripple_cache_misses_total").increment(1); 64 + } 65 + 66 + pub fn record_cache_write() { 67 + counter!("tranquil_ripple_cache_writes_total").increment(1); 68 + } 69 + 70 + pub fn record_cache_delete() { 71 + counter!("tranquil_ripple_cache_deletes_total").increment(1); 72 + } 73 + 74 + pub fn set_cache_bytes(bytes: usize) { 75 + gauge!("tranquil_ripple_cache_bytes").set(bytes as f64); 76 + } 77 + 78 + pub fn set_rate_limit_bytes(bytes: usize) { 79 + gauge!("tranquil_ripple_rate_limit_bytes").set(bytes as f64); 80 + } 81 + 82 + pub fn set_gossip_peers(count: usize) { 83 + gauge!("tranquil_ripple_gossip_peers").set(count as f64); 84 + } 85 + 86 + pub fn record_evictions(count: usize) { 87 + counter!("tranquil_ripple_evictions_total").increment(count as u64); 88 + } 89 + 90 + pub fn record_gossip_delta_sent() { 91 + counter!("tranquil_ripple_gossip_deltas_sent_total").increment(1); 92 + } 93 + 94 + pub fn record_gossip_delta_received() { 95 + counter!("tranquil_ripple_gossip_deltas_received_total").increment(1); 96 + } 97 + 98 + pub fn record_gossip_merge() { 99 + counter!("tranquil_ripple_gossip_merges_total").increment(1); 100 + } 101 + 102 + pub fn record_gossip_drop() { 103 + counter!("tranquil_ripple_gossip_drops_total").increment(1); 104 + } 105 + 106 + pub fn record_gossip_delta_bytes(bytes: usize) { 107 + histogram!("tranquil_ripple_gossip_delta_bytes").record(bytes as f64); 108 + }

History

2 rounds 0 comments
sign up or login to add to the discussion
lewis.moe submitted #1
2 commits
expand
feat: initial in-house cache distribution
feat: cache locks less
expand 0 comments
pull request successfully merged
lewis.moe submitted #0
1 commit
expand
feat: initial in-house cache distribution
expand 0 comments