tracks lexicons and how many times they appeared on the jetstream

fix(server): chunk block encodes and make sure each chunk is processed sequentially, but per nsid is processed in parallel

ptr.pet e2dd9f2b 86f3fa23

verified
+48 -35
+48 -35
server/src/db/mod.rs
··· 124 124 self.eps.observe(); 125 125 } 126 126 127 - fn encode_block(&self, item_count: usize) -> AppResult<Option<Block>> { 127 + fn encode_block(&self, item_count: usize) -> AppResult<Block> { 128 128 let mut writer = ItemEncoder::new( 129 129 WritableByteView::with_size(ItemEncoder::encoded_len(item_count)), 130 130 item_count, ··· 160 160 self.buf_len.store(0, AtomicOrdering::SeqCst); 161 161 let value = writer.finish()?; 162 162 let key = varints_unsigned_encoded([start_timestamp, end_timestamp]); 163 - return Ok(Some(Block { 163 + return Ok(Block { 164 164 written, 165 165 key, 166 166 data: value.into_inner(), 167 - })); 167 + }); 168 168 } 169 - Ok(None) 169 + Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into()) 170 170 } 171 171 } 172 172 173 173 // counts is nsid -> NsidCounts 174 174 // hits is tree per nsid: varint start time + varint end time -> block of hits 175 175 pub struct Db { 176 - inner: Keyspace, 176 + ks: Keyspace, 177 177 counts: Partition, 178 178 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 179 179 sync_pool: threadpool::ThreadPool, ··· 188 188 impl Db { 189 189 pub fn new(path: impl AsRef<Path>, cancel_token: CancellationToken) -> AppResult<Self> { 190 190 tracing::info!("opening db..."); 191 - let ks = Config::new(path) 192 - .cache_size(8 * 1024 * 1024) // from talna 193 - .open()?; 191 + let ks = Config::new(path).open()?; 194 192 Ok(Self { 195 193 hits: Default::default(), 196 194 sync_pool: threadpool::Builder::new() ··· 200 198 "_counts", 201 199 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 202 200 )?, 203 - inner: ks, 201 + ks, 204 202 event_broadcaster: broadcast::channel(1000).0, 205 203 eps: RateTracker::new(Duration::from_secs(1)), 206 204 cancel_token, ··· 223 221 let mut data = Vec::with_capacity(self.hits.len()); 224 222 let _guard = scc::ebr::Guard::new(); 225 223 for (_, handle) in self.hits.iter(&_guard) { 226 - let block_size = self 227 - .max_block_size 228 - .min(self.min_block_size.max(handle.suggested_block_size())); 224 + let mut nsid_data = Vec::with_capacity(2); 225 + let is_too_old = handle.since_last_activity() > self.max_last_activity; 226 + // if we disconnect for a long time, we want to sync all of what we have 227 + // to avoid having many small blocks (even if we run compaction later) 228 + let block_size = is_too_old 229 + .then_some(self.max_block_size) 230 + .unwrap_or_else(|| { 231 + self.max_block_size 232 + .min(self.min_block_size.max(handle.suggested_block_size())) 233 + }); 229 234 let count = handle.item_count(); 230 235 let data_count = count / block_size; 231 - let is_too_old = handle.since_last_activity() > self.max_last_activity; 232 236 if count > 0 && (all || data_count > 0 || is_too_old) { 233 237 for i in 0..data_count { 234 - data.push((i, handle.clone(), block_size)); 238 + nsid_data.push((i, handle.clone(), block_size)); 235 239 } 236 240 // only sync remainder if we haven't met block size 237 241 let remainder = count % block_size; 238 242 if data_count == 0 && remainder > 0 { 239 - data.push((data_count, handle.clone(), remainder)); 243 + nsid_data.push((data_count, handle.clone(), remainder)); 240 244 } 241 245 } 246 + data.push(nsid_data); 242 247 } 243 248 drop(_guard); 244 249 245 250 // process the blocks 246 251 let mut blocks = Vec::with_capacity(data.len()); 247 252 data.into_par_iter() 248 - .map(|(i, handle, max_block_size)| { 249 - handle 250 - .encode_block(max_block_size) 251 - .transpose() 252 - .map(|r| r.map(|block| (i, block, handle.clone()))) 253 + .map(|chunk| { 254 + chunk 255 + .into_iter() 256 + .map(|(i, handle, max_block_size)| { 257 + handle 258 + .encode_block(max_block_size) 259 + .map(|block| (i, block, handle)) 260 + }) 261 + .collect::<Result<Vec<_>, _>>() 253 262 }) 254 263 .collect_into_vec(&mut blocks); 255 264 256 265 // execute into db 257 - for item in blocks.into_iter() { 258 - let Some((i, block, handle)) = item.transpose()? else { 259 - continue; 260 - }; 261 - self.sync_pool 262 - .execute(move || match handle.tree.insert(block.key, block.data) { 263 - Ok(_) => { 264 - tracing::info!("[{i}] synced {} of {} to db", block.written, handle.nsid) 265 - } 266 - Err(err) => tracing::error!("failed to sync block: {}", err), 267 - }); 266 + for chunk in blocks.into_iter() { 267 + let chunk = chunk?; 268 + for (i, block, handle) in chunk { 269 + self.sync_pool 270 + .execute(move || match handle.tree.insert(block.key, block.data) { 271 + Ok(_) => { 272 + tracing::info!( 273 + "[{i}] synced {} of {} to db", 274 + block.written, 275 + handle.nsid 276 + ) 277 + } 278 + Err(err) => tracing::error!("failed to sync block: {}", err), 279 + }); 280 + } 268 281 } 269 282 self.sync_pool.join(); 270 283 ··· 291 304 let handle = match self.hits.peek(nsid, &_guard) { 292 305 Some(handle) => handle.clone(), 293 306 None => { 294 - if self.inner.partition_exists(nsid) { 295 - let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 307 + if self.ks.partition_exists(nsid) { 308 + let handle = Arc::new(LexiconHandle::new(&self.ks, nsid)); 296 309 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 297 310 handle 298 311 } else { ··· 312 325 f(self 313 326 .hits 314 327 .entry(nsid.clone()) 315 - .or_insert_with(|| Arc::new(LexiconHandle::new(&self.inner, &nsid))) 328 + .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 316 329 .get()) 317 330 } 318 331 ··· 380 393 } 381 394 382 395 pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> { 383 - self.inner 396 + self.ks 384 397 .list_partitions() 385 398 .into_iter() 386 399 .filter(|k| k.deref() != "_counts")