tangled
alpha
login
or
join now
urschrei.eurosky.social
/
cvmcount
0
fork
atom
Rust implementation of the CVM algorithm for counting distinct elements in a stream
0
fork
atom
overview
issues
pulls
pipelines
Streaming / Iterator interface
urschrei.eurosky.social
8 months ago
8ae23cbd
11bcbd94
+260
-1
2 changed files
expand all
collapse all
unified
split
README.md
src
lib.rs
+33
README.md
···
99
99
100
100
The builder validates parameters and provides clear error messages for invalid inputs.
101
101
102
102
+
## Streaming Interface
103
103
+
104
104
+
For processing iterators directly, you can use the streaming methods:
105
105
+
106
106
+
```rust
107
107
+
use cvmcount::{CVM, EstimateDistinct};
108
108
+
109
109
+
// Process an entire iterator with CVM instance
110
110
+
let mut cvm: CVM<i32> = CVM::builder().epsilon(0.05).build().unwrap();
111
111
+
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
112
112
+
let estimate = cvm.process_stream(numbers);
113
113
+
114
114
+
// Or use the iterator extension trait for one-liners
115
115
+
let estimate = (1..=1000)
116
116
+
.cycle()
117
117
+
.take(10_000)
118
118
+
.estimate_distinct_count(0.1, 0.1, 10_000);
119
119
+
120
120
+
// With builder pattern
121
121
+
let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
122
122
+
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
123
123
+
let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
124
124
+
125
125
+
// When working with borrowed data, map to owned explicitly
126
126
+
let borrowed_words = vec!["hello", "world", "hello"];
127
127
+
let estimate = borrowed_words
128
128
+
.iter()
129
129
+
.map(|s| s.to_string())
130
130
+
.estimate_distinct_count(0.1, 0.1, 1000);
131
131
+
```
132
132
+
133
133
+
The streaming interface accepts owned values to avoid cloning within the algorithm, making the ownership requirements explicit.
134
134
+
102
135
## Analysis
103
136
104
137

+227
-1
src/lib.rs
···
254
254
let rng = &mut self.rng;
255
255
self.buf.retain(|_| rng.gen_bool(0.5));
256
256
}
257
257
+
/// Process an entire iterator of owned values and return the final estimate
258
258
+
///
259
259
+
/// This is a convenience method that processes all elements from an iterator
260
260
+
/// and returns the final count estimate. The iterator must yield owned values
261
261
+
/// that the CVM can take ownership of.
262
262
+
///
263
263
+
/// # Examples
264
264
+
///
265
265
+
/// ```
266
266
+
/// use cvmcount::CVM;
267
267
+
///
268
268
+
/// // Process owned strings
269
269
+
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
270
270
+
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
271
271
+
/// let estimate = cvm.process_stream(words);
272
272
+
/// assert!(estimate > 0.0);
273
273
+
///
274
274
+
/// // Process numeric data
275
275
+
/// let numbers = vec![1, 2, 3, 2, 1, 4];
276
276
+
/// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
277
277
+
/// let estimate = cvm.process_stream(numbers);
278
278
+
/// assert!(estimate > 0.0);
279
279
+
///
280
280
+
/// // When you have borrowed data, clone explicitly
281
281
+
/// let borrowed_words = vec!["hello", "world", "hello"];
282
282
+
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
283
283
+
/// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
284
284
+
/// assert!(estimate > 0.0);
285
285
+
/// ```
286
286
+
pub fn process_stream<I>(&mut self, iter: I) -> f64
287
287
+
where
288
288
+
I: IntoIterator<Item = T>,
289
289
+
{
290
290
+
for item in iter {
291
291
+
self.process_element(item);
292
292
+
}
293
293
+
self.calculate_final_result()
294
294
+
}
295
295
+
257
296
/// Calculate the current unique element count. You can continue to add elements after calling this method.
258
297
pub fn calculate_final_result(&self) -> f64 {
259
298
self.buf.len() as f64 / self.probability
260
299
}
261
300
}
262
301
302
302
+
/// Extension trait for iterators to estimate distinct count directly
303
303
+
///
304
304
+
/// This trait provides convenient methods to estimate distinct counts from iterators
305
305
+
/// without manually creating and managing a CVM instance.
306
306
+
///
307
307
+
/// # Examples
308
308
+
///
309
309
+
/// ```
310
310
+
/// use cvmcount::{CVM, EstimateDistinct};
311
311
+
///
312
312
+
/// // Simple usage with default parameters
313
313
+
/// let numbers = vec![1, 2, 3, 2, 1, 4, 5];
314
314
+
/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
315
315
+
/// assert!(estimate > 0.0);
316
316
+
///
317
317
+
/// // Using builder pattern for more control
318
318
+
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
319
319
+
/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
320
320
+
/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
321
321
+
/// assert!(estimate > 0.0);
322
322
+
/// ```
323
323
+
pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
324
324
+
/// Estimate distinct count using the CVM algorithm with specified parameters
325
325
+
///
326
326
+
/// # Arguments
327
327
+
///
328
328
+
/// * `epsilon` - Accuracy requirement (smaller = more accurate)
329
329
+
/// * `delta` - Failure probability (smaller = more confident)
330
330
+
/// * `estimated_size` - Rough estimate of total stream size
331
331
+
///
332
332
+
/// # Returns
333
333
+
///
334
334
+
/// The estimated number of distinct elements
335
335
+
fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
336
336
+
let mut cvm = CVM::new(epsilon, delta, estimated_size);
337
337
+
cvm.process_stream(self)
338
338
+
}
339
339
+
340
340
+
/// Estimate distinct count using a builder for more ergonomic configuration
341
341
+
///
342
342
+
/// # Arguments
343
343
+
///
344
344
+
/// * `builder` - A configured CVMBuilder instance
345
345
+
///
346
346
+
/// # Returns
347
347
+
///
348
348
+
/// Result containing the estimated number of distinct elements or an error message
349
349
+
///
350
350
+
/// # Examples
351
351
+
///
352
352
+
/// ```
353
353
+
/// use cvmcount::{CVM, EstimateDistinct};
354
354
+
///
355
355
+
/// let data = vec![1, 2, 3, 2, 1];
356
356
+
/// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99);
357
357
+
/// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap();
358
358
+
/// assert!(estimate > 0.0);
359
359
+
/// ```
360
360
+
fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
361
361
+
let mut cvm: CVM<T> = builder.build()?;
362
362
+
Ok(cvm.process_stream(self))
363
363
+
}
364
364
+
}
365
365
+
366
366
+
/// Implement EstimateDistinct for all iterators that yield Ord types
367
367
+
impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}
368
368
+
263
369
// Calculate threshold (buf_size) value for the F0-Estimator algorithm
264
370
fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
265
371
((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
···
273
379
path::Path,
274
380
};
275
381
276
276
-
use super::{CVM, ConfidenceSpec};
382
382
+
use super::{CVM, ConfidenceSpec, EstimateDistinct};
277
383
use regex::Regex;
278
384
use std::collections::HashSet;
279
385
···
404
510
405
511
let delta_spec = ConfidenceSpec::Delta(0.05);
406
512
assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
513
513
+
}
514
514
+
515
515
+
#[test]
516
516
+
fn test_process_stream() {
517
517
+
let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
518
518
+
519
519
+
// Test with vector
520
520
+
let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
521
521
+
let estimate = cvm.process_stream(numbers);
522
522
+
assert!(estimate > 0.0);
523
523
+
524
524
+
// Test with range
525
525
+
let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
526
526
+
let estimate2 = cvm2.process_stream(1..=100);
527
527
+
assert!(estimate2 > 0.0);
528
528
+
}
529
529
+
530
530
+
#[test]
531
531
+
fn test_process_stream_strings() {
532
532
+
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
533
533
+
534
534
+
// Test with owned strings
535
535
+
let words = vec![
536
536
+
"hello".to_string(),
537
537
+
"world".to_string(),
538
538
+
"hello".to_string(),
539
539
+
"rust".to_string(),
540
540
+
];
541
541
+
let estimate = cvm.process_stream(words);
542
542
+
assert!(estimate > 0.0);
543
543
+
}
544
544
+
545
545
+
#[test]
546
546
+
fn test_process_stream_with_map() {
547
547
+
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
548
548
+
549
549
+
// Test with borrowed data mapped to owned
550
550
+
let borrowed_words = ["hello", "world", "hello", "rust"];
551
551
+
let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
552
552
+
assert!(estimate > 0.0);
553
553
+
}
554
554
+
555
555
+
#[test]
556
556
+
fn test_estimate_distinct_trait() {
557
557
+
// Test simple usage
558
558
+
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
559
559
+
let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
560
560
+
assert!(estimate > 0.0);
561
561
+
562
562
+
// Test with builder
563
563
+
let words = vec![
564
564
+
"hello".to_string(),
565
565
+
"world".to_string(),
566
566
+
"hello".to_string(),
567
567
+
];
568
568
+
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
569
569
+
let estimate = words
570
570
+
.into_iter()
571
571
+
.estimate_distinct_with_builder(builder)
572
572
+
.unwrap();
573
573
+
assert!(estimate > 0.0);
574
574
+
}
575
575
+
576
576
+
#[test]
577
577
+
fn test_estimate_distinct_with_cloning() {
578
578
+
// Test that explicit cloning works as expected
579
579
+
let borrowed_numbers = [1, 2, 3, 2, 1, 4];
580
580
+
let estimate = borrowed_numbers
581
581
+
.iter()
582
582
+
.cloned()
583
583
+
.estimate_distinct_count(0.1, 0.1, 100);
584
584
+
assert!(estimate > 0.0);
585
585
+
}
586
586
+
587
587
+
#[test]
588
588
+
fn test_streaming_integration_with_file_processing() {
589
589
+
// Simulate file processing pattern
590
590
+
let lines = vec![
591
591
+
"hello world".to_string(),
592
592
+
"world peace".to_string(),
593
593
+
"hello rust".to_string(),
594
594
+
];
595
595
+
596
596
+
let mut cvm: CVM<String> = CVM::<String>::builder()
597
597
+
.epsilon(0.1)
598
598
+
.confidence(0.9)
599
599
+
.build()
600
600
+
.unwrap();
601
601
+
602
602
+
// Process words from all lines
603
603
+
let words: Vec<String> = lines
604
604
+
.into_iter()
605
605
+
.flat_map(|line| {
606
606
+
line.split_whitespace()
607
607
+
.map(|s| s.to_string())
608
608
+
.collect::<Vec<_>>()
609
609
+
})
610
610
+
.collect();
611
611
+
let estimate = cvm.process_stream(words);
612
612
+
613
613
+
assert!(estimate > 0.0);
614
614
+
}
615
615
+
616
616
+
#[test]
617
617
+
fn test_streaming_large_dataset() {
618
618
+
// Test with a larger dataset to verify the algorithm works
619
619
+
let mut cvm: CVM<i32> = CVM::<i32>::builder()
620
620
+
.epsilon(0.1)
621
621
+
.confidence(0.9)
622
622
+
.estimated_size(10_000)
623
623
+
.build()
624
624
+
.unwrap();
625
625
+
626
626
+
// Create data with known distinct count (1000 unique values, repeated)
627
627
+
let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
628
628
+
let estimate = cvm.process_stream(data);
629
629
+
630
630
+
// The estimate should be reasonably close to 1000
631
631
+
// With epsilon=0.1, we expect within 10 % accuracy most of the time
632
632
+
assert!(estimate > 500.0 && estimate < 2000.0);
407
633
}
408
634
}