tracks lexicons and how many times they appeared on the jetstream

feat(server): implement delta-of-deltas encoding and varint and chunking

ptr.pet d7265904 f73de8d3

verified
+1174 -298
+40 -21
server/Cargo.lock
··· 88 88 ] 89 89 90 90 [[package]] 91 + name = "atomic-time" 92 + version = "0.1.5" 93 + source = "registry+https://github.com/rust-lang/crates.io-index" 94 + checksum = "9622f5c6fb50377516c70f65159e70b25465409760c6bd6d4e581318bf704e83" 95 + dependencies = [ 96 + "once_cell", 97 + "portable-atomic", 98 + ] 99 + 100 + [[package]] 91 101 name = "atomic-waker" 92 102 version = "1.1.2" 93 103 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 956 966 checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" 957 967 958 968 [[package]] 959 - name = "overload" 960 - version = "0.1.1" 969 + name = "ordered-varint" 970 + version = "2.0.0" 961 971 source = "registry+https://github.com/rust-lang/crates.io-index" 962 - checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" 972 + checksum = "e9cc9f18ab4bad1e01726bda1259feb8f11e5e76308708a966b4c0136e9db34c" 963 973 964 974 [[package]] 965 - name = "papaya" 966 - version = "0.2.3" 975 + name = "overload" 976 + version = "0.1.1" 967 977 source = "registry+https://github.com/rust-lang/crates.io-index" 968 - checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f" 969 - dependencies = [ 970 - "equivalent", 971 - "seize", 972 - ] 978 + checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" 973 979 974 980 [[package]] 975 981 name = "parking_lot" ··· 1044 1050 version = "0.3.32" 1045 1051 source = "registry+https://github.com/rust-lang/crates.io-index" 1046 1052 checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 1053 + 1054 + [[package]] 1055 + name = "portable-atomic" 1056 + version = "1.11.1" 1057 + source = "registry+https://github.com/rust-lang/crates.io-index" 1058 + checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1047 1059 1048 1060 [[package]] 1049 1061 name = "proc-macro2" ··· 1335 1347 ] 1336 1348 1337 1349 [[package]] 1350 + name = "scc" 1351 + version = "2.3.4" 1352 + source = "registry+https://github.com/rust-lang/crates.io-index" 1353 + checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4" 1354 + dependencies = [ 1355 + "sdd", 1356 + ] 1357 + 1358 + [[package]] 1338 1359 name = "schannel" 1339 1360 version = "0.1.27" 1340 1361 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1350 1371 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1351 1372 1352 1373 [[package]] 1374 + name = "sdd" 1375 + version = "3.0.10" 1376 + source = "registry+https://github.com/rust-lang/crates.io-index" 1377 + checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" 1378 + 1379 + [[package]] 1353 1380 name = "security-framework" 1354 1381 version = "3.2.0" 1355 1382 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1397 dependencies = [ 1371 1398 "core-foundation-sys", 1372 1399 "libc", 1373 - ] 1374 - 1375 - [[package]] 1376 - name = "seize" 1377 - version = "0.5.0" 1378 - source = "registry+https://github.com/rust-lang/crates.io-index" 1379 - checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7" 1380 - dependencies = [ 1381 - "libc", 1382 - "windows-sys 0.52.0", 1383 1400 ] 1384 1401 1385 1402 [[package]] ··· 1448 1465 dependencies = [ 1449 1466 "anyhow", 1450 1467 "async-trait", 1468 + "atomic-time", 1451 1469 "axum", 1452 1470 "axum-tws", 1453 1471 "fjall", 1454 1472 "futures-util", 1455 - "papaya", 1473 + "ordered-varint", 1456 1474 "pingora-limits", 1457 1475 "rkyv", 1458 1476 "rustls", 1477 + "scc", 1459 1478 "serde", 1460 1479 "serde_json", 1461 1480 "smol_str",
+3 -1
server/Cargo.toml
··· 20 20 fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] } 21 21 rkyv = {version = "0.8", features = ["unaligned"]} 22 22 smol_str = { version = "0.3", features = ["serde"] } 23 - papaya = "0.2" 24 23 serde = { version = "1", features = ["derive"] } 25 24 serde_json = "1.0.141" 25 + scc = "2.3.4" 26 + atomic-time = "0.1.5" 27 + ordered-varint = "2.0.0" 26 28 27 29 [target.'cfg(not(target_env = "msvc"))'.dependencies] 28 30 tikv-jemallocator = "0.6"
+8 -11
server/src/api.rs
··· 30 30 use crate::{ 31 31 db::Db, 32 32 error::{AppError, AppResult}, 33 + utils::time_now, 33 34 }; 34 35 35 36 struct LatencyMillis(u128); ··· 161 162 let maybe_hits = db 162 163 .get_hits( 163 164 &params.nsid, 164 - params.to.unwrap_or(0) 165 - ..params.from.unwrap_or( 166 - std::time::SystemTime::now() 167 - .duration_since(UNIX_EPOCH) 168 - .expect("oops") 169 - .as_micros() as u64, 170 - ), 171 - )? 165 + params.to.unwrap_or(0)..params.from.unwrap_or(time_now()), 166 + ) 172 167 .take(MAX_HITS); 173 168 let mut hits = Vec::with_capacity(maybe_hits.size_hint().0); 174 169 175 170 for maybe_hit in maybe_hits { 176 - let (timestamp, hit) = maybe_hit?; 171 + let hit = maybe_hit?; 172 + let hit_data = hit.access(); 173 + 177 174 hits.push(Hit { 178 - timestamp, 179 - deleted: hit.deleted, 175 + timestamp: hit.timestamp, 176 + deleted: hit_data.deleted, 180 177 }); 181 178 } 182 179
-244
server/src/db.rs
··· 1 - use std::{ 2 - ops::{Bound, Deref, RangeBounds}, 3 - path::Path, 4 - time::Duration, 5 - }; 6 - 7 - use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; 8 - use pingora_limits::rate::Rate; 9 - use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 10 - use smol_str::SmolStr; 11 - use tokio::sync::broadcast; 12 - 13 - use crate::{ 14 - error::{AppError, AppResult}, 15 - jetstream::JetstreamEvent, 16 - }; 17 - 18 - #[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 19 - #[rkyv(compare(PartialEq), derive(Debug))] 20 - pub struct NsidCounts { 21 - pub count: u128, 22 - pub deleted_count: u128, 23 - pub last_seen: u64, 24 - } 25 - 26 - #[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 27 - #[rkyv(compare(PartialEq), derive(Debug))] 28 - pub struct NsidHit { 29 - pub deleted: bool, 30 - } 31 - 32 - pub struct EventRecord { 33 - pub nsid: SmolStr, 34 - pub timestamp: u64, 35 - pub deleted: bool, 36 - } 37 - 38 - impl EventRecord { 39 - pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 40 - match event { 41 - JetstreamEvent::Commit { 42 - time_us, commit, .. 43 - } => Some(Self { 44 - nsid: commit.collection.into(), 45 - timestamp: time_us, 46 - deleted: false, 47 - }), 48 - JetstreamEvent::Delete { 49 - time_us, commit, .. 50 - } => Some(Self { 51 - nsid: commit.collection.into(), 52 - timestamp: time_us, 53 - deleted: true, 54 - }), 55 - _ => None, 56 - } 57 - } 58 - } 59 - 60 - // counts is nsid -> NsidCounts 61 - // hits is tree per nsid: timestamp -> NsidHit 62 - pub struct Db { 63 - inner: Keyspace, 64 - hits: papaya::HashMap<SmolStr, Partition>, 65 - counts: Partition, 66 - event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 67 - eps: Rate, 68 - } 69 - 70 - impl Db { 71 - pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 72 - tracing::info!("opening db..."); 73 - let ks = Config::new(path) 74 - .cache_size(8 * 1024 * 1024) // from talna 75 - .open()?; 76 - Ok(Self { 77 - hits: Default::default(), 78 - counts: ks.open_partition( 79 - "_counts", 80 - PartitionCreateOptions::default().compression(fjall::CompressionType::None), 81 - )?, 82 - inner: ks, 83 - event_broadcaster: broadcast::channel(1000).0, 84 - eps: Rate::new(Duration::from_secs(1)), 85 - }) 86 - } 87 - 88 - pub fn eps(&self) -> usize { 89 - self.eps.rate(&()) as usize 90 - } 91 - 92 - pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 93 - self.event_broadcaster.subscribe() 94 - } 95 - 96 - #[inline(always)] 97 - fn run_in_nsid_tree<T>( 98 - &self, 99 - nsid: &str, 100 - f: impl FnOnce(&Partition) -> AppResult<T>, 101 - ) -> AppResult<T> { 102 - f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || { 103 - let opts = PartitionCreateOptions::default() 104 - .compression(fjall::CompressionType::Miniz(9)) 105 - .compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo { 106 - limit: 5 * 1024 * 1024 * 1024, // 5 gb 107 - ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days 108 - })); 109 - self.inner.open_partition(nsid, opts).unwrap() 110 - })) 111 - } 112 - 113 - pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 114 - let EventRecord { 115 - nsid, 116 - timestamp, 117 - deleted, 118 - } = e; 119 - 120 - self.insert_event(&nsid, timestamp, deleted)?; 121 - // increment count 122 - let mut counts = self.get_count(&nsid)?; 123 - counts.last_seen = timestamp; 124 - if deleted { 125 - counts.deleted_count += 1; 126 - } else { 127 - counts.count += 1; 128 - } 129 - self.insert_count(&nsid, counts.clone())?; 130 - if self.event_broadcaster.receiver_count() > 0 { 131 - let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 132 - } 133 - self.eps.observe(&(), 1); 134 - Ok(()) 135 - } 136 - 137 - #[inline(always)] 138 - fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> { 139 - self.run_in_nsid_tree(nsid, |tree| { 140 - tree.insert( 141 - timestamp.to_be_bytes(), 142 - unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() } 143 - .as_slice(), 144 - ) 145 - .map_err(AppError::from) 146 - }) 147 - } 148 - 149 - #[inline(always)] 150 - fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 151 - self.counts 152 - .insert( 153 - nsid, 154 - unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 155 - ) 156 - .map_err(AppError::from) 157 - } 158 - 159 - pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 160 - let Some(raw) = self.counts.get(nsid)? else { 161 - return Ok(NsidCounts::default()); 162 - }; 163 - Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 164 - } 165 - 166 - pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 167 - self.counts.iter().map(|res| { 168 - res.map_err(AppError::from).map(|(key, val)| { 169 - ( 170 - SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 171 - unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 172 - ) 173 - }) 174 - }) 175 - } 176 - 177 - pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> { 178 - self.inner 179 - .list_partitions() 180 - .into_iter() 181 - .filter(|k| k.deref() != "_counts") 182 - } 183 - 184 - pub fn get_hits( 185 - &self, 186 - nsid: &str, 187 - range: impl RangeBounds<u64>, 188 - ) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> { 189 - let start = range.start_bound().cloned().map(u64::to_be_bytes); 190 - let end = range.end_bound().cloned().map(u64::to_be_bytes); 191 - 192 - let _guard = self.hits.guard(); 193 - let Some(tree) = self.hits.get(nsid, &_guard) else { 194 - return Ok(Box::new(std::iter::empty())); 195 - }; 196 - 197 - Ok(Box::new(tree.range(TimestampRange { start, end }).map( 198 - |res| { 199 - res.map_err(AppError::from).map(|(key, val)| { 200 - ( 201 - u64::from_be_bytes(key.as_ref().try_into().unwrap()), 202 - unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 203 - ) 204 - }) 205 - }, 206 - ))) 207 - } 208 - 209 - pub fn tracking_since(&self) -> AppResult<u64> { 210 - let _guard = self.hits.guard(); 211 - // HACK: we should actually store when we started tracking but im lazy 212 - // should be accurate enough 213 - let Some(tree) = self.hits.get("app.bsky.feed.like", &_guard) else { 214 - return Ok(0); 215 - }; 216 - 217 - let Some((timestamp_raw, _)) = tree.first_key_value()? else { 218 - return Ok(0); 219 - }; 220 - 221 - Ok(u64::from_be_bytes( 222 - timestamp_raw.as_ref().try_into().unwrap(), 223 - )) 224 - } 225 - } 226 - 227 - type TimestampRepr = [u8; 8]; 228 - 229 - struct TimestampRange { 230 - start: Bound<TimestampRepr>, 231 - end: Bound<TimestampRepr>, 232 - } 233 - 234 - impl RangeBounds<TimestampRepr> for TimestampRange { 235 - #[inline(always)] 236 - fn start_bound(&self) -> Bound<&TimestampRepr> { 237 - self.start.as_ref() 238 - } 239 - 240 - #[inline(always)] 241 - fn end_bound(&self) -> Bound<&TimestampRepr> { 242 - self.end.as_ref() 243 - } 244 - }
+485
server/src/db/block.rs
··· 1 + use ordered_varint::Variable; 2 + use rkyv::{ 3 + Archive, Serialize, api::high::HighSerializer, rancor, ser::allocator::ArenaHandle, 4 + util::AlignedVec, 5 + }; 6 + use std::{ 7 + io::{self, Read, Write}, 8 + marker::PhantomData, 9 + }; 10 + 11 + use crate::error::AppResult; 12 + 13 + pub struct Item<T> { 14 + pub timestamp: u64, 15 + data: AlignedVec, 16 + phantom: PhantomData<T>, 17 + } 18 + 19 + impl<T: Archive> Item<T> { 20 + pub fn access(&self) -> &T::Archived { 21 + unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) } 22 + } 23 + } 24 + 25 + impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 26 + pub fn new(timestamp: u64, data: &T) -> Self { 27 + Item { 28 + timestamp, 29 + data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 30 + phantom: PhantomData, 31 + } 32 + } 33 + } 34 + 35 + pub struct ItemEncoder<W: Write, T> { 36 + writer: W, 37 + prev_timestamp: u64, 38 + prev_delta: i64, 39 + _item: PhantomData<T>, 40 + } 41 + 42 + impl<W: Write, T> ItemEncoder<W, T> { 43 + pub fn new(writer: W) -> Self { 44 + ItemEncoder { 45 + writer, 46 + prev_timestamp: 0, 47 + prev_delta: 0, 48 + _item: PhantomData, 49 + } 50 + } 51 + 52 + pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> { 53 + if self.prev_timestamp == 0 { 54 + // self.writer.write_varint(item.timestamp)?; 55 + self.prev_timestamp = item.timestamp; 56 + self.write_data(&item.data)?; 57 + return Ok(()); 58 + } 59 + 60 + let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 61 + 62 + self.writer.write_varint(delta - self.prev_delta)?; 63 + self.prev_timestamp = item.timestamp; 64 + self.prev_delta = delta; 65 + 66 + self.write_data(&item.data)?; 67 + 68 + Ok(()) 69 + } 70 + 71 + fn write_data(&mut self, data: &[u8]) -> AppResult<()> { 72 + self.writer.write_varint(data.len())?; 73 + self.writer.write_all(data)?; 74 + Ok(()) 75 + } 76 + 77 + pub fn finish(mut self) -> AppResult<W> { 78 + self.writer.flush()?; 79 + Ok(self.writer) 80 + } 81 + } 82 + 83 + pub struct ItemDecoder<R, T> { 84 + reader: R, 85 + current_timestamp: u64, 86 + current_delta: i64, 87 + first_item: bool, 88 + _item: PhantomData<T>, 89 + } 90 + 91 + impl<R: Read, T: Archive> ItemDecoder<R, T> { 92 + pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> { 93 + Ok(ItemDecoder { 94 + reader, 95 + current_timestamp: start_timestamp, 96 + current_delta: 0, 97 + first_item: true, 98 + _item: PhantomData, 99 + }) 100 + } 101 + 102 + pub fn decode(&mut self) -> AppResult<Option<Item<T>>> { 103 + if self.first_item { 104 + // read the first timestamp 105 + // let timestamp = match self.reader.read_varint::<u64>() { 106 + // Ok(timestamp) => timestamp, 107 + // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 108 + // Err(e) => return Err(e.into()), 109 + // }; 110 + // self.current_timestamp = timestamp; 111 + 112 + let Some(data_raw) = self.read_item()? else { 113 + return Ok(None); 114 + }; 115 + self.first_item = false; 116 + return Ok(Some(Item { 117 + timestamp: self.current_timestamp, 118 + data: data_raw, 119 + phantom: PhantomData, 120 + })); 121 + } 122 + 123 + let Some(_delta) = self.read_timestamp()? else { 124 + return Ok(None); 125 + }; 126 + 127 + // read data 128 + let data_raw = match self.read_item()? { 129 + Some(data_raw) => data_raw, 130 + None => { 131 + return Err(io::Error::new( 132 + io::ErrorKind::UnexpectedEof, 133 + "expected data after delta", 134 + ) 135 + .into()); 136 + } 137 + }; 138 + 139 + Ok(Some(Item { 140 + timestamp: self.current_timestamp, 141 + data: data_raw, 142 + phantom: PhantomData, 143 + })) 144 + } 145 + 146 + // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 147 + fn read_timestamp(&mut self) -> AppResult<Option<u64>> { 148 + let delta = match self.reader.read_varint::<i64>() { 149 + Ok(delta) => delta, 150 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 151 + Err(e) => return Err(e.into()), 152 + }; 153 + self.current_delta += delta; 154 + self.current_timestamp = 155 + (self.current_timestamp as i128 + self.current_delta as i128) as u64; 156 + Ok(Some(self.current_timestamp)) 157 + } 158 + 159 + fn read_item(&mut self) -> AppResult<Option<AlignedVec>> { 160 + let data_len = match self.reader.read_varint::<usize>() { 161 + Ok(data_len) => data_len, 162 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 163 + Err(e) => return Err(e.into()), 164 + }; 165 + let mut data_raw = AlignedVec::with_capacity(data_len); 166 + for _ in 0..data_len { 167 + data_raw.push(0); 168 + } 169 + self.reader.read_exact(data_raw.as_mut_slice())?; 170 + Ok(Some(data_raw)) 171 + } 172 + } 173 + 174 + impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 175 + type Item = AppResult<Item<T>>; 176 + 177 + fn next(&mut self) -> Option<Self::Item> { 178 + self.decode().transpose() 179 + } 180 + } 181 + 182 + pub trait WriteVariableExt: Write { 183 + fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 184 + value.encode_variable(self) 185 + } 186 + } 187 + impl<W: Write> WriteVariableExt for W {} 188 + 189 + pub trait ReadVariableExt: Read { 190 + fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 191 + T::decode_variable(self) 192 + } 193 + } 194 + impl<R: Read> ReadVariableExt for R {} 195 + 196 + #[cfg(test)] 197 + mod test { 198 + use super::*; 199 + use rkyv::{Archive, Deserialize, Serialize}; 200 + use std::io::Cursor; 201 + 202 + #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 203 + #[rkyv(compare(PartialEq))] 204 + struct TestData { 205 + id: u32, 206 + value: String, 207 + } 208 + 209 + #[test] 210 + fn test_encoder_decoder_single_item() { 211 + let data = TestData { 212 + id: 123, 213 + value: "test".to_string(), 214 + }; 215 + 216 + let item = Item::new(1000, &data); 217 + 218 + // encode 219 + let mut buffer = Vec::new(); 220 + let mut encoder = ItemEncoder::new(&mut buffer); 221 + encoder.encode(&item).unwrap(); 222 + encoder.finish().unwrap(); 223 + 224 + // decode 225 + let cursor = Cursor::new(buffer); 226 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 227 + 228 + let decoded_item = decoder.decode().unwrap().unwrap(); 229 + assert_eq!(decoded_item.timestamp, 1000); 230 + 231 + let decoded_data = decoded_item.access(); 232 + assert_eq!(decoded_data.id, 123); 233 + assert_eq!(decoded_data.value.as_str(), "test"); 234 + } 235 + 236 + #[test] 237 + fn test_encoder_decoder_multiple_items() { 238 + let items = vec![ 239 + Item::new( 240 + 1000, 241 + &TestData { 242 + id: 1, 243 + value: "first".to_string(), 244 + }, 245 + ), 246 + Item::new( 247 + 1010, 248 + &TestData { 249 + id: 2, 250 + value: "second".to_string(), 251 + }, 252 + ), 253 + Item::new( 254 + 1015, 255 + &TestData { 256 + id: 3, 257 + value: "third".to_string(), 258 + }, 259 + ), 260 + Item::new( 261 + 1025, 262 + &TestData { 263 + id: 4, 264 + value: "fourth".to_string(), 265 + }, 266 + ), 267 + ]; 268 + 269 + // encode 270 + let mut buffer = Vec::new(); 271 + let mut encoder = ItemEncoder::new(&mut buffer); 272 + 273 + for item in &items { 274 + encoder.encode(item).unwrap(); 275 + } 276 + encoder.finish().unwrap(); 277 + 278 + // decode 279 + let cursor = Cursor::new(buffer); 280 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 281 + 282 + let mut decoded_items = Vec::new(); 283 + while let Some(item) = decoder.decode().unwrap() { 284 + decoded_items.push(item); 285 + } 286 + 287 + assert_eq!(decoded_items.len(), 4); 288 + 289 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 290 + assert_eq!(original.timestamp, decoded.timestamp); 291 + assert_eq!(original.access().id, decoded.access().id); 292 + assert_eq!( 293 + original.access().value.as_str(), 294 + decoded.access().value.as_str() 295 + ); 296 + } 297 + } 298 + 299 + #[test] 300 + fn test_encoder_decoder_with_iterator() { 301 + let items = vec![ 302 + Item::new( 303 + 2000, 304 + &TestData { 305 + id: 10, 306 + value: "a".to_string(), 307 + }, 308 + ), 309 + Item::new( 310 + 2005, 311 + &TestData { 312 + id: 20, 313 + value: "b".to_string(), 314 + }, 315 + ), 316 + Item::new( 317 + 2012, 318 + &TestData { 319 + id: 30, 320 + value: "c".to_string(), 321 + }, 322 + ), 323 + ]; 324 + 325 + // encode 326 + let mut buffer = Vec::new(); 327 + let mut encoder = ItemEncoder::new(&mut buffer); 328 + 329 + for item in &items { 330 + encoder.encode(item).unwrap(); 331 + } 332 + encoder.finish().unwrap(); 333 + 334 + // decode 335 + let cursor = Cursor::new(buffer); 336 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 337 + 338 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 339 + let decoded_items = decoded_items.unwrap(); 340 + 341 + assert_eq!(decoded_items.len(), 3); 342 + assert_eq!(decoded_items[0].timestamp, 2000); 343 + assert_eq!(decoded_items[1].timestamp, 2005); 344 + assert_eq!(decoded_items[2].timestamp, 2012); 345 + 346 + assert_eq!(decoded_items[0].access().id, 10); 347 + assert_eq!(decoded_items[1].access().id, 20); 348 + assert_eq!(decoded_items[2].access().id, 30); 349 + } 350 + 351 + #[test] 352 + fn test_delta_compression() { 353 + let items = vec![ 354 + Item::new( 355 + 1000, 356 + &TestData { 357 + id: 1, 358 + value: "a".to_string(), 359 + }, 360 + ), 361 + Item::new( 362 + 1010, 363 + &TestData { 364 + id: 2, 365 + value: "b".to_string(), 366 + }, 367 + ), // delta = 10 368 + Item::new( 369 + 1020, 370 + &TestData { 371 + id: 3, 372 + value: "c".to_string(), 373 + }, 374 + ), // delta = 10, delta-of-delta = 0 375 + Item::new( 376 + 1025, 377 + &TestData { 378 + id: 4, 379 + value: "d".to_string(), 380 + }, 381 + ), // delta = 5, delta-of-delta = -5 382 + ]; 383 + 384 + let mut buffer = Vec::new(); 385 + let mut encoder = ItemEncoder::new(&mut buffer); 386 + 387 + for item in &items { 388 + encoder.encode(item).unwrap(); 389 + } 390 + encoder.finish().unwrap(); 391 + 392 + // decode and verify 393 + let cursor = Cursor::new(buffer); 394 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 395 + 396 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 397 + let decoded_items = decoded_items.unwrap(); 398 + 399 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 400 + assert_eq!(original.timestamp, decoded.timestamp); 401 + assert_eq!(original.access().id, decoded.access().id); 402 + } 403 + } 404 + 405 + #[test] 406 + fn test_empty_decode() { 407 + let buffer = Vec::new(); 408 + let cursor = Cursor::new(buffer); 409 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 410 + 411 + let result = decoder.decode().unwrap(); 412 + assert!(result.is_none()); 413 + } 414 + 415 + #[test] 416 + fn test_backwards_timestamp() { 417 + let items = vec![ 418 + Item::new( 419 + 1000, 420 + &TestData { 421 + id: 1, 422 + value: "first".to_string(), 423 + }, 424 + ), 425 + Item::new( 426 + 900, 427 + &TestData { 428 + id: 2, 429 + value: "second".to_string(), 430 + }, 431 + ), 432 + ]; 433 + 434 + let mut buffer = Vec::new(); 435 + let mut encoder = ItemEncoder::new(&mut buffer); 436 + 437 + for item in &items { 438 + encoder.encode(item).unwrap(); 439 + } 440 + encoder.finish().unwrap(); 441 + 442 + let cursor = Cursor::new(buffer); 443 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 444 + 445 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 446 + let decoded_items = decoded_items.unwrap(); 447 + 448 + assert_eq!(decoded_items.len(), 2); 449 + assert_eq!(decoded_items[0].timestamp, 1000); 450 + assert_eq!(decoded_items[1].timestamp, 900); 451 + } 452 + 453 + #[test] 454 + fn test_different_data_sizes() { 455 + let small_data = TestData { 456 + id: 1, 457 + value: "x".to_string(), 458 + }; 459 + let large_data = TestData { 460 + id: 2, 461 + value: "a".repeat(1000), 462 + }; 463 + 464 + let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 465 + 466 + let mut buffer = Vec::new(); 467 + let mut encoder = ItemEncoder::new(&mut buffer); 468 + 469 + for item in &items { 470 + encoder.encode(item).unwrap(); 471 + } 472 + encoder.finish().unwrap(); 473 + 474 + let cursor = Cursor::new(buffer); 475 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 476 + 477 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 478 + let decoded_items = decoded_items.unwrap(); 479 + 480 + assert_eq!(decoded_items.len(), 2); 481 + assert_eq!(decoded_items[0].access().value.as_str(), "x"); 482 + assert_eq!(decoded_items[1].access().value.len(), 1000); 483 + assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000)); 484 + } 485 + }
+589
server/src/db/mod.rs
··· 1 + use std::{ 2 + io::Cursor, 3 + ops::{Bound, Deref, RangeBounds}, 4 + path::Path, 5 + sync::{ 6 + Arc, 7 + atomic::{AtomicUsize, Ordering as AtomicOrdering}, 8 + }, 9 + time::{Duration, Instant}, 10 + }; 11 + 12 + use atomic_time::AtomicInstant; 13 + use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 14 + use ordered_varint::Variable; 15 + use pingora_limits::rate::Rate; 16 + use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17 + use smol_str::SmolStr; 18 + use tokio::sync::broadcast; 19 + 20 + use crate::{ 21 + db::block::{ReadVariableExt, WriteVariableExt}, 22 + error::{AppError, AppResult}, 23 + jetstream::JetstreamEvent, 24 + utils::time_now, 25 + }; 26 + 27 + mod block; 28 + 29 + #[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 30 + #[rkyv(compare(PartialEq), derive(Debug))] 31 + pub struct NsidCounts { 32 + pub count: u128, 33 + pub deleted_count: u128, 34 + pub last_seen: u64, 35 + } 36 + 37 + #[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 38 + #[rkyv(compare(PartialEq), derive(Debug))] 39 + pub struct NsidHit { 40 + pub deleted: bool, 41 + } 42 + 43 + #[derive(Clone)] 44 + pub struct EventRecord { 45 + pub nsid: SmolStr, 46 + pub timestamp: u64, // seconds 47 + pub deleted: bool, 48 + } 49 + 50 + impl EventRecord { 51 + pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 52 + match event { 53 + JetstreamEvent::Commit { 54 + time_us, commit, .. 55 + } => Some(Self { 56 + nsid: commit.collection.into(), 57 + timestamp: time_us / 1_000_000, 58 + deleted: false, 59 + }), 60 + JetstreamEvent::Delete { 61 + time_us, commit, .. 62 + } => Some(Self { 63 + nsid: commit.collection.into(), 64 + timestamp: time_us / 1_000_000, 65 + deleted: true, 66 + }), 67 + _ => None, 68 + } 69 + } 70 + } 71 + 72 + // counts is nsid -> NsidCounts 73 + // hits is tree per nsid: timestamp -> NsidHit 74 + pub struct DbOld { 75 + inner: Keyspace, 76 + hits: scc::HashIndex<SmolStr, Partition>, 77 + counts: Partition, 78 + event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 79 + eps: Rate, 80 + } 81 + 82 + impl DbOld { 83 + pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 84 + tracing::info!("opening db..."); 85 + let ks = Config::new(path) 86 + .cache_size(8 * 1024 * 1024) // from talna 87 + .open()?; 88 + Ok(Self { 89 + hits: Default::default(), 90 + counts: ks.open_partition( 91 + "_counts", 92 + PartitionCreateOptions::default().compression(fjall::CompressionType::None), 93 + )?, 94 + inner: ks, 95 + event_broadcaster: broadcast::channel(1000).0, 96 + eps: Rate::new(Duration::from_secs(1)), 97 + }) 98 + } 99 + 100 + pub fn eps(&self) -> usize { 101 + self.eps.rate(&()) as usize 102 + } 103 + 104 + pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 105 + self.event_broadcaster.subscribe() 106 + } 107 + 108 + #[inline(always)] 109 + fn run_in_nsid_tree<T>( 110 + &self, 111 + nsid: &str, 112 + f: impl FnOnce(&Partition) -> AppResult<T>, 113 + ) -> AppResult<T> { 114 + f(self 115 + .hits 116 + .entry(SmolStr::new(nsid)) 117 + .or_insert_with(|| { 118 + let opts = PartitionCreateOptions::default() 119 + .compression(fjall::CompressionType::Miniz(9)) 120 + .compaction_strategy(fjall::compaction::Strategy::Fifo( 121 + fjall::compaction::Fifo { 122 + limit: 5 * 1024 * 1024 * 1024, // 5 gb 123 + ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days 124 + }, 125 + )); 126 + self.inner.open_partition(nsid, opts).unwrap() 127 + }) 128 + .get()) 129 + } 130 + 131 + pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 132 + let EventRecord { 133 + nsid, 134 + timestamp, 135 + deleted, 136 + } = e; 137 + 138 + self.insert_event(&nsid, timestamp, deleted)?; 139 + // increment count 140 + let mut counts = self.get_count(&nsid)?; 141 + counts.last_seen = timestamp; 142 + if deleted { 143 + counts.deleted_count += 1; 144 + } else { 145 + counts.count += 1; 146 + } 147 + self.insert_count(&nsid, counts.clone())?; 148 + if self.event_broadcaster.receiver_count() > 0 { 149 + let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 150 + } 151 + self.eps.observe(&(), 1); 152 + Ok(()) 153 + } 154 + 155 + #[inline(always)] 156 + fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> { 157 + self.run_in_nsid_tree(nsid, |tree| { 158 + tree.insert( 159 + timestamp.to_be_bytes(), 160 + unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() } 161 + .as_slice(), 162 + ) 163 + .map_err(AppError::from) 164 + }) 165 + } 166 + 167 + #[inline(always)] 168 + fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 169 + self.counts 170 + .insert( 171 + nsid, 172 + unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 173 + ) 174 + .map_err(AppError::from) 175 + } 176 + 177 + pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 178 + let Some(raw) = self.counts.get(nsid)? else { 179 + return Ok(NsidCounts::default()); 180 + }; 181 + Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 182 + } 183 + 184 + pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 185 + self.counts.iter().map(|res| { 186 + res.map_err(AppError::from).map(|(key, val)| { 187 + ( 188 + SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 189 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 190 + ) 191 + }) 192 + }) 193 + } 194 + 195 + pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> { 196 + self.inner 197 + .list_partitions() 198 + .into_iter() 199 + .filter(|k| k.deref() != "_counts") 200 + } 201 + 202 + pub fn get_hits( 203 + &self, 204 + nsid: &str, 205 + range: impl RangeBounds<u64>, 206 + ) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> { 207 + let start = range.start_bound().cloned().map(u64::to_be_bytes); 208 + let end = range.end_bound().cloned().map(u64::to_be_bytes); 209 + 210 + let _guard = scc::ebr::Guard::new(); 211 + let Some(tree) = self.hits.peek(nsid, &_guard) else { 212 + return Ok(Box::new(std::iter::empty())); 213 + }; 214 + 215 + Ok(Box::new(tree.range(TimestampRangeOld { start, end }).map( 216 + |res| { 217 + res.map_err(AppError::from).map(|(key, val)| { 218 + ( 219 + u64::from_be_bytes(key.as_ref().try_into().unwrap()), 220 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 221 + ) 222 + }) 223 + }, 224 + ))) 225 + } 226 + 227 + pub fn tracking_since(&self) -> AppResult<u64> { 228 + let _guard = scc::ebr::Guard::new(); 229 + // HACK: we should actually store when we started tracking but im lazy 230 + // should be accurate enough 231 + let Some(tree) = self.hits.peek("app.bsky.feed.like", &_guard) else { 232 + return Ok(0); 233 + }; 234 + let Some((timestamp_raw, _)) = tree.first_key_value()? else { 235 + return Ok(0); 236 + }; 237 + drop(_guard); 238 + 239 + Ok(u64::from_be_bytes( 240 + timestamp_raw.as_ref().try_into().unwrap(), 241 + )) 242 + } 243 + } 244 + 245 + type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 246 + type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 247 + type Item = block::Item<NsidHit>; 248 + 249 + pub struct LexiconHandle { 250 + tree: Partition, 251 + buf: Arc<scc::Queue<EventRecord>>, 252 + buf_len: AtomicUsize, 253 + last_insert: AtomicInstant, 254 + eps: Rate, 255 + block_size: AtomicUsize, 256 + } 257 + 258 + impl LexiconHandle { 259 + fn new(keyspace: &Keyspace, nsid: &str) -> Self { 260 + let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9)); 261 + Self { 262 + tree: keyspace.open_partition(nsid, opts).unwrap(), 263 + buf: Default::default(), 264 + buf_len: AtomicUsize::new(0), 265 + last_insert: AtomicInstant::now(), 266 + eps: Rate::new(Duration::from_secs(5)), 267 + block_size: AtomicUsize::new(1000), 268 + } 269 + } 270 + 271 + fn item_count(&self) -> usize { 272 + self.buf_len.load(AtomicOrdering::Acquire) 273 + } 274 + 275 + fn last_insert(&self) -> Instant { 276 + self.last_insert.load(AtomicOrdering::Acquire) 277 + } 278 + 279 + fn suggested_block_size(&self) -> usize { 280 + self.block_size.load(AtomicOrdering::Relaxed) 281 + } 282 + 283 + fn insert(&self, event: EventRecord) { 284 + self.buf.push(event); 285 + self.buf_len.fetch_add(1, AtomicOrdering::Release); 286 + self.last_insert 287 + .store(Instant::now(), AtomicOrdering::Release); 288 + self.eps.observe(&(), 1); 289 + let rate = self.eps.rate(&()) as usize; 290 + if rate != 0 { 291 + self.block_size.store(rate * 60, AtomicOrdering::Relaxed); 292 + } 293 + } 294 + 295 + fn sync(&self) -> AppResult<()> { 296 + let mut writer = ItemEncoder::new(Vec::with_capacity( 297 + size_of::<u64>() + self.item_count() * size_of::<(u64, NsidHit)>(), 298 + )); 299 + let mut start_timestamp = None; 300 + let mut end_timestamp = None; 301 + while let Some(event) = self.buf.pop() { 302 + let item = Item::new( 303 + event.timestamp, 304 + &NsidHit { 305 + deleted: event.deleted, 306 + }, 307 + ); 308 + writer.encode(&item)?; 309 + if start_timestamp.is_none() { 310 + start_timestamp = Some(event.timestamp); 311 + } 312 + end_timestamp = Some(event.timestamp); 313 + } 314 + if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 315 + self.buf_len.store(0, AtomicOrdering::Release); 316 + let value = writer.finish()?; 317 + let mut key = Vec::with_capacity(size_of::<u64>() * 2); 318 + key.write_varint(start_timestamp)?; 319 + key.write_varint(end_timestamp)?; 320 + self.tree.insert(key, value)?; 321 + } 322 + Ok(()) 323 + } 324 + } 325 + 326 + type BoxedIter<T> = Box<dyn Iterator<Item = T>>; 327 + 328 + // counts is nsid -> NsidCounts 329 + // hits is tree per nsid: varint start time + varint end time -> block of hits 330 + pub struct Db { 331 + inner: Keyspace, 332 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 333 + counts: Partition, 334 + event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 335 + eps: Rate, 336 + min_block_size: usize, 337 + max_last_activity: Duration, 338 + } 339 + 340 + impl Db { 341 + pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 342 + tracing::info!("opening db..."); 343 + let ks = Config::new(path) 344 + .cache_size(8 * 1024 * 1024) // from talna 345 + .open()?; 346 + Ok(Self { 347 + hits: Default::default(), 348 + counts: ks.open_partition( 349 + "_counts", 350 + PartitionCreateOptions::default().compression(fjall::CompressionType::None), 351 + )?, 352 + inner: ks, 353 + event_broadcaster: broadcast::channel(1000).0, 354 + eps: Rate::new(Duration::from_secs(1)), 355 + min_block_size: 512, 356 + max_last_activity: Duration::from_secs(10), 357 + }) 358 + } 359 + 360 + pub fn sync(&self, all: bool) -> AppResult<()> { 361 + let _guard = scc::ebr::Guard::new(); 362 + for (nsid, tree) in self.hits.iter(&_guard) { 363 + let count = tree.item_count(); 364 + let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 365 + let is_too_old = tree.last_insert().elapsed() > self.max_last_activity; 366 + if count > 0 && (all || is_max_block_size || is_too_old) { 367 + tracing::info!("syncing {count} of {nsid} to db"); 368 + tree.sync()?; 369 + } 370 + } 371 + Ok(()) 372 + } 373 + 374 + #[inline(always)] 375 + pub fn eps(&self) -> usize { 376 + self.eps.rate(&()) as usize 377 + } 378 + 379 + #[inline(always)] 380 + pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 381 + self.event_broadcaster.subscribe() 382 + } 383 + 384 + #[inline(always)] 385 + fn maybe_run_in_nsid_tree<T>( 386 + &self, 387 + nsid: &str, 388 + f: impl FnOnce(&LexiconHandle) -> T, 389 + ) -> Option<T> { 390 + let _guard = scc::ebr::Guard::new(); 391 + let handle = match self.hits.peek(nsid, &_guard) { 392 + Some(handle) => handle.clone(), 393 + None => { 394 + if self.inner.partition_exists(nsid) { 395 + let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 396 + let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 397 + handle 398 + } else { 399 + return None; 400 + } 401 + } 402 + }; 403 + Some(f(&handle)) 404 + } 405 + 406 + #[inline(always)] 407 + fn run_in_nsid_tree<T>( 408 + &self, 409 + nsid: SmolStr, 410 + f: impl FnOnce(&LexiconHandle) -> AppResult<T>, 411 + ) -> AppResult<T> { 412 + f(self 413 + .hits 414 + .entry(nsid.clone()) 415 + .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid))) 416 + .get()) 417 + } 418 + 419 + pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 420 + let EventRecord { 421 + nsid, 422 + timestamp, 423 + deleted, 424 + } = e.clone(); 425 + 426 + // insert event 427 + self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?; 428 + // increment count 429 + let mut counts = self.get_count(&nsid)?; 430 + counts.last_seen = timestamp; 431 + if deleted { 432 + counts.deleted_count += 1; 433 + } else { 434 + counts.count += 1; 435 + } 436 + self.insert_count(&nsid, counts.clone())?; 437 + if self.event_broadcaster.receiver_count() > 0 { 438 + let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 439 + } 440 + self.eps.observe(&(), 1); 441 + Ok(()) 442 + } 443 + 444 + #[inline(always)] 445 + fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 446 + self.counts 447 + .insert( 448 + nsid, 449 + unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 450 + ) 451 + .map_err(AppError::from) 452 + } 453 + 454 + pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 455 + let Some(raw) = self.counts.get(nsid)? else { 456 + return Ok(NsidCounts::default()); 457 + }; 458 + Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 459 + } 460 + 461 + pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 462 + self.counts.iter().map(|res| { 463 + res.map_err(AppError::from).map(|(key, val)| { 464 + ( 465 + SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 466 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 467 + ) 468 + }) 469 + }) 470 + } 471 + 472 + pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> { 473 + self.inner 474 + .list_partitions() 475 + .into_iter() 476 + .filter(|k| k.deref() != "_counts") 477 + } 478 + 479 + pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> { 480 + self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> { 481 + Box::new( 482 + handle 483 + .tree 484 + .iter() 485 + .rev() 486 + .map(|res| res.map_err(AppError::from)), 487 + ) 488 + }) 489 + .unwrap_or_else(|| Box::new(std::iter::empty())) 490 + } 491 + 492 + pub fn get_hits( 493 + &self, 494 + nsid: &str, 495 + range: impl RangeBounds<u64> + std::fmt::Debug, 496 + ) -> BoxedIter<AppResult<Item>> { 497 + let start = range 498 + .start_bound() 499 + .cloned() 500 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 501 + let end = range 502 + .end_bound() 503 + .cloned() 504 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 505 + let limit = match range.end_bound().cloned() { 506 + Bound::Included(end) => end, 507 + Bound::Excluded(end) => end.saturating_sub(1), 508 + Bound::Unbounded => u64::MAX, 509 + }; 510 + 511 + self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> { 512 + let map_block = move |(key, val)| { 513 + let mut key_reader = Cursor::new(key); 514 + let start_timestamp = key_reader.read_varint::<u64>()?; 515 + let items = 516 + ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| { 517 + item.as_ref().map_or(true, |item| item.timestamp <= limit) 518 + }); 519 + Ok(items) 520 + }; 521 + 522 + Box::new( 523 + handle 524 + .tree 525 + .range(TimestampRange { start, end }) 526 + .map(move |res| res.map_err(AppError::from).and_then(map_block)) 527 + .flatten() 528 + .flatten(), 529 + ) 530 + }) 531 + .unwrap_or_else(|| Box::new(std::iter::empty())) 532 + } 533 + 534 + pub fn tracking_since(&self) -> AppResult<u64> { 535 + let _guard = scc::ebr::Guard::new(); 536 + // HACK: we should actually store when we started tracking but im lazy 537 + // should be accurate enough 538 + let Some(handle) = self.hits.peek("app.bsky.feed.like", &_guard) else { 539 + return Ok(0); 540 + }; 541 + let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else { 542 + return Ok(0); 543 + }; 544 + drop(_guard); 545 + 546 + let mut timestamp_reader = Cursor::new(timestamps_raw); 547 + timestamp_reader 548 + .read_varint::<u64>() 549 + .map_err(AppError::from) 550 + } 551 + } 552 + 553 + type TimestampRepr = Vec<u8>; 554 + 555 + struct TimestampRange { 556 + start: Bound<TimestampRepr>, 557 + end: Bound<TimestampRepr>, 558 + } 559 + 560 + impl RangeBounds<TimestampRepr> for TimestampRange { 561 + #[inline(always)] 562 + fn start_bound(&self) -> Bound<&TimestampRepr> { 563 + self.start.as_ref() 564 + } 565 + 566 + #[inline(always)] 567 + fn end_bound(&self) -> Bound<&TimestampRepr> { 568 + self.end.as_ref() 569 + } 570 + } 571 + 572 + type TimestampReprOld = [u8; 8]; 573 + 574 + struct TimestampRangeOld { 575 + start: Bound<TimestampReprOld>, 576 + end: Bound<TimestampReprOld>, 577 + } 578 + 579 + impl RangeBounds<TimestampReprOld> for TimestampRangeOld { 580 + #[inline(always)] 581 + fn start_bound(&self) -> Bound<&TimestampReprOld> { 582 + self.start.as_ref() 583 + } 584 + 585 + #[inline(always)] 586 + fn end_bound(&self) -> Bound<&TimestampReprOld> { 587 + self.end.as_ref() 588 + } 589 + }
+41 -21
server/src/main.rs
··· 9 9 10 10 use crate::{ 11 11 api::serve, 12 - db::{Db, EventRecord}, 12 + db::{Db, DbOld, EventRecord}, 13 13 error::AppError, 14 14 jetstream::JetstreamClient, 15 15 }; ··· 18 18 mod db; 19 19 mod error; 20 20 mod jetstream; 21 + mod utils; 21 22 22 23 #[cfg(not(target_env = "msvc"))] 23 24 #[global_allocator] ··· 34 35 .compact() 35 36 .init(); 36 37 37 - if std::env::args() 38 - .nth(1) 39 - .map_or(false, |arg| arg == "migrate") 40 - { 41 - migrate_to_miniz(); 42 - return; 38 + match std::env::args().nth(1).as_deref() { 39 + Some("migrate") => { 40 + migrate(); 41 + return; 42 + } 43 + Some("debug") => { 44 + debug(); 45 + return; 46 + } 47 + _ => {} 43 48 } 44 49 45 50 let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db")); ··· 58 63 }; 59 64 60 65 let cancel_token = CancellationToken::new(); 61 - let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000); 62 66 63 67 let consume_events = tokio::spawn({ 64 68 let consume_cancel = cancel_token.child_token(); 69 + let db = db.clone(); 65 70 async move { 66 71 jetstream.connect().await?; 67 72 loop { ··· 71 76 let Some(record) = EventRecord::from_jetstream(event) else { 72 77 continue; 73 78 }; 74 - let _ = event_tx.send(record).await; 79 + let db = db.clone(); 80 + tokio::task::spawn_blocking(move || { 81 + if let Err(err) = db.record_event(record) { 82 + tracing::error!("failed to record event: {}", err); 83 + } 84 + }); 75 85 } 76 86 Err(err) => return Err(err), 77 87 }, ··· 81 91 } 82 92 }); 83 93 84 - let ingest_events = std::thread::spawn({ 94 + std::thread::spawn({ 85 95 let db = db.clone(); 86 96 move || { 87 - tracing::info!("starting ingest events thread..."); 88 - while let Some(e) = event_rx.blocking_recv() { 89 - if let Err(e) = db.record_event(e) { 90 - tracing::error!("failed to record event: {}", e); 97 + loop { 98 + match db.sync(false) { 99 + Ok(_) => (), 100 + Err(e) => tracing::error!("failed to sync db: {}", e), 91 101 } 102 + std::thread::sleep(std::time::Duration::from_secs(1)); 92 103 } 93 104 } 94 105 }); 95 106 96 107 tokio::select! { 97 - res = serve(db, cancel_token.child_token()) => { 108 + res = serve(db.clone(), cancel_token.child_token()) => { 98 109 if let Err(e) = res { 99 110 tracing::error!("serve failed: {}", e); 100 111 } ··· 114 125 } 115 126 116 127 tracing::info!("shutting down..."); 117 - ingest_events 118 - .join() 119 - .expect("failed to join ingest events thread"); 128 + db.sync(true).expect("couldnt sync db"); 129 + } 130 + 131 + fn debug() { 132 + let db = Db::new(".fjall_data").expect("couldnt create db"); 133 + for nsid in db.get_nsids() { 134 + let nsid = nsid.deref(); 135 + for hit in db.get_hits(nsid, ..) { 136 + let hit = hit.expect("cant read event"); 137 + println!("{nsid} {}", hit.timestamp); 138 + } 139 + } 120 140 } 121 141 122 - fn migrate_to_miniz() { 123 - let from = Db::new(".fjall_data").expect("couldnt create db"); 124 - let to = Db::new(".fjall_data_miniz").expect("couldnt create db"); 142 + fn migrate() { 143 + let from = DbOld::new(".fjall_data").expect("couldnt create db"); 144 + let to = Db::new(".fjall_data_migrated").expect("couldnt create db"); 125 145 126 146 let mut total_count = 0_u64; 127 147 for nsid in from.get_nsids() {
+8
server/src/utils.rs
··· 1 + use std::time::UNIX_EPOCH; 2 + 3 + pub fn time_now() -> u64 { 4 + std::time::SystemTime::now() 5 + .duration_since(UNIX_EPOCH) 6 + .expect("oops") 7 + .as_micros() as u64 8 + }