tracks lexicons and how many times they appeared on the jetstream

fix(server): handle byteview with empty bytes at the end by letting the amount of elements be specified in the blocks

ptr.pet 86f3fa23 52fd5331

verified
+188 -126
+47 -29
server/src/db/block.rs
··· 1 - use ordered_varint::Variable; 1 + use std::{ 2 + io::{self, Read, Write}, 3 + marker::PhantomData, 4 + usize, 5 + }; 6 + 2 7 use rkyv::{ 3 8 Archive, Serialize, api::high::HighSerializer, rancor, ser::allocator::ArenaHandle, 4 9 util::AlignedVec, 5 10 }; 6 - use std::{ 7 - io::{self, Read, Write}, 8 - marker::PhantomData, 9 - }; 11 + 12 + use crate::utils::{ReadVariableExt, WriteVariableExt}; 10 13 11 14 pub struct Item<T> { 12 15 pub timestamp: u64, ··· 34 37 writer: W, 35 38 prev_timestamp: u64, 36 39 prev_delta: i64, 40 + item_count: usize, 37 41 _item: PhantomData<T>, 38 42 } 39 43 40 44 impl<W: Write, T> ItemEncoder<W, T> { 41 - pub fn new(writer: W) -> Self { 45 + pub fn new(writer: W, item_count: usize) -> Self { 46 + assert!(item_count > 0); 42 47 ItemEncoder { 43 48 writer, 44 49 prev_timestamp: 0, 45 50 prev_delta: 0, 51 + item_count, 46 52 _item: PhantomData, 47 53 } 48 54 } 49 55 56 + /// NOTE: this is a best effort estimate of the encoded length of the block. 57 + /// if T contains variable-length data, the encoded length may be larger than this estimate. 58 + pub fn encoded_len(item_count: usize) -> usize { 59 + // items length + item count * delta length + data length 60 + size_of::<usize>() + item_count * size_of::<(i64, T)>() 61 + } 62 + 50 63 pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> { 51 64 if self.prev_timestamp == 0 { 65 + self.writer.write_varint(self.item_count)?; 52 66 // self.writer.write_varint(item.timestamp)?; 53 67 self.prev_timestamp = item.timestamp; 54 68 self.write_data(&item.data)?; ··· 82 96 reader: R, 83 97 current_timestamp: u64, 84 98 current_delta: i64, 85 - first_item: bool, 99 + items_read: usize, 100 + expected: usize, 86 101 _item: PhantomData<T>, 87 102 } 88 103 89 104 impl<R: Read, T: Archive> ItemDecoder<R, T> { 90 - pub fn new(reader: R, start_timestamp: u64) -> io::Result<Self> { 105 + pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> { 106 + let expected = match reader.read_varint() { 107 + Ok(expected) => expected, 108 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0, 109 + Err(e) => return Err(e.into()), 110 + }; 111 + 91 112 Ok(ItemDecoder { 92 113 reader, 93 114 current_timestamp: start_timestamp, 94 115 current_delta: 0, 95 - first_item: true, 116 + items_read: 0, 117 + expected, 96 118 _item: PhantomData, 97 119 }) 98 120 } 99 121 100 122 pub fn decode(&mut self) -> io::Result<Option<Item<T>>> { 101 - if self.first_item { 123 + if self.items_read == 0 { 102 124 // read the first timestamp 103 125 // let timestamp = match self.reader.read_varint::<u64>() { 104 126 // Ok(timestamp) => timestamp, ··· 110 132 let Some(data_raw) = self.read_item()? else { 111 133 return Ok(None); 112 134 }; 113 - self.first_item = false; 135 + 136 + self.items_read += 1; 114 137 return Ok(Some(Item { 115 138 timestamp: self.current_timestamp, 116 139 data: data_raw, ··· 118 141 })); 119 142 } 120 143 144 + if self.items_read >= self.expected { 145 + return Ok(None); 146 + } 147 + 121 148 let Some(_delta) = self.read_timestamp()? else { 122 149 return Ok(None); 123 150 }; ··· 134 161 } 135 162 }; 136 163 164 + self.items_read += 1; 137 165 Ok(Some(Item { 138 166 timestamp: self.current_timestamp, 139 167 data: data_raw, ··· 175 203 fn next(&mut self) -> Option<Self::Item> { 176 204 self.decode().transpose() 177 205 } 178 - } 179 206 180 - pub trait WriteVariableExt: Write { 181 - fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 182 - value.encode_variable(self) 183 - } 184 - } 185 - impl<W: Write> WriteVariableExt for W {} 186 - 187 - pub trait ReadVariableExt: Read { 188 - fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 189 - T::decode_variable(self) 207 + fn size_hint(&self) -> (usize, Option<usize>) { 208 + (self.expected, Some(self.expected)) 190 209 } 191 210 } 192 - impl<R: Read> ReadVariableExt for R {} 193 211 194 212 #[cfg(test)] 195 213 mod test { ··· 215 233 216 234 // encode 217 235 let mut buffer = Vec::new(); 218 - let mut encoder = ItemEncoder::new(&mut buffer); 236 + let mut encoder = ItemEncoder::new(&mut buffer, 1); 219 237 encoder.encode(&item).unwrap(); 220 238 encoder.finish().unwrap(); 221 239 ··· 266 284 267 285 // encode 268 286 let mut buffer = Vec::new(); 269 - let mut encoder = ItemEncoder::new(&mut buffer); 287 + let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 270 288 271 289 for item in &items { 272 290 encoder.encode(item).unwrap(); ··· 322 340 323 341 // encode 324 342 let mut buffer = Vec::new(); 325 - let mut encoder = ItemEncoder::new(&mut buffer); 343 + let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 326 344 327 345 for item in &items { 328 346 encoder.encode(item).unwrap(); ··· 380 398 ]; 381 399 382 400 let mut buffer = Vec::new(); 383 - let mut encoder = ItemEncoder::new(&mut buffer); 401 + let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 384 402 385 403 for item in &items { 386 404 encoder.encode(item).unwrap(); ··· 430 448 ]; 431 449 432 450 let mut buffer = Vec::new(); 433 - let mut encoder = ItemEncoder::new(&mut buffer); 451 + let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 434 452 435 453 for item in &items { 436 454 encoder.encode(item).unwrap(); ··· 462 480 let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 463 481 464 482 let mut buffer = Vec::new(); 465 - let mut encoder = ItemEncoder::new(&mut buffer); 483 + let mut encoder = ItemEncoder::new(&mut buffer, items.len()); 466 484 467 485 for item in &items { 468 486 encoder.encode(item).unwrap();
+42 -97
server/src/db/mod.rs
··· 1 1 use std::{ 2 - io::{Cursor, Write}, 2 + io::Cursor, 3 3 ops::{Bound, Deref, RangeBounds}, 4 4 path::Path, 5 5 sync::{ ··· 12 12 use byteview::ByteView; 13 13 use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 14 14 use itertools::{Either, Itertools}; 15 - use ordered_varint::Variable; 16 15 use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; 17 16 use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 18 17 use smol_str::SmolStr; ··· 20 19 use tokio_util::sync::CancellationToken; 21 20 22 21 use crate::{ 23 - db::block::{ReadVariableExt, WriteVariableExt}, 24 22 error::{AppError, AppResult}, 25 23 jetstream::JetstreamEvent, 26 - utils::{CLOCK, DefaultRateTracker, RateTracker}, 24 + utils::{ 25 + CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, WritableByteView, 26 + varints_unsigned_encoded, 27 + }, 27 28 }; 28 29 29 30 mod block; ··· 75 76 type ItemEncoder = block::ItemEncoder<WritableByteView, NsidHit>; 76 77 type Item = block::Item<NsidHit>; 77 78 78 - struct WritableByteView { 79 - view: ByteView, 80 - written: usize, 81 - } 82 - 83 - impl WritableByteView { 84 - // returns None if the view already has a reference to it 85 - fn with_size(capacity: usize) -> Self { 86 - Self { 87 - view: ByteView::with_size(capacity), 88 - written: 0, 89 - } 90 - } 91 - 92 - #[inline(always)] 93 - fn into_inner(self) -> ByteView { 94 - self.view 95 - } 96 - } 97 - 98 - impl Write for WritableByteView { 99 - fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 100 - let len = buf.len(); 101 - if len > self.view.len() - self.written { 102 - return Err(std::io::Error::new( 103 - std::io::ErrorKind::StorageFull, 104 - "buffer full", 105 - )); 106 - } 107 - // SAFETY: this is safe because we have checked that the buffer is not full 108 - // SAFETY: we own the mutator so no other references to the view exist 109 - unsafe { 110 - std::ptr::copy_nonoverlapping( 111 - buf.as_ptr(), 112 - self.view 113 - .get_mut() 114 - .unwrap_unchecked() 115 - .as_mut_ptr() 116 - .add(self.written), 117 - len, 118 - ); 119 - self.written += len; 120 - } 121 - Ok(len) 122 - } 123 - 124 - #[inline(always)] 125 - fn flush(&mut self) -> std::io::Result<()> { 126 - Ok(()) 127 - } 128 - } 129 - 130 79 struct Block { 131 80 written: usize, 132 81 key: ByteView, ··· 175 124 self.eps.observe(); 176 125 } 177 126 178 - fn encode_block(&self, max_block_size: usize) -> AppResult<Option<Block>> { 179 - let buf_size = 180 - size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(); 181 - let mut writer = ItemEncoder::new(WritableByteView::with_size(buf_size)); 127 + fn encode_block(&self, item_count: usize) -> AppResult<Option<Block>> { 128 + let mut writer = ItemEncoder::new( 129 + WritableByteView::with_size(ItemEncoder::encoded_len(item_count)), 130 + item_count, 131 + ); 182 132 let mut start_timestamp = None; 183 133 let mut end_timestamp = None; 184 134 let mut written = 0_usize; ··· 194 144 start_timestamp = Some(event.timestamp); 195 145 } 196 146 end_timestamp = Some(event.timestamp); 197 - if written >= max_block_size { 147 + if written >= item_count { 198 148 break; 199 149 } 200 150 written += 1; 201 151 } 152 + if written != item_count { 153 + return Err(std::io::Error::new( 154 + std::io::ErrorKind::InvalidData, 155 + "unexpected number of items, invalid data?", 156 + ) 157 + .into()); 158 + } 202 159 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 203 160 self.buf_len.store(0, AtomicOrdering::SeqCst); 204 161 let value = writer.finish()?; 205 - let mut key = WritableByteView::with_size(size_of::<u64>() * 2); 206 - key.write_varint(start_timestamp)?; 207 - key.write_varint(end_timestamp)?; 162 + let key = varints_unsigned_encoded([start_timestamp, end_timestamp]); 208 163 return Ok(Some(Block { 209 164 written, 210 - key: key.into_inner(), 165 + key, 211 166 data: value.into_inner(), 212 167 })); 213 168 } ··· 449 404 nsid: &str, 450 405 range: impl RangeBounds<u64> + std::fmt::Debug, 451 406 ) -> impl Iterator<Item = AppResult<Item>> { 452 - let start = range 453 - .start_bound() 454 - .cloned() 455 - .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 456 - let end = range 457 - .end_bound() 458 - .cloned() 459 - .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 460 - let limit = match range.end_bound().cloned() { 407 + let start_limit = match range.start_bound().cloned() { 408 + Bound::Included(start) => start, 409 + Bound::Excluded(start) => start.saturating_add(1), 410 + Bound::Unbounded => 0, 411 + }; 412 + let end_limit = match range.end_bound().cloned() { 461 413 Bound::Included(end) => end, 462 414 Bound::Excluded(end) => end.saturating_sub(1), 463 415 Bound::Unbounded => u64::MAX, 464 416 }; 417 + let end_key = varints_unsigned_encoded([end_limit]); 465 418 466 419 self.maybe_run_in_nsid_tree(nsid, move |handle| { 467 420 let map_block = move |(key, val)| { 468 421 let mut key_reader = Cursor::new(key); 469 422 let start_timestamp = key_reader.read_varint::<u64>()?; 423 + if start_timestamp < start_limit { 424 + return Ok(None); 425 + } 470 426 let items = ItemDecoder::new(Cursor::new(val), start_timestamp)? 471 427 .take_while(move |item| { 472 - item.as_ref().map_or(true, |item| item.timestamp <= limit) 428 + item.as_ref().map_or(true, |item| { 429 + item.timestamp <= end_limit && item.timestamp >= start_limit 430 + }) 473 431 }) 474 432 .map(|res| res.map_err(AppError::from)); 475 - Ok(items) 433 + Ok(Some(items)) 476 434 }; 477 435 478 436 Either::Left( 479 437 handle 480 438 .tree 481 - .range(TimestampRange { start, end }) 482 - .map(move |res| res.map_err(AppError::from).and_then(map_block)) 439 + .range(..end_key) 440 + .rev() 441 + .map_while(move |res| { 442 + res.map_err(AppError::from).and_then(map_block).transpose() 443 + }) 444 + .collect::<Vec<_>>() 445 + .into_iter() 446 + .rev() 483 447 .flatten() 484 448 .flatten(), 485 449 ) ··· 502 466 .unwrap_or(Ok(0)) 503 467 } 504 468 } 505 - 506 - type TimestampRepr = Vec<u8>; 507 - 508 - struct TimestampRange { 509 - start: Bound<TimestampRepr>, 510 - end: Bound<TimestampRepr>, 511 - } 512 - 513 - impl RangeBounds<TimestampRepr> for TimestampRange { 514 - #[inline(always)] 515 - fn start_bound(&self) -> Bound<&TimestampRepr> { 516 - self.start.as_ref() 517 - } 518 - 519 - #[inline(always)] 520 - fn end_bound(&self) -> Bound<&TimestampRepr> { 521 - self.end.as_ref() 522 - } 523 - }
+99
server/src/utils.rs
··· 1 + use std::io::{self, Read, Write}; 1 2 use std::sync::atomic::{AtomicU64, Ordering}; 2 3 use std::time::Duration; 4 + 5 + use byteview::ByteView; 6 + use ordered_varint::Variable; 7 + 8 + pub trait WriteVariableExt: Write { 9 + fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 10 + value.encode_variable(self) 11 + } 12 + } 13 + impl<W: Write> WriteVariableExt for W {} 14 + 15 + pub trait ReadVariableExt: Read { 16 + fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 17 + T::decode_variable(self) 18 + } 19 + } 20 + impl<R: Read> ReadVariableExt for R {} 21 + 22 + pub struct WritableByteView { 23 + view: ByteView, 24 + written: usize, 25 + } 26 + 27 + impl WritableByteView { 28 + // returns None if the view already has a reference to it 29 + pub fn with_size(capacity: usize) -> Self { 30 + Self { 31 + view: ByteView::with_size(capacity), 32 + written: 0, 33 + } 34 + } 35 + 36 + #[inline(always)] 37 + pub fn into_inner(self) -> ByteView { 38 + self.view 39 + } 40 + } 41 + 42 + impl Write for WritableByteView { 43 + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { 44 + let len = buf.len(); 45 + if len > self.view.len() - self.written { 46 + return Err(std::io::Error::new( 47 + std::io::ErrorKind::StorageFull, 48 + "buffer full", 49 + )); 50 + } 51 + // SAFETY: this is safe because we have checked that the buffer is not full 52 + // SAFETY: we own the mutator so no other references to the view exist 53 + unsafe { 54 + std::ptr::copy_nonoverlapping( 55 + buf.as_ptr(), 56 + self.view 57 + .get_mut() 58 + .unwrap_unchecked() 59 + .as_mut_ptr() 60 + .add(self.written), 61 + len, 62 + ); 63 + self.written += len; 64 + } 65 + Ok(len) 66 + } 67 + 68 + #[inline(always)] 69 + fn flush(&mut self) -> std::io::Result<()> { 70 + Ok(()) 71 + } 72 + } 73 + 74 + pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView { 75 + let mut buf = 76 + WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum()); 77 + for value in values { 78 + // cant fail 79 + let _ = buf.write_varint(value); 80 + } 81 + buf.into_inner() 82 + } 83 + 84 + // gets the encoded length of a varint-encoded unsigned integer 85 + // see ordered_varint 86 + pub fn varint_unsigned_encoded_len(value: u64) -> usize { 87 + let value = value.to_be_bytes(); 88 + value 89 + .iter() 90 + .enumerate() 91 + .find_map(|(index, &byte)| { 92 + (byte > 0).then(|| { 93 + let extra_bytes = 7 - index; 94 + (byte < 16) 95 + .then(|| extra_bytes + 1) 96 + .unwrap_or_else(|| extra_bytes + 2) 97 + }) 98 + }) 99 + .unwrap_or(0) 100 + .max(1) 101 + } 3 102 4 103 pub static CLOCK: std::sync::LazyLock<quanta::Clock> = 5 104 std::sync::LazyLock::new(|| quanta::Clock::new());