forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! Persistent pre-bundle operation store with strict chronological validation, CID deduplication, incremental saving, and fast DID lookups
2// src/mempool.rs
3use crate::constants;
4use crate::format::format_std_duration_ms;
5use crate::operations::Operation;
6use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use log::{debug, info};
9use std::collections::{HashMap, HashSet};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, BufWriter, Write};
12use std::path::{Path, PathBuf};
13
14/// Mempool stores operations waiting to be bundled
15/// Operations must be strictly chronological
16pub struct Mempool {
17 operations: Vec<Operation>,
18 // Index by DID for fast lookups (maintained in sync with operations)
19 did_index: HashMap<String, Vec<usize>>, // DID -> indices in operations vec
20 target_bundle: u32,
21 min_timestamp: DateTime<Utc>,
22 file: PathBuf,
23 validated: bool,
24 dirty: bool,
25 verbose: bool,
26
27 // Incremental save tracking
28 last_saved_len: usize,
29 last_save_time: std::time::Instant,
30 save_threshold: usize,
31 save_interval: std::time::Duration,
32}
33
34impl Mempool {
35 /// Creates a new mempool for a specific bundle number
36 pub fn new(
37 bundle_dir: &Path,
38 target_bundle: u32,
39 min_timestamp: DateTime<Utc>,
40 verbose: bool,
41 ) -> Result<Self> {
42 let filename = format!(
43 "{}{:06}.jsonl",
44 constants::MEMPOOL_FILE_PREFIX,
45 target_bundle
46 );
47 let file = bundle_dir.join(filename);
48
49 let mut mempool = Self {
50 operations: Vec::new(),
51 did_index: HashMap::new(),
52 target_bundle,
53 min_timestamp,
54 file,
55 validated: false,
56 dirty: false,
57 verbose,
58 last_saved_len: 0,
59 last_save_time: std::time::Instant::now(),
60 save_threshold: 100,
61 save_interval: std::time::Duration::from_secs(15),
62 };
63
64 // Try to load existing mempool
65 if mempool.file.exists() {
66 mempool.load()?;
67 }
68
69 Ok(mempool)
70 }
71
72 fn add_internal(&mut self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> {
73 if ops.is_empty() {
74 return Ok((0, Vec::new()));
75 }
76
77 let mut existing_cids: HashSet<String> = self
78 .operations
79 .iter()
80 .filter_map(|op| op.cid.clone())
81 .collect();
82
83 let mut new_ops = Vec::new();
84 let total_in = ops.len();
85 let mut skipped_no_cid = 0usize;
86 let mut skipped_dupe = 0usize;
87 let mut added_cids = Vec::new();
88
89 let mut last_time = if !self.operations.is_empty() {
90 self.parse_timestamp(&self.operations.last().unwrap().created_at)?
91 } else {
92 self.min_timestamp
93 };
94
95 let start_add = std::time::Instant::now();
96 let mut first_added_time: Option<DateTime<Utc>> = None;
97 let mut last_added_time: Option<DateTime<Utc>> = None;
98 for op in ops {
99 let cid = match &op.cid {
100 Some(c) => c,
101 None => {
102 skipped_no_cid += 1;
103 continue;
104 }
105 };
106
107 if existing_cids.contains(cid) {
108 skipped_dupe += 1;
109 continue;
110 }
111
112 let op_time = self.parse_timestamp(&op.created_at)?;
113 if op_time < last_time {
114 bail!(
115 "chronological violation: operation {} at {} is before {}",
116 cid,
117 op.created_at,
118 last_time.to_rfc3339()
119 );
120 }
121 if op_time < self.min_timestamp {
122 bail!(
123 "operation {} at {} is before minimum timestamp {} (belongs in earlier bundle)",
124 cid,
125 op.created_at,
126 self.min_timestamp.to_rfc3339()
127 );
128 }
129
130 new_ops.push(op.clone());
131 if collect_cids {
132 added_cids.push(cid.clone());
133 }
134 existing_cids.insert(cid.clone());
135 last_time = op_time;
136 if first_added_time.is_none() {
137 first_added_time = Some(op_time);
138 }
139 last_added_time = Some(op_time);
140 }
141
142 let added = new_ops.len();
143
144 let start_idx = self.operations.len();
145 self.operations.extend(new_ops);
146
147 for (offset, op) in self.operations[start_idx..].iter().enumerate() {
148 let idx = start_idx + offset;
149 self.did_index.entry(op.did.clone()).or_default().push(idx);
150 }
151
152 self.validated = true;
153 self.dirty = true;
154
155 if self.verbose {
156 let dur = start_add.elapsed();
157 info!(
158 "mempool add: +{} unique from {} ({} no-cid, {} dupes) in {} • total {}",
159 added,
160 total_in,
161 skipped_no_cid,
162 skipped_dupe,
163 format_std_duration_ms(dur),
164 self.operations.len()
165 );
166 if let (Some(f), Some(l)) = (first_added_time, last_added_time) {
167 debug!(
168 "mempool add range: {} → {}",
169 f.format("%Y-%m-%d %H:%M:%S"),
170 l.format("%Y-%m-%d %H:%M:%S")
171 );
172 }
173 if added == 0 && (skipped_no_cid > 0 || skipped_dupe > 0) {
174 debug!("mempool add made no progress");
175 }
176 }
177
178 Ok((added, added_cids))
179 }
180
181 /// Add operations to the mempool with strict validation
182 pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> {
183 let (added, _) = self.add_internal(ops, false)?;
184 Ok(added)
185 }
186
187 pub fn add_and_collect_cids(&mut self, ops: Vec<Operation>) -> Result<(usize, Vec<String>)> {
188 self.add_internal(ops, true)
189 }
190
191 /// Validate performs a full chronological validation of all operations
192 pub fn validate(&self) -> Result<()> {
193 if self.operations.is_empty() {
194 return Ok(());
195 }
196
197 // Check all operations are after minimum timestamp
198 for (i, op) in self.operations.iter().enumerate() {
199 let op_time = self.parse_timestamp(&op.created_at)?;
200 if op_time < self.min_timestamp {
201 bail!(
202 "operation {} (CID: {:?}) at {} is before minimum timestamp {}",
203 i,
204 op.cid,
205 op.created_at,
206 self.min_timestamp.to_rfc3339()
207 );
208 }
209 }
210
211 // Check chronological order
212 for i in 1..self.operations.len() {
213 let prev = &self.operations[i - 1];
214 let curr = &self.operations[i];
215
216 let prev_time = self.parse_timestamp(&prev.created_at)?;
217 let curr_time = self.parse_timestamp(&curr.created_at)?;
218
219 if curr_time < prev_time {
220 bail!(
221 "chronological violation at index {}: {:?} ({}) is before {:?} ({})",
222 i,
223 curr.cid,
224 curr.created_at,
225 prev.cid,
226 prev.created_at
227 );
228 }
229 }
230
231 // Check for duplicate CIDs
232 let mut cid_map: HashMap<String, usize> = HashMap::new();
233 for (i, op) in self.operations.iter().enumerate() {
234 if let Some(cid) = &op.cid {
235 if let Some(prev_idx) = cid_map.get(cid) {
236 bail!("duplicate CID {} at indices {} and {}", cid, prev_idx, i);
237 }
238 cid_map.insert(cid.clone(), i);
239 }
240 }
241
242 Ok(())
243 }
244
245 /// Count returns the number of operations in mempool
246 pub fn count(&self) -> usize {
247 self.operations.len()
248 }
249
250 /// Take removes and returns up to n operations from the front
251 pub fn take(&mut self, n: usize) -> Result<Vec<Operation>> {
252 // Validate before taking
253 self.validate_locked()?;
254
255 let take_count = n.min(self.operations.len());
256
257 let result: Vec<Operation> = self.operations.drain(..take_count).collect();
258
259 // Rebuild DID index after removing operations (indices have shifted)
260 self.rebuild_did_index();
261
262 // ALWAYS reset tracking after Take
263 // Take() means we're consuming these ops for a bundle
264 // Any remaining ops are "new" and unsaved
265 self.last_saved_len = 0;
266 self.last_save_time = std::time::Instant::now();
267
268 // Mark dirty only if ops remain
269 self.dirty = !self.operations.is_empty();
270 self.validated = false;
271
272 Ok(result)
273 }
274
275 /// validateLocked performs validation (internal helper)
276 fn validate_locked(&mut self) -> Result<()> {
277 if self.validated {
278 return Ok(());
279 }
280
281 if self.operations.is_empty() {
282 return Ok(());
283 }
284
285 // Check chronological order
286 let mut last_time = self.min_timestamp;
287 for (i, op) in self.operations.iter().enumerate() {
288 let op_time = self.parse_timestamp(&op.created_at)?;
289 if op_time < last_time {
290 bail!(
291 "chronological violation at index {}: {} is before {}",
292 i,
293 op.created_at,
294 last_time.to_rfc3339()
295 );
296 }
297 last_time = op_time;
298 }
299
300 self.validated = true;
301 Ok(())
302 }
303
304 /// Peek returns up to n operations without removing them
305 pub fn peek(&self, n: usize) -> Vec<Operation> {
306 let count = n.min(self.operations.len());
307 self.operations[..count].to_vec()
308 }
309
310 /// Clear removes all operations
311 pub fn clear(&mut self) {
312 let prev = self.operations.len();
313 self.operations.clear();
314 self.operations.shrink_to_fit();
315 self.did_index.clear();
316 self.did_index.shrink_to_fit();
317 self.validated = false;
318 self.dirty = true;
319 if self.verbose {
320 info!("mempool clear: removed {} ops", prev);
321 }
322 }
323
324 /// ShouldSave checks if threshold/interval is met
325 pub fn should_save(&self) -> bool {
326 if !self.dirty {
327 return false;
328 }
329
330 let new_ops = self.operations.len().saturating_sub(self.last_saved_len);
331 let time_since_last_save = self.last_save_time.elapsed();
332
333 new_ops >= self.save_threshold || time_since_last_save >= self.save_interval
334 }
335
336 /// SaveIfNeeded saves only if threshold is met
337 pub fn save_if_needed(&mut self) -> Result<()> {
338 if !self.should_save() {
339 return Ok(());
340 }
341 self.save()
342 }
343
344 /// Save - always append-only since mempool only grows
345 pub fn save(&mut self) -> Result<()> {
346 if !self.dirty {
347 return Ok(());
348 }
349
350 if self.operations.is_empty() {
351 // Remove file if empty
352 if self.file.exists() {
353 fs::remove_file(&self.file)?;
354 }
355 self.last_saved_len = 0;
356 self.last_save_time = std::time::Instant::now();
357 self.dirty = false;
358 return Ok(());
359 }
360
361 // Validate before saving
362 self.validate_locked()?;
363
364 // Bounds check to prevent panic
365 if self.last_saved_len > self.operations.len() {
366 if self.verbose {
367 eprintln!(
368 "Warning: lastSavedLen ({}) > operations ({}), resetting to 0",
369 self.last_saved_len,
370 self.operations.len()
371 );
372 }
373 self.last_saved_len = 0;
374 }
375
376 // Get only new operations since last save
377 let new_ops = &self.operations[self.last_saved_len..];
378
379 if new_ops.is_empty() {
380 // Nothing new to save
381 self.dirty = false;
382 return Ok(());
383 }
384
385 // Open for append (or create if first save)
386 let file = OpenOptions::new()
387 .create(true)
388 .append(true)
389 .open(&self.file)?;
390
391 let mut writer = BufWriter::new(file);
392
393 // Write only new operations
394 // CRITICAL: Use raw_json if available to preserve exact byte content
395 // This is required for deterministic content_hash calculation.
396 // Re-serialization would change field order/whitespace and break hash verification.
397 let mut bytes_written = 0usize;
398 let mut appended = 0usize;
399 for op in new_ops {
400 let json = if let Some(ref raw) = op.raw_json {
401 raw.clone()
402 } else {
403 // Fallback: Re-serialize if raw_json is not available
404 // WARNING: This may produce different content_hash than the original!
405 sonic_rs::to_string(op)?
406 };
407 writeln!(writer, "{}", json)?;
408 bytes_written += json.len() + 1;
409 appended += 1;
410 }
411
412 writer.flush()?;
413
414 // Get the underlying file to sync
415 let file = writer.into_inner()?;
416 file.sync_all()?;
417
418 self.last_saved_len = self.operations.len();
419 self.last_save_time = std::time::Instant::now();
420 self.dirty = false;
421
422 if self.verbose {
423 info!(
424 "mempool save: appended {} ops ({} bytes) to {}",
425 appended,
426 bytes_written,
427 self.get_filename()
428 );
429 }
430
431 Ok(())
432 }
433
434 /// Load reads mempool from disk and validates it
435 pub fn load(&mut self) -> Result<()> {
436 let start = std::time::Instant::now();
437 let file = File::open(&self.file)?;
438 let reader = BufReader::new(file);
439
440 self.operations.clear();
441 self.did_index.clear();
442
443 for line in reader.lines() {
444 let line = line?;
445 if line.trim().is_empty() {
446 continue;
447 }
448
449 // CRITICAL: Preserve raw JSON for content hash calculation
450 // This is required by the V1 specification (docs/specification.md § 4.2)
451 // to ensure content_hash remains reproducible.
452 // Without this, re-serialization would change the hash.
453 // Use Operation::from_json (sonic_rs) instead of serde deserialization
454 let op = Operation::from_json(&line)?;
455 let idx = self.operations.len();
456 self.operations.push(op);
457
458 // Update DID index
459 let did = self.operations[idx].did.clone();
460 self.did_index.entry(did).or_default().push(idx);
461 }
462
463 // Validate loaded data
464 self.validate_locked()?;
465
466 // Mark as saved (just loaded from disk)
467 self.last_saved_len = self.operations.len();
468 self.last_save_time = std::time::Instant::now();
469 self.dirty = false;
470
471 if self.verbose {
472 info!(
473 "mempool load: {} ops for bundle {:06} in {}",
474 self.operations.len(),
475 self.target_bundle,
476 format_std_duration_ms(start.elapsed())
477 );
478 }
479
480 Ok(())
481 }
482
483 /// GetFirstTime returns the created_at of the first operation
484 pub fn get_first_time(&self) -> Option<String> {
485 self.operations.first().map(|op| op.created_at.clone())
486 }
487
488 /// GetLastTime returns the created_at of the last operation
489 pub fn get_last_time(&self) -> Option<String> {
490 self.operations.last().map(|op| op.created_at.clone())
491 }
492
493 /// GetTargetBundle returns the bundle number this mempool is for
494 pub fn get_target_bundle(&self) -> u32 {
495 self.target_bundle
496 }
497
498 /// GetMinTimestamp returns the minimum timestamp for operations
499 pub fn get_min_timestamp(&self) -> DateTime<Utc> {
500 self.min_timestamp
501 }
502
503 /// Stats returns mempool statistics
504 pub fn stats(&self) -> MempoolStats {
505 let count = self.operations.len();
506
507 let mut stats = MempoolStats {
508 count,
509 can_create_bundle: count >= constants::BUNDLE_SIZE,
510 target_bundle: self.target_bundle,
511 min_timestamp: self.min_timestamp,
512 validated: self.validated,
513 first_time: None,
514 last_time: None,
515 size_bytes: None,
516 did_count: None,
517 };
518
519 if count > 0 {
520 stats.first_time = self
521 .operations
522 .first()
523 .and_then(|op| self.parse_timestamp(&op.created_at).ok());
524 stats.last_time = self
525 .operations
526 .last()
527 .and_then(|op| self.parse_timestamp(&op.created_at).ok());
528
529 let mut total_size = 0;
530 for op in &self.operations {
531 if let Ok(json) = serde_json::to_string(op) {
532 total_size += json.len();
533 }
534 }
535
536 stats.size_bytes = Some(total_size);
537 stats.did_count = Some(self.did_index.len());
538 }
539
540 stats
541 }
542
543 /// Delete removes the mempool file
544 pub fn delete(&self) -> Result<()> {
545 if self.file.exists() {
546 fs::remove_file(&self.file)?;
547 }
548 Ok(())
549 }
550
551 /// GetFilename returns the mempool filename
552 pub fn get_filename(&self) -> String {
553 self.file
554 .file_name()
555 .and_then(|n| n.to_str())
556 .unwrap_or("")
557 .to_string()
558 }
559
560 /// FindDIDOperations searches for operations matching a DID
561 /// Uses DID index for O(1) lookup instead of linear scan
562 pub fn find_did_operations(&self, did: &str) -> Vec<Operation> {
563 if let Some(indices) = self.did_index.get(did) {
564 // Fast path: use index
565 indices
566 .iter()
567 .map(|&idx| self.operations[idx].clone())
568 .collect()
569 } else {
570 // DID not in index (shouldn't happen if index is maintained correctly)
571 Vec::new()
572 }
573 }
574
575 /// Rebuild DID index from operations (used after take/clear)
576 fn rebuild_did_index(&mut self) {
577 self.did_index.clear();
578 for (idx, op) in self.operations.iter().enumerate() {
579 self.did_index.entry(op.did.clone()).or_default().push(idx);
580 }
581 }
582
583 /// FindLatestDIDOperation finds the most recent non-nullified operation for a DID
584 /// Returns the operation and its position/index in the mempool
585 /// Uses DID index for O(1) lookup, then finds latest by index (operations are chronologically sorted)
586 pub fn find_latest_did_operation(&self, did: &str) -> Option<(Operation, usize)> {
587 if let Some(indices) = self.did_index.get(did) {
588 // Operations are in chronological order, so highest index = latest
589 // Find the highest index that's not nullified
590 indices.iter().rev().find_map(|&idx| {
591 let op = &self.operations[idx];
592 if !op.nullified {
593 Some((op.clone(), idx))
594 } else {
595 None
596 }
597 })
598 } else {
599 None
600 }
601 }
602
603 /// Get all operations (for dump command)
604 pub fn get_operations(&self) -> &[Operation] {
605 &self.operations
606 }
607
608 /// Parse timestamp string to DateTime
609 fn parse_timestamp(&self, s: &str) -> Result<DateTime<Utc>> {
610 Ok(DateTime::parse_from_rfc3339(s)?.with_timezone(&Utc))
611 }
612}
613
614#[derive(Debug, Clone)]
615pub struct MempoolStats {
616 pub count: usize,
617 pub can_create_bundle: bool,
618 pub target_bundle: u32,
619 pub min_timestamp: DateTime<Utc>,
620 pub validated: bool,
621 pub first_time: Option<DateTime<Utc>>,
622 pub last_time: Option<DateTime<Utc>>,
623 pub size_bytes: Option<usize>,
624 pub did_count: Option<usize>,
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630 use crate::operations::Operation;
631 use sonic_rs::Value;
632 use tempfile::TempDir;
633
634 fn create_test_operation(did: &str, cid: &str, created_at: &str) -> Operation {
635 Operation {
636 did: did.to_string(),
637 operation: Value::new(),
638 cid: Some(cid.to_string()),
639 nullified: false,
640 created_at: created_at.to_string(),
641 extra: Value::new(),
642 raw_json: None,
643 }
644 }
645
646 #[test]
647 fn test_mempool_count() {
648 let tmp = TempDir::new().unwrap();
649 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
650 .unwrap()
651 .with_timezone(&Utc);
652 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
653
654 assert_eq!(mempool.count(), 0);
655
656 let ops = vec![
657 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
658 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
659 ];
660 mempool.add(ops).unwrap();
661 assert_eq!(mempool.count(), 2);
662 }
663
664 #[test]
665 fn test_mempool_get_target_bundle() {
666 let tmp = TempDir::new().unwrap();
667 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
668 .unwrap()
669 .with_timezone(&Utc);
670 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
671 assert_eq!(mempool.get_target_bundle(), 42);
672 }
673
674 #[test]
675 fn test_mempool_get_min_timestamp() {
676 let tmp = TempDir::new().unwrap();
677 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
678 .unwrap()
679 .with_timezone(&Utc);
680 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
681 assert_eq!(mempool.get_min_timestamp(), min_time);
682 }
683
684 #[test]
685 fn test_mempool_get_first_time() {
686 let tmp = TempDir::new().unwrap();
687 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
688 .unwrap()
689 .with_timezone(&Utc);
690 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
691
692 assert_eq!(mempool.get_first_time(), None);
693
694 let ops = vec![create_test_operation(
695 "did:plc:test1",
696 "cid1",
697 "2024-01-01T00:00:01Z",
698 )];
699 mempool.add(ops).unwrap();
700 assert_eq!(
701 mempool.get_first_time(),
702 Some("2024-01-01T00:00:01Z".to_string())
703 );
704 }
705
706 #[test]
707 fn test_mempool_get_last_time() {
708 let tmp = TempDir::new().unwrap();
709 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
710 .unwrap()
711 .with_timezone(&Utc);
712 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
713
714 assert_eq!(mempool.get_last_time(), None);
715
716 let ops = vec![
717 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
718 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
719 ];
720 mempool.add(ops).unwrap();
721 assert_eq!(
722 mempool.get_last_time(),
723 Some("2024-01-01T00:00:02Z".to_string())
724 );
725 }
726
727 #[test]
728 fn test_mempool_peek() {
729 let tmp = TempDir::new().unwrap();
730 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
731 .unwrap()
732 .with_timezone(&Utc);
733 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
734
735 let ops = vec![
736 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
737 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
738 create_test_operation("did:plc:test3", "cid3", "2024-01-01T00:00:03Z"),
739 ];
740 mempool.add(ops).unwrap();
741
742 let peeked = mempool.peek(2);
743 assert_eq!(peeked.len(), 2);
744 assert_eq!(peeked[0].cid, Some("cid1".to_string()));
745 assert_eq!(peeked[1].cid, Some("cid2".to_string()));
746
747 // Peek should not remove operations
748 assert_eq!(mempool.count(), 3);
749 }
750
751 #[test]
752 fn test_mempool_peek_more_than_available() {
753 let tmp = TempDir::new().unwrap();
754 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
755 .unwrap()
756 .with_timezone(&Utc);
757 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
758
759 let ops = vec![create_test_operation(
760 "did:plc:test1",
761 "cid1",
762 "2024-01-01T00:00:01Z",
763 )];
764 mempool.add(ops).unwrap();
765
766 let peeked = mempool.peek(10);
767 assert_eq!(peeked.len(), 1);
768 }
769
770 #[test]
771 fn test_mempool_clear() {
772 let tmp = TempDir::new().unwrap();
773 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
774 .unwrap()
775 .with_timezone(&Utc);
776 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
777
778 let ops = vec![
779 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
780 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
781 ];
782 mempool.add(ops).unwrap();
783 assert_eq!(mempool.count(), 2);
784
785 mempool.clear();
786 assert_eq!(mempool.count(), 0);
787 }
788
789 #[test]
790 fn test_mempool_find_did_operations() {
791 let tmp = TempDir::new().unwrap();
792 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
793 .unwrap()
794 .with_timezone(&Utc);
795 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
796
797 let ops = vec![
798 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
799 create_test_operation("did:plc:test1", "cid2", "2024-01-01T00:00:02Z"),
800 create_test_operation("did:plc:test2", "cid3", "2024-01-01T00:00:03Z"),
801 ];
802 mempool.add(ops).unwrap();
803
804 let found = mempool.find_did_operations("did:plc:test1");
805 assert_eq!(found.len(), 2);
806 assert_eq!(found[0].cid, Some("cid1".to_string()));
807 assert_eq!(found[1].cid, Some("cid2".to_string()));
808
809 let found = mempool.find_did_operations("did:plc:test2");
810 assert_eq!(found.len(), 1);
811 assert_eq!(found[0].cid, Some("cid3".to_string()));
812
813 let found = mempool.find_did_operations("did:plc:nonexistent");
814 assert_eq!(found.len(), 0);
815 }
816
817 #[test]
818 fn test_mempool_stats_empty() {
819 let tmp = TempDir::new().unwrap();
820 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
821 .unwrap()
822 .with_timezone(&Utc);
823 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
824
825 let stats = mempool.stats();
826 assert_eq!(stats.count, 0);
827 assert!(!stats.can_create_bundle);
828 assert_eq!(stats.target_bundle, 1);
829 }
830
831 #[test]
832 fn test_mempool_stats_with_operations() {
833 let tmp = TempDir::new().unwrap();
834 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
835 .unwrap()
836 .with_timezone(&Utc);
837 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap();
838
839 let ops = vec![
840 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"),
841 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"),
842 ];
843 mempool.add(ops).unwrap();
844
845 let stats = mempool.stats();
846 assert_eq!(stats.count, 2);
847 assert!(!stats.can_create_bundle); // Need BUNDLE_SIZE (10000) ops
848 assert_eq!(stats.target_bundle, 1);
849 assert!(stats.first_time.is_some());
850 assert!(stats.last_time.is_some());
851 assert_eq!(stats.did_count, Some(2));
852 }
853
854 #[test]
855 fn test_mempool_get_filename() {
856 let tmp = TempDir::new().unwrap();
857 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
858 .unwrap()
859 .with_timezone(&Utc);
860 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap();
861
862 let filename = mempool.get_filename();
863 assert!(filename.contains("plc_mempool_"));
864 assert!(filename.contains("000042"));
865 assert!(filename.ends_with(".jsonl"));
866 }
867}