High-performance implementation of plcbundle written in Rust
at main 4121 lines 151 kB view raw
1//! High-level manager orchestrating loading, random access, DID resolution, querying/export, sync, verification, rollback/migration, and mempool management 2// src/manager.rs 3use crate::constants::{self, bundle_position_to_global, total_operations_from_bundles}; 4use crate::index::{BundleMetadata, Index}; 5use crate::iterators::{ExportIterator, QueryIterator, RangeIterator}; 6use crate::operations::{Operation, OperationFilter, OperationRequest, OperationWithLocation}; 7use crate::options::QueryMode; 8use crate::{did_index, handle_resolver, mempool, verification}; 9use anyhow::{Context, Result}; 10use chrono::{DateTime, Utc}; 11use std::collections::{HashMap, HashSet}; 12use std::io::Write; 13use std::path::PathBuf; 14use std::sync::{Arc, Mutex, RwLock}; 15 16/// Result of a sync_next_bundle operation 17#[derive(Debug, Clone)] 18pub enum SyncResult { 19 /// Successfully created a bundle 20 BundleCreated { 21 bundle_num: u32, 22 mempool_count: usize, 23 duration_ms: u64, 24 fetch_duration_ms: u64, 25 bundle_save_ms: u64, 26 index_ms: u64, 27 fetch_requests: usize, 28 hash: String, 29 age: String, 30 did_index_compacted: bool, 31 unique_dids: u32, 32 size_bytes: u64, 33 fetch_wait_ms: u64, 34 fetch_http_ms: u64, 35 }, 36 /// Caught up to latest PLC data, mempool has partial operations 37 CaughtUp { 38 next_bundle: u32, 39 mempool_count: usize, 40 new_ops: usize, 41 fetch_duration_ms: u64, 42 }, 43} 44 45/// High-level manager for PLC bundle repositories 46/// 47/// Provides fast, memory-efficient access to operations stored in 48/// compressed JSONL bundle files, along with a DID index for quick lookups. 49/// 50/// Key capabilities: 51/// - Smart loading with caching and frame-based random access 52/// - DID resolution and per-DID operation queries (bundles + mempool) 53/// - Batch operations, range iterators, query and export pipelines 54/// - Sync to fetch, deduplicate, and create new bundles with verified hashes 55/// - Maintenance utilities: warm-up, prefetch, rollback, migrate, clean 56/// 57/// # Quickstart 58/// 59/// ```no_run 60/// use plcbundle::{BundleManager, ManagerOptions, QuerySpec, BundleRange, QueryMode}; 61/// use std::path::PathBuf; 62/// 63/// // Create a manager for an existing repository 64/// let mgr = BundleManager::new(PathBuf::from("/data/plc"), ManagerOptions::default())?; 65/// 66/// // Load a bundle 67/// let load = mgr.load_bundle(42, Default::default())?; 68/// let _count = load.operations.len(); 69/// 70/// // Get an operation (bundle number, position within bundle) 71/// let op = mgr.get_operation(42, 10)?; 72/// assert!(!op.did.is_empty()); 73/// 74/// // Resolve a DID to its latest document 75/// let resolved = mgr.resolve_did("did:plc:abcdef...")?; 76/// println!("Resolved to shard {}", resolved.shard_num); 77/// 78/// // Query a range and export 79/// let spec = QuerySpec { bundles: BundleRange::Range(40, 45), filter: None, query: String::new(), mode: QueryMode::Simple }; 80/// let mut count = 0u64; 81/// for item in mgr.query(spec) { count += 1; } 82/// assert!(count > 0); 83/// # Ok::<(), anyhow::Error>(()) 84/// ``` 85pub struct BundleManager { 86 directory: PathBuf, 87 index: Arc<RwLock<Index>>, 88 did_index: Arc<RwLock<Option<did_index::Manager>>>, 89 stats: Arc<RwLock<ManagerStats>>, 90 mempool: Arc<RwLock<Option<mempool::Mempool>>>, 91 mempool_checked: Arc<RwLock<bool>>, // Cache whether we've checked for mempool (to avoid repeated file checks) 92 handle_resolver: Option<Arc<handle_resolver::HandleResolver>>, 93 verbose: Arc<Mutex<bool>>, 94} 95 96#[derive(Debug, Clone, Default)] 97pub struct ManagerStats { 98 pub bundles_loaded: u64, 99 pub cache_hits: u64, 100 pub cache_misses: u64, 101 pub operations_read: u64, 102 pub queries_executed: u64, 103} 104 105#[derive(Debug, Clone)] 106pub struct ResolveResult { 107 pub document: crate::resolver::DIDDocument, 108 pub operation: Operation, // The operation used for resolution 109 pub bundle_number: u32, 110 pub position: usize, 111 pub mempool_time: std::time::Duration, 112 pub mempool_load_time: std::time::Duration, 113 pub index_time: std::time::Duration, 114 pub load_time: std::time::Duration, 115 pub total_time: std::time::Duration, 116 pub locations_found: usize, 117 pub shard_num: u8, 118 pub shard_stats: Option<did_index::DIDLookupStats>, 119 pub lookup_timings: Option<did_index::DIDLookupTimings>, 120} 121 122#[derive(Debug, Clone)] 123pub struct DIDOperationsResult { 124 pub operations: Vec<Operation>, 125 pub operations_with_locations: Option<Vec<OperationWithLocation>>, 126 pub stats: Option<did_index::DIDLookupStats>, 127 pub shard_num: Option<u8>, 128 pub lookup_timings: Option<did_index::DIDLookupTimings>, 129 pub load_time: Option<std::time::Duration>, 130} 131 132#[derive(Debug, Clone, Default)] 133pub struct DIDIndexStats { 134 pub total_dids: usize, 135 pub total_entries: usize, 136 pub avg_operations_per_did: f64, 137} 138 139#[derive(Debug, Clone)] 140pub struct RollbackFileStats { 141 pub deleted: usize, 142 pub failed: usize, 143 pub deleted_size: u64, 144} 145 146#[derive(Debug, Clone)] 147pub struct CleanResult { 148 pub files_removed: usize, 149 pub bytes_freed: u64, 150 pub errors: Option<Vec<String>>, 151} 152 153#[derive(Debug, Clone)] 154pub struct CleanPreview { 155 pub files: Vec<CleanPreviewFile>, 156 pub total_size: u64, 157} 158 159#[derive(Debug, Clone)] 160pub struct CleanPreviewFile { 161 pub path: PathBuf, 162 pub size: u64, 163} 164 165/// Options for configuring BundleManager initialization 166#[derive(Debug, Clone, Default)] 167pub struct ManagerOptions { 168 /// Optional handle resolver URL for resolving @handle.did identifiers 169 pub handle_resolver_url: Option<String>, 170 /// Whether to preload the mempool at initialization (for server use) 171 pub preload_mempool: bool, 172 /// Whether to enable verbose logging 173 pub verbose: bool, 174} 175 176/// Trait to allow passing ManagerOptions or using defaults 177pub trait IntoManagerOptions { 178 fn into_options(self) -> ManagerOptions; 179} 180 181impl IntoManagerOptions for ManagerOptions { 182 fn into_options(self) -> ManagerOptions { 183 self 184 } 185} 186 187impl IntoManagerOptions for () { 188 fn into_options(self) -> ManagerOptions { 189 ManagerOptions::default() 190 } 191} 192 193// Convenience: allow creating from individual components 194impl IntoManagerOptions for (Option<String>, bool, bool) { 195 fn into_options(self) -> ManagerOptions { 196 ManagerOptions { 197 handle_resolver_url: self.0, 198 preload_mempool: self.1, 199 verbose: self.2, 200 } 201 } 202} 203 204impl BundleManager { 205 /// Create a new BundleManager 206 /// 207 /// # Examples 208 /// 209 /// ```no_run 210 /// use plcbundle::{BundleManager, ManagerOptions}; 211 /// use std::path::PathBuf; 212 /// 213 /// // With default options 214 /// let manager = BundleManager::new(PathBuf::from("."), ())?; 215 /// 216 /// // With custom options 217 /// let options = ManagerOptions { 218 /// handle_resolver_url: Some("https://example.com".to_string()), 219 /// preload_mempool: true, 220 /// verbose: true, 221 /// }; 222 /// let manager = BundleManager::new(PathBuf::from("."), options)?; 223 /// # Ok::<(), anyhow::Error>(()) 224 /// ``` 225 pub fn new<O: IntoManagerOptions>(directory: PathBuf, options: O) -> Result<Self> { 226 let options = options.into_options(); 227 let init_start = std::time::Instant::now(); 228 let display_dir = directory 229 .canonicalize() 230 .unwrap_or_else(|_| directory.clone()); 231 log::debug!( 232 "[BundleManager] Initializing BundleManager from: {}", 233 display_dir.display() 234 ); 235 let index = Index::load(&directory)?; 236 237 let handle_resolver = options 238 .handle_resolver_url 239 .map(|url| Arc::new(handle_resolver::HandleResolver::new(url))); 240 241 if handle_resolver.is_some() { 242 log::debug!("[BundleManager] Handle resolver configured"); 243 } 244 245 let manager = Self { 246 directory: directory.clone(), 247 index: Arc::new(RwLock::new(index)), 248 did_index: Arc::new(RwLock::new(None)), 249 stats: Arc::new(RwLock::new(ManagerStats::default())), 250 mempool: Arc::new(RwLock::new(None)), 251 mempool_checked: Arc::new(RwLock::new(false)), 252 handle_resolver, 253 verbose: Arc::new(Mutex::new(options.verbose)), 254 }; 255 256 // Pre-initialize mempool if requested (for server use) 257 if options.preload_mempool { 258 let mempool_preload_start = std::time::Instant::now(); 259 if let Err(e) = manager.load_mempool() { 260 log::debug!("[BundleManager] Mempool preload failed: {}", e); 261 } else { 262 let mempool_preload_time = mempool_preload_start.elapsed(); 263 let mempool_preload_ms = mempool_preload_time.as_secs_f64() * 1000.0; 264 if let Ok(stats) = manager.get_mempool_stats() 265 && stats.count > 0 266 { 267 log::debug!( 268 "[BundleManager] Pre-loaded mempool: {} operations for bundle {} ({:.3}ms)", 269 stats.count, 270 stats.target_bundle, 271 mempool_preload_ms 272 ); 273 } 274 } 275 } 276 277 let total_elapsed = init_start.elapsed(); 278 let total_elapsed_ms = total_elapsed.as_secs_f64() * 1000.0; 279 log::debug!( 280 "[BundleManager] BundleManager initialized successfully ({:.3}ms total)", 281 total_elapsed_ms 282 ); 283 Ok(manager) 284 } 285 286 /// Ensure DID index is loaded (lazy initialization) 287 pub fn ensure_did_index(&self) -> Result<()> { 288 let mut did_index_guard = self.did_index.write().unwrap(); 289 if did_index_guard.is_none() { 290 let did_index_start = std::time::Instant::now(); 291 log::debug!("[BundleManager] Loading DID index..."); 292 let did_index = did_index::Manager::new(self.directory.clone())?; 293 let did_index_elapsed = did_index_start.elapsed(); 294 let did_index_elapsed_ms = did_index_elapsed.as_secs_f64() * 1000.0; 295 log::debug!( 296 "[BundleManager] DID index loaded ({:.3}ms)", 297 did_index_elapsed_ms 298 ); 299 *did_index_guard = Some(did_index); 300 } 301 Ok(()) 302 } 303 304 /// Get a clone of the verbose state Arc for external access 305 pub fn verbose_handle(&self) -> Arc<Mutex<bool>> { 306 self.verbose.clone() 307 } 308 309 // === Smart Loading === 310 /// Load a bundle's operations with optional caching and filtering 311 /// 312 /// Uses on-disk compressed JSONL and supports frame-based random access 313 /// when available, falling back to sequential scan for legacy bundles. 314 /// 315 /// # Arguments 316 /// - `num` Bundle number to load 317 /// - `options` Loading options (cache, decompress, filter, limit) 318 /// 319 /// # Returns 320 /// A `LoadResult` containing operations and optional metadata 321 pub fn load_bundle(&self, num: u32, options: LoadOptions) -> Result<LoadResult> { 322 self.stats.write().unwrap().bundles_loaded += 1; 323 324 let bundle_path = constants::bundle_path(&self.directory, num); 325 let operations = self.load_bundle_from_disk(&bundle_path)?; 326 327 Ok(self.filter_load_result(operations, &options)) 328 } 329 330 // === Single Operation Access === 331 332 /// Get a single operation as raw JSON (fastest, preserves field order) 333 /// 334 /// This method uses frame-based access for efficient random reads. 335 /// Falls back to legacy sequential scan if no frame index is available. 336 pub fn get_operation_raw(&self, bundle_num: u32, position: usize) -> Result<String> { 337 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 338 339 if !bundle_path.exists() { 340 anyhow::bail!("Bundle {} not found", bundle_num); 341 } 342 343 // Try frame-based access first (new format) 344 match self.get_operation_raw_with_frames(&bundle_path, position) { 345 Ok(json) => Ok(json), 346 Err(e) => { 347 // Fall back to legacy sequential scan 348 // This happens for old bundles without frame index 349 if let Ok(json) = self.get_operation_raw_legacy(&bundle_path, position) { 350 Ok(json) 351 } else { 352 Err(e) 353 } 354 } 355 } 356 } 357 358 /// Frame-based operation access (new format with metadata) 359 fn get_operation_raw_with_frames( 360 &self, 361 bundle_path: &std::path::Path, 362 position: usize, 363 ) -> Result<String> { 364 use crate::bundle_format; 365 use std::io::{Read, Seek, SeekFrom}; 366 367 // Open file and read actual metadata frame size 368 let mut file = std::fs::File::open(bundle_path)?; 369 370 // Read magic (4 bytes) 371 let mut magic_buf = [0u8; 4]; 372 file.read_exact(&mut magic_buf)?; 373 let magic = u32::from_le_bytes(magic_buf); 374 375 if magic != bundle_format::SKIPPABLE_MAGIC_METADATA { 376 anyhow::bail!("No metadata frame at start of bundle"); 377 } 378 379 // Read frame size (4 bytes) 380 let mut size_buf = [0u8; 4]; 381 file.read_exact(&mut size_buf)?; 382 let frame_data_size = u32::from_le_bytes(size_buf) as i64; 383 384 // Metadata frame total size = magic(4) + size(4) + data 385 let metadata_frame_size = 8 + frame_data_size; 386 387 // Read the actual metadata 388 let mut metadata_data = vec![0u8; frame_data_size as usize]; 389 file.read_exact(&mut metadata_data)?; 390 let metadata: bundle_format::BundleMetadata = sonic_rs::from_slice(&metadata_data)?; 391 392 if metadata.frame_offsets.is_empty() { 393 anyhow::bail!("No frame offsets in metadata"); 394 } 395 396 // Now seek back to start and use the frame-based loader 397 file.seek(SeekFrom::Start(0))?; 398 bundle_format::load_operation_at_position( 399 &mut file, 400 position, 401 &metadata.frame_offsets, 402 metadata_frame_size, 403 ) 404 } 405 406 /// Legacy sequential scan (for old bundles without frame index) 407 fn get_operation_raw_legacy( 408 &self, 409 bundle_path: &std::path::Path, 410 position: usize, 411 ) -> Result<String> { 412 let file = std::fs::File::open(bundle_path)?; 413 let decoder = zstd::Decoder::new(file)?; 414 let reader = std::io::BufReader::new(decoder); 415 416 use std::io::BufRead; 417 418 for (idx, line_result) in reader.lines().enumerate() { 419 if idx == position { 420 return Ok(line_result?); 421 } 422 } 423 424 anyhow::bail!("Operation position {} out of bounds", position) 425 } 426 427 /// Get a single operation as parsed struct 428 /// 429 /// This method retrieves the raw JSON and parses it into an Operation struct. 430 /// Uses sonic_rs for parsing (not serde) to avoid compatibility issues. 431 /// Use `get_operation_raw()` if you only need the JSON. 432 pub fn get_operation(&self, bundle_num: u32, position: usize) -> Result<Operation> { 433 let json = self.get_operation_raw(bundle_num, position)?; 434 let op = Operation::from_json(&json).with_context(|| { 435 format!( 436 "Failed to parse operation JSON (bundle {}, position {})", 437 bundle_num, position 438 ) 439 })?; 440 Ok(op) 441 } 442 443 /// Get operation with timing statistics (for CLI verbose mode) 444 pub fn get_operation_with_stats( 445 &self, 446 bundle_num: u32, 447 position: usize, 448 ) -> Result<OperationResult> { 449 let start = std::time::Instant::now(); 450 let json = self.get_operation_raw(bundle_num, position)?; 451 let duration = start.elapsed(); 452 453 // Update stats 454 { 455 let mut stats = self.stats.write().unwrap(); 456 stats.operations_read += 1; 457 } 458 459 Ok(OperationResult { 460 raw_json: json.clone(), 461 size_bytes: json.len(), 462 load_duration: duration, 463 }) 464 } 465 466 // === Batch Operations === 467 /// Batch fetch operations across multiple bundles using match requests 468 /// 469 /// Groups requests by bundle for efficient loading and returns 470 /// operations that match each request's optional filter. 471 pub fn get_operations_batch(&self, requests: Vec<OperationRequest>) -> Result<Vec<Operation>> { 472 let mut results = Vec::new(); 473 474 let mut by_bundle: HashMap<u32, Vec<&OperationRequest>> = HashMap::new(); 475 for req in &requests { 476 by_bundle.entry(req.bundle).or_default().push(req); 477 } 478 479 for (bundle_num, reqs) in by_bundle { 480 let load_result = self.load_bundle(bundle_num, LoadOptions::default())?; 481 482 for req in reqs { 483 for op in &load_result.operations { 484 if self.matches_request(op, req) { 485 results.push(op.clone()); 486 } 487 } 488 } 489 } 490 491 Ok(results) 492 } 493 494 /// Create an iterator over operations across a bundle range 495 /// 496 /// Returns a `RangeIterator` that lazily loads bundles between 497 /// `start` and `end` and yields operations optionally filtered. 498 pub fn get_operations_range( 499 &self, 500 start: u32, 501 end: u32, 502 filter: Option<OperationFilter>, 503 ) -> RangeIterator { 504 RangeIterator::new(Arc::new(self.clone_for_arc()), start, end, filter) 505 } 506 507 // === DID Operations === 508 /// Get all operations for a DID from both bundles and mempool 509 /// 510 /// # Arguments 511 /// * `did` - The DID to look up 512 /// * `include_locations` - If true, also return operations with location information 513 /// * `include_stats` - If true, include detailed lookup statistics 514 pub fn get_did_operations( 515 &self, 516 did: &str, 517 include_locations: bool, 518 include_stats: bool, 519 ) -> Result<DIDOperationsResult> { 520 use chrono::DateTime; 521 use std::time::Instant; 522 523 self.ensure_did_index()?; 524 525 let index_start = Instant::now(); 526 let (locations, shard_stats, shard_num, lookup_timings) = if include_stats { 527 let did_index = self.did_index.read().unwrap(); 528 did_index 529 .as_ref() 530 .unwrap() 531 .get_did_locations_with_stats(did)? 532 } else { 533 let did_index = self.did_index.read().unwrap(); 534 let locs = did_index.as_ref().unwrap().get_did_locations(did)?; 535 ( 536 locs, 537 did_index::DIDLookupStats::default(), 538 0, 539 did_index::DIDLookupTimings::default(), 540 ) 541 }; 542 let _index_time = index_start.elapsed(); 543 544 // Get operations from bundles 545 let (bundle_ops_with_loc, load_time) = self.collect_operations_for_locations(&locations)?; 546 let mut bundle_operations: Vec<Operation> = bundle_ops_with_loc 547 .iter() 548 .map(|owl| owl.operation.clone()) 549 .collect(); 550 551 // Get operations from mempool (only once) 552 let (mempool_ops, _mempool_load_time) = self.get_did_operations_from_mempool(did)?; 553 554 // Merge bundle and mempool operations 555 bundle_operations.extend(mempool_ops.clone()); 556 557 // Sort by created_at timestamp 558 bundle_operations.sort_by(|a, b| { 559 let time_a = DateTime::parse_from_rfc3339(&a.created_at) 560 .unwrap_or_else(|_| DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap()); 561 let time_b = DateTime::parse_from_rfc3339(&b.created_at) 562 .unwrap_or_else(|_| DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap()); 563 time_a.cmp(&time_b) 564 }); 565 566 // Build operations_with_locations if requested 567 let operations_with_locations = if include_locations { 568 let mut ops_with_loc = bundle_ops_with_loc; 569 570 // Add mempool operations with bundle=0 571 for (idx, op) in mempool_ops.iter().enumerate() { 572 ops_with_loc.push(OperationWithLocation { 573 operation: op.clone(), 574 bundle: 0, // Use 0 to indicate mempool 575 position: idx, 576 nullified: op.nullified, 577 }); 578 } 579 580 // Sort all operations by timestamp 581 ops_with_loc.sort_by(|a, b| { 582 let time_a = 583 DateTime::parse_from_rfc3339(&a.operation.created_at).unwrap_or_else(|_| { 584 DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap() 585 }); 586 let time_b = 587 DateTime::parse_from_rfc3339(&b.operation.created_at).unwrap_or_else(|_| { 588 DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap() 589 }); 590 time_a.cmp(&time_b) 591 }); 592 593 Some(ops_with_loc) 594 } else { 595 None 596 }; 597 598 Ok(DIDOperationsResult { 599 operations: bundle_operations, 600 operations_with_locations, 601 stats: if include_stats { 602 Some(shard_stats) 603 } else { 604 None 605 }, 606 shard_num: if include_stats { Some(shard_num) } else { None }, 607 lookup_timings: if include_stats { 608 Some(lookup_timings) 609 } else { 610 None 611 }, 612 load_time: if include_stats { Some(load_time) } else { None }, 613 }) 614 } 615 616 /// Sample random DIDs directly from the DID index without reading bundles. 617 pub fn sample_random_dids(&self, count: usize, seed: Option<u64>) -> Result<Vec<String>> { 618 self.ensure_did_index()?; 619 let did_index = self.did_index.read().unwrap(); 620 did_index.as_ref().unwrap().sample_random_dids(count, seed) 621 } 622 623 /// Get DID operations from mempool (internal helper) 624 /// Mempool should be preloaded at initialization, so this is just a fast in-memory lookup 625 /// Returns (operations, load_time) where load_time is always ZERO (no lazy loading) 626 fn get_did_operations_from_mempool( 627 &self, 628 did: &str, 629 ) -> Result<(Vec<Operation>, std::time::Duration)> { 630 use std::time::Instant; 631 632 let mempool_start = Instant::now(); 633 634 // Mempool should be preloaded at initialization (no lazy loading) 635 let mempool_guard = self.mempool.read().unwrap(); 636 match mempool_guard.as_ref() { 637 Some(mp) => { 638 // Mempool is initialized, use it directly (fast HashMap lookup) 639 let ops = mp.find_did_operations(did); 640 let mempool_elapsed = mempool_start.elapsed(); 641 log::debug!( 642 "[Mempool] Found {} operations for DID {} in {:?}", 643 ops.len(), 644 did, 645 mempool_elapsed 646 ); 647 Ok((ops, std::time::Duration::ZERO)) 648 } 649 None => { 650 // Mempool not initialized (wasn't preloaded and doesn't exist) 651 let mempool_elapsed = mempool_start.elapsed(); 652 log::debug!( 653 "[Mempool] No mempool initialized (checked in {:?})", 654 mempool_elapsed 655 ); 656 Ok((Vec::new(), std::time::Duration::ZERO)) 657 } 658 } 659 } 660 661 fn get_latest_did_operation_from_mempool( 662 &self, 663 did: &str, 664 ) -> Result<(Option<(Operation, usize)>, std::time::Duration)> { 665 use std::time::Instant; 666 667 let mempool_start = Instant::now(); 668 669 // Mempool should be preloaded at initialization (no lazy loading) 670 let mempool_guard = self.mempool.read().unwrap(); 671 let result = match mempool_guard.as_ref() { 672 Some(mp) => { 673 // Use mempool's method to find latest non-nullified operation (by index, operations are sorted) 674 mp.find_latest_did_operation(did) 675 } 676 None => { 677 // Mempool not initialized 678 None 679 } 680 }; 681 682 let mempool_elapsed = mempool_start.elapsed(); 683 log::debug!( 684 "[Mempool] Latest operation lookup for DID {} in {:?}", 685 did, 686 mempool_elapsed 687 ); 688 689 Ok((result, std::time::Duration::ZERO)) 690 } 691 692 /// Resolve DID to current W3C DID Document with detailed timing statistics 693 /// Returns the latest non-nullified DID document. 694 /// If mempool has operations, uses the latest from mempool and skips bundle/index lookup. 695 pub fn resolve_did(&self, did: &str) -> Result<ResolveResult> { 696 use chrono::DateTime; 697 use std::time::Instant; 698 699 let total_start = Instant::now(); 700 701 // Validate DID format 702 crate::resolver::validate_did_format(did)?; 703 704 // Check mempool first (most recent operations) 705 log::debug!("[Resolve] Checking mempool first for DID: {}", did); 706 let mempool_start = Instant::now(); 707 let (latest_mempool_op, mempool_load_time) = 708 self.get_latest_did_operation_from_mempool(did)?; 709 let mempool_time = mempool_start.elapsed(); 710 log::debug!( 711 "[Resolve] Mempool check: found latest operation in {:?} (load: {:?})", 712 mempool_time, 713 mempool_load_time 714 ); 715 716 // If mempool has a non-nullified operation, use it and skip bundle lookup 717 if let Some((operation, position)) = latest_mempool_op { 718 let load_start = Instant::now(); 719 log::debug!( 720 "[Resolve] Found latest non-nullified operation in mempool, skipping bundle lookup" 721 ); 722 723 // Build document from latest mempool operation 724 let document = 725 crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?; 726 let load_time = load_start.elapsed(); 727 728 return Ok(ResolveResult { 729 document, 730 operation, 731 bundle_number: 0, // bundle=0 for mempool 732 position, 733 mempool_time, 734 mempool_load_time, 735 index_time: std::time::Duration::ZERO, 736 load_time, 737 total_time: total_start.elapsed(), 738 locations_found: 1, // Found one operation in mempool 739 shard_num: 0, // No shard for mempool 740 shard_stats: None, 741 lookup_timings: None, 742 }); 743 } 744 745 // Mempool is empty or all nullified - check bundles 746 log::debug!( 747 "[Resolve] No non-nullified operations in mempool, checking bundles for DID: {}", 748 did 749 ); 750 self.ensure_did_index()?; 751 let index_start = Instant::now(); 752 let did_index = self.did_index.read().unwrap(); 753 let (locations, shard_stats, shard_num, lookup_timings) = did_index 754 .as_ref() 755 .unwrap() 756 .get_did_locations_with_stats(did)?; 757 let index_time = index_start.elapsed(); 758 log::debug!( 759 "[Resolve] Bundle index lookup: {} locations found in {:?}", 760 locations.len(), 761 index_time 762 ); 763 764 // Find latest non-nullified operation from bundles 765 let load_start = Instant::now(); 766 let mut latest_operation: Option<(Operation, u32, usize)> = None; 767 let mut latest_time = DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap(); 768 769 for loc in &locations { 770 if !loc.nullified() 771 && let Ok(op) = self.get_operation(loc.bundle() as u32, loc.position() as usize) 772 && let Ok(op_time) = DateTime::parse_from_rfc3339(&op.created_at) 773 && op_time > latest_time 774 { 775 latest_time = op_time; 776 latest_operation = Some((op, loc.bundle() as u32, loc.position() as usize)); 777 } 778 } 779 let load_time = load_start.elapsed(); 780 781 let (operation, bundle_number, position) = latest_operation.ok_or_else(|| { 782 anyhow::anyhow!("DID not found: {} (checked bundles and mempool)", did) 783 })?; 784 785 // Build document from latest bundle operation 786 let document = 787 crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?; 788 789 Ok(ResolveResult { 790 document, 791 operation: operation.clone(), 792 bundle_number, 793 position, 794 mempool_time, 795 mempool_load_time, 796 index_time, 797 load_time, 798 total_time: total_start.elapsed(), 799 locations_found: locations.len(), 800 shard_num, 801 shard_stats: Some(shard_stats), 802 lookup_timings: Some(lookup_timings), 803 }) 804 } 805 806 fn collect_operations_for_locations( 807 &self, 808 locations: &[did_index::OpLocation], 809 ) -> Result<(Vec<OperationWithLocation>, std::time::Duration)> { 810 use std::time::Instant; 811 812 let load_start = Instant::now(); 813 let mut ops_with_loc = Vec::with_capacity(locations.len()); 814 for loc in locations { 815 let bundle_num = loc.bundle() as u32; 816 let position = loc.position() as usize; 817 818 match self.get_operation(bundle_num, position) { 819 Ok(op) => { 820 ops_with_loc.push(OperationWithLocation { 821 operation: op, 822 bundle: bundle_num, 823 position, 824 nullified: loc.nullified(), 825 }); 826 } 827 Err(e) => { 828 log::warn!( 829 "Failed to load operation at bundle {} position {}: {}", 830 bundle_num, 831 position, 832 e 833 ); 834 } 835 } 836 } 837 838 ops_with_loc.sort_by_key(|owl| bundle_position_to_global(owl.bundle, owl.position)); 839 840 Ok((ops_with_loc, load_start.elapsed())) 841 } 842 843 /// Resolve multiple DIDs to their operations (bundles + mempool) 844 /// 845 /// Returns a map of DID → operations, without location metadata or stats. 846 pub fn batch_resolve_dids(&self, dids: Vec<String>) -> Result<HashMap<String, Vec<Operation>>> { 847 let mut results = HashMap::new(); 848 849 for did in dids { 850 let result = self.get_did_operations(&did, false, false)?; 851 results.insert(did, result.operations); 852 } 853 854 Ok(results) 855 } 856 857 // === Query/Export === 858 /// Execute a query over bundles with optional filters and modes 859 /// 860 /// Increments internal stats and returns a `QueryIterator` that yields 861 /// serialized records matching the query specification. 862 pub fn query(&self, spec: QuerySpec) -> QueryIterator { 863 self.stats.write().unwrap().queries_executed += 1; 864 QueryIterator::new(Arc::new(self.clone_for_arc()), spec) 865 } 866 867 /// Create an export iterator for streaming results to a sink 868 /// 869 /// Supports JSONL format. 870 pub fn export(&self, spec: ExportSpec) -> ExportIterator { 871 ExportIterator::new(Arc::new(self.clone_for_arc()), spec) 872 } 873 874 /// Export results to a provided writer factory and return statistics 875 /// 876 /// The `writer_fn` is invoked to obtain a fresh `Write` for streaming. 877 pub fn export_to_writer<F>(&self, spec: ExportSpec, mut writer_fn: F) -> Result<ExportStats> 878 where 879 F: FnMut() -> Box<dyn Write>, 880 { 881 let mut writer = writer_fn(); 882 let mut stats = ExportStats::default(); 883 884 for item in self.export(spec) { 885 let data = item?; 886 writer.write_all(data.as_bytes())?; 887 writer.write_all(b"\n")?; 888 stats.records_written += 1; 889 stats.bytes_written += data.len() as u64 + 1; 890 } 891 892 Ok(stats) 893 } 894 895 // === Verification === 896 /// Verify a single bundle's integrity and metadata 897 pub fn verify_bundle(&self, num: u32, spec: VerifySpec) -> Result<VerifyResult> { 898 let index = self.index.read().unwrap(); 899 let metadata = index 900 .get_bundle(num) 901 .ok_or_else(|| anyhow::anyhow!("Bundle {} not in index", num))?; 902 903 verification::verify_bundle(&self.directory, metadata, spec) 904 } 905 906 /// Verify chain linkage and optional parent relationships across bundles 907 pub fn verify_chain(&self, spec: ChainVerifySpec) -> Result<ChainVerifyResult> { 908 verification::verify_chain(&self.directory, &self.index.read().unwrap(), spec) 909 } 910 911 // === Multi-info === 912 /// Get consolidated bundle information with optional operation and size details 913 pub fn get_bundle_info(&self, num: u32, flags: InfoFlags) -> Result<BundleInfo> { 914 let index = self.index.read().unwrap(); 915 let metadata = index 916 .get_bundle(num) 917 .ok_or_else(|| anyhow::anyhow!("Bundle {} not found", num))?; 918 919 let mut info = BundleInfo { 920 metadata: metadata.clone(), 921 exists: constants::bundle_path(&self.directory, num).exists(), 922 operations: None, 923 size_info: None, 924 }; 925 926 if flags.include_operations { 927 let result = self.load_bundle(num, LoadOptions::default())?; 928 info.operations = Some(result.operations); 929 } 930 931 if flags.include_size_info { 932 info.size_info = Some(SizeInfo { 933 compressed: metadata.compressed_size, 934 uncompressed: metadata.uncompressed_size, 935 }); 936 } 937 938 Ok(info) 939 } 940 941 // === Rollback === 942 /// Plan a rollback to a target bundle and report estimated impact 943 pub fn rollback_plan(&self, spec: RollbackSpec) -> Result<RollbackPlan> { 944 let affected_bundles: Vec<u32> = (spec.target_bundle..=self.get_last_bundle()).collect(); 945 946 let mut affected_operations = 0; 947 let mut affected_dids = std::collections::HashSet::new(); 948 949 for bundle_num in &affected_bundles { 950 if let Ok(result) = self.load_bundle(*bundle_num, LoadOptions::default()) { 951 affected_operations += result.operations.len(); 952 for op in result.operations { 953 affected_dids.insert(op.did); 954 } 955 } 956 } 957 958 Ok(RollbackPlan { 959 target_bundle: spec.target_bundle, 960 affected_bundles: affected_bundles.clone(), // Clone here 961 affected_operations, 962 affected_dids: affected_dids.len(), 963 estimated_time_ms: affected_bundles.len() as u64 * 10, 964 }) 965 } 966 967 /// Execute a rollback to the target bundle, optionally as a dry run 968 pub fn rollback(&self, spec: RollbackSpec) -> Result<RollbackResult> { 969 let plan = self.rollback_plan(spec.clone())?; 970 971 if spec.dry_run { 972 return Ok(RollbackResult { 973 success: true, 974 bundles_removed: 0, 975 plan: Some(plan), 976 }); 977 } 978 979 for bundle_num in &plan.affected_bundles { 980 let path = constants::bundle_path(&self.directory, *bundle_num); 981 if path.exists() { 982 std::fs::remove_file(path)?; 983 } 984 } 985 986 let mut index = self.index.write().unwrap(); 987 index.last_bundle = spec.target_bundle; 988 index 989 .bundles 990 .retain(|b| b.bundle_number <= spec.target_bundle); 991 992 // Use default flush interval for rollback 993 self.build_did_index( 994 crate::constants::DID_INDEX_FLUSH_INTERVAL, 995 None::<fn(u32, u32, u64, u64)>, 996 None, 997 None, 998 )?; 999 1000 Ok(RollbackResult { 1001 success: true, 1002 bundles_removed: plan.affected_bundles.len(), 1003 plan: Some(plan), 1004 }) 1005 } 1006 1007 // === Cache Hints === 1008 pub fn prefetch_bundles(&self, _nums: Vec<u32>) -> Result<()> { Ok(()) } 1009 1010 /// Preload specified bundles into the cache for faster subsequent access 1011 pub fn warm_up(&self, _spec: WarmUpSpec) -> Result<()> { Ok(()) } 1012 1013 // === DID Index === 1014 pub fn build_did_index<F>( 1015 &self, 1016 flush_interval: u32, 1017 progress_cb: Option<F>, 1018 num_threads: Option<usize>, 1019 interrupted: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>, 1020 ) -> Result<RebuildStats> 1021 where 1022 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes) 1023 { 1024 use std::time::Instant; 1025 1026 let actual_threads = num_threads.unwrap_or(0); // 0 = auto-detect 1027 1028 let last_bundle = self.get_last_bundle(); 1029 let mut stats = RebuildStats::default(); 1030 1031 // Create new index (this clears any existing index) 1032 let new_index = did_index::Manager::new(self.directory.clone())?; 1033 *self.did_index.write().unwrap() = Some(new_index); 1034 1035 self.ensure_did_index()?; 1036 1037 // Get total uncompressed size for progress tracking 1038 let index = self.index.read().unwrap(); 1039 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect(); 1040 let total_uncompressed_bytes = index.total_uncompressed_size_for_bundles(&bundle_numbers); 1041 drop(index); 1042 1043 eprintln!("\n📦 Building DID Index"); 1044 eprintln!(" Strategy: Streaming (memory-efficient)"); 1045 eprintln!(" Bundles: {}", last_bundle); 1046 if flush_interval > 0 { 1047 if flush_interval == crate::constants::DID_INDEX_FLUSH_INTERVAL { 1048 // Default value - show with tuning hint 1049 eprintln!( 1050 " Flush: Every {} bundles (tune with --flush-interval)", 1051 flush_interval 1052 ); 1053 } else { 1054 // Non-default value - show with tuning hint 1055 eprintln!( 1056 " Flush: {} bundles (you can tune with --flush-interval)", 1057 flush_interval 1058 ); 1059 } 1060 } else { 1061 eprintln!(" Flush: Only at end (maximum memory usage)"); 1062 } 1063 eprintln!(); 1064 eprintln!("📊 Stage 1: Processing bundles..."); 1065 1066 let build_start = Instant::now(); 1067 1068 // Call the streaming build method in did_index 1069 let (total_operations, _bundles_processed, stage1_duration, stage2_duration) = { 1070 let did_index_guard = self.did_index.read().unwrap(); 1071 if let Some(ref idx) = *did_index_guard { 1072 idx.build_from_scratch( 1073 &self.directory, 1074 last_bundle, 1075 flush_interval, 1076 progress_cb.map(|cb| { 1077 move |current: u32, total: u32, bytes: u64, stage: Option<String>| { 1078 // Always call the callback - let it handle stage detection 1079 // For stage 1, use bytes tracking; for stage 2, use shard count 1080 if let Some(ref stage_name) = stage { 1081 if stage_name.contains("Stage 2") { 1082 // For consolidation, we don't have byte tracking, so just pass 0 1083 // The progress bar will show shard progress 1084 cb(current, total, 0, total_uncompressed_bytes); 1085 } else { 1086 // Stage 1 or unknown - use bytes 1087 cb(current, total, bytes, total_uncompressed_bytes); 1088 } 1089 } else { 1090 // Fallback for backward compatibility 1091 cb(current, total, bytes, total_uncompressed_bytes); 1092 } 1093 } 1094 }), 1095 actual_threads, 1096 interrupted, 1097 )? 1098 } else { 1099 return Err(anyhow::anyhow!("DID index not initialized")); 1100 } 1101 }; 1102 1103 stats.bundles_processed = last_bundle; 1104 stats.operations_indexed = total_operations; 1105 1106 let total_duration = build_start.elapsed(); 1107 1108 eprintln!("\n"); 1109 eprintln!("✅ Index Build Complete"); 1110 eprintln!( 1111 " Time: {:.1}s (Stage 1: {:.1}s, Stage 2: {:.1}s)", 1112 total_duration.as_secs_f64(), 1113 stage1_duration.as_secs_f64(), 1114 stage2_duration.as_secs_f64() 1115 ); 1116 eprintln!( 1117 " Operations: {}", 1118 crate::format::format_number(total_operations) 1119 ); 1120 1121 // Get final stats 1122 let final_stats = self.get_did_index_stats(); 1123 let total_dids = final_stats 1124 .get("total_dids") 1125 .and_then(|v| v.as_i64()) 1126 .unwrap_or(0); 1127 1128 eprintln!( 1129 " Total DIDs: {}", 1130 crate::format::format_number(total_dids as u64) 1131 ); 1132 1133 Ok(stats) 1134 } 1135 1136 /// Get DID index statistics as a JSON-compatible map 1137 /// 1138 /// Returns keys like `exists`, `total_dids`, `last_bundle`, `delta_segments`, `shard_count` when available. 1139 pub fn get_did_index_stats(&self) -> HashMap<String, serde_json::Value> { 1140 self.ensure_did_index().ok(); // Stats might be called even if index doesn't exist 1141 self.did_index 1142 .read() 1143 .unwrap() 1144 .as_ref() 1145 .map(|idx| idx.get_stats()) 1146 .unwrap_or_default() 1147 } 1148 1149 /// Get DID index stats as struct (legacy format) 1150 pub fn get_did_index_stats_struct(&self) -> DIDIndexStats { 1151 let stats_map = self.get_did_index_stats(); 1152 1153 // Convert to old format 1154 DIDIndexStats { 1155 total_dids: stats_map 1156 .get("total_dids") 1157 .and_then(|v| v.as_i64()) 1158 .unwrap_or(0) as usize, 1159 total_entries: 0, // Not tracked in new version 1160 avg_operations_per_did: 0.0, // Not tracked in new version 1161 } 1162 } 1163 1164 pub fn get_did_index(&self) -> Arc<RwLock<Option<did_index::Manager>>> { 1165 Arc::clone(&self.did_index) 1166 } 1167 1168 /// Verify DID index and return detailed result 1169 /// 1170 /// Performs standard integrity check by default. If `full` is true, also rebuilds 1171 /// the index in a temporary directory and compares with the existing index. 1172 /// 1173 /// For server startup checks, call with `full=false` and check `verify_result.missing_base_shards` 1174 /// and `verify_result.missing_delta_segments` to determine if the index is corrupted. 1175 pub fn verify_did_index<F>( 1176 &self, 1177 verbose: bool, 1178 flush_interval: u32, 1179 full: bool, 1180 progress_callback: Option<F>, 1181 ) -> Result<did_index::VerifyResult> 1182 where 1183 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes) 1184 { 1185 self.ensure_did_index()?; 1186 1187 let did_index = self.did_index.read().unwrap(); 1188 let idx = did_index 1189 .as_ref() 1190 .ok_or_else(|| anyhow::anyhow!("DID index not initialized"))?; 1191 1192 let last_bundle = self.get_last_bundle(); 1193 let mut verify_result = idx.verify_integrity(last_bundle)?; 1194 1195 // If full verification requested, rebuild and compare 1196 if full { 1197 // Adapt callback for build_from_scratch which expects Option<String> as 4th param 1198 let build_callback = progress_callback.map(|cb| { 1199 move |current: u32, total: u32, bytes: u64, _stage: Option<String>| { 1200 cb(current, total, bytes, bytes); // Use bytes as total_bytes for now 1201 } 1202 }); 1203 let rebuild_result = idx.verify_full( 1204 self.directory(), 1205 last_bundle, 1206 verbose, 1207 flush_interval, 1208 build_callback, 1209 )?; 1210 verify_result.errors += rebuild_result.errors; 1211 verify_result 1212 .error_categories 1213 .extend(rebuild_result.error_categories); 1214 } 1215 1216 Ok(verify_result) 1217 } 1218 1219 /// Repair DID index - intelligently rebuilds or updates as needed 1220 pub fn repair_did_index<F>( 1221 &self, 1222 num_threads: usize, 1223 flush_interval: u32, 1224 progress_callback: Option<F>, 1225 ) -> Result<did_index::RepairResult> 1226 where 1227 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes) 1228 { 1229 self.ensure_did_index()?; 1230 1231 let last_bundle = self.get_last_bundle(); 1232 1233 // Create bundle loader closure 1234 let bundle_loader = |bundle_num: u32| -> Result<Vec<(String, bool)>> { 1235 let result = self.load_bundle(bundle_num, LoadOptions::default())?; 1236 Ok(result 1237 .operations 1238 .iter() 1239 .map(|op| (op.did.clone(), op.nullified)) 1240 .collect()) 1241 }; 1242 1243 let mut did_index = self.did_index.write().unwrap(); 1244 let idx = did_index 1245 .as_mut() 1246 .ok_or_else(|| anyhow::anyhow!("DID index not initialized"))?; 1247 1248 let mut repair_result = idx.repair(last_bundle, bundle_loader)?; 1249 1250 // If repair indicates full rebuild is needed, do it 1251 if repair_result.repaired && repair_result.bundles_processed == 0 { 1252 drop(did_index); 1253 1254 // Adapt callback signature for build_did_index 1255 let build_callback = progress_callback.map(|cb| { 1256 move |current: u32, total: u32, bytes: u64, total_bytes: u64| { 1257 cb(current, total, bytes, total_bytes); 1258 } 1259 }); 1260 self.build_did_index(flush_interval, build_callback, Some(num_threads), None)?; 1261 1262 repair_result.bundles_processed = last_bundle; 1263 } 1264 1265 Ok(repair_result) 1266 } 1267 1268 // === Observability === 1269 pub fn get_stats(&self) -> ManagerStats { 1270 self.stats.read().unwrap().clone() 1271 } 1272 1273 pub fn clear_caches(&self) { 1274 self.stats.write().unwrap().cache_hits = 0; 1275 self.stats.write().unwrap().cache_misses = 0; 1276 } 1277 1278 // === Mempool Management === 1279 1280 /// Check if the mempool is loaded (does not load it) 1281 /// 1282 /// Returns `Ok(())` if mempool is loaded, error otherwise. 1283 pub fn get_mempool(&self) -> Result<()> { 1284 let mempool_guard = self.mempool.read().unwrap(); 1285 if mempool_guard.is_some() { 1286 Ok(()) 1287 } else { 1288 anyhow::bail!("Mempool not loaded. Call load_mempool() first.") 1289 } 1290 } 1291 1292 /// Explicitly load mempool from disk (or create empty if file doesn't exist) 1293 /// 1294 /// Intended for initialization/preload, not lazy loading. 1295 pub fn load_mempool(&self) -> Result<()> { 1296 // Check if already loaded 1297 { 1298 let mempool_guard = self.mempool.read().unwrap(); 1299 if mempool_guard.is_some() { 1300 return Ok(()); // Already loaded 1301 } 1302 } 1303 1304 // Acquire write lock to load 1305 let mut mempool_guard = self.mempool.write().unwrap(); 1306 1307 // Double-check after acquiring write lock 1308 if mempool_guard.is_some() { 1309 return Ok(()); 1310 } 1311 1312 // Load mempool 1313 let last_bundle = self.get_last_bundle(); 1314 let target_bundle = last_bundle + 1; 1315 1316 // Get min timestamp from last bundle's last operation 1317 let min_timestamp = self.get_last_bundle_timestamp()?; 1318 1319 // Mempool::new will check if file exists and load it if it does 1320 // If file doesn't exist, it creates an empty mempool 1321 match mempool::Mempool::new( 1322 &self.directory, 1323 target_bundle, 1324 min_timestamp, 1325 *self.verbose.lock().unwrap(), 1326 ) { 1327 Ok(mp) => { 1328 // Mempool loaded (either from file or empty) 1329 *mempool_guard = Some(mp); 1330 *self.mempool_checked.write().unwrap() = true; 1331 } 1332 Err(e) => { 1333 // Mempool file doesn't exist or error loading 1334 // Mark as checked so we don't try again 1335 *self.mempool_checked.write().unwrap() = true; 1336 // Return error only if it's not a "file not found" type error 1337 if e.to_string().contains("No such file") || e.to_string().contains("not found") { 1338 // File doesn't exist, that's fine - just return Ok with None mempool 1339 return Ok(()); 1340 } 1341 return Err(e); 1342 } 1343 } 1344 1345 Ok(()) 1346 } 1347 1348 /// Get mempool statistics including counts and time bounds 1349 pub fn get_mempool_stats(&self) -> Result<mempool::MempoolStats> { 1350 let mempool_guard = self.mempool.read().unwrap(); 1351 1352 match mempool_guard.as_ref() { 1353 Some(mp) => Ok(mp.stats()), 1354 None => { 1355 // Return empty stats if no mempool 1356 let last_bundle = self.get_last_bundle(); 1357 let min_timestamp = self.get_last_bundle_timestamp()?; 1358 Ok(mempool::MempoolStats { 1359 count: 0, 1360 can_create_bundle: false, 1361 target_bundle: last_bundle + 1, 1362 min_timestamp, 1363 validated: false, 1364 first_time: None, 1365 last_time: None, 1366 size_bytes: None, 1367 did_count: None, 1368 }) 1369 } 1370 } 1371 } 1372 1373 /// Get all operations currently in the mempool 1374 pub fn get_mempool_operations(&self) -> Result<Vec<Operation>> { 1375 let mempool_guard = self.mempool.read().unwrap(); 1376 1377 match mempool_guard.as_ref() { 1378 Some(mp) => Ok(mp.get_operations().to_vec()), 1379 None => Ok(Vec::new()), 1380 } 1381 } 1382 1383 /// Clear all mempool data and remove on-disk mempool files 1384 pub fn clear_mempool(&self) -> Result<()> { 1385 let mut mempool_guard = self.mempool.write().unwrap(); 1386 1387 if let Some(mp) = mempool_guard.as_mut() { 1388 mp.clear(); 1389 mp.save()?; 1390 } 1391 1392 // Also delete all mempool files to prevent stale data from previous bundles 1393 if let Ok(entries) = std::fs::read_dir(&self.directory) { 1394 for entry in entries.flatten() { 1395 if let Some(name) = entry.file_name().to_str() 1396 && name.starts_with(constants::MEMPOOL_FILE_PREFIX) 1397 && name.ends_with(".jsonl") 1398 { 1399 let _ = std::fs::remove_file(entry.path()); 1400 } 1401 } 1402 } 1403 1404 Ok(()) 1405 } 1406 1407 /// Add operations to mempool, returning number added 1408 /// 1409 /// Mempool must be loaded first (call `load_mempool()`). 1410 pub fn add_to_mempool(&self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> { 1411 self.get_mempool()?; 1412 let mut mempool_guard = self.mempool.write().unwrap(); 1413 if let Some(mp) = mempool_guard.as_mut() { 1414 let result = if collect_cids { mp.add_and_collect_cids(ops)? } else { (mp.add(ops)?, Vec::new()) }; 1415 mp.save_if_needed()?; 1416 Ok(result) 1417 } else { 1418 anyhow::bail!("Mempool not initialized") 1419 } 1420 } 1421 1422 /// Get the last bundle's last operation timestamp 1423 fn get_last_bundle_timestamp(&self) -> Result<DateTime<Utc>> { 1424 let last_bundle = self.get_last_bundle(); 1425 1426 if last_bundle == 0 { 1427 // No bundles yet, use epoch 1428 return Ok(DateTime::from_timestamp(0, 0).unwrap()); 1429 } 1430 1431 // Load last bundle and get last operation's timestamp 1432 let result = self.load_bundle(last_bundle, LoadOptions::default())?; 1433 1434 if let Some(last_op) = result.operations.last() { 1435 let timestamp = DateTime::parse_from_rfc3339(&last_op.created_at)?.with_timezone(&Utc); 1436 Ok(timestamp) 1437 } else { 1438 // Bundle is empty (shouldn't happen), use epoch 1439 Ok(DateTime::from_timestamp(0, 0).unwrap()) 1440 } 1441 } 1442 1443 // === Sync Operations === 1444 1445 /// Validate and clean repository state before sync 1446 fn validate_sync_state(&self) -> Result<()> { 1447 let last_bundle = self.get_last_bundle(); 1448 let next_bundle_num = last_bundle + 1; 1449 1450 // Check for and delete mempool files for already-completed bundles 1451 let mut found_stale_files = false; 1452 if let Ok(entries) = std::fs::read_dir(&self.directory) { 1453 for entry in entries.flatten() { 1454 if let Some(name) = entry.file_name().to_str() 1455 && name.starts_with(constants::MEMPOOL_FILE_PREFIX) 1456 && name.ends_with(".jsonl") 1457 { 1458 // Extract bundle number from filename: plc_mempool_NNNNNN.jsonl 1459 if let Some(num_str) = name 1460 .strip_prefix(constants::MEMPOOL_FILE_PREFIX) 1461 .and_then(|s| s.strip_suffix(".jsonl")) 1462 && let Ok(bundle_num) = num_str.parse::<u32>() 1463 { 1464 // Delete mempool files for completed bundles or way future bundles 1465 if bundle_num <= last_bundle || bundle_num > next_bundle_num { 1466 log::warn!("Removing stale mempool file for bundle {:06}", bundle_num); 1467 let _ = std::fs::remove_file(entry.path()); 1468 found_stale_files = true; 1469 } 1470 } 1471 } 1472 } 1473 } 1474 1475 if found_stale_files { 1476 log::info!("Cleaned up stale mempool files"); 1477 } 1478 1479 let mempool_stats = self.get_mempool_stats()?; 1480 1481 if mempool_stats.count == 0 { 1482 return Ok(()); // Empty mempool is always valid 1483 } 1484 1485 // Check if mempool operations are for the correct bundle 1486 let mempool_ops = self.get_mempool_operations()?; 1487 if mempool_ops.is_empty() { 1488 return Ok(()); 1489 } 1490 1491 // Get the last operation from the previous bundle 1492 let last_bundle_time = if next_bundle_num > 1 1493 && let Ok(last_bundle_result) = 1494 self.load_bundle(next_bundle_num - 1, LoadOptions::default()) 1495 { 1496 last_bundle_result.operations.last().and_then(|last_op| { 1497 chrono::DateTime::parse_from_rfc3339(&last_op.created_at) 1498 .ok() 1499 .map(|dt| dt.with_timezone(&chrono::Utc)) 1500 }) 1501 } else { 1502 None 1503 }; 1504 1505 // Special case: When creating the first bundle (next_bundle_num == 1, meaning 1506 // last_bundle == 0, i.e., empty repository), any existing mempool is likely stale 1507 // from a previous sync attempt. Clear it to start fresh from the beginning. 1508 if next_bundle_num == 1 && mempool_stats.count > 0 { 1509 log::warn!( 1510 "Starting first bundle (empty repository), but mempool has {} operations", 1511 mempool_stats.count 1512 ); 1513 if let Some(first_time) = mempool_stats.first_time { 1514 log::warn!( 1515 "Mempool operations start at: {}", 1516 first_time.format("%Y-%m-%d %H:%M:%S") 1517 ); 1518 } 1519 log::warn!("Clearing mempool to start fresh from the beginning..."); 1520 self.clear_mempool()?; 1521 return Ok(()); 1522 } 1523 1524 // Check if mempool operations are chronologically valid relative to last bundle 1525 if let Some(last_time) = last_bundle_time 1526 && let Some(first_mempool_time) = mempool_stats.first_time 1527 { 1528 // Case 1: Mempool operations are BEFORE the last bundle (definitely stale) 1529 if first_mempool_time < last_time { 1530 log::warn!("Detected stale mempool data (operations before last bundle)"); 1531 log::warn!( 1532 "First mempool op: {}, Last bundle op: {}", 1533 first_mempool_time.format("%Y-%m-%d %H:%M:%S"), 1534 last_time.format("%Y-%m-%d %H:%M:%S") 1535 ); 1536 log::warn!("Clearing mempool to start fresh..."); 1537 self.clear_mempool()?; 1538 return Ok(()); 1539 } 1540 1541 // Case 2: Mempool operations are slightly after last bundle, but way too close 1542 // This indicates they're from a previous failed attempt at this bundle 1543 // BUT: Only clear if the mempool file is old (modified > 1 hour ago) 1544 // If it's recent, it might be a legitimate resume of a slow sync 1545 let time_diff = first_mempool_time.signed_duration_since(last_time); 1546 if time_diff < chrono::Duration::seconds(constants::MIN_BUNDLE_CREATION_INTERVAL_SECS) 1547 && mempool_stats.count < constants::BUNDLE_SIZE 1548 { 1549 // Check mempool file modification time 1550 let mempool_filename = format!( 1551 "{}{:06}.jsonl", 1552 constants::MEMPOOL_FILE_PREFIX, 1553 next_bundle_num 1554 ); 1555 let mempool_path = self.directory.join(mempool_filename); 1556 1557 let is_stale = if let Ok(metadata) = std::fs::metadata(&mempool_path) { 1558 if let Ok(modified) = metadata.modified() { 1559 let modified_time = std::time::SystemTime::now() 1560 .duration_since(modified) 1561 .unwrap_or(std::time::Duration::from_secs(0)); 1562 modified_time > std::time::Duration::from_secs(3600) // 1 hour 1563 } else { 1564 false // Can't get modified time, assume not stale 1565 } 1566 } else { 1567 false // File doesn't exist, assume not stale 1568 }; 1569 1570 if is_stale { 1571 log::warn!( 1572 "Detected potentially stale mempool data (too close to last bundle timestamp)" 1573 ); 1574 log::warn!( 1575 "Time difference: {}s, Operations: {}/{}", 1576 time_diff.num_seconds(), 1577 mempool_stats.count, 1578 constants::BUNDLE_SIZE 1579 ); 1580 log::warn!( 1581 "This likely indicates a previous failed sync attempt. Clearing mempool..." 1582 ); 1583 self.clear_mempool()?; 1584 } else if *self.verbose.lock().unwrap() { 1585 log::debug!("Mempool appears recent, allowing resume despite close timestamp"); 1586 } 1587 return Ok(()); 1588 } 1589 } 1590 1591 // Check if mempool has way too many operations (likely from failed previous attempt) 1592 if mempool_stats.count > constants::BUNDLE_SIZE { 1593 log::warn!( 1594 "Mempool has {} operations (expected max {})", 1595 mempool_stats.count, 1596 constants::BUNDLE_SIZE 1597 ); 1598 log::warn!("This indicates a previous sync attempt failed. Clearing mempool..."); 1599 self.clear_mempool()?; 1600 return Ok(()); 1601 } 1602 1603 Ok(()) 1604 } 1605 1606 /// Batch update DID index for a range of bundles (for initial sync optimization) 1607 /// 1608 /// IMPORTANT: This method performs heavy blocking I/O and should be called from async 1609 /// contexts using spawn_blocking to avoid freezing the async runtime (and HTTP server). 1610 pub fn batch_update_did_index( 1611 &self, 1612 start_bundle: u32, 1613 end_bundle: u32, 1614 compact: bool, 1615 ) -> Result<()> { 1616 use std::time::Instant; 1617 1618 if start_bundle > end_bundle { 1619 return Ok(()); 1620 } 1621 1622 let total_start = Instant::now(); 1623 let bundle_count = end_bundle - start_bundle + 1; 1624 if bundle_count > 10 { 1625 use std::time::Instant; 1626 eprintln!( 1627 "[DID Index] Rebuild triggered for {} bundles ({}{})", 1628 bundle_count, start_bundle, end_bundle 1629 ); 1630 let rebuild_start = Instant::now(); 1631 let _ = self.build_did_index( 1632 crate::constants::DID_INDEX_FLUSH_INTERVAL, 1633 Some( 1634 |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| { 1635 let percent = if total_bytes > 0 { 1636 (bytes_processed as f64 / total_bytes as f64) * 100.0 1637 } else { 1638 0.0 1639 }; 1640 eprintln!( 1641 "[DID Index] Rebuild progress: {}/{} ({:.1}%)", 1642 current, total, percent 1643 ); 1644 }, 1645 ), 1646 None, 1647 None, 1648 )?; 1649 let dur = rebuild_start.elapsed(); 1650 eprintln!("[DID Index] Rebuild complete in {:.1}s", dur.as_secs_f64()); 1651 return Ok(()); 1652 } 1653 1654 if *self.verbose.lock().unwrap() { 1655 log::info!( 1656 "Batch updating DID index for bundles {:06} to {:06}... ({} bundles)", 1657 start_bundle, 1658 end_bundle, 1659 bundle_count 1660 ); 1661 } 1662 1663 // Process bundles incrementally (avoid loading all into memory) 1664 let load_start = Instant::now(); 1665 let mut total_operations = 0usize; 1666 let mut bundles_processed = 0usize; 1667 1668 // Update DID index for each bundle as we load it (memory efficient) 1669 self.ensure_did_index()?; 1670 let update_start = Instant::now(); 1671 for bundle_num in start_bundle..=end_bundle { 1672 if let Ok(result) = self.load_bundle(bundle_num, LoadOptions::default()) { 1673 total_operations += result.operations.len(); 1674 let operations: Vec<(String, bool)> = result 1675 .operations 1676 .iter() 1677 .map(|op| (op.did.clone(), op.nullified)) 1678 .collect(); 1679 1680 // Process immediately instead of accumulating 1681 let _ = self 1682 .did_index 1683 .write() 1684 .unwrap() 1685 .as_mut() 1686 .unwrap() 1687 .update_for_bundle(bundle_num, operations)?; 1688 bundles_processed += 1; 1689 } 1690 } 1691 let load_duration = load_start.elapsed(); 1692 let update_duration = update_start.elapsed(); 1693 1694 if bundles_processed == 0 { 1695 return Ok(()); 1696 } 1697 1698 log::debug!( 1699 "[Batch DID Index] Processed {} bundles ({} operations) in {:.3}s ({:.0} ops/sec)", 1700 bundles_processed, 1701 total_operations, 1702 update_duration.as_secs_f64(), 1703 total_operations as f64 / update_duration.as_secs_f64() 1704 ); 1705 1706 // Optionally compact all shards immediately to avoid leaving delta segments 1707 if compact { 1708 let idx_guard = self.did_index.read().unwrap(); 1709 if let Some(idx) = idx_guard.as_ref() { 1710 idx.compact_pending_segments(None)?; 1711 } 1712 } 1713 1714 let total_duration = total_start.elapsed(); 1715 1716 if *self.verbose.lock().unwrap() { 1717 log::info!( 1718 "✓ DID index updated for bundles {:06} to {:06} in {:.3}s (load={:.1}s, update={:.1}s, {:.0} ops/sec overall)", 1719 start_bundle, 1720 end_bundle, 1721 total_duration.as_secs_f64(), 1722 load_duration.as_secs_f64(), 1723 update_duration.as_secs_f64(), 1724 total_operations as f64 / total_duration.as_secs_f64() 1725 ); 1726 } 1727 1728 Ok(()) 1729 } 1730 1731 /// Async wrapper for batch_update_did_index that runs in a blocking task 1732 /// 1733 /// This prevents blocking the async runtime (and HTTP server) during heavy I/O operations. 1734 pub async fn batch_update_did_index_async( 1735 &self, 1736 start_bundle: u32, 1737 end_bundle: u32, 1738 compact: bool, 1739 ) -> Result<()> { 1740 let manager = self.clone_for_arc(); 1741 1742 // First perform the batch update in a blocking task 1743 let _ = tokio::task::spawn_blocking(move || { 1744 manager.batch_update_did_index(start_bundle, end_bundle, compact) 1745 }) 1746 .await 1747 .map_err(|e| anyhow::anyhow!("Batch DID index update task failed: {}", e))?; 1748 1749 Ok(()) 1750 } 1751 1752 /// Fetch and save next bundle from PLC directory 1753 /// DID index is updated on every bundle (fast with delta segments) 1754 pub async fn sync_next_bundle( 1755 &self, 1756 client: &crate::plc_client::PLCClient, 1757 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 1758 update_did_index: bool, 1759 fetch_log: bool, 1760 safety_lag: Option<std::time::Duration>, 1761 ) -> Result<SyncResult> { 1762 use crate::sync::{get_boundary_cids, strip_boundary_duplicates}; 1763 use std::time::Instant; 1764 1765 // Validate repository state before starting 1766 self.validate_sync_state()?; 1767 1768 let next_bundle_num = self.get_last_bundle() + 1; 1769 1770 // ALWAYS get boundaries from last bundle initially 1771 let (mut after_time, mut prev_boundary) = if next_bundle_num > 1 { 1772 let last = self.load_bundle( 1773 next_bundle_num - 1, 1774 LoadOptions { 1775 cache: false, 1776 decompress: true, 1777 filter: None, 1778 limit: None, 1779 }, 1780 )?; 1781 let boundary = get_boundary_cids(&last.operations); 1782 let cursor = last 1783 .operations 1784 .last() 1785 .map(|op| op.created_at.clone()) 1786 .unwrap_or_default(); 1787 1788 if *self.verbose.lock().unwrap() { 1789 log::info!( 1790 "Loaded {} boundary CIDs from bundle {:06} (at {})", 1791 boundary.len(), 1792 next_bundle_num - 1, 1793 cursor 1794 ); 1795 } 1796 1797 (cursor, boundary) 1798 } else { 1799 ("1970-01-01T00:00:00Z".to_string(), HashSet::new()) 1800 }; 1801 1802 // If mempool has operations, update cursor AND boundaries from mempool 1803 // (mempool operations already had boundary dedup applied when they were added) 1804 let mempool_stats = self.get_mempool_stats()?; 1805 if mempool_stats.count > 0 1806 && let Some(last_time) = mempool_stats.last_time 1807 { 1808 if *self.verbose.lock().unwrap() { 1809 log::debug!( 1810 "Mempool has {} ops, resuming from {}", 1811 mempool_stats.count, 1812 last_time.format("%Y-%m-%dT%H:%M:%S") 1813 ); 1814 } 1815 after_time = last_time.to_rfc3339(); 1816 1817 // Calculate boundaries from MEMPOOL for next fetch 1818 let mempool_ops = self.get_mempool_operations()?; 1819 if !mempool_ops.is_empty() { 1820 prev_boundary = get_boundary_cids(&mempool_ops); 1821 if *self.verbose.lock().unwrap() { 1822 log::info!("Using {} boundary CIDs from mempool", prev_boundary.len()); 1823 } 1824 } 1825 } 1826 1827 log::debug!( 1828 "Preparing bundle {:06} (mempool: {} ops)...", 1829 next_bundle_num, 1830 mempool_stats.count 1831 ); 1832 log::debug!( 1833 "Starting cursor: {}", 1834 if after_time.is_empty() || after_time == "1970-01-01T00:00:00Z" { 1835 "" 1836 } else { 1837 &after_time 1838 } 1839 ); 1840 1841 if !prev_boundary.is_empty() && *self.verbose.lock().unwrap() && mempool_stats.count == 0 { 1842 log::info!( 1843 " Starting with {} boundary CIDs from previous bundle", 1844 prev_boundary.len() 1845 ); 1846 } 1847 1848 // Ensure mempool is loaded (load if needed) 1849 self.load_mempool()?; 1850 1851 // Fetch until we have 10,000 operations 1852 let mut fetch_num = 0; 1853 let mut total_fetched = 0; 1854 let mut total_dupes = 0; 1855 let mut total_boundary_dupes = 0; 1856 let fetch_start = Instant::now(); 1857 let mut caught_up = false; 1858 const MAX_ATTEMPTS: usize = 50; 1859 let mut total_wait = std::time::Duration::from_secs(0); 1860 let mut total_http = std::time::Duration::from_secs(0); 1861 1862 // Cutoff time will be calculated per-request based on server time 1863 // (removed static cutoff calculation) 1864 1865 while fetch_num < MAX_ATTEMPTS { 1866 let stats = self.get_mempool_stats()?; 1867 1868 if stats.count >= constants::BUNDLE_SIZE { 1869 break; 1870 } 1871 1872 fetch_num += 1; 1873 let needed = constants::BUNDLE_SIZE - stats.count; 1874 1875 // Smart batch sizing - request more than exact amount to account for duplicates 1876 let request_count = match needed { 1877 n if n <= 50 => 50, 1878 n if n <= 100 => 100, 1879 n if n <= 500 => 200, 1880 _ => 1000, 1881 }; 1882 1883 if *self.verbose.lock().unwrap() { 1884 log::info!( 1885 " Fetch #{}: requesting {} (need {} more, have {}/{})", 1886 fetch_num, 1887 request_count, 1888 needed, 1889 stats.count, 1890 constants::BUNDLE_SIZE 1891 ); 1892 } 1893 1894 let fetch_op_start = Instant::now(); 1895 if let Some(ref rx) = shutdown_rx 1896 && *rx.borrow() 1897 { 1898 anyhow::bail!("Shutdown requested"); 1899 } 1900 let (plc_ops, wait_dur, http_dur, raw_capture_opt, server_time) = if fetch_log { 1901 if let Some(rx) = shutdown_rx.clone() { 1902 let (ops, w, h, capture_opt, st) = client 1903 .fetch_operations(&after_time, request_count, Some(rx), true) 1904 .await?; 1905 (ops, w, h, capture_opt, st) 1906 } else { 1907 let (ops, w, h, capture_opt, st) = client 1908 .fetch_operations(&after_time, request_count, None, true) 1909 .await?; 1910 (ops, w, h, capture_opt, st) 1911 } 1912 } else { 1913 if let Some(rx) = shutdown_rx.clone() { 1914 let (ops, w, h, _, st) = client 1915 .fetch_operations(&after_time, request_count, Some(rx), false) 1916 .await?; 1917 (ops, w, h, None, st) 1918 } else { 1919 let (ops, w, h, _, st) = 1920 client.fetch_operations(&after_time, request_count, None, false).await?; 1921 (ops, w, h, None, st) 1922 } 1923 }; 1924 total_wait += wait_dur; 1925 total_http += http_dur; 1926 1927 let fetched_count = plc_ops.len(); 1928 1929 // Check for incomplete batch (indicates caught up) 1930 let got_incomplete_batch = fetched_count > 0 && fetched_count < request_count; 1931 1932 if plc_ops.is_empty() || got_incomplete_batch { 1933 caught_up = true; 1934 if *self.verbose.lock().unwrap() && fetch_num > 0 { 1935 log::debug!("Caught up to latest PLC data"); 1936 } 1937 if plc_ops.is_empty() { 1938 break; 1939 } 1940 } 1941 1942 total_fetched += fetched_count; 1943 1944 // Calculate cutoff time based on server time if available, otherwise local time 1945 let cutoff_time = if let Some(lag) = safety_lag { 1946 let base_time = server_time.unwrap_or_else(chrono::Utc::now); 1947 let cutoff = base_time - chrono::Duration::from_std(lag).unwrap_or(chrono::Duration::seconds(0)); 1948 1949 // Only log if we're using server time (to avoid spamming logs) or if verbose 1950 if *self.verbose.lock().unwrap() { 1951 let source = if server_time.is_some() { "server" } else { "local" }; 1952 log::debug!( 1953 "Safety lag cutoff: {} (source: {}, lag: {:?})", 1954 cutoff.to_rfc3339(), 1955 source, 1956 lag 1957 ); 1958 } 1959 Some(cutoff) 1960 } else { 1961 None 1962 }; 1963 1964 // Convert to operations 1965 let ops_pre_raw: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect(); 1966 1967 // Apply safety lag filtering 1968 let (ops_pre, filtered_count) = if let Some(cutoff) = cutoff_time { 1969 let mut kept = Vec::with_capacity(ops_pre_raw.len()); 1970 let mut filtered = 0; 1971 for op in ops_pre_raw { 1972 if let Ok(op_time) = chrono::DateTime::parse_from_rfc3339(&op.created_at) { 1973 if op_time <= cutoff { 1974 kept.push(op); 1975 } else { 1976 filtered += 1; 1977 } 1978 } else { 1979 // If we can't parse the time, keep it (safe default? or unsafe?) 1980 // Keeping it is safer for data availability, but risky for consistency. 1981 // Given the issue is about race conditions, keeping it might be risky. 1982 // But failing to parse is a bigger issue. Let's keep it and log warning. 1983 log::warn!("Failed to parse timestamp for op {}, keeping it", op.did); 1984 kept.push(op); 1985 } 1986 } 1987 (kept, filtered) 1988 } else { 1989 (ops_pre_raw, 0) 1990 }; 1991 1992 if filtered_count > 0 { 1993 if *self.verbose.lock().unwrap() { 1994 log::info!( 1995 " Safety lag: filtered {} operations newer than cutoff", 1996 filtered_count 1997 ); 1998 } 1999 // If we filtered any operations, we must consider ourselves "caught up" 2000 // because we can't proceed past the cutoff time safely. 2001 // We also stop fetching in this cycle. 2002 caught_up = true; 2003 } 2004 2005 let mut all_cids_pre: Vec<String> = Vec::new(); 2006 if fetch_log { 2007 all_cids_pre = ops_pre 2008 .iter() 2009 .filter_map(|op| op.cid.clone()) 2010 .collect(); 2011 } 2012 // Deduplicate against boundary 2013 let before_dedup = ops_pre.len(); 2014 let ops: Vec<Operation> = strip_boundary_duplicates(ops_pre.clone(), &prev_boundary); 2015 let after_dedup = ops.len(); 2016 2017 let boundary_removed = before_dedup - after_dedup; 2018 if boundary_removed > 0 { 2019 total_boundary_dupes += boundary_removed; 2020 if *self.verbose.lock().unwrap() { 2021 log::info!( 2022 " Stripped {} boundary duplicates from fetch", 2023 boundary_removed 2024 ); 2025 } 2026 } 2027 2028 let export_url = if fetch_log { 2029 client.build_export_url(&after_time, request_count) 2030 } else { 2031 String::new() 2032 }; 2033 2034 let mut all_cids: Vec<String> = Vec::new(); 2035 if fetch_log { 2036 all_cids = all_cids_pre; 2037 } 2038 2039 let (added, added_cids) = if !ops.is_empty() { 2040 self.add_to_mempool(ops, fetch_log)? 2041 } else { 2042 (0, Vec::new()) 2043 }; 2044 2045 if fetch_log { 2046 use serde_json::json; 2047 let log_dir = self.directory.join(constants::DID_INDEX_DIR).join("logs"); 2048 let _ = std::fs::create_dir_all(&log_dir); 2049 let log_path = log_dir.join(format!("{:06}.json", next_bundle_num)); 2050 let added_set: std::collections::HashSet<String> = 2051 added_cids.iter().cloned().collect(); 2052 let skipped: Vec<String> = all_cids 2053 .iter() 2054 .filter(|c| !added_set.contains(*c)) 2055 .cloned() 2056 .collect(); 2057 let entry = json!({ 2058 "time": chrono::Utc::now().to_rfc3339(), 2059 "url": export_url, 2060 "count": fetched_count, 2061 "cids": all_cids, 2062 "skipped": skipped, 2063 "http_start": raw_capture_opt.as_ref().map(|c| c.http_start.clone()).unwrap_or_default(), 2064 }); 2065 let mut file = std::fs::OpenOptions::new() 2066 .create(true) 2067 .append(true) 2068 .open(log_path)?; 2069 use std::io::Write; 2070 writeln!(file, "{}", entry.to_string())?; 2071 2072 if let Some(capture) = raw_capture_opt.as_ref() { 2073 let raw_path = log_dir.join(format!("{:06}-{}", next_bundle_num, after_time)); 2074 let mut raw_file = std::fs::OpenOptions::new() 2075 .create(true) 2076 .write(true) 2077 .truncate(true) 2078 .open(raw_path)?; 2079 writeln!(raw_file, "Status: {}", capture.status)?; 2080 for (name, value) in &capture.headers { 2081 writeln!(raw_file, "{}: {}", name, value)?; 2082 } 2083 writeln!(raw_file)?; 2084 write!(raw_file, "{}", capture.body)?; 2085 } 2086 } 2087 2088 let dupes_in_fetch = after_dedup - added; 2089 total_dupes += dupes_in_fetch; 2090 2091 let fetch_duration = fetch_op_start.elapsed(); 2092 let new_stats = self.get_mempool_stats()?; 2093 let ops_per_sec = if fetch_duration.as_secs_f64() > 0.0 { 2094 added as f64 / fetch_duration.as_secs_f64() 2095 } else { 2096 0.0 2097 }; 2098 2099 if *self.verbose.lock().unwrap() { 2100 if boundary_removed > 0 || dupes_in_fetch > 0 { 2101 log::info!( 2102 " → +{} unique ({} dupes, {} boundary) in {:.9}s • Running: {}/{} ({:.0} ops/sec)", 2103 added, 2104 dupes_in_fetch, 2105 boundary_removed, 2106 fetch_duration.as_secs_f64(), 2107 new_stats.count, 2108 constants::BUNDLE_SIZE, 2109 ops_per_sec 2110 ); 2111 } else { 2112 log::info!( 2113 " → +{} unique in {:.9}s • Running: {}/{} ({:.0} ops/sec)", 2114 added, 2115 fetch_duration.as_secs_f64(), 2116 new_stats.count, 2117 constants::BUNDLE_SIZE, 2118 ops_per_sec 2119 ); 2120 } 2121 } 2122 2123 // Update cursor 2124 if let Some(last_time) = new_stats.last_time { 2125 after_time = last_time.to_rfc3339(); 2126 } 2127 2128 // Stop if we got an incomplete batch or made no progress 2129 // Also stop if we filtered operations due to safety lag (caught_up is set above) 2130 if got_incomplete_batch || added == 0 || (filtered_count > 0 && caught_up) { 2131 caught_up = true; 2132 if *self.verbose.lock().unwrap() { 2133 if filtered_count > 0 { 2134 log::debug!("Caught up to safety lag cutoff"); 2135 } else { 2136 log::debug!("Caught up to latest PLC data"); 2137 } 2138 } 2139 break; 2140 } 2141 } 2142 2143 let fetch_total_duration = fetch_start.elapsed(); 2144 let dedup_pct = if total_fetched > 0 { 2145 (total_dupes + total_boundary_dupes) as f64 / total_fetched as f64 * 100.0 2146 } else { 2147 0.0 2148 }; 2149 2150 let final_stats = self.get_mempool_stats()?; 2151 2152 // Bundles must contain exactly BUNDLE_SIZE operations (no partial bundles allowed) 2153 if final_stats.count < constants::BUNDLE_SIZE { 2154 if caught_up { 2155 // Caught up to latest PLC data without enough ops for a full bundle 2156 // Return CaughtUp result instead of error 2157 return Ok(SyncResult::CaughtUp { 2158 next_bundle: next_bundle_num, 2159 mempool_count: final_stats.count, 2160 new_ops: total_fetched - total_dupes - total_boundary_dupes, 2161 fetch_duration_ms: fetch_total_duration.as_millis() as u64, 2162 }); 2163 } else { 2164 anyhow::bail!( 2165 "Insufficient operations: have {}, need exactly {} (max attempts reached)", 2166 final_stats.count, 2167 constants::BUNDLE_SIZE 2168 ); 2169 } 2170 } 2171 2172 if *self.verbose.lock().unwrap() { 2173 log::info!( 2174 " ✓ Collected {} unique ops from {} fetches ({:.1}% dedup)", 2175 final_stats.count, 2176 fetch_num, 2177 dedup_pct 2178 ); 2179 } 2180 2181 // Take operations and create bundle 2182 log::debug!( 2183 "Calling operations.SaveBundle with bundle={}", 2184 next_bundle_num 2185 ); 2186 2187 let operations = { 2188 let mut mempool = self.mempool.write().unwrap(); 2189 let mem = mempool 2190 .as_mut() 2191 .ok_or_else(|| anyhow::anyhow!("Mempool not initialized"))?; 2192 // Take up to BUNDLE_SIZE operations (or all if less) 2193 let count = mem.count().min(constants::BUNDLE_SIZE); 2194 mem.take(count)? 2195 }; 2196 2197 if operations.is_empty() { 2198 anyhow::bail!("No operations to create bundle"); 2199 } 2200 2201 // Bundles must contain exactly BUNDLE_SIZE operations 2202 if operations.len() != constants::BUNDLE_SIZE { 2203 anyhow::bail!( 2204 "Invalid operation count: expected exactly {}, got {}", 2205 constants::BUNDLE_SIZE, 2206 operations.len() 2207 ); 2208 } 2209 2210 log::debug!("SaveBundle SUCCESS, setting bundle fields"); 2211 2212 // CRITICAL: Clear mempool BEFORE saving to ensure atomicity 2213 // If interrupted after this point, the operations are no longer in mempool 2214 // and won't be re-fetched on restart, preventing duplicate/inconsistent bundles. 2215 // If save fails after clearing, we bail out and the operations are lost, 2216 // but this is better than creating bundles with inconsistent content. 2217 self.clear_mempool()?; 2218 2219 // Save bundle to disk with timing breakdown 2220 // Save bundle and update DID index (now fast with delta segments) 2221 let save_start = Instant::now(); 2222 let ( 2223 serialize_time, 2224 compress_time, 2225 hash_time, 2226 did_index_time, 2227 index_write_time, 2228 did_index_compacted, 2229 ) = self 2230 .save_bundle_with_timing(next_bundle_num, operations, update_did_index) 2231 .await?; 2232 let save_duration = save_start.elapsed(); 2233 2234 // Show timing breakdown in verbose mode only 2235 if *self.verbose.lock().unwrap() { 2236 log::debug!( 2237 " Save timing: serialize={:.3}ms, compress={:.3}ms, hash={:.3}ms, did_index={:.3}ms, index_write={:.3}ms, total={:.1}ms", 2238 serialize_time.as_secs_f64() * 1000.0, 2239 compress_time.as_secs_f64() * 1000.0, 2240 hash_time.as_secs_f64() * 1000.0, 2241 did_index_time.as_secs_f64() * 1000.0, 2242 index_write_time.as_secs_f64() * 1000.0, 2243 save_duration.as_secs_f64() * 1000.0 2244 ); 2245 } 2246 2247 log::debug!("Adding bundle {} to index", next_bundle_num); 2248 log::debug!("Index now has {} bundles", next_bundle_num); 2249 log::debug!("Index saved, last bundle = {}", next_bundle_num); 2250 2251 // Get bundle info for display 2252 let (short_hash, age_str, unique_dids, size_bytes) = { 2253 let index = self.index.read().unwrap(); 2254 let bundle_meta = index.get_bundle(next_bundle_num).unwrap(); 2255 // Use chain hash (first 7 chars) for display 2256 let hash = bundle_meta.hash[..7].to_string(); 2257 2258 // Calculate age 2259 let created_time = chrono::DateTime::parse_from_rfc3339(&bundle_meta.start_time) 2260 .unwrap() 2261 .with_timezone(&chrono::Utc); 2262 let now = chrono::Utc::now(); 2263 let age = now.signed_duration_since(created_time); 2264 let age_str = format_age(age); 2265 2266 ( 2267 hash, 2268 age_str, 2269 bundle_meta.did_count, 2270 bundle_meta.compressed_size, 2271 ) 2272 }; 2273 2274 // Get mempool count after clearing (should be 0, but check anyway) 2275 let mempool_count = self.get_mempool_stats().map(|s| s.count).unwrap_or(0); 2276 let total_duration_ms = (fetch_total_duration + save_duration).as_millis() as u64; 2277 let fetch_duration_ms = fetch_total_duration.as_millis() as u64; 2278 2279 // Calculate separate timings: bundle save vs index write/DID index 2280 let (bundle_save_ms, index_ms) = if update_did_index { 2281 ( 2282 (serialize_time + compress_time + hash_time).as_millis() as u64, 2283 (did_index_time + index_write_time).as_millis() as u64, 2284 ) 2285 } else { 2286 ( 2287 (serialize_time + compress_time + hash_time + index_write_time).as_millis() as u64, 2288 0, 2289 ) 2290 }; 2291 2292 // Only log detailed info in verbose mode 2293 if *self.verbose.lock().unwrap() { 2294 log::info!( 2295 "→ Bundle {:06} | {} | fetch: {:.3}s ({} reqs) | {}", 2296 next_bundle_num, 2297 short_hash, 2298 fetch_total_duration.as_secs_f64(), 2299 fetch_num, 2300 age_str 2301 ); 2302 log::debug!( 2303 "Bundle done = {}, finish duration = {:.3}ms", 2304 next_bundle_num, 2305 save_duration.as_secs_f64() * 1000.0 2306 ); 2307 } 2308 2309 Ok(SyncResult::BundleCreated { 2310 bundle_num: next_bundle_num, 2311 mempool_count, 2312 duration_ms: total_duration_ms, 2313 fetch_duration_ms, 2314 bundle_save_ms, 2315 index_ms, 2316 fetch_requests: fetch_num, 2317 hash: short_hash, 2318 age: age_str, 2319 did_index_compacted, 2320 unique_dids, 2321 size_bytes, 2322 fetch_wait_ms: total_wait.as_millis() as u64, 2323 fetch_http_ms: total_http.as_millis() as u64, 2324 }) 2325 } 2326 2327 /// Run single sync cycle 2328 /// 2329 /// If max_bundles is Some(n), stop after syncing n bundles 2330 /// If max_bundles is None, sync until caught up 2331 pub async fn sync_once( 2332 &self, 2333 client: &crate::plc_client::PLCClient, 2334 max_bundles: Option<usize>, 2335 ) -> Result<usize> { 2336 let mut synced = 0; 2337 2338 loop { 2339 match self.sync_next_bundle(client, None, true, false, None).await { 2340 Ok(SyncResult::BundleCreated { .. }) => { 2341 synced += 1; 2342 2343 // Check if we've reached the limit 2344 if let Some(max) = max_bundles 2345 && synced >= max 2346 { 2347 break; 2348 } 2349 } 2350 Ok(SyncResult::CaughtUp { .. }) => { 2351 // Caught up to latest PLC data 2352 break; 2353 } 2354 Err(e) => return Err(e), 2355 } 2356 2357 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 2358 } 2359 2360 Ok(synced) 2361 } 2362 2363 /// Save bundle to disk with compression and index updates (with timing) 2364 async fn save_bundle_with_timing( 2365 &self, 2366 bundle_num: u32, 2367 operations: Vec<Operation>, 2368 update_did_index: bool, 2369 ) -> Result<( 2370 std::time::Duration, 2371 std::time::Duration, 2372 std::time::Duration, 2373 std::time::Duration, 2374 std::time::Duration, 2375 bool, 2376 )> { 2377 use anyhow::Context; 2378 use std::collections::HashSet; 2379 use std::fs::File; 2380 use std::io::Write; 2381 use std::time::Instant; 2382 2383 if operations.is_empty() { 2384 anyhow::bail!("Cannot save empty bundle"); 2385 } 2386 2387 // Extract metadata 2388 let start_time = operations.first().unwrap().created_at.clone(); 2389 let end_time = operations.last().unwrap().created_at.clone(); 2390 let operation_count = operations.len() as u32; 2391 2392 // Count unique DIDs 2393 let unique_dids: HashSet<String> = operations.iter().map(|op| op.did.clone()).collect(); 2394 let did_count = unique_dids.len() as u32; 2395 2396 // Use multi-frame compression for better performance on large bundles 2397 2398 // Compress operations to frames using parallel compression 2399 let compress_result = 2400 crate::bundle_format::compress_operations_to_frames_parallel(&operations)?; 2401 2402 let serialize_time = 2403 std::time::Duration::from_secs_f64(compress_result.serialize_time_ms / 1000.0); 2404 let compress_time = 2405 std::time::Duration::from_secs_f64(compress_result.compress_time_ms / 1000.0); 2406 2407 let uncompressed_size = compress_result.uncompressed_size; 2408 let compressed_size = compress_result.compressed_size; 2409 let frame_count = compress_result.compressed_frames.len(); 2410 let frame_offsets = compress_result.frame_offsets; 2411 let compressed_frames = compress_result.compressed_frames; 2412 2413 // Calculate content hash from uncompressed data 2414 let hash_start = Instant::now(); 2415 let content_hash = { 2416 use sha2::{Digest, Sha256}; 2417 let mut hasher = Sha256::new(); 2418 let mut missing_raw_json = 0; 2419 2420 // Hash all operations in order (reconstructing uncompressed JSONL) 2421 for op in &operations { 2422 let json = if let Some(raw) = &op.raw_json { 2423 raw.clone() 2424 } else { 2425 missing_raw_json += 1; 2426 if missing_raw_json == 1 && *self.verbose.lock().unwrap() { 2427 log::warn!( 2428 "⚠️ Bundle {}: Operation missing raw_json, using re-serialized JSON (may cause hash mismatch!)", 2429 bundle_num 2430 ); 2431 } 2432 sonic_rs::to_string(op)? 2433 }; 2434 hasher.update(json.as_bytes()); 2435 hasher.update(b"\n"); 2436 } 2437 2438 if missing_raw_json > 0 && *self.verbose.lock().unwrap() { 2439 log::warn!( 2440 "⚠️ Bundle {}: {} operations missing raw_json (content hash may be incorrect!)", 2441 bundle_num, 2442 missing_raw_json 2443 ); 2444 } 2445 2446 format!("{:x}", hasher.finalize()) 2447 }; 2448 2449 // Calculate compressed hash - will be calculated after writing the file 2450 // because it needs to include the metadata frame (verification hashes entire file) 2451 // We'll calculate it after the file is written 2452 let hash_time = hash_start.elapsed(); 2453 2454 // Calculate chain hash per spec (Section 6.3) 2455 // Genesis bundle: SHA256("plcbundle:genesis:" + content_hash) 2456 // Subsequent: SHA256(parent_chain_hash + ":" + current_content_hash) 2457 let (parent, chain_hash) = if bundle_num > 1 { 2458 use sha2::{Digest, Sha256}; 2459 let parent_chain_hash = self 2460 .index 2461 .read() 2462 .unwrap() 2463 .get_bundle(bundle_num - 1) 2464 .map(|b| b.hash.clone()) 2465 .unwrap_or_default(); 2466 2467 // Debug logging for hash calculation issues 2468 if parent_chain_hash.is_empty() { 2469 log::warn!( 2470 "⚠️ Bundle {}: Parent bundle {} not found in index! Using empty parent hash.", 2471 bundle_num, 2472 bundle_num - 1 2473 ); 2474 } else if *self.verbose.lock().unwrap() { 2475 log::debug!( 2476 "Bundle {}: Parent hash from bundle {}: {}", 2477 bundle_num, 2478 bundle_num - 1, 2479 &parent_chain_hash[..16] 2480 ); 2481 log::debug!( 2482 "Bundle {}: Content hash: {}", 2483 bundle_num, 2484 &content_hash[..16] 2485 ); 2486 } 2487 2488 let chain_input = format!("{}:{}", parent_chain_hash, content_hash); 2489 let mut hasher = Sha256::new(); 2490 hasher.update(chain_input.as_bytes()); 2491 let hash = format!("{:x}", hasher.finalize()); 2492 2493 if *self.verbose.lock().unwrap() { 2494 log::debug!("Bundle {}: Chain hash: {}", bundle_num, &hash[..16]); 2495 } 2496 2497 (parent_chain_hash, hash) 2498 } else { 2499 // Genesis bundle 2500 use sha2::{Digest, Sha256}; 2501 let chain_input = format!("plcbundle:genesis:{}", content_hash); 2502 let mut hasher = Sha256::new(); 2503 hasher.update(chain_input.as_bytes()); 2504 let hash = format!("{:x}", hasher.finalize()); 2505 2506 (String::new(), hash) 2507 }; 2508 2509 // Get cursor (end_time of previous bundle per spec) 2510 // For the first bundle, cursor is empty string 2511 let cursor = if bundle_num > 1 { 2512 let prev_end_time = self 2513 .index 2514 .read() 2515 .unwrap() 2516 .get_bundle(bundle_num - 1) 2517 .map(|b| b.end_time.clone()) 2518 .unwrap_or_default(); 2519 2520 // Validate cursor matches previous bundle's end_time 2521 if prev_end_time.is_empty() { 2522 log::warn!( 2523 "⚠️ Bundle {}: Previous bundle {} has empty end_time, cursor will be empty", 2524 bundle_num, 2525 bundle_num - 1 2526 ); 2527 } 2528 2529 prev_end_time 2530 } else { 2531 String::new() 2532 }; 2533 2534 // Validate cursor correctness (for non-genesis bundles) 2535 if bundle_num > 1 { 2536 let expected_cursor = { 2537 let index = self.index.read().unwrap(); 2538 index 2539 .get_bundle(bundle_num - 1) 2540 .map(|b| b.end_time.clone()) 2541 .unwrap_or_default() 2542 }; 2543 if cursor != expected_cursor { 2544 anyhow::bail!( 2545 "Cursor validation failed for bundle {}: expected {} (previous bundle end_time), got {}", 2546 bundle_num, 2547 expected_cursor, 2548 cursor 2549 ); 2550 } 2551 } else if !cursor.is_empty() { 2552 anyhow::bail!( 2553 "Cursor validation failed for bundle {} (genesis): cursor should be empty, got {}", 2554 bundle_num, 2555 cursor 2556 ); 2557 } 2558 2559 // Prepare bundle metadata for skippable frame 2560 let bundle_metadata_frame = crate::bundle_format::BundleMetadata { 2561 format: "plcbundle/1.0".to_string(), 2562 bundle_number: bundle_num, 2563 origin: self.index.read().unwrap().origin.clone(), 2564 content_hash: content_hash.clone(), 2565 parent_hash: if !parent.is_empty() { 2566 Some(parent.clone()) 2567 } else { 2568 None 2569 }, 2570 uncompressed_size: Some(uncompressed_size), 2571 compressed_size: Some(compressed_size), 2572 operation_count: operation_count as usize, 2573 did_count: did_count as usize, 2574 start_time: start_time.clone(), 2575 end_time: end_time.clone(), 2576 created_at: chrono::Utc::now().to_rfc3339(), 2577 created_by: constants::created_by(), 2578 frame_count, 2579 frame_size: constants::FRAME_SIZE, 2580 frame_offsets: frame_offsets.clone(), 2581 }; 2582 2583 // Write to disk with metadata skippable frame (move to blocking task to avoid blocking async runtime) 2584 // CRITICAL: We need to calculate compressed_hash from the entire file (including metadata frame) 2585 // because verification hashes the entire file. So we write the file first, then read it back to calculate the hash. 2586 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 2587 let bundle_path_clone = bundle_path.clone(); 2588 let bundle_metadata_frame_clone = bundle_metadata_frame.clone(); 2589 let compressed_frames_clone = compressed_frames.clone(); 2590 2591 // Write file first (metadata frame doesn't contain compressed_hash, so we can write it) 2592 tokio::task::spawn_blocking({ 2593 let bundle_path_clone = bundle_path_clone.clone(); 2594 let bundle_metadata_frame_clone = bundle_metadata_frame_clone.clone(); 2595 let compressed_frames_clone = compressed_frames_clone.clone(); 2596 move || { 2597 let mut file = File::create(&bundle_path_clone).with_context(|| { 2598 format!( 2599 "Failed to create bundle file: {}", 2600 bundle_path_clone.display() 2601 ) 2602 })?; 2603 2604 // Write metadata as skippable frame first 2605 crate::bundle_format::write_metadata_frame(&mut file, &bundle_metadata_frame_clone) 2606 .with_context(|| { 2607 format!( 2608 "Failed to write metadata frame to: {}", 2609 bundle_path_clone.display() 2610 ) 2611 })?; 2612 2613 // Write all compressed frames 2614 for frame in &compressed_frames_clone { 2615 file.write_all(frame).with_context(|| { 2616 format!( 2617 "Failed to write compressed frame to: {}", 2618 bundle_path_clone.display() 2619 ) 2620 })?; 2621 } 2622 file.flush().with_context(|| { 2623 format!( 2624 "Failed to flush bundle file: {}", 2625 bundle_path_clone.display() 2626 ) 2627 })?; 2628 2629 Ok::<(), anyhow::Error>(()) 2630 } 2631 }) 2632 .await 2633 .context("Bundle file write task failed")??; 2634 2635 // Now calculate compressed_hash from the entire file (as verification does) 2636 let compressed_hash = tokio::task::spawn_blocking({ 2637 let bundle_path_clone = bundle_path_clone.clone(); 2638 move || { 2639 use sha2::{Digest, Sha256}; 2640 let file_data = std::fs::read(&bundle_path_clone).with_context(|| { 2641 format!( 2642 "Failed to read bundle file for hash: {}", 2643 bundle_path_clone.display() 2644 ) 2645 })?; 2646 2647 let mut hasher = Sha256::new(); 2648 hasher.update(&file_data); 2649 Ok::<String, anyhow::Error>(format!("{:x}", hasher.finalize())) 2650 } 2651 }) 2652 .await 2653 .context("Compressed hash calculation task failed")??; 2654 2655 if *self.verbose.lock().unwrap() { 2656 log::debug!( 2657 "Saved bundle {} ({} ops, {} DIDs, {} → {} bytes, {:.1}% compression)", 2658 bundle_num, 2659 operation_count, 2660 did_count, 2661 uncompressed_size, 2662 compressed_size, 2663 100.0 * (1.0 - compressed_size as f64 / uncompressed_size as f64) 2664 ); 2665 } 2666 2667 let (did_index_time, did_index_compacted) = if update_did_index { 2668 let did_index_start = Instant::now(); 2669 let did_ops: Vec<(String, bool)> = operations 2670 .iter() 2671 .map(|op| (op.did.clone(), op.nullified)) 2672 .collect(); 2673 2674 self.ensure_did_index()?; 2675 let compacted = self 2676 .did_index 2677 .write() 2678 .unwrap() 2679 .as_mut() 2680 .unwrap() 2681 .update_for_bundle(bundle_num, did_ops)?; 2682 (did_index_start.elapsed(), compacted) 2683 } else { 2684 (std::time::Duration::from_millis(0), false) 2685 }; 2686 2687 // Update main index 2688 let index_write_start = Instant::now(); 2689 let bundle_metadata = crate::index::BundleMetadata { 2690 bundle_number: bundle_num, 2691 start_time, 2692 end_time, 2693 operation_count, 2694 did_count, 2695 hash: chain_hash, // Chain hash per spec 2696 content_hash, 2697 parent, 2698 compressed_hash, 2699 compressed_size, 2700 uncompressed_size, 2701 cursor, 2702 created_at: chrono::Utc::now().to_rfc3339(), 2703 }; 2704 2705 // Add to index 2706 // CRITICAL: Clone index data while holding lock briefly, then release lock 2707 // before doing expensive serialization and file I/O in spawn_blocking 2708 let index_clone = { 2709 let mut index = self.index.write().unwrap(); 2710 index.bundles.push(bundle_metadata); 2711 index.last_bundle = bundle_num; 2712 index.updated_at = chrono::Utc::now().to_rfc3339(); 2713 index.total_size_bytes += compressed_size; 2714 index.total_uncompressed_size_bytes += uncompressed_size; 2715 2716 // Clone the index for serialization outside the lock 2717 // This prevents blocking the async runtime while holding the lock 2718 index.clone() 2719 }; 2720 2721 // Serialize and write index in blocking task to avoid blocking async runtime 2722 // Use Index::save() which does atomic write (temp file + rename) 2723 let directory = self.directory.clone(); 2724 tokio::task::spawn_blocking(move || index_clone.save(directory)) 2725 .await 2726 .context("Index write task failed")??; 2727 let index_write_time = index_write_start.elapsed(); 2728 2729 Ok(( 2730 serialize_time, 2731 compress_time, 2732 hash_time, 2733 did_index_time, 2734 index_write_time, 2735 did_index_compacted, 2736 )) 2737 } 2738 2739 /// Migrate a bundle to multi-frame format 2740 /// 2741 /// This method loads a bundle and re-saves it with multi-frame compression 2742 /// (100 operations per frame) with frame offsets for efficient random access. 2743 /// 2744 /// Returns: (size_diff, new_uncompressed_size, new_compressed_size) 2745 pub fn migrate_bundle(&self, bundle_num: u32) -> Result<(i64, u64, u64)> { 2746 use anyhow::Context; 2747 use std::collections::HashSet; 2748 use std::fs::File; 2749 2750 // Get existing bundle metadata 2751 let meta = self 2752 .get_bundle_metadata(bundle_num)? 2753 .ok_or_else(|| anyhow::anyhow!("Bundle {} not in index", bundle_num))?; 2754 2755 let old_size = meta.compressed_size; 2756 2757 // Load bundle operations 2758 let load_result = self.load_bundle( 2759 bundle_num, 2760 LoadOptions { 2761 decompress: true, 2762 cache: false, 2763 filter: None, 2764 limit: None, 2765 }, 2766 )?; 2767 2768 let operations = load_result.operations; 2769 if operations.is_empty() { 2770 anyhow::bail!("Bundle {} has no operations", bundle_num); 2771 } 2772 2773 // Extract metadata 2774 let start_time = operations.first().unwrap().created_at.clone(); 2775 let end_time = operations.last().unwrap().created_at.clone(); 2776 let operation_count = operations.len() as u32; 2777 2778 // Count unique DIDs 2779 let unique_dids: HashSet<String> = operations.iter().map(|op| op.did.clone()).collect(); 2780 let did_count = unique_dids.len() as u32; 2781 2782 // Compress operations into frames using parallel compression 2783 let frame_result = 2784 crate::bundle_format::compress_operations_to_frames_parallel(&operations)?; 2785 let compressed_size = frame_result.compressed_size; 2786 let uncompressed_size = frame_result.uncompressed_size; 2787 2788 // Calculate hashes using library functions 2789 let content_hash = crate::bundle_format::calculate_content_hash(&operations)?; 2790 2791 // Compressed hash will be calculated after writing the file 2792 // because it needs to include the metadata frame (verification hashes entire file) 2793 2794 // Recalculate chain hash to verify correctness 2795 let (expected_parent, recalculated_chain_hash) = if bundle_num > 1 { 2796 use sha2::{Digest, Sha256}; 2797 let parent_chain_hash = self 2798 .index 2799 .read() 2800 .unwrap() 2801 .get_bundle(bundle_num - 1) 2802 .map(|b| b.hash.clone()) 2803 .unwrap_or_default(); 2804 2805 let chain_input = format!("{}:{}", parent_chain_hash, content_hash); 2806 let mut hasher = Sha256::new(); 2807 hasher.update(chain_input.as_bytes()); 2808 let hash = format!("{:x}", hasher.finalize()); 2809 2810 (parent_chain_hash, hash) 2811 } else { 2812 use sha2::{Digest, Sha256}; 2813 let chain_input = format!("plcbundle:genesis:{}", content_hash); 2814 let mut hasher = Sha256::new(); 2815 hasher.update(chain_input.as_bytes()); 2816 let hash = format!("{:x}", hasher.finalize()); 2817 2818 (String::new(), hash) 2819 }; 2820 2821 // Verify chain hash matches original 2822 if recalculated_chain_hash != meta.hash { 2823 anyhow::bail!( 2824 "Chain hash mismatch in bundle {}: original={}, recalculated={}\n\ 2825 This indicates the original bundle content may be corrupted or the chain was broken.", 2826 bundle_num, 2827 meta.hash, 2828 recalculated_chain_hash 2829 ); 2830 } 2831 2832 // Verify parent hash matches 2833 if expected_parent != meta.parent { 2834 anyhow::bail!( 2835 "Parent hash mismatch in bundle {}: original={}, expected={}\n\ 2836 This indicates the chain linkage is broken.", 2837 bundle_num, 2838 meta.parent, 2839 expected_parent 2840 ); 2841 } 2842 2843 // Use verified hashes from original bundle 2844 let chain_hash = meta.hash.clone(); 2845 let parent = meta.parent.clone(); 2846 2847 // Get cursor (end_time of previous bundle per spec) 2848 // For the first bundle, cursor is empty string 2849 let cursor = if bundle_num > 1 { 2850 let prev_end_time = self 2851 .index 2852 .read() 2853 .unwrap() 2854 .get_bundle(bundle_num - 1) 2855 .map(|b| b.end_time.clone()) 2856 .unwrap_or_default(); 2857 2858 // Validate cursor matches previous bundle's end_time 2859 if prev_end_time.is_empty() { 2860 log::warn!( 2861 "⚠️ Bundle {}: Previous bundle {} has empty end_time, cursor will be empty", 2862 bundle_num, 2863 bundle_num - 1 2864 ); 2865 } 2866 2867 prev_end_time 2868 } else { 2869 String::new() 2870 }; 2871 2872 // Validate cursor correctness (for non-genesis bundles) 2873 if bundle_num > 1 { 2874 let expected_cursor = { 2875 let index = self.index.read().unwrap(); 2876 index 2877 .get_bundle(bundle_num - 1) 2878 .map(|b| b.end_time.clone()) 2879 .unwrap_or_default() 2880 }; 2881 if cursor != expected_cursor { 2882 anyhow::bail!( 2883 "Cursor validation failed for bundle {}: expected {} (previous bundle end_time), got {}", 2884 bundle_num, 2885 expected_cursor, 2886 cursor 2887 ); 2888 } 2889 } else if !cursor.is_empty() { 2890 anyhow::bail!( 2891 "Cursor validation failed for bundle {} (genesis): cursor should be empty, got {}", 2892 bundle_num, 2893 cursor 2894 ); 2895 } 2896 2897 let origin = self.index.read().unwrap().origin.clone(); 2898 2899 // Create bundle metadata using library function 2900 let bundle_metadata_frame = crate::bundle_format::create_bundle_metadata( 2901 bundle_num, 2902 &origin, 2903 &content_hash, 2904 if !parent.is_empty() { 2905 Some(&parent) 2906 } else { 2907 None 2908 }, 2909 Some(uncompressed_size), 2910 Some(compressed_size), 2911 operation_count as usize, 2912 did_count as usize, 2913 &start_time, 2914 &end_time, 2915 frame_result.frame_offsets.len() - 1, 2916 constants::FRAME_SIZE, 2917 &frame_result.frame_offsets, 2918 ); 2919 2920 // Create backup path 2921 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 2922 let backup_path = bundle_path.with_extension("jsonl.zst.bak"); 2923 2924 // Backup existing file 2925 if bundle_path.exists() { 2926 std::fs::copy(&bundle_path, &backup_path) 2927 .with_context(|| format!("Failed to backup bundle: {}", bundle_path.display()))?; 2928 } 2929 2930 // Write new bundle with multi-frame format using library function 2931 let mut file = File::create(&bundle_path) 2932 .with_context(|| format!("Failed to create bundle file: {}", bundle_path.display()))?; 2933 2934 crate::bundle_format::write_bundle_with_frames( 2935 &mut file, 2936 &bundle_metadata_frame, 2937 &frame_result.compressed_frames, 2938 ) 2939 .with_context(|| format!("Failed to write bundle: {}", bundle_path.display()))?; 2940 2941 // Verify metadata was written correctly 2942 let embedded_meta = crate::bundle_format::extract_metadata_from_file(&bundle_path) 2943 .with_context(|| "Failed to extract embedded metadata after migration")?; 2944 2945 if embedded_meta.frame_offsets.is_empty() { 2946 // Restore backup on failure 2947 if backup_path.exists() { 2948 std::fs::rename(&backup_path, &bundle_path)?; 2949 } 2950 anyhow::bail!("Frame offsets missing in metadata after migration"); 2951 } 2952 2953 // Verify content hash matches 2954 if embedded_meta.content_hash != content_hash { 2955 // Restore backup on failure 2956 if backup_path.exists() { 2957 std::fs::rename(&backup_path, &bundle_path)?; 2958 } 2959 anyhow::bail!("Content hash mismatch after migration"); 2960 } 2961 2962 // Calculate compressed_hash from the entire file (as verification does) 2963 // This must be done AFTER writing the file because it includes the metadata frame 2964 use sha2::{Digest, Sha256}; 2965 let file_data = std::fs::read(&bundle_path).with_context(|| { 2966 format!( 2967 "Failed to read bundle file for hash: {}", 2968 bundle_path.display() 2969 ) 2970 })?; 2971 2972 let mut hasher = Sha256::new(); 2973 hasher.update(&file_data); 2974 let compressed_hash = format!("{:x}", hasher.finalize()); 2975 2976 // Update index BEFORE removing backup (so if interrupted, index is consistent with file) 2977 let bundle_metadata = crate::index::BundleMetadata { 2978 bundle_number: bundle_num, 2979 start_time, 2980 end_time, 2981 operation_count, 2982 did_count, 2983 hash: chain_hash, 2984 content_hash, 2985 parent, 2986 compressed_hash, 2987 compressed_size, 2988 uncompressed_size, 2989 cursor, 2990 created_at: chrono::Utc::now().to_rfc3339(), 2991 }; 2992 2993 { 2994 let mut index = self.index.write().unwrap(); 2995 // Update existing bundle metadata 2996 if let Some(existing) = index 2997 .bundles 2998 .iter_mut() 2999 .find(|b| b.bundle_number == bundle_num) 3000 { 3001 *existing = bundle_metadata.clone(); 3002 } else { 3003 index.bundles.push(bundle_metadata.clone()); 3004 } 3005 3006 // Recalculate totals 3007 index.total_size_bytes = index.bundles.iter().map(|b| b.compressed_size).sum(); 3008 index.total_uncompressed_size_bytes = 3009 index.bundles.iter().map(|b| b.uncompressed_size).sum(); 3010 index.updated_at = chrono::Utc::now().to_rfc3339(); 3011 3012 // Save index to disk using Index::save() (atomic write) 3013 index.save(&self.directory)?; 3014 } 3015 3016 // Remove backup only after index is successfully updated 3017 if backup_path.exists() { 3018 std::fs::remove_file(&backup_path) 3019 .with_context(|| format!("Failed to remove backup: {}", backup_path.display()))?; 3020 } 3021 3022 let size_diff = compressed_size as i64 - old_size as i64; 3023 Ok((size_diff, uncompressed_size, compressed_size)) 3024 } 3025 3026 // === Helpers === 3027 pub fn get_last_bundle(&self) -> u32 { 3028 self.index.read().unwrap().last_bundle 3029 } 3030 3031 pub fn directory(&self) -> &PathBuf { 3032 &self.directory 3033 } 3034 3035 /// Get a copy of the current index 3036 pub fn get_index(&self) -> Index { 3037 self.index.read().unwrap().clone() 3038 } 3039 3040 pub fn bundle_count(&self) -> usize { 3041 self.index.read().unwrap().bundles.len() 3042 } 3043 3044 pub fn get_mempool_operations_from(&self, start: usize) -> Result<Vec<Operation>> { 3045 let mempool_guard = self.mempool.read().unwrap(); 3046 match mempool_guard.as_ref() { 3047 Some(mp) => { 3048 let ops = mp.get_operations(); 3049 if start >= ops.len() { 3050 Ok(Vec::new()) 3051 } else { 3052 Ok(ops[start..].to_vec()) 3053 } 3054 } 3055 None => Ok(Vec::new()), 3056 } 3057 } 3058 3059 // === Remote Access === 3060 3061 /// Fetch index from remote URL or local file path 3062 /// 3063 /// This is an async method that requires a tokio runtime. 3064 /// For synchronous usage, use the remote module functions directly. 3065 pub async fn fetch_remote_index(&self, target: &str) -> Result<Index> { 3066 if target.starts_with("http://") || target.starts_with("https://") { 3067 let client = crate::remote::RemoteClient::new(target)?; 3068 client.fetch_index().await 3069 } else { 3070 crate::remote::load_local_index(target) 3071 } 3072 } 3073 3074 /// Fetch bundle operations from remote URL 3075 /// 3076 /// This is an async method that requires a tokio runtime. 3077 pub async fn fetch_remote_bundle( 3078 &self, 3079 base_url: &str, 3080 bundle_num: u32, 3081 ) -> Result<Vec<Operation>> { 3082 let client = crate::remote::RemoteClient::new(base_url)?; 3083 client.fetch_bundle_operations(bundle_num).await 3084 } 3085 3086 /// Fetch a single operation from remote URL 3087 /// 3088 /// This is an async method that requires a tokio runtime. 3089 pub async fn fetch_remote_operation( 3090 &self, 3091 base_url: &str, 3092 bundle_num: u32, 3093 position: usize, 3094 ) -> Result<String> { 3095 let client = crate::remote::RemoteClient::new(base_url)?; 3096 client.fetch_operation(bundle_num, position).await 3097 } 3098 3099 /// Rollback repository to a specific bundle 3100 pub fn rollback_to_bundle(&mut self, target_bundle: u32) -> Result<()> { 3101 let mut index = self.index.write().unwrap(); 3102 3103 // Keep only bundles up to target 3104 index.bundles.retain(|b| b.bundle_number <= target_bundle); 3105 index.last_bundle = target_bundle; 3106 index.updated_at = chrono::Utc::now().to_rfc3339(); 3107 3108 // Recalculate total sizes 3109 index.total_size_bytes = index.bundles.iter().map(|b| b.compressed_size).sum(); 3110 index.total_uncompressed_size_bytes = 3111 index.bundles.iter().map(|b| b.uncompressed_size).sum(); 3112 3113 // Save updated index using Index::save() (atomic write) 3114 index.save(&self.directory)?; 3115 3116 Ok(()) 3117 } 3118 3119 /// Get bundle metadata from index 3120 pub fn get_bundle_metadata( 3121 &self, 3122 bundle_num: u32, 3123 ) -> Result<Option<crate::index::BundleMetadata>> { 3124 let index = self.index.read().unwrap(); 3125 Ok(index.get_bundle(bundle_num).cloned()) 3126 } 3127 3128 /// Get embedded metadata from bundle's skippable frame 3129 pub fn get_embedded_metadata( 3130 &self, 3131 bundle_num: u32, 3132 ) -> Result<Option<crate::bundle_format::BundleMetadata>> { 3133 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 3134 3135 if !bundle_path.exists() { 3136 return Ok(None); 3137 } 3138 3139 match crate::bundle_format::extract_metadata_from_file(&bundle_path) { 3140 Ok(meta) => Ok(Some(meta)), 3141 Err(_) => Ok(None), // Bundle may not have embedded metadata 3142 } 3143 } 3144 3145 /// Delete bundle files from disk 3146 pub fn delete_bundle_files(&self, bundle_numbers: &[u32]) -> Result<RollbackFileStats> { 3147 let mut deleted = 0; 3148 let mut failed = 0; 3149 let mut deleted_size = 0u64; 3150 3151 for &bundle_num in bundle_numbers { 3152 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 3153 3154 // Get file size before deletion 3155 if let Ok(metadata) = std::fs::metadata(&bundle_path) { 3156 deleted_size += metadata.len(); 3157 } 3158 3159 match std::fs::remove_file(&bundle_path) { 3160 Ok(_) => deleted += 1, 3161 Err(e) if e.kind() == std::io::ErrorKind::NotFound => deleted += 1, 3162 Err(_) => failed += 1, 3163 } 3164 } 3165 3166 Ok(RollbackFileStats { 3167 deleted, 3168 failed, 3169 deleted_size, 3170 }) 3171 } 3172 3173 /// Preview what files would be cleaned without actually deleting them 3174 /// 3175 /// Scans for all `.tmp` files in: 3176 /// - Repository root directory (e.g., `plc_bundles.json.tmp`) 3177 /// - DID index directory `.plcbundle/` (e.g., `config.json.tmp`) 3178 /// - DID index shards directory `.plcbundle/shards/` (e.g., `00.tmp`, `01.tmp`, etc.) 3179 /// 3180 /// # Returns 3181 /// A preview of files that would be removed 3182 pub fn clean_preview(&self) -> Result<CleanPreview> { 3183 use std::fs; 3184 3185 let mut files = Vec::new(); 3186 let mut total_size = 0u64; 3187 3188 // Scan repository root directory 3189 let root_dir = &self.directory; 3190 if let Ok(entries) = fs::read_dir(root_dir) { 3191 for entry in entries { 3192 let entry = match entry { 3193 Ok(e) => e, 3194 Err(_) => continue, 3195 }; 3196 3197 let path = entry.path(); 3198 if !path.is_file() { 3199 continue; 3200 } 3201 3202 if path.extension().is_some_and(|ext| ext == "tmp") { 3203 let file_size = match fs::metadata(&path) { 3204 Ok(meta) => meta.len(), 3205 Err(_) => 0, 3206 }; 3207 total_size += file_size; 3208 files.push(CleanPreviewFile { 3209 path, 3210 size: file_size, 3211 }); 3212 } 3213 } 3214 } 3215 3216 // Scan DID index directory (.plcbundle/) 3217 let did_index_dir = root_dir.join(constants::DID_INDEX_DIR); 3218 if did_index_dir.exists() { 3219 // Check config.json.tmp 3220 let config_tmp = did_index_dir.join(format!("{}.tmp", constants::DID_INDEX_CONFIG)); 3221 if config_tmp.exists() { 3222 let file_size = match fs::metadata(&config_tmp) { 3223 Ok(meta) => meta.len(), 3224 Err(_) => 0, 3225 }; 3226 total_size += file_size; 3227 files.push(CleanPreviewFile { 3228 path: config_tmp, 3229 size: file_size, 3230 }); 3231 } 3232 3233 // Scan shards directory (.plcbundle/shards/) 3234 let shards_dir = did_index_dir.join(constants::DID_INDEX_SHARDS); 3235 if shards_dir.exists() 3236 && let Ok(entries) = fs::read_dir(&shards_dir) 3237 { 3238 for entry in entries { 3239 let entry = match entry { 3240 Ok(e) => e, 3241 Err(_) => continue, 3242 }; 3243 3244 let path = entry.path(); 3245 if !path.is_file() { 3246 continue; 3247 } 3248 3249 if path.extension().is_some_and(|ext| ext == "tmp") { 3250 let file_size = match fs::metadata(&path) { 3251 Ok(meta) => meta.len(), 3252 Err(_) => 0, 3253 }; 3254 total_size += file_size; 3255 files.push(CleanPreviewFile { 3256 path, 3257 size: file_size, 3258 }); 3259 } 3260 } 3261 } 3262 } 3263 3264 Ok(CleanPreview { files, total_size }) 3265 } 3266 3267 /// Clean up all temporary files from the repository 3268 /// 3269 /// Removes all `.tmp` files from: 3270 /// - Repository root directory (e.g., `plc_bundles.json.tmp`) 3271 /// - DID index directory `.plcbundle/` (e.g., `config.json.tmp`) 3272 /// - DID index shards directory `.plcbundle/shards/` (e.g., `00.tmp`, `01.tmp`, etc.) 3273 /// 3274 /// # Returns 3275 /// Statistics about the cleanup operation 3276 pub fn clean(&self) -> Result<CleanResult> { 3277 use std::fs; 3278 3279 let verbose = *self.verbose.lock().unwrap(); 3280 3281 if verbose { 3282 log::info!("Starting repository cleanup..."); 3283 } 3284 3285 let mut files_removed = 0; 3286 let mut bytes_freed = 0u64; 3287 let mut errors = Vec::new(); 3288 3289 // Clean repository root directory 3290 let root_dir = &self.directory; 3291 if verbose { 3292 log::info!("Scanning repository root directory: {}", root_dir.display()); 3293 } 3294 3295 if let Ok(entries) = fs::read_dir(root_dir) { 3296 for entry in entries { 3297 let entry = match entry { 3298 Ok(e) => e, 3299 Err(e) => { 3300 errors.push(format!("Failed to read directory entry: {}", e)); 3301 continue; 3302 } 3303 }; 3304 3305 let path = entry.path(); 3306 if !path.is_file() { 3307 continue; 3308 } 3309 3310 if path.extension().is_some_and(|ext| ext == "tmp") { 3311 let file_size = match fs::metadata(&path) { 3312 Ok(meta) => { 3313 let size = meta.len(); 3314 bytes_freed += size; 3315 size 3316 } 3317 Err(_) => 0, 3318 }; 3319 3320 match fs::remove_file(&path) { 3321 Ok(_) => { 3322 files_removed += 1; 3323 if verbose { 3324 log::info!( 3325 " ✓ Removed: {} ({})", 3326 path.file_name().and_then(|n| n.to_str()).unwrap_or("?"), 3327 crate::format::format_bytes(file_size) 3328 ); 3329 } 3330 } 3331 Err(e) => { 3332 let error_msg = format!("Failed to remove {}: {}", path.display(), e); 3333 errors.push(error_msg.clone()); 3334 if verbose { 3335 log::warn!(" ✗ {}", error_msg); 3336 } 3337 } 3338 } 3339 } 3340 } 3341 } 3342 3343 // Clean DID index directory (.plcbundle/) 3344 let did_index_dir = root_dir.join(constants::DID_INDEX_DIR); 3345 if did_index_dir.exists() { 3346 if verbose { 3347 log::info!("Scanning DID index directory: {}", did_index_dir.display()); 3348 } 3349 3350 // Clean config.json.tmp 3351 let config_tmp = did_index_dir.join(format!("{}.tmp", constants::DID_INDEX_CONFIG)); 3352 if config_tmp.exists() { 3353 let file_size = match fs::metadata(&config_tmp) { 3354 Ok(meta) => { 3355 let size = meta.len(); 3356 bytes_freed += size; 3357 size 3358 } 3359 Err(_) => 0, 3360 }; 3361 3362 match fs::remove_file(&config_tmp) { 3363 Ok(_) => { 3364 files_removed += 1; 3365 if verbose { 3366 log::info!( 3367 " ✓ Removed: {} ({})", 3368 config_tmp 3369 .file_name() 3370 .and_then(|n| n.to_str()) 3371 .unwrap_or("?"), 3372 crate::format::format_bytes(file_size) 3373 ); 3374 } 3375 } 3376 Err(e) => { 3377 let error_msg = format!("Failed to remove {}: {}", config_tmp.display(), e); 3378 errors.push(error_msg.clone()); 3379 if verbose { 3380 log::warn!(" ✗ {}", error_msg); 3381 } 3382 } 3383 } 3384 } 3385 3386 // Clean shards directory (.plcbundle/shards/) 3387 let shards_dir = did_index_dir.join(constants::DID_INDEX_SHARDS); 3388 if shards_dir.exists() { 3389 if verbose { 3390 log::info!("Scanning shards directory: {}", shards_dir.display()); 3391 } 3392 if let Ok(entries) = fs::read_dir(&shards_dir) { 3393 for entry in entries { 3394 let entry = match entry { 3395 Ok(e) => e, 3396 Err(e) => { 3397 errors 3398 .push(format!("Failed to read shards directory entry: {}", e)); 3399 continue; 3400 } 3401 }; 3402 3403 let path = entry.path(); 3404 if !path.is_file() { 3405 continue; 3406 } 3407 3408 if path.extension().is_some_and(|ext| ext == "tmp") { 3409 let file_size = match fs::metadata(&path) { 3410 Ok(meta) => { 3411 let size = meta.len(); 3412 bytes_freed += size; 3413 size 3414 } 3415 Err(_) => 0, 3416 }; 3417 3418 match fs::remove_file(&path) { 3419 Ok(_) => { 3420 files_removed += 1; 3421 if verbose { 3422 log::info!( 3423 " ✓ Removed: {} ({})", 3424 path.file_name() 3425 .and_then(|n| n.to_str()) 3426 .unwrap_or("?"), 3427 crate::format::format_bytes(file_size) 3428 ); 3429 } 3430 } 3431 Err(e) => { 3432 let error_msg = 3433 format!("Failed to remove {}: {}", path.display(), e); 3434 errors.push(error_msg.clone()); 3435 if verbose { 3436 log::warn!(" ✗ {}", error_msg); 3437 } 3438 } 3439 } 3440 } 3441 } 3442 } 3443 } else if verbose { 3444 log::debug!("Shards directory does not exist: {}", shards_dir.display()); 3445 } 3446 } else if verbose { 3447 log::debug!( 3448 "DID index directory does not exist: {}", 3449 did_index_dir.display() 3450 ); 3451 } 3452 3453 // Summary logging 3454 if verbose { 3455 if files_removed > 0 { 3456 log::info!( 3457 "Cleanup complete: removed {} file(s), freed {}", 3458 files_removed, 3459 crate::format::format_bytes(bytes_freed) 3460 ); 3461 } else { 3462 log::info!("Cleanup complete: no temporary files found"); 3463 } 3464 3465 if !errors.is_empty() { 3466 log::warn!("Encountered {} error(s) during cleanup", errors.len()); 3467 } 3468 } 3469 3470 Ok(CleanResult { 3471 files_removed, 3472 bytes_freed, 3473 errors: if errors.is_empty() { 3474 None 3475 } else { 3476 Some(errors) 3477 }, 3478 }) 3479 } 3480 3481 // === Server API Methods === 3482 3483 /// Get PLC origin from index 3484 pub fn get_plc_origin(&self) -> String { 3485 self.index.read().unwrap().origin.clone() 3486 } 3487 3488 /// Stream bundle raw (compressed) data 3489 /// Returns a reader that can be used to stream the compressed bundle file 3490 pub fn stream_bundle_raw(&self, bundle_num: u32) -> Result<std::fs::File> { 3491 // Validate bundle exists in index first 3492 if self.get_bundle_metadata(bundle_num)?.is_none() { 3493 anyhow::bail!("Bundle {} not found in index", bundle_num); 3494 } 3495 3496 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 3497 if !bundle_path.exists() { 3498 anyhow::bail!( 3499 "Bundle {} file not found (exists in index but missing on disk)", 3500 bundle_num 3501 ); 3502 } 3503 Ok(std::fs::File::open(bundle_path)?) 3504 } 3505 3506 /// Stream bundle decompressed (JSONL) data 3507 /// Returns a reader that decompresses the bundle on-the-fly 3508 pub fn stream_bundle_decompressed( 3509 &self, 3510 bundle_num: u32, 3511 ) -> Result<Box<dyn std::io::Read + Send>> { 3512 let file = self.stream_bundle_raw(bundle_num)?; 3513 Ok(Box::new(zstd::Decoder::new(file)?)) 3514 } 3515 3516 /// Get current cursor (global position of last operation) 3517 /// Cursor = (last_bundle * BUNDLE_SIZE) + mempool_ops_count 3518 pub fn get_current_cursor(&self) -> u64 { 3519 let index = self.index.read().unwrap(); 3520 let bundled_ops = total_operations_from_bundles(index.last_bundle); 3521 3522 // Add mempool operations if available 3523 let mempool_guard = self.mempool.read().unwrap(); 3524 let mempool_ops = if let Some(mp) = mempool_guard.as_ref() { 3525 mp.get_operations().len() as u64 3526 } else { 3527 0 3528 }; 3529 3530 bundled_ops + mempool_ops 3531 } 3532 3533 /// Resolve handle to DID or validate DID format (async version) 3534 /// Returns (did, handle_resolve_time_ms) 3535 /// Use this version when calling from async code (e.g., server handlers) 3536 pub async fn resolve_handle_or_did_async(&self, input: &str) -> Result<(String, u64)> { 3537 use std::time::Instant; 3538 3539 let input = input.trim(); 3540 3541 // Normalize handle format (remove at://, @ prefixes) 3542 let normalized = if !input.starts_with("did:") { 3543 handle_resolver::normalize_handle(input) 3544 } else { 3545 input.to_string() 3546 }; 3547 3548 // If already a DID, validate and return 3549 if normalized.starts_with("did:plc:") { 3550 crate::resolver::validate_did_format(&normalized)?; 3551 return Ok((normalized, 0)); 3552 } 3553 3554 // Support did:web too 3555 if normalized.starts_with("did:web:") { 3556 return Ok((normalized, 0)); 3557 } 3558 3559 // It's a handle - need resolver 3560 let resolver = match &self.handle_resolver { 3561 Some(r) => r, 3562 None => { 3563 anyhow::bail!( 3564 "Input '{}' appears to be a handle, but handle resolver is not configured\n\n\ 3565 Configure resolver with:\n\ 3566 plcbundle --handle-resolver {} did resolve {}\n\n\ 3567 Or set default in config", 3568 normalized, 3569 constants::DEFAULT_HANDLE_RESOLVER_URL, 3570 normalized 3571 ); 3572 } 3573 }; 3574 3575 // Resolve handle (async operation) 3576 let resolve_start = Instant::now(); 3577 let did = resolver.resolve_handle(&normalized).await?; 3578 let resolve_time = resolve_start.elapsed(); 3579 3580 Ok((did, resolve_time.as_millis() as u64)) 3581 } 3582 3583 /// Resolve handle to DID or validate DID format 3584 /// Returns (did, handle_resolve_time_ms) 3585 /// This is a synchronous wrapper that uses tokio runtime for async resolution 3586 /// For async code, use resolve_handle_or_did_async instead 3587 pub fn resolve_handle_or_did(&self, input: &str) -> Result<(String, u64)> { 3588 use std::time::Instant; 3589 3590 let input = input.trim(); 3591 3592 // Normalize handle format (remove at://, @ prefixes) 3593 let normalized = if !input.starts_with("did:") { 3594 handle_resolver::normalize_handle(input) 3595 } else { 3596 input.to_string() 3597 }; 3598 3599 // If already a DID, validate and return 3600 if normalized.starts_with("did:plc:") { 3601 crate::resolver::validate_did_format(&normalized)?; 3602 return Ok((normalized, 0)); 3603 } 3604 3605 // Support did:web too 3606 if normalized.starts_with("did:web:") { 3607 return Ok((normalized, 0)); 3608 } 3609 3610 // It's a handle - need resolver 3611 let resolver = match &self.handle_resolver { 3612 Some(r) => r, 3613 None => { 3614 anyhow::bail!( 3615 "Input '{}' appears to be a handle, but handle resolver is not configured\n\n\ 3616 Configure resolver with:\n\ 3617 plcbundle --handle-resolver {} did resolve {}\n\n\ 3618 Or set default in config", 3619 normalized, 3620 constants::DEFAULT_HANDLE_RESOLVER_URL, 3621 normalized 3622 ); 3623 } 3624 }; 3625 3626 // Use tokio runtime to resolve handle (async operation) 3627 // Not in a runtime - safe to create one and use block_on 3628 let resolve_start = Instant::now(); 3629 let runtime = tokio::runtime::Runtime::new() 3630 .map_err(|e| anyhow::anyhow!("Failed to create tokio runtime: {}", e))?; 3631 let did = runtime.block_on(resolver.resolve_handle(&normalized))?; 3632 let resolve_time = resolve_start.elapsed(); 3633 3634 Ok((did, resolve_time.as_millis() as u64)) 3635 } 3636 3637 /// Get resolver statistics 3638 /// Returns a HashMap with resolver performance metrics 3639 pub fn get_resolver_stats(&self) -> HashMap<String, serde_json::Value> { 3640 // For now, return empty stats 3641 // TODO: Track resolver statistics 3642 HashMap::new() 3643 } 3644 3645 /// Get handle resolver base URL 3646 /// Returns None if handle resolver is not configured 3647 pub fn get_handle_resolver_base_url(&self) -> Option<String> { 3648 self.handle_resolver 3649 .as_ref() 3650 .map(|r| r.get_base_url().to_string()) 3651 } 3652 3653 /// Get a reference to the handle resolver 3654 /// Returns None if handle resolver is not configured 3655 pub fn get_handle_resolver(&self) -> Option<Arc<handle_resolver::HandleResolver>> { 3656 self.handle_resolver.clone() 3657 } 3658 3659 /// Create a shallow clone suitable for `Arc` sharing 3660 pub fn clone_for_arc(&self) -> Self { 3661 Self { 3662 directory: self.directory.clone(), 3663 index: Arc::clone(&self.index), 3664 did_index: Arc::clone(&self.did_index), 3665 stats: Arc::clone(&self.stats), 3666 mempool: Arc::clone(&self.mempool), 3667 mempool_checked: Arc::clone(&self.mempool_checked), 3668 handle_resolver: self.handle_resolver.clone(), 3669 verbose: Arc::clone(&self.verbose), 3670 } 3671 } 3672 fn load_bundle_from_disk(&self, path: &PathBuf) -> Result<Vec<Operation>> { 3673 use std::io::BufRead; 3674 3675 let file = std::fs::File::open(path)?; 3676 let decoder = zstd::Decoder::new(file)?; 3677 let reader = std::io::BufReader::new(decoder); 3678 3679 let mut operations = Vec::new(); 3680 for line in reader.lines() { 3681 let line = line?; 3682 if line.is_empty() { 3683 continue; 3684 } 3685 // CRITICAL: Preserve raw JSON for content hash calculation 3686 // This is required by the V1 specification (docs/specification.md § 4.2) 3687 // to ensure content_hash remains reproducible during migration. 3688 // Without this, re-serialization would change the hash. 3689 // Use Operation::from_json (sonic_rs) instead of serde deserialization 3690 let op = Operation::from_json(&line)?; 3691 operations.push(op); 3692 } 3693 3694 Ok(operations) 3695 } 3696 3697 fn filter_load_result(&self, operations: Vec<Operation>, options: &LoadOptions) -> LoadResult { 3698 let mut filtered = operations; 3699 3700 if let Some(ref filter) = options.filter { 3701 filtered.retain(|op| self.matches_filter(op, filter)); 3702 } 3703 3704 if let Some(limit) = options.limit { 3705 filtered.truncate(limit); 3706 } 3707 3708 LoadResult { 3709 bundle_number: 0, 3710 operations: filtered, 3711 metadata: None, 3712 } 3713 } 3714 3715 fn matches_filter(&self, op: &Operation, filter: &OperationFilter) -> bool { 3716 if let Some(ref did) = filter.did 3717 && &op.did != did 3718 { 3719 return false; 3720 } 3721 3722 if let Some(ref op_type) = filter.operation_type 3723 && &op.operation != op_type 3724 { 3725 return false; 3726 } 3727 3728 if !filter.include_nullified && op.nullified { 3729 return false; 3730 } 3731 3732 true 3733 } 3734 3735 fn matches_request(&self, op: &Operation, req: &OperationRequest) -> bool { 3736 if let Some(ref filter) = req.filter { 3737 return self.matches_filter(op, filter); 3738 } 3739 true 3740 } 3741 3742 // === Repository Management === 3743 3744 /// Initialize a new repository with an empty index 3745 /// 3746 /// This is a static method that doesn't require an existing BundleManager. 3747 /// Creates all necessary directories and an empty index file. 3748 /// 3749 /// # Arguments 3750 /// * `directory` - Directory to initialize 3751 /// * `origin` - PLC directory URL or origin identifier 3752 /// * `force` - Whether to reinitialize if already exists 3753 /// 3754 /// # Returns 3755 /// True if initialized (created new), False if already existed and force=false 3756 pub fn init_repository<P: AsRef<std::path::Path>>( 3757 directory: P, 3758 origin: String, 3759 force: bool, 3760 ) -> Result<bool> { 3761 Index::init(directory, origin, force) 3762 } 3763 3764 /// Rebuild index from existing bundle files 3765 /// 3766 /// This is a static method that doesn't require an existing BundleManager. 3767 /// It scans all .jsonl.zst files in the directory and reconstructs the index 3768 /// by extracting embedded metadata from each bundle's skippable frame. 3769 /// 3770 /// # Arguments 3771 /// * `directory` - Directory containing bundle files 3772 /// * `origin` - Optional origin URL (auto-detected from first bundle if None) 3773 /// * `progress_cb` - Optional progress callback (current, total) 3774 /// 3775 /// # Returns 3776 /// The reconstructed index (already saved to disk) 3777 pub fn rebuild_index<P: AsRef<std::path::Path>, F>( 3778 directory: P, 3779 origin: Option<String>, 3780 progress_cb: Option<F>, 3781 ) -> Result<Index> 3782 where 3783 F: Fn(usize, usize, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes) 3784 { 3785 let index = Index::rebuild_from_bundles(&directory, origin, progress_cb)?; 3786 index.save(&directory)?; 3787 Ok(index) 3788 } 3789 3790 /// Clone repository from a remote plcbundle instance 3791 /// 3792 /// Downloads bundles from a remote instance and reconstructs the repository. 3793 /// This is a static async method that doesn't require an existing BundleManager. 3794 /// 3795 /// # Arguments 3796 /// * `remote_url` - URL of the remote plcbundle instance 3797 /// * `target_dir` - Directory to clone into 3798 /// * `remote_index` - Already fetched remote index 3799 /// * `bundles_to_download` - List of bundle numbers to download 3800 /// * `progress_callback` - Optional callback for progress updates (bundle_num, count, total_count, bytes) 3801 /// 3802 /// # Returns 3803 /// Tuple of (successful_downloads, failed_downloads) 3804 pub async fn clone_from_remote<P, F>( 3805 remote_url: String, 3806 target_dir: P, 3807 remote_index: &Index, 3808 bundles_to_download: Vec<u32>, 3809 progress_callback: Option<F>, 3810 ) -> Result<(usize, usize)> 3811 where 3812 P: AsRef<std::path::Path> + Send + Sync, 3813 F: Fn(u32, usize, usize, u64) + Send + Sync + 'static, 3814 { 3815 use crate::remote::RemoteClient; 3816 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; 3817 3818 let target_dir = target_dir.as_ref(); 3819 3820 // Save index first 3821 remote_index.save(target_dir)?; 3822 3823 // Progress tracking 3824 let downloaded = Arc::new(AtomicUsize::new(0)); 3825 let failed = Arc::new(AtomicUsize::new(0)); 3826 let bytes_downloaded = Arc::new(AtomicU64::new(0)); 3827 let total_count = bundles_to_download.len(); 3828 3829 // Parallel download with semaphore (4 concurrent downloads) 3830 let semaphore = Arc::new(tokio::sync::Semaphore::new(4)); 3831 let progress_cb = progress_callback.map(Arc::new); 3832 3833 let mut tasks = Vec::new(); 3834 3835 for bundle_num in bundles_to_download { 3836 let client = RemoteClient::new(&remote_url)?; 3837 let target_dir = target_dir.to_path_buf(); 3838 let downloaded = Arc::clone(&downloaded); 3839 let failed = Arc::clone(&failed); 3840 let bytes_downloaded = Arc::clone(&bytes_downloaded); 3841 let semaphore = Arc::clone(&semaphore); 3842 let progress_cb = progress_cb.clone(); 3843 3844 let task = tokio::spawn(async move { 3845 let _permit = semaphore.acquire().await.unwrap(); 3846 3847 // Retry logic with exponential backoff 3848 let max_retries = 3; 3849 for attempt in 0..max_retries { 3850 if attempt > 0 { 3851 let delay = std::time::Duration::from_secs(1 << attempt); 3852 tokio::time::sleep(delay).await; 3853 } 3854 3855 match client.download_bundle_file(bundle_num).await { 3856 Ok(data) => { 3857 let data_len = data.len() as u64; 3858 3859 // Write bundle file 3860 let bundle_path = constants::bundle_path(&target_dir, bundle_num); 3861 if let Err(_e) = std::fs::write(&bundle_path, data) { 3862 failed.fetch_add(1, Ordering::SeqCst); 3863 return; 3864 } 3865 3866 let count = downloaded.fetch_add(1, Ordering::SeqCst) + 1; 3867 let bytes = 3868 bytes_downloaded.fetch_add(data_len, Ordering::SeqCst) + data_len; 3869 3870 // Call progress callback 3871 if let Some(ref cb) = progress_cb { 3872 cb(bundle_num, count, total_count, bytes); 3873 } 3874 return; 3875 } 3876 Err(_) => { 3877 continue; // Retry 3878 } 3879 } 3880 } 3881 3882 // All retries failed 3883 failed.fetch_add(1, Ordering::SeqCst); 3884 }); 3885 3886 tasks.push(task); 3887 } 3888 3889 // Wait for all downloads 3890 for task in tasks { 3891 let _ = task.await; 3892 } 3893 3894 let downloaded_count = downloaded.load(Ordering::SeqCst); 3895 let failed_count = failed.load(Ordering::SeqCst); 3896 3897 Ok((downloaded_count, failed_count)) 3898 } 3899 3900 /// Deletes a bundle file from the repository. 3901 /// 3902 /// This method removes a bundle file from the repository directory. 3903 /// 3904 /// # Arguments 3905 /// * `bundle_num` - The number of the bundle to delete. 3906 /// 3907 /// # Returns 3908 /// A `Result` indicating whether the operation was successful. 3909 pub fn delete_bundle_file(&self, bundle_num: u32) -> Result<()> { 3910 let bundle_path = constants::bundle_path(&self.directory, bundle_num); 3911 if bundle_path.exists() { 3912 std::fs::remove_file(bundle_path)?; 3913 } 3914 Ok(()) 3915 } 3916} 3917 3918// Supporting types moved here 3919/// Options controlling bundle loading behavior 3920#[derive(Debug, Clone)] 3921pub struct LoadOptions { 3922 pub cache: bool, 3923 pub decompress: bool, 3924 pub filter: Option<OperationFilter>, 3925 pub limit: Option<usize>, 3926} 3927 3928impl Default for LoadOptions { 3929 fn default() -> Self { 3930 Self { 3931 cache: true, 3932 decompress: true, 3933 filter: None, 3934 limit: None, 3935 } 3936 } 3937} 3938 3939/// Result from a bundle load operation 3940#[derive(Debug)] 3941pub struct LoadResult { 3942 pub bundle_number: u32, 3943 pub operations: Vec<Operation>, 3944 pub metadata: Option<BundleMetadata>, 3945} 3946 3947/// Result for single-operation fetch with timing 3948#[derive(Debug)] 3949pub struct OperationResult { 3950 pub raw_json: String, 3951 pub size_bytes: usize, 3952 pub load_duration: std::time::Duration, 3953} 3954 3955/// Specification for querying bundles 3956#[derive(Debug, Clone)] 3957pub struct QuerySpec { 3958 pub bundles: BundleRange, 3959 pub filter: Option<OperationFilter>, 3960 pub query: String, 3961 pub mode: QueryMode, 3962} 3963 3964// Helper function to format age duration 3965fn format_age(duration: chrono::Duration) -> String { 3966 let days = duration.num_days(); 3967 if days >= 365 { 3968 let years = days as f64 / 365.25; 3969 format!("{:.1} years ago", years) 3970 } else if days >= 30 { 3971 let months = days as f64 / 30.0; 3972 format!("{:.1} months ago", months) 3973 } else if days > 0 { 3974 format!("{} days ago", days) 3975 } else { 3976 let hours = duration.num_hours(); 3977 if hours > 0 { 3978 format!("{} hours ago", hours) 3979 } else { 3980 let mins = duration.num_minutes(); 3981 if mins > 0 { 3982 format!("{} minutes ago", mins) 3983 } else { 3984 "just now".to_string() 3985 } 3986 } 3987 } 3988} 3989 3990/// Bundle selection for queries, exports, and verification 3991#[derive(Debug, Clone)] 3992pub enum BundleRange { 3993 All, 3994 Single(u32), 3995 Range(u32, u32), 3996 List(Vec<u32>), 3997} 3998 3999/// Specification for export operations 4000#[derive(Debug, Clone)] 4001pub struct ExportSpec { 4002 pub bundles: BundleRange, 4003 pub format: ExportFormat, 4004 pub filter: Option<OperationFilter>, 4005 pub count: Option<usize>, 4006 pub after_timestamp: Option<String>, 4007} 4008 4009/// Output format for export 4010#[derive(Debug, Clone)] 4011pub enum ExportFormat { 4012 JsonLines, 4013} 4014 4015/// Statistics collected during export 4016#[derive(Debug, Default)] 4017pub struct ExportStats { 4018 pub records_written: u64, 4019 pub bytes_written: u64, 4020} 4021 4022/// Specification for bundle verification 4023#[derive(Debug, Clone)] 4024pub struct VerifySpec { 4025 pub check_hash: bool, 4026 pub check_content_hash: bool, 4027 pub check_operations: bool, 4028 pub fast: bool, // Fast mode: only check metadata frame, skip hash calculations 4029} 4030 4031/// Result of verifying a single bundle 4032#[derive(Debug)] 4033pub struct VerifyResult { 4034 pub valid: bool, 4035 pub errors: Vec<String>, 4036} 4037 4038/// Specification for chain verification 4039#[derive(Debug, Clone)] 4040pub struct ChainVerifySpec { 4041 pub start_bundle: u32, 4042 pub end_bundle: Option<u32>, 4043 pub check_parent_links: bool, 4044} 4045 4046/// Result of chain verification across multiple bundles 4047#[derive(Debug)] 4048pub struct ChainVerifyResult { 4049 pub valid: bool, 4050 pub bundles_checked: u32, 4051 pub errors: Vec<(u32, String)>, 4052} 4053 4054/// Aggregated bundle information with optional details 4055#[derive(Debug)] 4056pub struct BundleInfo { 4057 pub metadata: BundleMetadata, 4058 pub exists: bool, 4059 pub operations: Option<Vec<Operation>>, 4060 pub size_info: Option<SizeInfo>, 4061} 4062 4063/// Size information (compressed and uncompressed) for a bundle 4064#[derive(Debug)] 4065pub struct SizeInfo { 4066 pub compressed: u64, 4067 pub uncompressed: u64, 4068} 4069 4070/// Flags controlling `get_bundle_info` detail inclusion 4071#[derive(Debug, Clone)] 4072pub struct InfoFlags { 4073 pub include_operations: bool, 4074 pub include_size_info: bool, 4075} 4076 4077/// Specification for rollback execution 4078#[derive(Debug, Clone)] 4079pub struct RollbackSpec { 4080 pub target_bundle: u32, 4081 pub dry_run: bool, 4082} 4083 4084/// Plan produced by `rollback_plan` 4085#[derive(Debug)] 4086pub struct RollbackPlan { 4087 pub target_bundle: u32, 4088 pub affected_bundles: Vec<u32>, 4089 pub affected_operations: usize, 4090 pub affected_dids: usize, 4091 pub estimated_time_ms: u64, 4092} 4093 4094/// Result returned by `rollback` 4095#[derive(Debug)] 4096pub struct RollbackResult { 4097 pub success: bool, 4098 pub bundles_removed: usize, 4099 pub plan: Option<RollbackPlan>, 4100} 4101 4102/// Specification for cache warm-up 4103#[derive(Debug, Clone)] 4104pub struct WarmUpSpec { 4105 pub strategy: WarmUpStrategy, 4106} 4107 4108/// Strategy selection for warm-up 4109#[derive(Debug, Clone)] 4110pub enum WarmUpStrategy { 4111 Recent(u32), 4112 Range(u32, u32), 4113 All, 4114} 4115 4116/// Statistics from DID index rebuild 4117#[derive(Debug, Default, Clone)] 4118pub struct RebuildStats { 4119 pub bundles_processed: u32, 4120 pub operations_indexed: u64, 4121}