High-performance implementation of plcbundle written in Rust
at main 867 lines 28 kB view raw
1//! Persistent pre-bundle operation store with strict chronological validation, CID deduplication, incremental saving, and fast DID lookups 2// src/mempool.rs 3use crate::constants; 4use crate::format::format_std_duration_ms; 5use crate::operations::Operation; 6use anyhow::{Result, bail}; 7use chrono::{DateTime, Utc}; 8use log::{debug, info}; 9use std::collections::{HashMap, HashSet}; 10use std::fs::{self, File, OpenOptions}; 11use std::io::{BufRead, BufReader, BufWriter, Write}; 12use std::path::{Path, PathBuf}; 13 14/// Mempool stores operations waiting to be bundled 15/// Operations must be strictly chronological 16pub struct Mempool { 17 operations: Vec<Operation>, 18 // Index by DID for fast lookups (maintained in sync with operations) 19 did_index: HashMap<String, Vec<usize>>, // DID -> indices in operations vec 20 target_bundle: u32, 21 min_timestamp: DateTime<Utc>, 22 file: PathBuf, 23 validated: bool, 24 dirty: bool, 25 verbose: bool, 26 27 // Incremental save tracking 28 last_saved_len: usize, 29 last_save_time: std::time::Instant, 30 save_threshold: usize, 31 save_interval: std::time::Duration, 32} 33 34impl Mempool { 35 /// Creates a new mempool for a specific bundle number 36 pub fn new( 37 bundle_dir: &Path, 38 target_bundle: u32, 39 min_timestamp: DateTime<Utc>, 40 verbose: bool, 41 ) -> Result<Self> { 42 let filename = format!( 43 "{}{:06}.jsonl", 44 constants::MEMPOOL_FILE_PREFIX, 45 target_bundle 46 ); 47 let file = bundle_dir.join(filename); 48 49 let mut mempool = Self { 50 operations: Vec::new(), 51 did_index: HashMap::new(), 52 target_bundle, 53 min_timestamp, 54 file, 55 validated: false, 56 dirty: false, 57 verbose, 58 last_saved_len: 0, 59 last_save_time: std::time::Instant::now(), 60 save_threshold: 100, 61 save_interval: std::time::Duration::from_secs(15), 62 }; 63 64 // Try to load existing mempool 65 if mempool.file.exists() { 66 mempool.load()?; 67 } 68 69 Ok(mempool) 70 } 71 72 fn add_internal(&mut self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> { 73 if ops.is_empty() { 74 return Ok((0, Vec::new())); 75 } 76 77 let mut existing_cids: HashSet<String> = self 78 .operations 79 .iter() 80 .filter_map(|op| op.cid.clone()) 81 .collect(); 82 83 let mut new_ops = Vec::new(); 84 let total_in = ops.len(); 85 let mut skipped_no_cid = 0usize; 86 let mut skipped_dupe = 0usize; 87 let mut added_cids = Vec::new(); 88 89 let mut last_time = if !self.operations.is_empty() { 90 self.parse_timestamp(&self.operations.last().unwrap().created_at)? 91 } else { 92 self.min_timestamp 93 }; 94 95 let start_add = std::time::Instant::now(); 96 let mut first_added_time: Option<DateTime<Utc>> = None; 97 let mut last_added_time: Option<DateTime<Utc>> = None; 98 for op in ops { 99 let cid = match &op.cid { 100 Some(c) => c, 101 None => { 102 skipped_no_cid += 1; 103 continue; 104 } 105 }; 106 107 if existing_cids.contains(cid) { 108 skipped_dupe += 1; 109 continue; 110 } 111 112 let op_time = self.parse_timestamp(&op.created_at)?; 113 if op_time < last_time { 114 bail!( 115 "chronological violation: operation {} at {} is before {}", 116 cid, 117 op.created_at, 118 last_time.to_rfc3339() 119 ); 120 } 121 if op_time < self.min_timestamp { 122 bail!( 123 "operation {} at {} is before minimum timestamp {} (belongs in earlier bundle)", 124 cid, 125 op.created_at, 126 self.min_timestamp.to_rfc3339() 127 ); 128 } 129 130 new_ops.push(op.clone()); 131 if collect_cids { 132 added_cids.push(cid.clone()); 133 } 134 existing_cids.insert(cid.clone()); 135 last_time = op_time; 136 if first_added_time.is_none() { 137 first_added_time = Some(op_time); 138 } 139 last_added_time = Some(op_time); 140 } 141 142 let added = new_ops.len(); 143 144 let start_idx = self.operations.len(); 145 self.operations.extend(new_ops); 146 147 for (offset, op) in self.operations[start_idx..].iter().enumerate() { 148 let idx = start_idx + offset; 149 self.did_index.entry(op.did.clone()).or_default().push(idx); 150 } 151 152 self.validated = true; 153 self.dirty = true; 154 155 if self.verbose { 156 let dur = start_add.elapsed(); 157 info!( 158 "mempool add: +{} unique from {} ({} no-cid, {} dupes) in {} • total {}", 159 added, 160 total_in, 161 skipped_no_cid, 162 skipped_dupe, 163 format_std_duration_ms(dur), 164 self.operations.len() 165 ); 166 if let (Some(f), Some(l)) = (first_added_time, last_added_time) { 167 debug!( 168 "mempool add range: {} → {}", 169 f.format("%Y-%m-%d %H:%M:%S"), 170 l.format("%Y-%m-%d %H:%M:%S") 171 ); 172 } 173 if added == 0 && (skipped_no_cid > 0 || skipped_dupe > 0) { 174 debug!("mempool add made no progress"); 175 } 176 } 177 178 Ok((added, added_cids)) 179 } 180 181 /// Add operations to the mempool with strict validation 182 pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> { 183 let (added, _) = self.add_internal(ops, false)?; 184 Ok(added) 185 } 186 187 pub fn add_and_collect_cids(&mut self, ops: Vec<Operation>) -> Result<(usize, Vec<String>)> { 188 self.add_internal(ops, true) 189 } 190 191 /// Validate performs a full chronological validation of all operations 192 pub fn validate(&self) -> Result<()> { 193 if self.operations.is_empty() { 194 return Ok(()); 195 } 196 197 // Check all operations are after minimum timestamp 198 for (i, op) in self.operations.iter().enumerate() { 199 let op_time = self.parse_timestamp(&op.created_at)?; 200 if op_time < self.min_timestamp { 201 bail!( 202 "operation {} (CID: {:?}) at {} is before minimum timestamp {}", 203 i, 204 op.cid, 205 op.created_at, 206 self.min_timestamp.to_rfc3339() 207 ); 208 } 209 } 210 211 // Check chronological order 212 for i in 1..self.operations.len() { 213 let prev = &self.operations[i - 1]; 214 let curr = &self.operations[i]; 215 216 let prev_time = self.parse_timestamp(&prev.created_at)?; 217 let curr_time = self.parse_timestamp(&curr.created_at)?; 218 219 if curr_time < prev_time { 220 bail!( 221 "chronological violation at index {}: {:?} ({}) is before {:?} ({})", 222 i, 223 curr.cid, 224 curr.created_at, 225 prev.cid, 226 prev.created_at 227 ); 228 } 229 } 230 231 // Check for duplicate CIDs 232 let mut cid_map: HashMap<String, usize> = HashMap::new(); 233 for (i, op) in self.operations.iter().enumerate() { 234 if let Some(cid) = &op.cid { 235 if let Some(prev_idx) = cid_map.get(cid) { 236 bail!("duplicate CID {} at indices {} and {}", cid, prev_idx, i); 237 } 238 cid_map.insert(cid.clone(), i); 239 } 240 } 241 242 Ok(()) 243 } 244 245 /// Count returns the number of operations in mempool 246 pub fn count(&self) -> usize { 247 self.operations.len() 248 } 249 250 /// Take removes and returns up to n operations from the front 251 pub fn take(&mut self, n: usize) -> Result<Vec<Operation>> { 252 // Validate before taking 253 self.validate_locked()?; 254 255 let take_count = n.min(self.operations.len()); 256 257 let result: Vec<Operation> = self.operations.drain(..take_count).collect(); 258 259 // Rebuild DID index after removing operations (indices have shifted) 260 self.rebuild_did_index(); 261 262 // ALWAYS reset tracking after Take 263 // Take() means we're consuming these ops for a bundle 264 // Any remaining ops are "new" and unsaved 265 self.last_saved_len = 0; 266 self.last_save_time = std::time::Instant::now(); 267 268 // Mark dirty only if ops remain 269 self.dirty = !self.operations.is_empty(); 270 self.validated = false; 271 272 Ok(result) 273 } 274 275 /// validateLocked performs validation (internal helper) 276 fn validate_locked(&mut self) -> Result<()> { 277 if self.validated { 278 return Ok(()); 279 } 280 281 if self.operations.is_empty() { 282 return Ok(()); 283 } 284 285 // Check chronological order 286 let mut last_time = self.min_timestamp; 287 for (i, op) in self.operations.iter().enumerate() { 288 let op_time = self.parse_timestamp(&op.created_at)?; 289 if op_time < last_time { 290 bail!( 291 "chronological violation at index {}: {} is before {}", 292 i, 293 op.created_at, 294 last_time.to_rfc3339() 295 ); 296 } 297 last_time = op_time; 298 } 299 300 self.validated = true; 301 Ok(()) 302 } 303 304 /// Peek returns up to n operations without removing them 305 pub fn peek(&self, n: usize) -> Vec<Operation> { 306 let count = n.min(self.operations.len()); 307 self.operations[..count].to_vec() 308 } 309 310 /// Clear removes all operations 311 pub fn clear(&mut self) { 312 let prev = self.operations.len(); 313 self.operations.clear(); 314 self.operations.shrink_to_fit(); 315 self.did_index.clear(); 316 self.did_index.shrink_to_fit(); 317 self.validated = false; 318 self.dirty = true; 319 if self.verbose { 320 info!("mempool clear: removed {} ops", prev); 321 } 322 } 323 324 /// ShouldSave checks if threshold/interval is met 325 pub fn should_save(&self) -> bool { 326 if !self.dirty { 327 return false; 328 } 329 330 let new_ops = self.operations.len().saturating_sub(self.last_saved_len); 331 let time_since_last_save = self.last_save_time.elapsed(); 332 333 new_ops >= self.save_threshold || time_since_last_save >= self.save_interval 334 } 335 336 /// SaveIfNeeded saves only if threshold is met 337 pub fn save_if_needed(&mut self) -> Result<()> { 338 if !self.should_save() { 339 return Ok(()); 340 } 341 self.save() 342 } 343 344 /// Save - always append-only since mempool only grows 345 pub fn save(&mut self) -> Result<()> { 346 if !self.dirty { 347 return Ok(()); 348 } 349 350 if self.operations.is_empty() { 351 // Remove file if empty 352 if self.file.exists() { 353 fs::remove_file(&self.file)?; 354 } 355 self.last_saved_len = 0; 356 self.last_save_time = std::time::Instant::now(); 357 self.dirty = false; 358 return Ok(()); 359 } 360 361 // Validate before saving 362 self.validate_locked()?; 363 364 // Bounds check to prevent panic 365 if self.last_saved_len > self.operations.len() { 366 if self.verbose { 367 eprintln!( 368 "Warning: lastSavedLen ({}) > operations ({}), resetting to 0", 369 self.last_saved_len, 370 self.operations.len() 371 ); 372 } 373 self.last_saved_len = 0; 374 } 375 376 // Get only new operations since last save 377 let new_ops = &self.operations[self.last_saved_len..]; 378 379 if new_ops.is_empty() { 380 // Nothing new to save 381 self.dirty = false; 382 return Ok(()); 383 } 384 385 // Open for append (or create if first save) 386 let file = OpenOptions::new() 387 .create(true) 388 .append(true) 389 .open(&self.file)?; 390 391 let mut writer = BufWriter::new(file); 392 393 // Write only new operations 394 // CRITICAL: Use raw_json if available to preserve exact byte content 395 // This is required for deterministic content_hash calculation. 396 // Re-serialization would change field order/whitespace and break hash verification. 397 let mut bytes_written = 0usize; 398 let mut appended = 0usize; 399 for op in new_ops { 400 let json = if let Some(ref raw) = op.raw_json { 401 raw.clone() 402 } else { 403 // Fallback: Re-serialize if raw_json is not available 404 // WARNING: This may produce different content_hash than the original! 405 sonic_rs::to_string(op)? 406 }; 407 writeln!(writer, "{}", json)?; 408 bytes_written += json.len() + 1; 409 appended += 1; 410 } 411 412 writer.flush()?; 413 414 // Get the underlying file to sync 415 let file = writer.into_inner()?; 416 file.sync_all()?; 417 418 self.last_saved_len = self.operations.len(); 419 self.last_save_time = std::time::Instant::now(); 420 self.dirty = false; 421 422 if self.verbose { 423 info!( 424 "mempool save: appended {} ops ({} bytes) to {}", 425 appended, 426 bytes_written, 427 self.get_filename() 428 ); 429 } 430 431 Ok(()) 432 } 433 434 /// Load reads mempool from disk and validates it 435 pub fn load(&mut self) -> Result<()> { 436 let start = std::time::Instant::now(); 437 let file = File::open(&self.file)?; 438 let reader = BufReader::new(file); 439 440 self.operations.clear(); 441 self.did_index.clear(); 442 443 for line in reader.lines() { 444 let line = line?; 445 if line.trim().is_empty() { 446 continue; 447 } 448 449 // CRITICAL: Preserve raw JSON for content hash calculation 450 // This is required by the V1 specification (docs/specification.md § 4.2) 451 // to ensure content_hash remains reproducible. 452 // Without this, re-serialization would change the hash. 453 // Use Operation::from_json (sonic_rs) instead of serde deserialization 454 let op = Operation::from_json(&line)?; 455 let idx = self.operations.len(); 456 self.operations.push(op); 457 458 // Update DID index 459 let did = self.operations[idx].did.clone(); 460 self.did_index.entry(did).or_default().push(idx); 461 } 462 463 // Validate loaded data 464 self.validate_locked()?; 465 466 // Mark as saved (just loaded from disk) 467 self.last_saved_len = self.operations.len(); 468 self.last_save_time = std::time::Instant::now(); 469 self.dirty = false; 470 471 if self.verbose { 472 info!( 473 "mempool load: {} ops for bundle {:06} in {}", 474 self.operations.len(), 475 self.target_bundle, 476 format_std_duration_ms(start.elapsed()) 477 ); 478 } 479 480 Ok(()) 481 } 482 483 /// GetFirstTime returns the created_at of the first operation 484 pub fn get_first_time(&self) -> Option<String> { 485 self.operations.first().map(|op| op.created_at.clone()) 486 } 487 488 /// GetLastTime returns the created_at of the last operation 489 pub fn get_last_time(&self) -> Option<String> { 490 self.operations.last().map(|op| op.created_at.clone()) 491 } 492 493 /// GetTargetBundle returns the bundle number this mempool is for 494 pub fn get_target_bundle(&self) -> u32 { 495 self.target_bundle 496 } 497 498 /// GetMinTimestamp returns the minimum timestamp for operations 499 pub fn get_min_timestamp(&self) -> DateTime<Utc> { 500 self.min_timestamp 501 } 502 503 /// Stats returns mempool statistics 504 pub fn stats(&self) -> MempoolStats { 505 let count = self.operations.len(); 506 507 let mut stats = MempoolStats { 508 count, 509 can_create_bundle: count >= constants::BUNDLE_SIZE, 510 target_bundle: self.target_bundle, 511 min_timestamp: self.min_timestamp, 512 validated: self.validated, 513 first_time: None, 514 last_time: None, 515 size_bytes: None, 516 did_count: None, 517 }; 518 519 if count > 0 { 520 stats.first_time = self 521 .operations 522 .first() 523 .and_then(|op| self.parse_timestamp(&op.created_at).ok()); 524 stats.last_time = self 525 .operations 526 .last() 527 .and_then(|op| self.parse_timestamp(&op.created_at).ok()); 528 529 let mut total_size = 0; 530 for op in &self.operations { 531 if let Ok(json) = serde_json::to_string(op) { 532 total_size += json.len(); 533 } 534 } 535 536 stats.size_bytes = Some(total_size); 537 stats.did_count = Some(self.did_index.len()); 538 } 539 540 stats 541 } 542 543 /// Delete removes the mempool file 544 pub fn delete(&self) -> Result<()> { 545 if self.file.exists() { 546 fs::remove_file(&self.file)?; 547 } 548 Ok(()) 549 } 550 551 /// GetFilename returns the mempool filename 552 pub fn get_filename(&self) -> String { 553 self.file 554 .file_name() 555 .and_then(|n| n.to_str()) 556 .unwrap_or("") 557 .to_string() 558 } 559 560 /// FindDIDOperations searches for operations matching a DID 561 /// Uses DID index for O(1) lookup instead of linear scan 562 pub fn find_did_operations(&self, did: &str) -> Vec<Operation> { 563 if let Some(indices) = self.did_index.get(did) { 564 // Fast path: use index 565 indices 566 .iter() 567 .map(|&idx| self.operations[idx].clone()) 568 .collect() 569 } else { 570 // DID not in index (shouldn't happen if index is maintained correctly) 571 Vec::new() 572 } 573 } 574 575 /// Rebuild DID index from operations (used after take/clear) 576 fn rebuild_did_index(&mut self) { 577 self.did_index.clear(); 578 for (idx, op) in self.operations.iter().enumerate() { 579 self.did_index.entry(op.did.clone()).or_default().push(idx); 580 } 581 } 582 583 /// FindLatestDIDOperation finds the most recent non-nullified operation for a DID 584 /// Returns the operation and its position/index in the mempool 585 /// Uses DID index for O(1) lookup, then finds latest by index (operations are chronologically sorted) 586 pub fn find_latest_did_operation(&self, did: &str) -> Option<(Operation, usize)> { 587 if let Some(indices) = self.did_index.get(did) { 588 // Operations are in chronological order, so highest index = latest 589 // Find the highest index that's not nullified 590 indices.iter().rev().find_map(|&idx| { 591 let op = &self.operations[idx]; 592 if !op.nullified { 593 Some((op.clone(), idx)) 594 } else { 595 None 596 } 597 }) 598 } else { 599 None 600 } 601 } 602 603 /// Get all operations (for dump command) 604 pub fn get_operations(&self) -> &[Operation] { 605 &self.operations 606 } 607 608 /// Parse timestamp string to DateTime 609 fn parse_timestamp(&self, s: &str) -> Result<DateTime<Utc>> { 610 Ok(DateTime::parse_from_rfc3339(s)?.with_timezone(&Utc)) 611 } 612} 613 614#[derive(Debug, Clone)] 615pub struct MempoolStats { 616 pub count: usize, 617 pub can_create_bundle: bool, 618 pub target_bundle: u32, 619 pub min_timestamp: DateTime<Utc>, 620 pub validated: bool, 621 pub first_time: Option<DateTime<Utc>>, 622 pub last_time: Option<DateTime<Utc>>, 623 pub size_bytes: Option<usize>, 624 pub did_count: Option<usize>, 625} 626 627#[cfg(test)] 628mod tests { 629 use super::*; 630 use crate::operations::Operation; 631 use sonic_rs::Value; 632 use tempfile::TempDir; 633 634 fn create_test_operation(did: &str, cid: &str, created_at: &str) -> Operation { 635 Operation { 636 did: did.to_string(), 637 operation: Value::new(), 638 cid: Some(cid.to_string()), 639 nullified: false, 640 created_at: created_at.to_string(), 641 extra: Value::new(), 642 raw_json: None, 643 } 644 } 645 646 #[test] 647 fn test_mempool_count() { 648 let tmp = TempDir::new().unwrap(); 649 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 650 .unwrap() 651 .with_timezone(&Utc); 652 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 653 654 assert_eq!(mempool.count(), 0); 655 656 let ops = vec![ 657 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 658 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"), 659 ]; 660 mempool.add(ops).unwrap(); 661 assert_eq!(mempool.count(), 2); 662 } 663 664 #[test] 665 fn test_mempool_get_target_bundle() { 666 let tmp = TempDir::new().unwrap(); 667 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 668 .unwrap() 669 .with_timezone(&Utc); 670 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap(); 671 assert_eq!(mempool.get_target_bundle(), 42); 672 } 673 674 #[test] 675 fn test_mempool_get_min_timestamp() { 676 let tmp = TempDir::new().unwrap(); 677 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 678 .unwrap() 679 .with_timezone(&Utc); 680 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 681 assert_eq!(mempool.get_min_timestamp(), min_time); 682 } 683 684 #[test] 685 fn test_mempool_get_first_time() { 686 let tmp = TempDir::new().unwrap(); 687 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 688 .unwrap() 689 .with_timezone(&Utc); 690 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 691 692 assert_eq!(mempool.get_first_time(), None); 693 694 let ops = vec![create_test_operation( 695 "did:plc:test1", 696 "cid1", 697 "2024-01-01T00:00:01Z", 698 )]; 699 mempool.add(ops).unwrap(); 700 assert_eq!( 701 mempool.get_first_time(), 702 Some("2024-01-01T00:00:01Z".to_string()) 703 ); 704 } 705 706 #[test] 707 fn test_mempool_get_last_time() { 708 let tmp = TempDir::new().unwrap(); 709 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 710 .unwrap() 711 .with_timezone(&Utc); 712 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 713 714 assert_eq!(mempool.get_last_time(), None); 715 716 let ops = vec![ 717 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 718 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"), 719 ]; 720 mempool.add(ops).unwrap(); 721 assert_eq!( 722 mempool.get_last_time(), 723 Some("2024-01-01T00:00:02Z".to_string()) 724 ); 725 } 726 727 #[test] 728 fn test_mempool_peek() { 729 let tmp = TempDir::new().unwrap(); 730 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 731 .unwrap() 732 .with_timezone(&Utc); 733 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 734 735 let ops = vec![ 736 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 737 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"), 738 create_test_operation("did:plc:test3", "cid3", "2024-01-01T00:00:03Z"), 739 ]; 740 mempool.add(ops).unwrap(); 741 742 let peeked = mempool.peek(2); 743 assert_eq!(peeked.len(), 2); 744 assert_eq!(peeked[0].cid, Some("cid1".to_string())); 745 assert_eq!(peeked[1].cid, Some("cid2".to_string())); 746 747 // Peek should not remove operations 748 assert_eq!(mempool.count(), 3); 749 } 750 751 #[test] 752 fn test_mempool_peek_more_than_available() { 753 let tmp = TempDir::new().unwrap(); 754 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 755 .unwrap() 756 .with_timezone(&Utc); 757 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 758 759 let ops = vec![create_test_operation( 760 "did:plc:test1", 761 "cid1", 762 "2024-01-01T00:00:01Z", 763 )]; 764 mempool.add(ops).unwrap(); 765 766 let peeked = mempool.peek(10); 767 assert_eq!(peeked.len(), 1); 768 } 769 770 #[test] 771 fn test_mempool_clear() { 772 let tmp = TempDir::new().unwrap(); 773 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 774 .unwrap() 775 .with_timezone(&Utc); 776 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 777 778 let ops = vec![ 779 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 780 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"), 781 ]; 782 mempool.add(ops).unwrap(); 783 assert_eq!(mempool.count(), 2); 784 785 mempool.clear(); 786 assert_eq!(mempool.count(), 0); 787 } 788 789 #[test] 790 fn test_mempool_find_did_operations() { 791 let tmp = TempDir::new().unwrap(); 792 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 793 .unwrap() 794 .with_timezone(&Utc); 795 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 796 797 let ops = vec![ 798 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 799 create_test_operation("did:plc:test1", "cid2", "2024-01-01T00:00:02Z"), 800 create_test_operation("did:plc:test2", "cid3", "2024-01-01T00:00:03Z"), 801 ]; 802 mempool.add(ops).unwrap(); 803 804 let found = mempool.find_did_operations("did:plc:test1"); 805 assert_eq!(found.len(), 2); 806 assert_eq!(found[0].cid, Some("cid1".to_string())); 807 assert_eq!(found[1].cid, Some("cid2".to_string())); 808 809 let found = mempool.find_did_operations("did:plc:test2"); 810 assert_eq!(found.len(), 1); 811 assert_eq!(found[0].cid, Some("cid3".to_string())); 812 813 let found = mempool.find_did_operations("did:plc:nonexistent"); 814 assert_eq!(found.len(), 0); 815 } 816 817 #[test] 818 fn test_mempool_stats_empty() { 819 let tmp = TempDir::new().unwrap(); 820 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 821 .unwrap() 822 .with_timezone(&Utc); 823 let mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 824 825 let stats = mempool.stats(); 826 assert_eq!(stats.count, 0); 827 assert!(!stats.can_create_bundle); 828 assert_eq!(stats.target_bundle, 1); 829 } 830 831 #[test] 832 fn test_mempool_stats_with_operations() { 833 let tmp = TempDir::new().unwrap(); 834 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 835 .unwrap() 836 .with_timezone(&Utc); 837 let mut mempool = Mempool::new(tmp.path(), 1, min_time, false).unwrap(); 838 839 let ops = vec![ 840 create_test_operation("did:plc:test1", "cid1", "2024-01-01T00:00:01Z"), 841 create_test_operation("did:plc:test2", "cid2", "2024-01-01T00:00:02Z"), 842 ]; 843 mempool.add(ops).unwrap(); 844 845 let stats = mempool.stats(); 846 assert_eq!(stats.count, 2); 847 assert!(!stats.can_create_bundle); // Need BUNDLE_SIZE (10000) ops 848 assert_eq!(stats.target_bundle, 1); 849 assert!(stats.first_time.is_some()); 850 assert!(stats.last_time.is_some()); 851 assert_eq!(stats.did_count, Some(2)); 852 } 853 854 #[test] 855 fn test_mempool_get_filename() { 856 let tmp = TempDir::new().unwrap(); 857 let min_time = DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z") 858 .unwrap() 859 .with_timezone(&Utc); 860 let mempool = Mempool::new(tmp.path(), 42, min_time, false).unwrap(); 861 862 let filename = mempool.get_filename(); 863 assert!(filename.contains("plc_mempool_")); 864 assert!(filename.contains("000042")); 865 assert!(filename.ends_with(".jsonl")); 866 } 867}