forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1//! High-level manager orchestrating loading, random access, DID resolution, querying/export, sync, verification, rollback/migration, and mempool management
2// src/manager.rs
3use crate::constants::{self, bundle_position_to_global, total_operations_from_bundles};
4use crate::index::{BundleMetadata, Index};
5use crate::iterators::{ExportIterator, QueryIterator, RangeIterator};
6use crate::operations::{Operation, OperationFilter, OperationRequest, OperationWithLocation};
7use crate::options::QueryMode;
8use crate::{did_index, handle_resolver, mempool, verification};
9use anyhow::{Context, Result};
10use chrono::{DateTime, Utc};
11use std::collections::{HashMap, HashSet};
12use std::io::Write;
13use std::path::PathBuf;
14use std::sync::{Arc, Mutex, RwLock};
15
16/// Result of a sync_next_bundle operation
17#[derive(Debug, Clone)]
18pub enum SyncResult {
19 /// Successfully created a bundle
20 BundleCreated {
21 bundle_num: u32,
22 mempool_count: usize,
23 duration_ms: u64,
24 fetch_duration_ms: u64,
25 bundle_save_ms: u64,
26 index_ms: u64,
27 fetch_requests: usize,
28 hash: String,
29 age: String,
30 did_index_compacted: bool,
31 unique_dids: u32,
32 size_bytes: u64,
33 fetch_wait_ms: u64,
34 fetch_http_ms: u64,
35 },
36 /// Caught up to latest PLC data, mempool has partial operations
37 CaughtUp {
38 next_bundle: u32,
39 mempool_count: usize,
40 new_ops: usize,
41 fetch_duration_ms: u64,
42 },
43}
44
45/// High-level manager for PLC bundle repositories
46///
47/// Provides fast, memory-efficient access to operations stored in
48/// compressed JSONL bundle files, along with a DID index for quick lookups.
49///
50/// Key capabilities:
51/// - Smart loading with caching and frame-based random access
52/// - DID resolution and per-DID operation queries (bundles + mempool)
53/// - Batch operations, range iterators, query and export pipelines
54/// - Sync to fetch, deduplicate, and create new bundles with verified hashes
55/// - Maintenance utilities: warm-up, prefetch, rollback, migrate, clean
56///
57/// # Quickstart
58///
59/// ```no_run
60/// use plcbundle::{BundleManager, ManagerOptions, QuerySpec, BundleRange, QueryMode};
61/// use std::path::PathBuf;
62///
63/// // Create a manager for an existing repository
64/// let mgr = BundleManager::new(PathBuf::from("/data/plc"), ManagerOptions::default())?;
65///
66/// // Load a bundle
67/// let load = mgr.load_bundle(42, Default::default())?;
68/// let _count = load.operations.len();
69///
70/// // Get an operation (bundle number, position within bundle)
71/// let op = mgr.get_operation(42, 10)?;
72/// assert!(!op.did.is_empty());
73///
74/// // Resolve a DID to its latest document
75/// let resolved = mgr.resolve_did("did:plc:abcdef...")?;
76/// println!("Resolved to shard {}", resolved.shard_num);
77///
78/// // Query a range and export
79/// let spec = QuerySpec { bundles: BundleRange::Range(40, 45), filter: None, query: String::new(), mode: QueryMode::Simple };
80/// let mut count = 0u64;
81/// for item in mgr.query(spec) { count += 1; }
82/// assert!(count > 0);
83/// # Ok::<(), anyhow::Error>(())
84/// ```
85pub struct BundleManager {
86 directory: PathBuf,
87 index: Arc<RwLock<Index>>,
88 did_index: Arc<RwLock<Option<did_index::Manager>>>,
89 stats: Arc<RwLock<ManagerStats>>,
90 mempool: Arc<RwLock<Option<mempool::Mempool>>>,
91 mempool_checked: Arc<RwLock<bool>>, // Cache whether we've checked for mempool (to avoid repeated file checks)
92 handle_resolver: Option<Arc<handle_resolver::HandleResolver>>,
93 verbose: Arc<Mutex<bool>>,
94}
95
96#[derive(Debug, Clone, Default)]
97pub struct ManagerStats {
98 pub bundles_loaded: u64,
99 pub cache_hits: u64,
100 pub cache_misses: u64,
101 pub operations_read: u64,
102 pub queries_executed: u64,
103}
104
105#[derive(Debug, Clone)]
106pub struct ResolveResult {
107 pub document: crate::resolver::DIDDocument,
108 pub operation: Operation, // The operation used for resolution
109 pub bundle_number: u32,
110 pub position: usize,
111 pub mempool_time: std::time::Duration,
112 pub mempool_load_time: std::time::Duration,
113 pub index_time: std::time::Duration,
114 pub load_time: std::time::Duration,
115 pub total_time: std::time::Duration,
116 pub locations_found: usize,
117 pub shard_num: u8,
118 pub shard_stats: Option<did_index::DIDLookupStats>,
119 pub lookup_timings: Option<did_index::DIDLookupTimings>,
120}
121
122#[derive(Debug, Clone)]
123pub struct DIDOperationsResult {
124 pub operations: Vec<Operation>,
125 pub operations_with_locations: Option<Vec<OperationWithLocation>>,
126 pub stats: Option<did_index::DIDLookupStats>,
127 pub shard_num: Option<u8>,
128 pub lookup_timings: Option<did_index::DIDLookupTimings>,
129 pub load_time: Option<std::time::Duration>,
130}
131
132#[derive(Debug, Clone, Default)]
133pub struct DIDIndexStats {
134 pub total_dids: usize,
135 pub total_entries: usize,
136 pub avg_operations_per_did: f64,
137}
138
139#[derive(Debug, Clone)]
140pub struct RollbackFileStats {
141 pub deleted: usize,
142 pub failed: usize,
143 pub deleted_size: u64,
144}
145
146#[derive(Debug, Clone)]
147pub struct CleanResult {
148 pub files_removed: usize,
149 pub bytes_freed: u64,
150 pub errors: Option<Vec<String>>,
151}
152
153#[derive(Debug, Clone)]
154pub struct CleanPreview {
155 pub files: Vec<CleanPreviewFile>,
156 pub total_size: u64,
157}
158
159#[derive(Debug, Clone)]
160pub struct CleanPreviewFile {
161 pub path: PathBuf,
162 pub size: u64,
163}
164
165/// Options for configuring BundleManager initialization
166#[derive(Debug, Clone, Default)]
167pub struct ManagerOptions {
168 /// Optional handle resolver URL for resolving @handle.did identifiers
169 pub handle_resolver_url: Option<String>,
170 /// Whether to preload the mempool at initialization (for server use)
171 pub preload_mempool: bool,
172 /// Whether to enable verbose logging
173 pub verbose: bool,
174}
175
176/// Trait to allow passing ManagerOptions or using defaults
177pub trait IntoManagerOptions {
178 fn into_options(self) -> ManagerOptions;
179}
180
181impl IntoManagerOptions for ManagerOptions {
182 fn into_options(self) -> ManagerOptions {
183 self
184 }
185}
186
187impl IntoManagerOptions for () {
188 fn into_options(self) -> ManagerOptions {
189 ManagerOptions::default()
190 }
191}
192
193// Convenience: allow creating from individual components
194impl IntoManagerOptions for (Option<String>, bool, bool) {
195 fn into_options(self) -> ManagerOptions {
196 ManagerOptions {
197 handle_resolver_url: self.0,
198 preload_mempool: self.1,
199 verbose: self.2,
200 }
201 }
202}
203
204impl BundleManager {
205 /// Create a new BundleManager
206 ///
207 /// # Examples
208 ///
209 /// ```no_run
210 /// use plcbundle::{BundleManager, ManagerOptions};
211 /// use std::path::PathBuf;
212 ///
213 /// // With default options
214 /// let manager = BundleManager::new(PathBuf::from("."), ())?;
215 ///
216 /// // With custom options
217 /// let options = ManagerOptions {
218 /// handle_resolver_url: Some("https://example.com".to_string()),
219 /// preload_mempool: true,
220 /// verbose: true,
221 /// };
222 /// let manager = BundleManager::new(PathBuf::from("."), options)?;
223 /// # Ok::<(), anyhow::Error>(())
224 /// ```
225 pub fn new<O: IntoManagerOptions>(directory: PathBuf, options: O) -> Result<Self> {
226 let options = options.into_options();
227 let init_start = std::time::Instant::now();
228 let display_dir = directory
229 .canonicalize()
230 .unwrap_or_else(|_| directory.clone());
231 log::debug!(
232 "[BundleManager] Initializing BundleManager from: {}",
233 display_dir.display()
234 );
235 let index = Index::load(&directory)?;
236
237 let handle_resolver = options
238 .handle_resolver_url
239 .map(|url| Arc::new(handle_resolver::HandleResolver::new(url)));
240
241 if handle_resolver.is_some() {
242 log::debug!("[BundleManager] Handle resolver configured");
243 }
244
245 let manager = Self {
246 directory: directory.clone(),
247 index: Arc::new(RwLock::new(index)),
248 did_index: Arc::new(RwLock::new(None)),
249 stats: Arc::new(RwLock::new(ManagerStats::default())),
250 mempool: Arc::new(RwLock::new(None)),
251 mempool_checked: Arc::new(RwLock::new(false)),
252 handle_resolver,
253 verbose: Arc::new(Mutex::new(options.verbose)),
254 };
255
256 // Pre-initialize mempool if requested (for server use)
257 if options.preload_mempool {
258 let mempool_preload_start = std::time::Instant::now();
259 if let Err(e) = manager.load_mempool() {
260 log::debug!("[BundleManager] Mempool preload failed: {}", e);
261 } else {
262 let mempool_preload_time = mempool_preload_start.elapsed();
263 let mempool_preload_ms = mempool_preload_time.as_secs_f64() * 1000.0;
264 if let Ok(stats) = manager.get_mempool_stats()
265 && stats.count > 0
266 {
267 log::debug!(
268 "[BundleManager] Pre-loaded mempool: {} operations for bundle {} ({:.3}ms)",
269 stats.count,
270 stats.target_bundle,
271 mempool_preload_ms
272 );
273 }
274 }
275 }
276
277 let total_elapsed = init_start.elapsed();
278 let total_elapsed_ms = total_elapsed.as_secs_f64() * 1000.0;
279 log::debug!(
280 "[BundleManager] BundleManager initialized successfully ({:.3}ms total)",
281 total_elapsed_ms
282 );
283 Ok(manager)
284 }
285
286 /// Ensure DID index is loaded (lazy initialization)
287 pub fn ensure_did_index(&self) -> Result<()> {
288 let mut did_index_guard = self.did_index.write().unwrap();
289 if did_index_guard.is_none() {
290 let did_index_start = std::time::Instant::now();
291 log::debug!("[BundleManager] Loading DID index...");
292 let did_index = did_index::Manager::new(self.directory.clone())?;
293 let did_index_elapsed = did_index_start.elapsed();
294 let did_index_elapsed_ms = did_index_elapsed.as_secs_f64() * 1000.0;
295 log::debug!(
296 "[BundleManager] DID index loaded ({:.3}ms)",
297 did_index_elapsed_ms
298 );
299 *did_index_guard = Some(did_index);
300 }
301 Ok(())
302 }
303
304 /// Get a clone of the verbose state Arc for external access
305 pub fn verbose_handle(&self) -> Arc<Mutex<bool>> {
306 self.verbose.clone()
307 }
308
309 // === Smart Loading ===
310 /// Load a bundle's operations with optional caching and filtering
311 ///
312 /// Uses on-disk compressed JSONL and supports frame-based random access
313 /// when available, falling back to sequential scan for legacy bundles.
314 ///
315 /// # Arguments
316 /// - `num` Bundle number to load
317 /// - `options` Loading options (cache, decompress, filter, limit)
318 ///
319 /// # Returns
320 /// A `LoadResult` containing operations and optional metadata
321 pub fn load_bundle(&self, num: u32, options: LoadOptions) -> Result<LoadResult> {
322 self.stats.write().unwrap().bundles_loaded += 1;
323
324 let bundle_path = constants::bundle_path(&self.directory, num);
325 let operations = self.load_bundle_from_disk(&bundle_path)?;
326
327 Ok(self.filter_load_result(operations, &options))
328 }
329
330 // === Single Operation Access ===
331
332 /// Get a single operation as raw JSON (fastest, preserves field order)
333 ///
334 /// This method uses frame-based access for efficient random reads.
335 /// Falls back to legacy sequential scan if no frame index is available.
336 pub fn get_operation_raw(&self, bundle_num: u32, position: usize) -> Result<String> {
337 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
338
339 if !bundle_path.exists() {
340 anyhow::bail!("Bundle {} not found", bundle_num);
341 }
342
343 // Try frame-based access first (new format)
344 match self.get_operation_raw_with_frames(&bundle_path, position) {
345 Ok(json) => Ok(json),
346 Err(e) => {
347 // Fall back to legacy sequential scan
348 // This happens for old bundles without frame index
349 if let Ok(json) = self.get_operation_raw_legacy(&bundle_path, position) {
350 Ok(json)
351 } else {
352 Err(e)
353 }
354 }
355 }
356 }
357
358 /// Frame-based operation access (new format with metadata)
359 fn get_operation_raw_with_frames(
360 &self,
361 bundle_path: &std::path::Path,
362 position: usize,
363 ) -> Result<String> {
364 use crate::bundle_format;
365 use std::io::{Read, Seek, SeekFrom};
366
367 // Open file and read actual metadata frame size
368 let mut file = std::fs::File::open(bundle_path)?;
369
370 // Read magic (4 bytes)
371 let mut magic_buf = [0u8; 4];
372 file.read_exact(&mut magic_buf)?;
373 let magic = u32::from_le_bytes(magic_buf);
374
375 if magic != bundle_format::SKIPPABLE_MAGIC_METADATA {
376 anyhow::bail!("No metadata frame at start of bundle");
377 }
378
379 // Read frame size (4 bytes)
380 let mut size_buf = [0u8; 4];
381 file.read_exact(&mut size_buf)?;
382 let frame_data_size = u32::from_le_bytes(size_buf) as i64;
383
384 // Metadata frame total size = magic(4) + size(4) + data
385 let metadata_frame_size = 8 + frame_data_size;
386
387 // Read the actual metadata
388 let mut metadata_data = vec![0u8; frame_data_size as usize];
389 file.read_exact(&mut metadata_data)?;
390 let metadata: bundle_format::BundleMetadata = sonic_rs::from_slice(&metadata_data)?;
391
392 if metadata.frame_offsets.is_empty() {
393 anyhow::bail!("No frame offsets in metadata");
394 }
395
396 // Now seek back to start and use the frame-based loader
397 file.seek(SeekFrom::Start(0))?;
398 bundle_format::load_operation_at_position(
399 &mut file,
400 position,
401 &metadata.frame_offsets,
402 metadata_frame_size,
403 )
404 }
405
406 /// Legacy sequential scan (for old bundles without frame index)
407 fn get_operation_raw_legacy(
408 &self,
409 bundle_path: &std::path::Path,
410 position: usize,
411 ) -> Result<String> {
412 let file = std::fs::File::open(bundle_path)?;
413 let decoder = zstd::Decoder::new(file)?;
414 let reader = std::io::BufReader::new(decoder);
415
416 use std::io::BufRead;
417
418 for (idx, line_result) in reader.lines().enumerate() {
419 if idx == position {
420 return Ok(line_result?);
421 }
422 }
423
424 anyhow::bail!("Operation position {} out of bounds", position)
425 }
426
427 /// Get a single operation as parsed struct
428 ///
429 /// This method retrieves the raw JSON and parses it into an Operation struct.
430 /// Uses sonic_rs for parsing (not serde) to avoid compatibility issues.
431 /// Use `get_operation_raw()` if you only need the JSON.
432 pub fn get_operation(&self, bundle_num: u32, position: usize) -> Result<Operation> {
433 let json = self.get_operation_raw(bundle_num, position)?;
434 let op = Operation::from_json(&json).with_context(|| {
435 format!(
436 "Failed to parse operation JSON (bundle {}, position {})",
437 bundle_num, position
438 )
439 })?;
440 Ok(op)
441 }
442
443 /// Get operation with timing statistics (for CLI verbose mode)
444 pub fn get_operation_with_stats(
445 &self,
446 bundle_num: u32,
447 position: usize,
448 ) -> Result<OperationResult> {
449 let start = std::time::Instant::now();
450 let json = self.get_operation_raw(bundle_num, position)?;
451 let duration = start.elapsed();
452
453 // Update stats
454 {
455 let mut stats = self.stats.write().unwrap();
456 stats.operations_read += 1;
457 }
458
459 Ok(OperationResult {
460 raw_json: json.clone(),
461 size_bytes: json.len(),
462 load_duration: duration,
463 })
464 }
465
466 // === Batch Operations ===
467 /// Batch fetch operations across multiple bundles using match requests
468 ///
469 /// Groups requests by bundle for efficient loading and returns
470 /// operations that match each request's optional filter.
471 pub fn get_operations_batch(&self, requests: Vec<OperationRequest>) -> Result<Vec<Operation>> {
472 let mut results = Vec::new();
473
474 let mut by_bundle: HashMap<u32, Vec<&OperationRequest>> = HashMap::new();
475 for req in &requests {
476 by_bundle.entry(req.bundle).or_default().push(req);
477 }
478
479 for (bundle_num, reqs) in by_bundle {
480 let load_result = self.load_bundle(bundle_num, LoadOptions::default())?;
481
482 for req in reqs {
483 for op in &load_result.operations {
484 if self.matches_request(op, req) {
485 results.push(op.clone());
486 }
487 }
488 }
489 }
490
491 Ok(results)
492 }
493
494 /// Create an iterator over operations across a bundle range
495 ///
496 /// Returns a `RangeIterator` that lazily loads bundles between
497 /// `start` and `end` and yields operations optionally filtered.
498 pub fn get_operations_range(
499 &self,
500 start: u32,
501 end: u32,
502 filter: Option<OperationFilter>,
503 ) -> RangeIterator {
504 RangeIterator::new(Arc::new(self.clone_for_arc()), start, end, filter)
505 }
506
507 // === DID Operations ===
508 /// Get all operations for a DID from both bundles and mempool
509 ///
510 /// # Arguments
511 /// * `did` - The DID to look up
512 /// * `include_locations` - If true, also return operations with location information
513 /// * `include_stats` - If true, include detailed lookup statistics
514 pub fn get_did_operations(
515 &self,
516 did: &str,
517 include_locations: bool,
518 include_stats: bool,
519 ) -> Result<DIDOperationsResult> {
520 use chrono::DateTime;
521 use std::time::Instant;
522
523 self.ensure_did_index()?;
524
525 let index_start = Instant::now();
526 let (locations, shard_stats, shard_num, lookup_timings) = if include_stats {
527 let did_index = self.did_index.read().unwrap();
528 did_index
529 .as_ref()
530 .unwrap()
531 .get_did_locations_with_stats(did)?
532 } else {
533 let did_index = self.did_index.read().unwrap();
534 let locs = did_index.as_ref().unwrap().get_did_locations(did)?;
535 (
536 locs,
537 did_index::DIDLookupStats::default(),
538 0,
539 did_index::DIDLookupTimings::default(),
540 )
541 };
542 let _index_time = index_start.elapsed();
543
544 // Get operations from bundles
545 let (bundle_ops_with_loc, load_time) = self.collect_operations_for_locations(&locations)?;
546 let mut bundle_operations: Vec<Operation> = bundle_ops_with_loc
547 .iter()
548 .map(|owl| owl.operation.clone())
549 .collect();
550
551 // Get operations from mempool (only once)
552 let (mempool_ops, _mempool_load_time) = self.get_did_operations_from_mempool(did)?;
553
554 // Merge bundle and mempool operations
555 bundle_operations.extend(mempool_ops.clone());
556
557 // Sort by created_at timestamp
558 bundle_operations.sort_by(|a, b| {
559 let time_a = DateTime::parse_from_rfc3339(&a.created_at)
560 .unwrap_or_else(|_| DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap());
561 let time_b = DateTime::parse_from_rfc3339(&b.created_at)
562 .unwrap_or_else(|_| DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap());
563 time_a.cmp(&time_b)
564 });
565
566 // Build operations_with_locations if requested
567 let operations_with_locations = if include_locations {
568 let mut ops_with_loc = bundle_ops_with_loc;
569
570 // Add mempool operations with bundle=0
571 for (idx, op) in mempool_ops.iter().enumerate() {
572 ops_with_loc.push(OperationWithLocation {
573 operation: op.clone(),
574 bundle: 0, // Use 0 to indicate mempool
575 position: idx,
576 nullified: op.nullified,
577 });
578 }
579
580 // Sort all operations by timestamp
581 ops_with_loc.sort_by(|a, b| {
582 let time_a =
583 DateTime::parse_from_rfc3339(&a.operation.created_at).unwrap_or_else(|_| {
584 DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap()
585 });
586 let time_b =
587 DateTime::parse_from_rfc3339(&b.operation.created_at).unwrap_or_else(|_| {
588 DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap()
589 });
590 time_a.cmp(&time_b)
591 });
592
593 Some(ops_with_loc)
594 } else {
595 None
596 };
597
598 Ok(DIDOperationsResult {
599 operations: bundle_operations,
600 operations_with_locations,
601 stats: if include_stats {
602 Some(shard_stats)
603 } else {
604 None
605 },
606 shard_num: if include_stats { Some(shard_num) } else { None },
607 lookup_timings: if include_stats {
608 Some(lookup_timings)
609 } else {
610 None
611 },
612 load_time: if include_stats { Some(load_time) } else { None },
613 })
614 }
615
616 /// Sample random DIDs directly from the DID index without reading bundles.
617 pub fn sample_random_dids(&self, count: usize, seed: Option<u64>) -> Result<Vec<String>> {
618 self.ensure_did_index()?;
619 let did_index = self.did_index.read().unwrap();
620 did_index.as_ref().unwrap().sample_random_dids(count, seed)
621 }
622
623 /// Get DID operations from mempool (internal helper)
624 /// Mempool should be preloaded at initialization, so this is just a fast in-memory lookup
625 /// Returns (operations, load_time) where load_time is always ZERO (no lazy loading)
626 fn get_did_operations_from_mempool(
627 &self,
628 did: &str,
629 ) -> Result<(Vec<Operation>, std::time::Duration)> {
630 use std::time::Instant;
631
632 let mempool_start = Instant::now();
633
634 // Mempool should be preloaded at initialization (no lazy loading)
635 let mempool_guard = self.mempool.read().unwrap();
636 match mempool_guard.as_ref() {
637 Some(mp) => {
638 // Mempool is initialized, use it directly (fast HashMap lookup)
639 let ops = mp.find_did_operations(did);
640 let mempool_elapsed = mempool_start.elapsed();
641 log::debug!(
642 "[Mempool] Found {} operations for DID {} in {:?}",
643 ops.len(),
644 did,
645 mempool_elapsed
646 );
647 Ok((ops, std::time::Duration::ZERO))
648 }
649 None => {
650 // Mempool not initialized (wasn't preloaded and doesn't exist)
651 let mempool_elapsed = mempool_start.elapsed();
652 log::debug!(
653 "[Mempool] No mempool initialized (checked in {:?})",
654 mempool_elapsed
655 );
656 Ok((Vec::new(), std::time::Duration::ZERO))
657 }
658 }
659 }
660
661 fn get_latest_did_operation_from_mempool(
662 &self,
663 did: &str,
664 ) -> Result<(Option<(Operation, usize)>, std::time::Duration)> {
665 use std::time::Instant;
666
667 let mempool_start = Instant::now();
668
669 // Mempool should be preloaded at initialization (no lazy loading)
670 let mempool_guard = self.mempool.read().unwrap();
671 let result = match mempool_guard.as_ref() {
672 Some(mp) => {
673 // Use mempool's method to find latest non-nullified operation (by index, operations are sorted)
674 mp.find_latest_did_operation(did)
675 }
676 None => {
677 // Mempool not initialized
678 None
679 }
680 };
681
682 let mempool_elapsed = mempool_start.elapsed();
683 log::debug!(
684 "[Mempool] Latest operation lookup for DID {} in {:?}",
685 did,
686 mempool_elapsed
687 );
688
689 Ok((result, std::time::Duration::ZERO))
690 }
691
692 /// Resolve DID to current W3C DID Document with detailed timing statistics
693 /// Returns the latest non-nullified DID document.
694 /// If mempool has operations, uses the latest from mempool and skips bundle/index lookup.
695 pub fn resolve_did(&self, did: &str) -> Result<ResolveResult> {
696 use chrono::DateTime;
697 use std::time::Instant;
698
699 let total_start = Instant::now();
700
701 // Validate DID format
702 crate::resolver::validate_did_format(did)?;
703
704 // Check mempool first (most recent operations)
705 log::debug!("[Resolve] Checking mempool first for DID: {}", did);
706 let mempool_start = Instant::now();
707 let (latest_mempool_op, mempool_load_time) =
708 self.get_latest_did_operation_from_mempool(did)?;
709 let mempool_time = mempool_start.elapsed();
710 log::debug!(
711 "[Resolve] Mempool check: found latest operation in {:?} (load: {:?})",
712 mempool_time,
713 mempool_load_time
714 );
715
716 // If mempool has a non-nullified operation, use it and skip bundle lookup
717 if let Some((operation, position)) = latest_mempool_op {
718 let load_start = Instant::now();
719 log::debug!(
720 "[Resolve] Found latest non-nullified operation in mempool, skipping bundle lookup"
721 );
722
723 // Build document from latest mempool operation
724 let document =
725 crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?;
726 let load_time = load_start.elapsed();
727
728 return Ok(ResolveResult {
729 document,
730 operation,
731 bundle_number: 0, // bundle=0 for mempool
732 position,
733 mempool_time,
734 mempool_load_time,
735 index_time: std::time::Duration::ZERO,
736 load_time,
737 total_time: total_start.elapsed(),
738 locations_found: 1, // Found one operation in mempool
739 shard_num: 0, // No shard for mempool
740 shard_stats: None,
741 lookup_timings: None,
742 });
743 }
744
745 // Mempool is empty or all nullified - check bundles
746 log::debug!(
747 "[Resolve] No non-nullified operations in mempool, checking bundles for DID: {}",
748 did
749 );
750 self.ensure_did_index()?;
751 let index_start = Instant::now();
752 let did_index = self.did_index.read().unwrap();
753 let (locations, shard_stats, shard_num, lookup_timings) = did_index
754 .as_ref()
755 .unwrap()
756 .get_did_locations_with_stats(did)?;
757 let index_time = index_start.elapsed();
758 log::debug!(
759 "[Resolve] Bundle index lookup: {} locations found in {:?}",
760 locations.len(),
761 index_time
762 );
763
764 // Find latest non-nullified operation from bundles
765 let load_start = Instant::now();
766 let mut latest_operation: Option<(Operation, u32, usize)> = None;
767 let mut latest_time = DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap();
768
769 for loc in &locations {
770 if !loc.nullified()
771 && let Ok(op) = self.get_operation(loc.bundle() as u32, loc.position() as usize)
772 && let Ok(op_time) = DateTime::parse_from_rfc3339(&op.created_at)
773 && op_time > latest_time
774 {
775 latest_time = op_time;
776 latest_operation = Some((op, loc.bundle() as u32, loc.position() as usize));
777 }
778 }
779 let load_time = load_start.elapsed();
780
781 let (operation, bundle_number, position) = latest_operation.ok_or_else(|| {
782 anyhow::anyhow!("DID not found: {} (checked bundles and mempool)", did)
783 })?;
784
785 // Build document from latest bundle operation
786 let document =
787 crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?;
788
789 Ok(ResolveResult {
790 document,
791 operation: operation.clone(),
792 bundle_number,
793 position,
794 mempool_time,
795 mempool_load_time,
796 index_time,
797 load_time,
798 total_time: total_start.elapsed(),
799 locations_found: locations.len(),
800 shard_num,
801 shard_stats: Some(shard_stats),
802 lookup_timings: Some(lookup_timings),
803 })
804 }
805
806 fn collect_operations_for_locations(
807 &self,
808 locations: &[did_index::OpLocation],
809 ) -> Result<(Vec<OperationWithLocation>, std::time::Duration)> {
810 use std::time::Instant;
811
812 let load_start = Instant::now();
813 let mut ops_with_loc = Vec::with_capacity(locations.len());
814 for loc in locations {
815 let bundle_num = loc.bundle() as u32;
816 let position = loc.position() as usize;
817
818 match self.get_operation(bundle_num, position) {
819 Ok(op) => {
820 ops_with_loc.push(OperationWithLocation {
821 operation: op,
822 bundle: bundle_num,
823 position,
824 nullified: loc.nullified(),
825 });
826 }
827 Err(e) => {
828 log::warn!(
829 "Failed to load operation at bundle {} position {}: {}",
830 bundle_num,
831 position,
832 e
833 );
834 }
835 }
836 }
837
838 ops_with_loc.sort_by_key(|owl| bundle_position_to_global(owl.bundle, owl.position));
839
840 Ok((ops_with_loc, load_start.elapsed()))
841 }
842
843 /// Resolve multiple DIDs to their operations (bundles + mempool)
844 ///
845 /// Returns a map of DID → operations, without location metadata or stats.
846 pub fn batch_resolve_dids(&self, dids: Vec<String>) -> Result<HashMap<String, Vec<Operation>>> {
847 let mut results = HashMap::new();
848
849 for did in dids {
850 let result = self.get_did_operations(&did, false, false)?;
851 results.insert(did, result.operations);
852 }
853
854 Ok(results)
855 }
856
857 // === Query/Export ===
858 /// Execute a query over bundles with optional filters and modes
859 ///
860 /// Increments internal stats and returns a `QueryIterator` that yields
861 /// serialized records matching the query specification.
862 pub fn query(&self, spec: QuerySpec) -> QueryIterator {
863 self.stats.write().unwrap().queries_executed += 1;
864 QueryIterator::new(Arc::new(self.clone_for_arc()), spec)
865 }
866
867 /// Create an export iterator for streaming results to a sink
868 ///
869 /// Supports JSONL format.
870 pub fn export(&self, spec: ExportSpec) -> ExportIterator {
871 ExportIterator::new(Arc::new(self.clone_for_arc()), spec)
872 }
873
874 /// Export results to a provided writer factory and return statistics
875 ///
876 /// The `writer_fn` is invoked to obtain a fresh `Write` for streaming.
877 pub fn export_to_writer<F>(&self, spec: ExportSpec, mut writer_fn: F) -> Result<ExportStats>
878 where
879 F: FnMut() -> Box<dyn Write>,
880 {
881 let mut writer = writer_fn();
882 let mut stats = ExportStats::default();
883
884 for item in self.export(spec) {
885 let data = item?;
886 writer.write_all(data.as_bytes())?;
887 writer.write_all(b"\n")?;
888 stats.records_written += 1;
889 stats.bytes_written += data.len() as u64 + 1;
890 }
891
892 Ok(stats)
893 }
894
895 // === Verification ===
896 /// Verify a single bundle's integrity and metadata
897 pub fn verify_bundle(&self, num: u32, spec: VerifySpec) -> Result<VerifyResult> {
898 let index = self.index.read().unwrap();
899 let metadata = index
900 .get_bundle(num)
901 .ok_or_else(|| anyhow::anyhow!("Bundle {} not in index", num))?;
902
903 verification::verify_bundle(&self.directory, metadata, spec)
904 }
905
906 /// Verify chain linkage and optional parent relationships across bundles
907 pub fn verify_chain(&self, spec: ChainVerifySpec) -> Result<ChainVerifyResult> {
908 verification::verify_chain(&self.directory, &self.index.read().unwrap(), spec)
909 }
910
911 // === Multi-info ===
912 /// Get consolidated bundle information with optional operation and size details
913 pub fn get_bundle_info(&self, num: u32, flags: InfoFlags) -> Result<BundleInfo> {
914 let index = self.index.read().unwrap();
915 let metadata = index
916 .get_bundle(num)
917 .ok_or_else(|| anyhow::anyhow!("Bundle {} not found", num))?;
918
919 let mut info = BundleInfo {
920 metadata: metadata.clone(),
921 exists: constants::bundle_path(&self.directory, num).exists(),
922 operations: None,
923 size_info: None,
924 };
925
926 if flags.include_operations {
927 let result = self.load_bundle(num, LoadOptions::default())?;
928 info.operations = Some(result.operations);
929 }
930
931 if flags.include_size_info {
932 info.size_info = Some(SizeInfo {
933 compressed: metadata.compressed_size,
934 uncompressed: metadata.uncompressed_size,
935 });
936 }
937
938 Ok(info)
939 }
940
941 // === Rollback ===
942 /// Plan a rollback to a target bundle and report estimated impact
943 pub fn rollback_plan(&self, spec: RollbackSpec) -> Result<RollbackPlan> {
944 let affected_bundles: Vec<u32> = (spec.target_bundle..=self.get_last_bundle()).collect();
945
946 let mut affected_operations = 0;
947 let mut affected_dids = std::collections::HashSet::new();
948
949 for bundle_num in &affected_bundles {
950 if let Ok(result) = self.load_bundle(*bundle_num, LoadOptions::default()) {
951 affected_operations += result.operations.len();
952 for op in result.operations {
953 affected_dids.insert(op.did);
954 }
955 }
956 }
957
958 Ok(RollbackPlan {
959 target_bundle: spec.target_bundle,
960 affected_bundles: affected_bundles.clone(), // Clone here
961 affected_operations,
962 affected_dids: affected_dids.len(),
963 estimated_time_ms: affected_bundles.len() as u64 * 10,
964 })
965 }
966
967 /// Execute a rollback to the target bundle, optionally as a dry run
968 pub fn rollback(&self, spec: RollbackSpec) -> Result<RollbackResult> {
969 let plan = self.rollback_plan(spec.clone())?;
970
971 if spec.dry_run {
972 return Ok(RollbackResult {
973 success: true,
974 bundles_removed: 0,
975 plan: Some(plan),
976 });
977 }
978
979 for bundle_num in &plan.affected_bundles {
980 let path = constants::bundle_path(&self.directory, *bundle_num);
981 if path.exists() {
982 std::fs::remove_file(path)?;
983 }
984 }
985
986 let mut index = self.index.write().unwrap();
987 index.last_bundle = spec.target_bundle;
988 index
989 .bundles
990 .retain(|b| b.bundle_number <= spec.target_bundle);
991
992 // Use default flush interval for rollback
993 self.build_did_index(
994 crate::constants::DID_INDEX_FLUSH_INTERVAL,
995 None::<fn(u32, u32, u64, u64)>,
996 None,
997 None,
998 )?;
999
1000 Ok(RollbackResult {
1001 success: true,
1002 bundles_removed: plan.affected_bundles.len(),
1003 plan: Some(plan),
1004 })
1005 }
1006
1007 // === Cache Hints ===
1008 pub fn prefetch_bundles(&self, _nums: Vec<u32>) -> Result<()> { Ok(()) }
1009
1010 /// Preload specified bundles into the cache for faster subsequent access
1011 pub fn warm_up(&self, _spec: WarmUpSpec) -> Result<()> { Ok(()) }
1012
1013 // === DID Index ===
1014 pub fn build_did_index<F>(
1015 &self,
1016 flush_interval: u32,
1017 progress_cb: Option<F>,
1018 num_threads: Option<usize>,
1019 interrupted: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
1020 ) -> Result<RebuildStats>
1021 where
1022 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes)
1023 {
1024 use std::time::Instant;
1025
1026 let actual_threads = num_threads.unwrap_or(0); // 0 = auto-detect
1027
1028 let last_bundle = self.get_last_bundle();
1029 let mut stats = RebuildStats::default();
1030
1031 // Create new index (this clears any existing index)
1032 let new_index = did_index::Manager::new(self.directory.clone())?;
1033 *self.did_index.write().unwrap() = Some(new_index);
1034
1035 self.ensure_did_index()?;
1036
1037 // Get total uncompressed size for progress tracking
1038 let index = self.index.read().unwrap();
1039 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect();
1040 let total_uncompressed_bytes = index.total_uncompressed_size_for_bundles(&bundle_numbers);
1041 drop(index);
1042
1043 eprintln!("\n📦 Building DID Index");
1044 eprintln!(" Strategy: Streaming (memory-efficient)");
1045 eprintln!(" Bundles: {}", last_bundle);
1046 if flush_interval > 0 {
1047 if flush_interval == crate::constants::DID_INDEX_FLUSH_INTERVAL {
1048 // Default value - show with tuning hint
1049 eprintln!(
1050 " Flush: Every {} bundles (tune with --flush-interval)",
1051 flush_interval
1052 );
1053 } else {
1054 // Non-default value - show with tuning hint
1055 eprintln!(
1056 " Flush: {} bundles (you can tune with --flush-interval)",
1057 flush_interval
1058 );
1059 }
1060 } else {
1061 eprintln!(" Flush: Only at end (maximum memory usage)");
1062 }
1063 eprintln!();
1064 eprintln!("📊 Stage 1: Processing bundles...");
1065
1066 let build_start = Instant::now();
1067
1068 // Call the streaming build method in did_index
1069 let (total_operations, _bundles_processed, stage1_duration, stage2_duration) = {
1070 let did_index_guard = self.did_index.read().unwrap();
1071 if let Some(ref idx) = *did_index_guard {
1072 idx.build_from_scratch(
1073 &self.directory,
1074 last_bundle,
1075 flush_interval,
1076 progress_cb.map(|cb| {
1077 move |current: u32, total: u32, bytes: u64, stage: Option<String>| {
1078 // Always call the callback - let it handle stage detection
1079 // For stage 1, use bytes tracking; for stage 2, use shard count
1080 if let Some(ref stage_name) = stage {
1081 if stage_name.contains("Stage 2") {
1082 // For consolidation, we don't have byte tracking, so just pass 0
1083 // The progress bar will show shard progress
1084 cb(current, total, 0, total_uncompressed_bytes);
1085 } else {
1086 // Stage 1 or unknown - use bytes
1087 cb(current, total, bytes, total_uncompressed_bytes);
1088 }
1089 } else {
1090 // Fallback for backward compatibility
1091 cb(current, total, bytes, total_uncompressed_bytes);
1092 }
1093 }
1094 }),
1095 actual_threads,
1096 interrupted,
1097 )?
1098 } else {
1099 return Err(anyhow::anyhow!("DID index not initialized"));
1100 }
1101 };
1102
1103 stats.bundles_processed = last_bundle;
1104 stats.operations_indexed = total_operations;
1105
1106 let total_duration = build_start.elapsed();
1107
1108 eprintln!("\n");
1109 eprintln!("✅ Index Build Complete");
1110 eprintln!(
1111 " Time: {:.1}s (Stage 1: {:.1}s, Stage 2: {:.1}s)",
1112 total_duration.as_secs_f64(),
1113 stage1_duration.as_secs_f64(),
1114 stage2_duration.as_secs_f64()
1115 );
1116 eprintln!(
1117 " Operations: {}",
1118 crate::format::format_number(total_operations)
1119 );
1120
1121 // Get final stats
1122 let final_stats = self.get_did_index_stats();
1123 let total_dids = final_stats
1124 .get("total_dids")
1125 .and_then(|v| v.as_i64())
1126 .unwrap_or(0);
1127
1128 eprintln!(
1129 " Total DIDs: {}",
1130 crate::format::format_number(total_dids as u64)
1131 );
1132
1133 Ok(stats)
1134 }
1135
1136 /// Get DID index statistics as a JSON-compatible map
1137 ///
1138 /// Returns keys like `exists`, `total_dids`, `last_bundle`, `delta_segments`, `shard_count` when available.
1139 pub fn get_did_index_stats(&self) -> HashMap<String, serde_json::Value> {
1140 self.ensure_did_index().ok(); // Stats might be called even if index doesn't exist
1141 self.did_index
1142 .read()
1143 .unwrap()
1144 .as_ref()
1145 .map(|idx| idx.get_stats())
1146 .unwrap_or_default()
1147 }
1148
1149 /// Get DID index stats as struct (legacy format)
1150 pub fn get_did_index_stats_struct(&self) -> DIDIndexStats {
1151 let stats_map = self.get_did_index_stats();
1152
1153 // Convert to old format
1154 DIDIndexStats {
1155 total_dids: stats_map
1156 .get("total_dids")
1157 .and_then(|v| v.as_i64())
1158 .unwrap_or(0) as usize,
1159 total_entries: 0, // Not tracked in new version
1160 avg_operations_per_did: 0.0, // Not tracked in new version
1161 }
1162 }
1163
1164 pub fn get_did_index(&self) -> Arc<RwLock<Option<did_index::Manager>>> {
1165 Arc::clone(&self.did_index)
1166 }
1167
1168 /// Verify DID index and return detailed result
1169 ///
1170 /// Performs standard integrity check by default. If `full` is true, also rebuilds
1171 /// the index in a temporary directory and compares with the existing index.
1172 ///
1173 /// For server startup checks, call with `full=false` and check `verify_result.missing_base_shards`
1174 /// and `verify_result.missing_delta_segments` to determine if the index is corrupted.
1175 pub fn verify_did_index<F>(
1176 &self,
1177 verbose: bool,
1178 flush_interval: u32,
1179 full: bool,
1180 progress_callback: Option<F>,
1181 ) -> Result<did_index::VerifyResult>
1182 where
1183 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes)
1184 {
1185 self.ensure_did_index()?;
1186
1187 let did_index = self.did_index.read().unwrap();
1188 let idx = did_index
1189 .as_ref()
1190 .ok_or_else(|| anyhow::anyhow!("DID index not initialized"))?;
1191
1192 let last_bundle = self.get_last_bundle();
1193 let mut verify_result = idx.verify_integrity(last_bundle)?;
1194
1195 // If full verification requested, rebuild and compare
1196 if full {
1197 // Adapt callback for build_from_scratch which expects Option<String> as 4th param
1198 let build_callback = progress_callback.map(|cb| {
1199 move |current: u32, total: u32, bytes: u64, _stage: Option<String>| {
1200 cb(current, total, bytes, bytes); // Use bytes as total_bytes for now
1201 }
1202 });
1203 let rebuild_result = idx.verify_full(
1204 self.directory(),
1205 last_bundle,
1206 verbose,
1207 flush_interval,
1208 build_callback,
1209 )?;
1210 verify_result.errors += rebuild_result.errors;
1211 verify_result
1212 .error_categories
1213 .extend(rebuild_result.error_categories);
1214 }
1215
1216 Ok(verify_result)
1217 }
1218
1219 /// Repair DID index - intelligently rebuilds or updates as needed
1220 pub fn repair_did_index<F>(
1221 &self,
1222 num_threads: usize,
1223 flush_interval: u32,
1224 progress_callback: Option<F>,
1225 ) -> Result<did_index::RepairResult>
1226 where
1227 F: Fn(u32, u32, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes)
1228 {
1229 self.ensure_did_index()?;
1230
1231 let last_bundle = self.get_last_bundle();
1232
1233 // Create bundle loader closure
1234 let bundle_loader = |bundle_num: u32| -> Result<Vec<(String, bool)>> {
1235 let result = self.load_bundle(bundle_num, LoadOptions::default())?;
1236 Ok(result
1237 .operations
1238 .iter()
1239 .map(|op| (op.did.clone(), op.nullified))
1240 .collect())
1241 };
1242
1243 let mut did_index = self.did_index.write().unwrap();
1244 let idx = did_index
1245 .as_mut()
1246 .ok_or_else(|| anyhow::anyhow!("DID index not initialized"))?;
1247
1248 let mut repair_result = idx.repair(last_bundle, bundle_loader)?;
1249
1250 // If repair indicates full rebuild is needed, do it
1251 if repair_result.repaired && repair_result.bundles_processed == 0 {
1252 drop(did_index);
1253
1254 // Adapt callback signature for build_did_index
1255 let build_callback = progress_callback.map(|cb| {
1256 move |current: u32, total: u32, bytes: u64, total_bytes: u64| {
1257 cb(current, total, bytes, total_bytes);
1258 }
1259 });
1260 self.build_did_index(flush_interval, build_callback, Some(num_threads), None)?;
1261
1262 repair_result.bundles_processed = last_bundle;
1263 }
1264
1265 Ok(repair_result)
1266 }
1267
1268 // === Observability ===
1269 pub fn get_stats(&self) -> ManagerStats {
1270 self.stats.read().unwrap().clone()
1271 }
1272
1273 pub fn clear_caches(&self) {
1274 self.stats.write().unwrap().cache_hits = 0;
1275 self.stats.write().unwrap().cache_misses = 0;
1276 }
1277
1278 // === Mempool Management ===
1279
1280 /// Check if the mempool is loaded (does not load it)
1281 ///
1282 /// Returns `Ok(())` if mempool is loaded, error otherwise.
1283 pub fn get_mempool(&self) -> Result<()> {
1284 let mempool_guard = self.mempool.read().unwrap();
1285 if mempool_guard.is_some() {
1286 Ok(())
1287 } else {
1288 anyhow::bail!("Mempool not loaded. Call load_mempool() first.")
1289 }
1290 }
1291
1292 /// Explicitly load mempool from disk (or create empty if file doesn't exist)
1293 ///
1294 /// Intended for initialization/preload, not lazy loading.
1295 pub fn load_mempool(&self) -> Result<()> {
1296 // Check if already loaded
1297 {
1298 let mempool_guard = self.mempool.read().unwrap();
1299 if mempool_guard.is_some() {
1300 return Ok(()); // Already loaded
1301 }
1302 }
1303
1304 // Acquire write lock to load
1305 let mut mempool_guard = self.mempool.write().unwrap();
1306
1307 // Double-check after acquiring write lock
1308 if mempool_guard.is_some() {
1309 return Ok(());
1310 }
1311
1312 // Load mempool
1313 let last_bundle = self.get_last_bundle();
1314 let target_bundle = last_bundle + 1;
1315
1316 // Get min timestamp from last bundle's last operation
1317 let min_timestamp = self.get_last_bundle_timestamp()?;
1318
1319 // Mempool::new will check if file exists and load it if it does
1320 // If file doesn't exist, it creates an empty mempool
1321 match mempool::Mempool::new(
1322 &self.directory,
1323 target_bundle,
1324 min_timestamp,
1325 *self.verbose.lock().unwrap(),
1326 ) {
1327 Ok(mp) => {
1328 // Mempool loaded (either from file or empty)
1329 *mempool_guard = Some(mp);
1330 *self.mempool_checked.write().unwrap() = true;
1331 }
1332 Err(e) => {
1333 // Mempool file doesn't exist or error loading
1334 // Mark as checked so we don't try again
1335 *self.mempool_checked.write().unwrap() = true;
1336 // Return error only if it's not a "file not found" type error
1337 if e.to_string().contains("No such file") || e.to_string().contains("not found") {
1338 // File doesn't exist, that's fine - just return Ok with None mempool
1339 return Ok(());
1340 }
1341 return Err(e);
1342 }
1343 }
1344
1345 Ok(())
1346 }
1347
1348 /// Get mempool statistics including counts and time bounds
1349 pub fn get_mempool_stats(&self) -> Result<mempool::MempoolStats> {
1350 let mempool_guard = self.mempool.read().unwrap();
1351
1352 match mempool_guard.as_ref() {
1353 Some(mp) => Ok(mp.stats()),
1354 None => {
1355 // Return empty stats if no mempool
1356 let last_bundle = self.get_last_bundle();
1357 let min_timestamp = self.get_last_bundle_timestamp()?;
1358 Ok(mempool::MempoolStats {
1359 count: 0,
1360 can_create_bundle: false,
1361 target_bundle: last_bundle + 1,
1362 min_timestamp,
1363 validated: false,
1364 first_time: None,
1365 last_time: None,
1366 size_bytes: None,
1367 did_count: None,
1368 })
1369 }
1370 }
1371 }
1372
1373 /// Get all operations currently in the mempool
1374 pub fn get_mempool_operations(&self) -> Result<Vec<Operation>> {
1375 let mempool_guard = self.mempool.read().unwrap();
1376
1377 match mempool_guard.as_ref() {
1378 Some(mp) => Ok(mp.get_operations().to_vec()),
1379 None => Ok(Vec::new()),
1380 }
1381 }
1382
1383 /// Clear all mempool data and remove on-disk mempool files
1384 pub fn clear_mempool(&self) -> Result<()> {
1385 let mut mempool_guard = self.mempool.write().unwrap();
1386
1387 if let Some(mp) = mempool_guard.as_mut() {
1388 mp.clear();
1389 mp.save()?;
1390 }
1391
1392 // Also delete all mempool files to prevent stale data from previous bundles
1393 if let Ok(entries) = std::fs::read_dir(&self.directory) {
1394 for entry in entries.flatten() {
1395 if let Some(name) = entry.file_name().to_str()
1396 && name.starts_with(constants::MEMPOOL_FILE_PREFIX)
1397 && name.ends_with(".jsonl")
1398 {
1399 let _ = std::fs::remove_file(entry.path());
1400 }
1401 }
1402 }
1403
1404 Ok(())
1405 }
1406
1407 /// Add operations to mempool, returning number added
1408 ///
1409 /// Mempool must be loaded first (call `load_mempool()`).
1410 pub fn add_to_mempool(&self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> {
1411 self.get_mempool()?;
1412 let mut mempool_guard = self.mempool.write().unwrap();
1413 if let Some(mp) = mempool_guard.as_mut() {
1414 let result = if collect_cids { mp.add_and_collect_cids(ops)? } else { (mp.add(ops)?, Vec::new()) };
1415 mp.save_if_needed()?;
1416 Ok(result)
1417 } else {
1418 anyhow::bail!("Mempool not initialized")
1419 }
1420 }
1421
1422 /// Get the last bundle's last operation timestamp
1423 fn get_last_bundle_timestamp(&self) -> Result<DateTime<Utc>> {
1424 let last_bundle = self.get_last_bundle();
1425
1426 if last_bundle == 0 {
1427 // No bundles yet, use epoch
1428 return Ok(DateTime::from_timestamp(0, 0).unwrap());
1429 }
1430
1431 // Load last bundle and get last operation's timestamp
1432 let result = self.load_bundle(last_bundle, LoadOptions::default())?;
1433
1434 if let Some(last_op) = result.operations.last() {
1435 let timestamp = DateTime::parse_from_rfc3339(&last_op.created_at)?.with_timezone(&Utc);
1436 Ok(timestamp)
1437 } else {
1438 // Bundle is empty (shouldn't happen), use epoch
1439 Ok(DateTime::from_timestamp(0, 0).unwrap())
1440 }
1441 }
1442
1443 // === Sync Operations ===
1444
1445 /// Validate and clean repository state before sync
1446 fn validate_sync_state(&self) -> Result<()> {
1447 let last_bundle = self.get_last_bundle();
1448 let next_bundle_num = last_bundle + 1;
1449
1450 // Check for and delete mempool files for already-completed bundles
1451 let mut found_stale_files = false;
1452 if let Ok(entries) = std::fs::read_dir(&self.directory) {
1453 for entry in entries.flatten() {
1454 if let Some(name) = entry.file_name().to_str()
1455 && name.starts_with(constants::MEMPOOL_FILE_PREFIX)
1456 && name.ends_with(".jsonl")
1457 {
1458 // Extract bundle number from filename: plc_mempool_NNNNNN.jsonl
1459 if let Some(num_str) = name
1460 .strip_prefix(constants::MEMPOOL_FILE_PREFIX)
1461 .and_then(|s| s.strip_suffix(".jsonl"))
1462 && let Ok(bundle_num) = num_str.parse::<u32>()
1463 {
1464 // Delete mempool files for completed bundles or way future bundles
1465 if bundle_num <= last_bundle || bundle_num > next_bundle_num {
1466 log::warn!("Removing stale mempool file for bundle {:06}", bundle_num);
1467 let _ = std::fs::remove_file(entry.path());
1468 found_stale_files = true;
1469 }
1470 }
1471 }
1472 }
1473 }
1474
1475 if found_stale_files {
1476 log::info!("Cleaned up stale mempool files");
1477 }
1478
1479 let mempool_stats = self.get_mempool_stats()?;
1480
1481 if mempool_stats.count == 0 {
1482 return Ok(()); // Empty mempool is always valid
1483 }
1484
1485 // Check if mempool operations are for the correct bundle
1486 let mempool_ops = self.get_mempool_operations()?;
1487 if mempool_ops.is_empty() {
1488 return Ok(());
1489 }
1490
1491 // Get the last operation from the previous bundle
1492 let last_bundle_time = if next_bundle_num > 1
1493 && let Ok(last_bundle_result) =
1494 self.load_bundle(next_bundle_num - 1, LoadOptions::default())
1495 {
1496 last_bundle_result.operations.last().and_then(|last_op| {
1497 chrono::DateTime::parse_from_rfc3339(&last_op.created_at)
1498 .ok()
1499 .map(|dt| dt.with_timezone(&chrono::Utc))
1500 })
1501 } else {
1502 None
1503 };
1504
1505 // Special case: When creating the first bundle (next_bundle_num == 1, meaning
1506 // last_bundle == 0, i.e., empty repository), any existing mempool is likely stale
1507 // from a previous sync attempt. Clear it to start fresh from the beginning.
1508 if next_bundle_num == 1 && mempool_stats.count > 0 {
1509 log::warn!(
1510 "Starting first bundle (empty repository), but mempool has {} operations",
1511 mempool_stats.count
1512 );
1513 if let Some(first_time) = mempool_stats.first_time {
1514 log::warn!(
1515 "Mempool operations start at: {}",
1516 first_time.format("%Y-%m-%d %H:%M:%S")
1517 );
1518 }
1519 log::warn!("Clearing mempool to start fresh from the beginning...");
1520 self.clear_mempool()?;
1521 return Ok(());
1522 }
1523
1524 // Check if mempool operations are chronologically valid relative to last bundle
1525 if let Some(last_time) = last_bundle_time
1526 && let Some(first_mempool_time) = mempool_stats.first_time
1527 {
1528 // Case 1: Mempool operations are BEFORE the last bundle (definitely stale)
1529 if first_mempool_time < last_time {
1530 log::warn!("Detected stale mempool data (operations before last bundle)");
1531 log::warn!(
1532 "First mempool op: {}, Last bundle op: {}",
1533 first_mempool_time.format("%Y-%m-%d %H:%M:%S"),
1534 last_time.format("%Y-%m-%d %H:%M:%S")
1535 );
1536 log::warn!("Clearing mempool to start fresh...");
1537 self.clear_mempool()?;
1538 return Ok(());
1539 }
1540
1541 // Case 2: Mempool operations are slightly after last bundle, but way too close
1542 // This indicates they're from a previous failed attempt at this bundle
1543 // BUT: Only clear if the mempool file is old (modified > 1 hour ago)
1544 // If it's recent, it might be a legitimate resume of a slow sync
1545 let time_diff = first_mempool_time.signed_duration_since(last_time);
1546 if time_diff < chrono::Duration::seconds(constants::MIN_BUNDLE_CREATION_INTERVAL_SECS)
1547 && mempool_stats.count < constants::BUNDLE_SIZE
1548 {
1549 // Check mempool file modification time
1550 let mempool_filename = format!(
1551 "{}{:06}.jsonl",
1552 constants::MEMPOOL_FILE_PREFIX,
1553 next_bundle_num
1554 );
1555 let mempool_path = self.directory.join(mempool_filename);
1556
1557 let is_stale = if let Ok(metadata) = std::fs::metadata(&mempool_path) {
1558 if let Ok(modified) = metadata.modified() {
1559 let modified_time = std::time::SystemTime::now()
1560 .duration_since(modified)
1561 .unwrap_or(std::time::Duration::from_secs(0));
1562 modified_time > std::time::Duration::from_secs(3600) // 1 hour
1563 } else {
1564 false // Can't get modified time, assume not stale
1565 }
1566 } else {
1567 false // File doesn't exist, assume not stale
1568 };
1569
1570 if is_stale {
1571 log::warn!(
1572 "Detected potentially stale mempool data (too close to last bundle timestamp)"
1573 );
1574 log::warn!(
1575 "Time difference: {}s, Operations: {}/{}",
1576 time_diff.num_seconds(),
1577 mempool_stats.count,
1578 constants::BUNDLE_SIZE
1579 );
1580 log::warn!(
1581 "This likely indicates a previous failed sync attempt. Clearing mempool..."
1582 );
1583 self.clear_mempool()?;
1584 } else if *self.verbose.lock().unwrap() {
1585 log::debug!("Mempool appears recent, allowing resume despite close timestamp");
1586 }
1587 return Ok(());
1588 }
1589 }
1590
1591 // Check if mempool has way too many operations (likely from failed previous attempt)
1592 if mempool_stats.count > constants::BUNDLE_SIZE {
1593 log::warn!(
1594 "Mempool has {} operations (expected max {})",
1595 mempool_stats.count,
1596 constants::BUNDLE_SIZE
1597 );
1598 log::warn!("This indicates a previous sync attempt failed. Clearing mempool...");
1599 self.clear_mempool()?;
1600 return Ok(());
1601 }
1602
1603 Ok(())
1604 }
1605
1606 /// Batch update DID index for a range of bundles (for initial sync optimization)
1607 ///
1608 /// IMPORTANT: This method performs heavy blocking I/O and should be called from async
1609 /// contexts using spawn_blocking to avoid freezing the async runtime (and HTTP server).
1610 pub fn batch_update_did_index(
1611 &self,
1612 start_bundle: u32,
1613 end_bundle: u32,
1614 compact: bool,
1615 ) -> Result<()> {
1616 use std::time::Instant;
1617
1618 if start_bundle > end_bundle {
1619 return Ok(());
1620 }
1621
1622 let total_start = Instant::now();
1623 let bundle_count = end_bundle - start_bundle + 1;
1624 if bundle_count > 10 {
1625 use std::time::Instant;
1626 eprintln!(
1627 "[DID Index] Rebuild triggered for {} bundles ({} → {})",
1628 bundle_count, start_bundle, end_bundle
1629 );
1630 let rebuild_start = Instant::now();
1631 let _ = self.build_did_index(
1632 crate::constants::DID_INDEX_FLUSH_INTERVAL,
1633 Some(
1634 |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| {
1635 let percent = if total_bytes > 0 {
1636 (bytes_processed as f64 / total_bytes as f64) * 100.0
1637 } else {
1638 0.0
1639 };
1640 eprintln!(
1641 "[DID Index] Rebuild progress: {}/{} ({:.1}%)",
1642 current, total, percent
1643 );
1644 },
1645 ),
1646 None,
1647 None,
1648 )?;
1649 let dur = rebuild_start.elapsed();
1650 eprintln!("[DID Index] Rebuild complete in {:.1}s", dur.as_secs_f64());
1651 return Ok(());
1652 }
1653
1654 if *self.verbose.lock().unwrap() {
1655 log::info!(
1656 "Batch updating DID index for bundles {:06} to {:06}... ({} bundles)",
1657 start_bundle,
1658 end_bundle,
1659 bundle_count
1660 );
1661 }
1662
1663 // Process bundles incrementally (avoid loading all into memory)
1664 let load_start = Instant::now();
1665 let mut total_operations = 0usize;
1666 let mut bundles_processed = 0usize;
1667
1668 // Update DID index for each bundle as we load it (memory efficient)
1669 self.ensure_did_index()?;
1670 let update_start = Instant::now();
1671 for bundle_num in start_bundle..=end_bundle {
1672 if let Ok(result) = self.load_bundle(bundle_num, LoadOptions::default()) {
1673 total_operations += result.operations.len();
1674 let operations: Vec<(String, bool)> = result
1675 .operations
1676 .iter()
1677 .map(|op| (op.did.clone(), op.nullified))
1678 .collect();
1679
1680 // Process immediately instead of accumulating
1681 let _ = self
1682 .did_index
1683 .write()
1684 .unwrap()
1685 .as_mut()
1686 .unwrap()
1687 .update_for_bundle(bundle_num, operations)?;
1688 bundles_processed += 1;
1689 }
1690 }
1691 let load_duration = load_start.elapsed();
1692 let update_duration = update_start.elapsed();
1693
1694 if bundles_processed == 0 {
1695 return Ok(());
1696 }
1697
1698 log::debug!(
1699 "[Batch DID Index] Processed {} bundles ({} operations) in {:.3}s ({:.0} ops/sec)",
1700 bundles_processed,
1701 total_operations,
1702 update_duration.as_secs_f64(),
1703 total_operations as f64 / update_duration.as_secs_f64()
1704 );
1705
1706 // Optionally compact all shards immediately to avoid leaving delta segments
1707 if compact {
1708 let idx_guard = self.did_index.read().unwrap();
1709 if let Some(idx) = idx_guard.as_ref() {
1710 idx.compact_pending_segments(None)?;
1711 }
1712 }
1713
1714 let total_duration = total_start.elapsed();
1715
1716 if *self.verbose.lock().unwrap() {
1717 log::info!(
1718 "✓ DID index updated for bundles {:06} to {:06} in {:.3}s (load={:.1}s, update={:.1}s, {:.0} ops/sec overall)",
1719 start_bundle,
1720 end_bundle,
1721 total_duration.as_secs_f64(),
1722 load_duration.as_secs_f64(),
1723 update_duration.as_secs_f64(),
1724 total_operations as f64 / total_duration.as_secs_f64()
1725 );
1726 }
1727
1728 Ok(())
1729 }
1730
1731 /// Async wrapper for batch_update_did_index that runs in a blocking task
1732 ///
1733 /// This prevents blocking the async runtime (and HTTP server) during heavy I/O operations.
1734 pub async fn batch_update_did_index_async(
1735 &self,
1736 start_bundle: u32,
1737 end_bundle: u32,
1738 compact: bool,
1739 ) -> Result<()> {
1740 let manager = self.clone_for_arc();
1741
1742 // First perform the batch update in a blocking task
1743 let _ = tokio::task::spawn_blocking(move || {
1744 manager.batch_update_did_index(start_bundle, end_bundle, compact)
1745 })
1746 .await
1747 .map_err(|e| anyhow::anyhow!("Batch DID index update task failed: {}", e))?;
1748
1749 Ok(())
1750 }
1751
1752 /// Fetch and save next bundle from PLC directory
1753 /// DID index is updated on every bundle (fast with delta segments)
1754 pub async fn sync_next_bundle(
1755 &self,
1756 client: &crate::plc_client::PLCClient,
1757 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
1758 update_did_index: bool,
1759 fetch_log: bool,
1760 safety_lag: Option<std::time::Duration>,
1761 ) -> Result<SyncResult> {
1762 use crate::sync::{get_boundary_cids, strip_boundary_duplicates};
1763 use std::time::Instant;
1764
1765 // Validate repository state before starting
1766 self.validate_sync_state()?;
1767
1768 let next_bundle_num = self.get_last_bundle() + 1;
1769
1770 // ALWAYS get boundaries from last bundle initially
1771 let (mut after_time, mut prev_boundary) = if next_bundle_num > 1 {
1772 let last = self.load_bundle(
1773 next_bundle_num - 1,
1774 LoadOptions {
1775 cache: false,
1776 decompress: true,
1777 filter: None,
1778 limit: None,
1779 },
1780 )?;
1781 let boundary = get_boundary_cids(&last.operations);
1782 let cursor = last
1783 .operations
1784 .last()
1785 .map(|op| op.created_at.clone())
1786 .unwrap_or_default();
1787
1788 if *self.verbose.lock().unwrap() {
1789 log::info!(
1790 "Loaded {} boundary CIDs from bundle {:06} (at {})",
1791 boundary.len(),
1792 next_bundle_num - 1,
1793 cursor
1794 );
1795 }
1796
1797 (cursor, boundary)
1798 } else {
1799 ("1970-01-01T00:00:00Z".to_string(), HashSet::new())
1800 };
1801
1802 // If mempool has operations, update cursor AND boundaries from mempool
1803 // (mempool operations already had boundary dedup applied when they were added)
1804 let mempool_stats = self.get_mempool_stats()?;
1805 if mempool_stats.count > 0
1806 && let Some(last_time) = mempool_stats.last_time
1807 {
1808 if *self.verbose.lock().unwrap() {
1809 log::debug!(
1810 "Mempool has {} ops, resuming from {}",
1811 mempool_stats.count,
1812 last_time.format("%Y-%m-%dT%H:%M:%S")
1813 );
1814 }
1815 after_time = last_time.to_rfc3339();
1816
1817 // Calculate boundaries from MEMPOOL for next fetch
1818 let mempool_ops = self.get_mempool_operations()?;
1819 if !mempool_ops.is_empty() {
1820 prev_boundary = get_boundary_cids(&mempool_ops);
1821 if *self.verbose.lock().unwrap() {
1822 log::info!("Using {} boundary CIDs from mempool", prev_boundary.len());
1823 }
1824 }
1825 }
1826
1827 log::debug!(
1828 "Preparing bundle {:06} (mempool: {} ops)...",
1829 next_bundle_num,
1830 mempool_stats.count
1831 );
1832 log::debug!(
1833 "Starting cursor: {}",
1834 if after_time.is_empty() || after_time == "1970-01-01T00:00:00Z" {
1835 ""
1836 } else {
1837 &after_time
1838 }
1839 );
1840
1841 if !prev_boundary.is_empty() && *self.verbose.lock().unwrap() && mempool_stats.count == 0 {
1842 log::info!(
1843 " Starting with {} boundary CIDs from previous bundle",
1844 prev_boundary.len()
1845 );
1846 }
1847
1848 // Ensure mempool is loaded (load if needed)
1849 self.load_mempool()?;
1850
1851 // Fetch until we have 10,000 operations
1852 let mut fetch_num = 0;
1853 let mut total_fetched = 0;
1854 let mut total_dupes = 0;
1855 let mut total_boundary_dupes = 0;
1856 let fetch_start = Instant::now();
1857 let mut caught_up = false;
1858 const MAX_ATTEMPTS: usize = 50;
1859 let mut total_wait = std::time::Duration::from_secs(0);
1860 let mut total_http = std::time::Duration::from_secs(0);
1861
1862 // Cutoff time will be calculated per-request based on server time
1863 // (removed static cutoff calculation)
1864
1865 while fetch_num < MAX_ATTEMPTS {
1866 let stats = self.get_mempool_stats()?;
1867
1868 if stats.count >= constants::BUNDLE_SIZE {
1869 break;
1870 }
1871
1872 fetch_num += 1;
1873 let needed = constants::BUNDLE_SIZE - stats.count;
1874
1875 // Smart batch sizing - request more than exact amount to account for duplicates
1876 let request_count = match needed {
1877 n if n <= 50 => 50,
1878 n if n <= 100 => 100,
1879 n if n <= 500 => 200,
1880 _ => 1000,
1881 };
1882
1883 if *self.verbose.lock().unwrap() {
1884 log::info!(
1885 " Fetch #{}: requesting {} (need {} more, have {}/{})",
1886 fetch_num,
1887 request_count,
1888 needed,
1889 stats.count,
1890 constants::BUNDLE_SIZE
1891 );
1892 }
1893
1894 let fetch_op_start = Instant::now();
1895 if let Some(ref rx) = shutdown_rx
1896 && *rx.borrow()
1897 {
1898 anyhow::bail!("Shutdown requested");
1899 }
1900 let (plc_ops, wait_dur, http_dur, raw_capture_opt, server_time) = if fetch_log {
1901 if let Some(rx) = shutdown_rx.clone() {
1902 let (ops, w, h, capture_opt, st) = client
1903 .fetch_operations(&after_time, request_count, Some(rx), true)
1904 .await?;
1905 (ops, w, h, capture_opt, st)
1906 } else {
1907 let (ops, w, h, capture_opt, st) = client
1908 .fetch_operations(&after_time, request_count, None, true)
1909 .await?;
1910 (ops, w, h, capture_opt, st)
1911 }
1912 } else {
1913 if let Some(rx) = shutdown_rx.clone() {
1914 let (ops, w, h, _, st) = client
1915 .fetch_operations(&after_time, request_count, Some(rx), false)
1916 .await?;
1917 (ops, w, h, None, st)
1918 } else {
1919 let (ops, w, h, _, st) =
1920 client.fetch_operations(&after_time, request_count, None, false).await?;
1921 (ops, w, h, None, st)
1922 }
1923 };
1924 total_wait += wait_dur;
1925 total_http += http_dur;
1926
1927 let fetched_count = plc_ops.len();
1928
1929 // Check for incomplete batch (indicates caught up)
1930 let got_incomplete_batch = fetched_count > 0 && fetched_count < request_count;
1931
1932 if plc_ops.is_empty() || got_incomplete_batch {
1933 caught_up = true;
1934 if *self.verbose.lock().unwrap() && fetch_num > 0 {
1935 log::debug!("Caught up to latest PLC data");
1936 }
1937 if plc_ops.is_empty() {
1938 break;
1939 }
1940 }
1941
1942 total_fetched += fetched_count;
1943
1944 // Calculate cutoff time based on server time if available, otherwise local time
1945 let cutoff_time = if let Some(lag) = safety_lag {
1946 let base_time = server_time.unwrap_or_else(chrono::Utc::now);
1947 let cutoff = base_time - chrono::Duration::from_std(lag).unwrap_or(chrono::Duration::seconds(0));
1948
1949 // Only log if we're using server time (to avoid spamming logs) or if verbose
1950 if *self.verbose.lock().unwrap() {
1951 let source = if server_time.is_some() { "server" } else { "local" };
1952 log::debug!(
1953 "Safety lag cutoff: {} (source: {}, lag: {:?})",
1954 cutoff.to_rfc3339(),
1955 source,
1956 lag
1957 );
1958 }
1959 Some(cutoff)
1960 } else {
1961 None
1962 };
1963
1964 // Convert to operations
1965 let ops_pre_raw: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect();
1966
1967 // Apply safety lag filtering
1968 let (ops_pre, filtered_count) = if let Some(cutoff) = cutoff_time {
1969 let mut kept = Vec::with_capacity(ops_pre_raw.len());
1970 let mut filtered = 0;
1971 for op in ops_pre_raw {
1972 if let Ok(op_time) = chrono::DateTime::parse_from_rfc3339(&op.created_at) {
1973 if op_time <= cutoff {
1974 kept.push(op);
1975 } else {
1976 filtered += 1;
1977 }
1978 } else {
1979 // If we can't parse the time, keep it (safe default? or unsafe?)
1980 // Keeping it is safer for data availability, but risky for consistency.
1981 // Given the issue is about race conditions, keeping it might be risky.
1982 // But failing to parse is a bigger issue. Let's keep it and log warning.
1983 log::warn!("Failed to parse timestamp for op {}, keeping it", op.did);
1984 kept.push(op);
1985 }
1986 }
1987 (kept, filtered)
1988 } else {
1989 (ops_pre_raw, 0)
1990 };
1991
1992 if filtered_count > 0 {
1993 if *self.verbose.lock().unwrap() {
1994 log::info!(
1995 " Safety lag: filtered {} operations newer than cutoff",
1996 filtered_count
1997 );
1998 }
1999 // If we filtered any operations, we must consider ourselves "caught up"
2000 // because we can't proceed past the cutoff time safely.
2001 // We also stop fetching in this cycle.
2002 caught_up = true;
2003 }
2004
2005 let mut all_cids_pre: Vec<String> = Vec::new();
2006 if fetch_log {
2007 all_cids_pre = ops_pre
2008 .iter()
2009 .filter_map(|op| op.cid.clone())
2010 .collect();
2011 }
2012 // Deduplicate against boundary
2013 let before_dedup = ops_pre.len();
2014 let ops: Vec<Operation> = strip_boundary_duplicates(ops_pre.clone(), &prev_boundary);
2015 let after_dedup = ops.len();
2016
2017 let boundary_removed = before_dedup - after_dedup;
2018 if boundary_removed > 0 {
2019 total_boundary_dupes += boundary_removed;
2020 if *self.verbose.lock().unwrap() {
2021 log::info!(
2022 " Stripped {} boundary duplicates from fetch",
2023 boundary_removed
2024 );
2025 }
2026 }
2027
2028 let export_url = if fetch_log {
2029 client.build_export_url(&after_time, request_count)
2030 } else {
2031 String::new()
2032 };
2033
2034 let mut all_cids: Vec<String> = Vec::new();
2035 if fetch_log {
2036 all_cids = all_cids_pre;
2037 }
2038
2039 let (added, added_cids) = if !ops.is_empty() {
2040 self.add_to_mempool(ops, fetch_log)?
2041 } else {
2042 (0, Vec::new())
2043 };
2044
2045 if fetch_log {
2046 use serde_json::json;
2047 let log_dir = self.directory.join(constants::DID_INDEX_DIR).join("logs");
2048 let _ = std::fs::create_dir_all(&log_dir);
2049 let log_path = log_dir.join(format!("{:06}.json", next_bundle_num));
2050 let added_set: std::collections::HashSet<String> =
2051 added_cids.iter().cloned().collect();
2052 let skipped: Vec<String> = all_cids
2053 .iter()
2054 .filter(|c| !added_set.contains(*c))
2055 .cloned()
2056 .collect();
2057 let entry = json!({
2058 "time": chrono::Utc::now().to_rfc3339(),
2059 "url": export_url,
2060 "count": fetched_count,
2061 "cids": all_cids,
2062 "skipped": skipped,
2063 "http_start": raw_capture_opt.as_ref().map(|c| c.http_start.clone()).unwrap_or_default(),
2064 });
2065 let mut file = std::fs::OpenOptions::new()
2066 .create(true)
2067 .append(true)
2068 .open(log_path)?;
2069 use std::io::Write;
2070 writeln!(file, "{}", entry.to_string())?;
2071
2072 if let Some(capture) = raw_capture_opt.as_ref() {
2073 let raw_path = log_dir.join(format!("{:06}-{}", next_bundle_num, after_time));
2074 let mut raw_file = std::fs::OpenOptions::new()
2075 .create(true)
2076 .write(true)
2077 .truncate(true)
2078 .open(raw_path)?;
2079 writeln!(raw_file, "Status: {}", capture.status)?;
2080 for (name, value) in &capture.headers {
2081 writeln!(raw_file, "{}: {}", name, value)?;
2082 }
2083 writeln!(raw_file)?;
2084 write!(raw_file, "{}", capture.body)?;
2085 }
2086 }
2087
2088 let dupes_in_fetch = after_dedup - added;
2089 total_dupes += dupes_in_fetch;
2090
2091 let fetch_duration = fetch_op_start.elapsed();
2092 let new_stats = self.get_mempool_stats()?;
2093 let ops_per_sec = if fetch_duration.as_secs_f64() > 0.0 {
2094 added as f64 / fetch_duration.as_secs_f64()
2095 } else {
2096 0.0
2097 };
2098
2099 if *self.verbose.lock().unwrap() {
2100 if boundary_removed > 0 || dupes_in_fetch > 0 {
2101 log::info!(
2102 " → +{} unique ({} dupes, {} boundary) in {:.9}s • Running: {}/{} ({:.0} ops/sec)",
2103 added,
2104 dupes_in_fetch,
2105 boundary_removed,
2106 fetch_duration.as_secs_f64(),
2107 new_stats.count,
2108 constants::BUNDLE_SIZE,
2109 ops_per_sec
2110 );
2111 } else {
2112 log::info!(
2113 " → +{} unique in {:.9}s • Running: {}/{} ({:.0} ops/sec)",
2114 added,
2115 fetch_duration.as_secs_f64(),
2116 new_stats.count,
2117 constants::BUNDLE_SIZE,
2118 ops_per_sec
2119 );
2120 }
2121 }
2122
2123 // Update cursor
2124 if let Some(last_time) = new_stats.last_time {
2125 after_time = last_time.to_rfc3339();
2126 }
2127
2128 // Stop if we got an incomplete batch or made no progress
2129 // Also stop if we filtered operations due to safety lag (caught_up is set above)
2130 if got_incomplete_batch || added == 0 || (filtered_count > 0 && caught_up) {
2131 caught_up = true;
2132 if *self.verbose.lock().unwrap() {
2133 if filtered_count > 0 {
2134 log::debug!("Caught up to safety lag cutoff");
2135 } else {
2136 log::debug!("Caught up to latest PLC data");
2137 }
2138 }
2139 break;
2140 }
2141 }
2142
2143 let fetch_total_duration = fetch_start.elapsed();
2144 let dedup_pct = if total_fetched > 0 {
2145 (total_dupes + total_boundary_dupes) as f64 / total_fetched as f64 * 100.0
2146 } else {
2147 0.0
2148 };
2149
2150 let final_stats = self.get_mempool_stats()?;
2151
2152 // Bundles must contain exactly BUNDLE_SIZE operations (no partial bundles allowed)
2153 if final_stats.count < constants::BUNDLE_SIZE {
2154 if caught_up {
2155 // Caught up to latest PLC data without enough ops for a full bundle
2156 // Return CaughtUp result instead of error
2157 return Ok(SyncResult::CaughtUp {
2158 next_bundle: next_bundle_num,
2159 mempool_count: final_stats.count,
2160 new_ops: total_fetched - total_dupes - total_boundary_dupes,
2161 fetch_duration_ms: fetch_total_duration.as_millis() as u64,
2162 });
2163 } else {
2164 anyhow::bail!(
2165 "Insufficient operations: have {}, need exactly {} (max attempts reached)",
2166 final_stats.count,
2167 constants::BUNDLE_SIZE
2168 );
2169 }
2170 }
2171
2172 if *self.verbose.lock().unwrap() {
2173 log::info!(
2174 " ✓ Collected {} unique ops from {} fetches ({:.1}% dedup)",
2175 final_stats.count,
2176 fetch_num,
2177 dedup_pct
2178 );
2179 }
2180
2181 // Take operations and create bundle
2182 log::debug!(
2183 "Calling operations.SaveBundle with bundle={}",
2184 next_bundle_num
2185 );
2186
2187 let operations = {
2188 let mut mempool = self.mempool.write().unwrap();
2189 let mem = mempool
2190 .as_mut()
2191 .ok_or_else(|| anyhow::anyhow!("Mempool not initialized"))?;
2192 // Take up to BUNDLE_SIZE operations (or all if less)
2193 let count = mem.count().min(constants::BUNDLE_SIZE);
2194 mem.take(count)?
2195 };
2196
2197 if operations.is_empty() {
2198 anyhow::bail!("No operations to create bundle");
2199 }
2200
2201 // Bundles must contain exactly BUNDLE_SIZE operations
2202 if operations.len() != constants::BUNDLE_SIZE {
2203 anyhow::bail!(
2204 "Invalid operation count: expected exactly {}, got {}",
2205 constants::BUNDLE_SIZE,
2206 operations.len()
2207 );
2208 }
2209
2210 log::debug!("SaveBundle SUCCESS, setting bundle fields");
2211
2212 // CRITICAL: Clear mempool BEFORE saving to ensure atomicity
2213 // If interrupted after this point, the operations are no longer in mempool
2214 // and won't be re-fetched on restart, preventing duplicate/inconsistent bundles.
2215 // If save fails after clearing, we bail out and the operations are lost,
2216 // but this is better than creating bundles with inconsistent content.
2217 self.clear_mempool()?;
2218
2219 // Save bundle to disk with timing breakdown
2220 // Save bundle and update DID index (now fast with delta segments)
2221 let save_start = Instant::now();
2222 let (
2223 serialize_time,
2224 compress_time,
2225 hash_time,
2226 did_index_time,
2227 index_write_time,
2228 did_index_compacted,
2229 ) = self
2230 .save_bundle_with_timing(next_bundle_num, operations, update_did_index)
2231 .await?;
2232 let save_duration = save_start.elapsed();
2233
2234 // Show timing breakdown in verbose mode only
2235 if *self.verbose.lock().unwrap() {
2236 log::debug!(
2237 " Save timing: serialize={:.3}ms, compress={:.3}ms, hash={:.3}ms, did_index={:.3}ms, index_write={:.3}ms, total={:.1}ms",
2238 serialize_time.as_secs_f64() * 1000.0,
2239 compress_time.as_secs_f64() * 1000.0,
2240 hash_time.as_secs_f64() * 1000.0,
2241 did_index_time.as_secs_f64() * 1000.0,
2242 index_write_time.as_secs_f64() * 1000.0,
2243 save_duration.as_secs_f64() * 1000.0
2244 );
2245 }
2246
2247 log::debug!("Adding bundle {} to index", next_bundle_num);
2248 log::debug!("Index now has {} bundles", next_bundle_num);
2249 log::debug!("Index saved, last bundle = {}", next_bundle_num);
2250
2251 // Get bundle info for display
2252 let (short_hash, age_str, unique_dids, size_bytes) = {
2253 let index = self.index.read().unwrap();
2254 let bundle_meta = index.get_bundle(next_bundle_num).unwrap();
2255 // Use chain hash (first 7 chars) for display
2256 let hash = bundle_meta.hash[..7].to_string();
2257
2258 // Calculate age
2259 let created_time = chrono::DateTime::parse_from_rfc3339(&bundle_meta.start_time)
2260 .unwrap()
2261 .with_timezone(&chrono::Utc);
2262 let now = chrono::Utc::now();
2263 let age = now.signed_duration_since(created_time);
2264 let age_str = format_age(age);
2265
2266 (
2267 hash,
2268 age_str,
2269 bundle_meta.did_count,
2270 bundle_meta.compressed_size,
2271 )
2272 };
2273
2274 // Get mempool count after clearing (should be 0, but check anyway)
2275 let mempool_count = self.get_mempool_stats().map(|s| s.count).unwrap_or(0);
2276 let total_duration_ms = (fetch_total_duration + save_duration).as_millis() as u64;
2277 let fetch_duration_ms = fetch_total_duration.as_millis() as u64;
2278
2279 // Calculate separate timings: bundle save vs index write/DID index
2280 let (bundle_save_ms, index_ms) = if update_did_index {
2281 (
2282 (serialize_time + compress_time + hash_time).as_millis() as u64,
2283 (did_index_time + index_write_time).as_millis() as u64,
2284 )
2285 } else {
2286 (
2287 (serialize_time + compress_time + hash_time + index_write_time).as_millis() as u64,
2288 0,
2289 )
2290 };
2291
2292 // Only log detailed info in verbose mode
2293 if *self.verbose.lock().unwrap() {
2294 log::info!(
2295 "→ Bundle {:06} | {} | fetch: {:.3}s ({} reqs) | {}",
2296 next_bundle_num,
2297 short_hash,
2298 fetch_total_duration.as_secs_f64(),
2299 fetch_num,
2300 age_str
2301 );
2302 log::debug!(
2303 "Bundle done = {}, finish duration = {:.3}ms",
2304 next_bundle_num,
2305 save_duration.as_secs_f64() * 1000.0
2306 );
2307 }
2308
2309 Ok(SyncResult::BundleCreated {
2310 bundle_num: next_bundle_num,
2311 mempool_count,
2312 duration_ms: total_duration_ms,
2313 fetch_duration_ms,
2314 bundle_save_ms,
2315 index_ms,
2316 fetch_requests: fetch_num,
2317 hash: short_hash,
2318 age: age_str,
2319 did_index_compacted,
2320 unique_dids,
2321 size_bytes,
2322 fetch_wait_ms: total_wait.as_millis() as u64,
2323 fetch_http_ms: total_http.as_millis() as u64,
2324 })
2325 }
2326
2327 /// Run single sync cycle
2328 ///
2329 /// If max_bundles is Some(n), stop after syncing n bundles
2330 /// If max_bundles is None, sync until caught up
2331 pub async fn sync_once(
2332 &self,
2333 client: &crate::plc_client::PLCClient,
2334 max_bundles: Option<usize>,
2335 ) -> Result<usize> {
2336 let mut synced = 0;
2337
2338 loop {
2339 match self.sync_next_bundle(client, None, true, false, None).await {
2340 Ok(SyncResult::BundleCreated { .. }) => {
2341 synced += 1;
2342
2343 // Check if we've reached the limit
2344 if let Some(max) = max_bundles
2345 && synced >= max
2346 {
2347 break;
2348 }
2349 }
2350 Ok(SyncResult::CaughtUp { .. }) => {
2351 // Caught up to latest PLC data
2352 break;
2353 }
2354 Err(e) => return Err(e),
2355 }
2356
2357 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2358 }
2359
2360 Ok(synced)
2361 }
2362
2363 /// Save bundle to disk with compression and index updates (with timing)
2364 async fn save_bundle_with_timing(
2365 &self,
2366 bundle_num: u32,
2367 operations: Vec<Operation>,
2368 update_did_index: bool,
2369 ) -> Result<(
2370 std::time::Duration,
2371 std::time::Duration,
2372 std::time::Duration,
2373 std::time::Duration,
2374 std::time::Duration,
2375 bool,
2376 )> {
2377 use anyhow::Context;
2378 use std::collections::HashSet;
2379 use std::fs::File;
2380 use std::io::Write;
2381 use std::time::Instant;
2382
2383 if operations.is_empty() {
2384 anyhow::bail!("Cannot save empty bundle");
2385 }
2386
2387 // Extract metadata
2388 let start_time = operations.first().unwrap().created_at.clone();
2389 let end_time = operations.last().unwrap().created_at.clone();
2390 let operation_count = operations.len() as u32;
2391
2392 // Count unique DIDs
2393 let unique_dids: HashSet<String> = operations.iter().map(|op| op.did.clone()).collect();
2394 let did_count = unique_dids.len() as u32;
2395
2396 // Use multi-frame compression for better performance on large bundles
2397
2398 // Compress operations to frames using parallel compression
2399 let compress_result =
2400 crate::bundle_format::compress_operations_to_frames_parallel(&operations)?;
2401
2402 let serialize_time =
2403 std::time::Duration::from_secs_f64(compress_result.serialize_time_ms / 1000.0);
2404 let compress_time =
2405 std::time::Duration::from_secs_f64(compress_result.compress_time_ms / 1000.0);
2406
2407 let uncompressed_size = compress_result.uncompressed_size;
2408 let compressed_size = compress_result.compressed_size;
2409 let frame_count = compress_result.compressed_frames.len();
2410 let frame_offsets = compress_result.frame_offsets;
2411 let compressed_frames = compress_result.compressed_frames;
2412
2413 // Calculate content hash from uncompressed data
2414 let hash_start = Instant::now();
2415 let content_hash = {
2416 use sha2::{Digest, Sha256};
2417 let mut hasher = Sha256::new();
2418 let mut missing_raw_json = 0;
2419
2420 // Hash all operations in order (reconstructing uncompressed JSONL)
2421 for op in &operations {
2422 let json = if let Some(raw) = &op.raw_json {
2423 raw.clone()
2424 } else {
2425 missing_raw_json += 1;
2426 if missing_raw_json == 1 && *self.verbose.lock().unwrap() {
2427 log::warn!(
2428 "⚠️ Bundle {}: Operation missing raw_json, using re-serialized JSON (may cause hash mismatch!)",
2429 bundle_num
2430 );
2431 }
2432 sonic_rs::to_string(op)?
2433 };
2434 hasher.update(json.as_bytes());
2435 hasher.update(b"\n");
2436 }
2437
2438 if missing_raw_json > 0 && *self.verbose.lock().unwrap() {
2439 log::warn!(
2440 "⚠️ Bundle {}: {} operations missing raw_json (content hash may be incorrect!)",
2441 bundle_num,
2442 missing_raw_json
2443 );
2444 }
2445
2446 format!("{:x}", hasher.finalize())
2447 };
2448
2449 // Calculate compressed hash - will be calculated after writing the file
2450 // because it needs to include the metadata frame (verification hashes entire file)
2451 // We'll calculate it after the file is written
2452 let hash_time = hash_start.elapsed();
2453
2454 // Calculate chain hash per spec (Section 6.3)
2455 // Genesis bundle: SHA256("plcbundle:genesis:" + content_hash)
2456 // Subsequent: SHA256(parent_chain_hash + ":" + current_content_hash)
2457 let (parent, chain_hash) = if bundle_num > 1 {
2458 use sha2::{Digest, Sha256};
2459 let parent_chain_hash = self
2460 .index
2461 .read()
2462 .unwrap()
2463 .get_bundle(bundle_num - 1)
2464 .map(|b| b.hash.clone())
2465 .unwrap_or_default();
2466
2467 // Debug logging for hash calculation issues
2468 if parent_chain_hash.is_empty() {
2469 log::warn!(
2470 "⚠️ Bundle {}: Parent bundle {} not found in index! Using empty parent hash.",
2471 bundle_num,
2472 bundle_num - 1
2473 );
2474 } else if *self.verbose.lock().unwrap() {
2475 log::debug!(
2476 "Bundle {}: Parent hash from bundle {}: {}",
2477 bundle_num,
2478 bundle_num - 1,
2479 &parent_chain_hash[..16]
2480 );
2481 log::debug!(
2482 "Bundle {}: Content hash: {}",
2483 bundle_num,
2484 &content_hash[..16]
2485 );
2486 }
2487
2488 let chain_input = format!("{}:{}", parent_chain_hash, content_hash);
2489 let mut hasher = Sha256::new();
2490 hasher.update(chain_input.as_bytes());
2491 let hash = format!("{:x}", hasher.finalize());
2492
2493 if *self.verbose.lock().unwrap() {
2494 log::debug!("Bundle {}: Chain hash: {}", bundle_num, &hash[..16]);
2495 }
2496
2497 (parent_chain_hash, hash)
2498 } else {
2499 // Genesis bundle
2500 use sha2::{Digest, Sha256};
2501 let chain_input = format!("plcbundle:genesis:{}", content_hash);
2502 let mut hasher = Sha256::new();
2503 hasher.update(chain_input.as_bytes());
2504 let hash = format!("{:x}", hasher.finalize());
2505
2506 (String::new(), hash)
2507 };
2508
2509 // Get cursor (end_time of previous bundle per spec)
2510 // For the first bundle, cursor is empty string
2511 let cursor = if bundle_num > 1 {
2512 let prev_end_time = self
2513 .index
2514 .read()
2515 .unwrap()
2516 .get_bundle(bundle_num - 1)
2517 .map(|b| b.end_time.clone())
2518 .unwrap_or_default();
2519
2520 // Validate cursor matches previous bundle's end_time
2521 if prev_end_time.is_empty() {
2522 log::warn!(
2523 "⚠️ Bundle {}: Previous bundle {} has empty end_time, cursor will be empty",
2524 bundle_num,
2525 bundle_num - 1
2526 );
2527 }
2528
2529 prev_end_time
2530 } else {
2531 String::new()
2532 };
2533
2534 // Validate cursor correctness (for non-genesis bundles)
2535 if bundle_num > 1 {
2536 let expected_cursor = {
2537 let index = self.index.read().unwrap();
2538 index
2539 .get_bundle(bundle_num - 1)
2540 .map(|b| b.end_time.clone())
2541 .unwrap_or_default()
2542 };
2543 if cursor != expected_cursor {
2544 anyhow::bail!(
2545 "Cursor validation failed for bundle {}: expected {} (previous bundle end_time), got {}",
2546 bundle_num,
2547 expected_cursor,
2548 cursor
2549 );
2550 }
2551 } else if !cursor.is_empty() {
2552 anyhow::bail!(
2553 "Cursor validation failed for bundle {} (genesis): cursor should be empty, got {}",
2554 bundle_num,
2555 cursor
2556 );
2557 }
2558
2559 // Prepare bundle metadata for skippable frame
2560 let bundle_metadata_frame = crate::bundle_format::BundleMetadata {
2561 format: "plcbundle/1.0".to_string(),
2562 bundle_number: bundle_num,
2563 origin: self.index.read().unwrap().origin.clone(),
2564 content_hash: content_hash.clone(),
2565 parent_hash: if !parent.is_empty() {
2566 Some(parent.clone())
2567 } else {
2568 None
2569 },
2570 uncompressed_size: Some(uncompressed_size),
2571 compressed_size: Some(compressed_size),
2572 operation_count: operation_count as usize,
2573 did_count: did_count as usize,
2574 start_time: start_time.clone(),
2575 end_time: end_time.clone(),
2576 created_at: chrono::Utc::now().to_rfc3339(),
2577 created_by: constants::created_by(),
2578 frame_count,
2579 frame_size: constants::FRAME_SIZE,
2580 frame_offsets: frame_offsets.clone(),
2581 };
2582
2583 // Write to disk with metadata skippable frame (move to blocking task to avoid blocking async runtime)
2584 // CRITICAL: We need to calculate compressed_hash from the entire file (including metadata frame)
2585 // because verification hashes the entire file. So we write the file first, then read it back to calculate the hash.
2586 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
2587 let bundle_path_clone = bundle_path.clone();
2588 let bundle_metadata_frame_clone = bundle_metadata_frame.clone();
2589 let compressed_frames_clone = compressed_frames.clone();
2590
2591 // Write file first (metadata frame doesn't contain compressed_hash, so we can write it)
2592 tokio::task::spawn_blocking({
2593 let bundle_path_clone = bundle_path_clone.clone();
2594 let bundle_metadata_frame_clone = bundle_metadata_frame_clone.clone();
2595 let compressed_frames_clone = compressed_frames_clone.clone();
2596 move || {
2597 let mut file = File::create(&bundle_path_clone).with_context(|| {
2598 format!(
2599 "Failed to create bundle file: {}",
2600 bundle_path_clone.display()
2601 )
2602 })?;
2603
2604 // Write metadata as skippable frame first
2605 crate::bundle_format::write_metadata_frame(&mut file, &bundle_metadata_frame_clone)
2606 .with_context(|| {
2607 format!(
2608 "Failed to write metadata frame to: {}",
2609 bundle_path_clone.display()
2610 )
2611 })?;
2612
2613 // Write all compressed frames
2614 for frame in &compressed_frames_clone {
2615 file.write_all(frame).with_context(|| {
2616 format!(
2617 "Failed to write compressed frame to: {}",
2618 bundle_path_clone.display()
2619 )
2620 })?;
2621 }
2622 file.flush().with_context(|| {
2623 format!(
2624 "Failed to flush bundle file: {}",
2625 bundle_path_clone.display()
2626 )
2627 })?;
2628
2629 Ok::<(), anyhow::Error>(())
2630 }
2631 })
2632 .await
2633 .context("Bundle file write task failed")??;
2634
2635 // Now calculate compressed_hash from the entire file (as verification does)
2636 let compressed_hash = tokio::task::spawn_blocking({
2637 let bundle_path_clone = bundle_path_clone.clone();
2638 move || {
2639 use sha2::{Digest, Sha256};
2640 let file_data = std::fs::read(&bundle_path_clone).with_context(|| {
2641 format!(
2642 "Failed to read bundle file for hash: {}",
2643 bundle_path_clone.display()
2644 )
2645 })?;
2646
2647 let mut hasher = Sha256::new();
2648 hasher.update(&file_data);
2649 Ok::<String, anyhow::Error>(format!("{:x}", hasher.finalize()))
2650 }
2651 })
2652 .await
2653 .context("Compressed hash calculation task failed")??;
2654
2655 if *self.verbose.lock().unwrap() {
2656 log::debug!(
2657 "Saved bundle {} ({} ops, {} DIDs, {} → {} bytes, {:.1}% compression)",
2658 bundle_num,
2659 operation_count,
2660 did_count,
2661 uncompressed_size,
2662 compressed_size,
2663 100.0 * (1.0 - compressed_size as f64 / uncompressed_size as f64)
2664 );
2665 }
2666
2667 let (did_index_time, did_index_compacted) = if update_did_index {
2668 let did_index_start = Instant::now();
2669 let did_ops: Vec<(String, bool)> = operations
2670 .iter()
2671 .map(|op| (op.did.clone(), op.nullified))
2672 .collect();
2673
2674 self.ensure_did_index()?;
2675 let compacted = self
2676 .did_index
2677 .write()
2678 .unwrap()
2679 .as_mut()
2680 .unwrap()
2681 .update_for_bundle(bundle_num, did_ops)?;
2682 (did_index_start.elapsed(), compacted)
2683 } else {
2684 (std::time::Duration::from_millis(0), false)
2685 };
2686
2687 // Update main index
2688 let index_write_start = Instant::now();
2689 let bundle_metadata = crate::index::BundleMetadata {
2690 bundle_number: bundle_num,
2691 start_time,
2692 end_time,
2693 operation_count,
2694 did_count,
2695 hash: chain_hash, // Chain hash per spec
2696 content_hash,
2697 parent,
2698 compressed_hash,
2699 compressed_size,
2700 uncompressed_size,
2701 cursor,
2702 created_at: chrono::Utc::now().to_rfc3339(),
2703 };
2704
2705 // Add to index
2706 // CRITICAL: Clone index data while holding lock briefly, then release lock
2707 // before doing expensive serialization and file I/O in spawn_blocking
2708 let index_clone = {
2709 let mut index = self.index.write().unwrap();
2710 index.bundles.push(bundle_metadata);
2711 index.last_bundle = bundle_num;
2712 index.updated_at = chrono::Utc::now().to_rfc3339();
2713 index.total_size_bytes += compressed_size;
2714 index.total_uncompressed_size_bytes += uncompressed_size;
2715
2716 // Clone the index for serialization outside the lock
2717 // This prevents blocking the async runtime while holding the lock
2718 index.clone()
2719 };
2720
2721 // Serialize and write index in blocking task to avoid blocking async runtime
2722 // Use Index::save() which does atomic write (temp file + rename)
2723 let directory = self.directory.clone();
2724 tokio::task::spawn_blocking(move || index_clone.save(directory))
2725 .await
2726 .context("Index write task failed")??;
2727 let index_write_time = index_write_start.elapsed();
2728
2729 Ok((
2730 serialize_time,
2731 compress_time,
2732 hash_time,
2733 did_index_time,
2734 index_write_time,
2735 did_index_compacted,
2736 ))
2737 }
2738
2739 /// Migrate a bundle to multi-frame format
2740 ///
2741 /// This method loads a bundle and re-saves it with multi-frame compression
2742 /// (100 operations per frame) with frame offsets for efficient random access.
2743 ///
2744 /// Returns: (size_diff, new_uncompressed_size, new_compressed_size)
2745 pub fn migrate_bundle(&self, bundle_num: u32) -> Result<(i64, u64, u64)> {
2746 use anyhow::Context;
2747 use std::collections::HashSet;
2748 use std::fs::File;
2749
2750 // Get existing bundle metadata
2751 let meta = self
2752 .get_bundle_metadata(bundle_num)?
2753 .ok_or_else(|| anyhow::anyhow!("Bundle {} not in index", bundle_num))?;
2754
2755 let old_size = meta.compressed_size;
2756
2757 // Load bundle operations
2758 let load_result = self.load_bundle(
2759 bundle_num,
2760 LoadOptions {
2761 decompress: true,
2762 cache: false,
2763 filter: None,
2764 limit: None,
2765 },
2766 )?;
2767
2768 let operations = load_result.operations;
2769 if operations.is_empty() {
2770 anyhow::bail!("Bundle {} has no operations", bundle_num);
2771 }
2772
2773 // Extract metadata
2774 let start_time = operations.first().unwrap().created_at.clone();
2775 let end_time = operations.last().unwrap().created_at.clone();
2776 let operation_count = operations.len() as u32;
2777
2778 // Count unique DIDs
2779 let unique_dids: HashSet<String> = operations.iter().map(|op| op.did.clone()).collect();
2780 let did_count = unique_dids.len() as u32;
2781
2782 // Compress operations into frames using parallel compression
2783 let frame_result =
2784 crate::bundle_format::compress_operations_to_frames_parallel(&operations)?;
2785 let compressed_size = frame_result.compressed_size;
2786 let uncompressed_size = frame_result.uncompressed_size;
2787
2788 // Calculate hashes using library functions
2789 let content_hash = crate::bundle_format::calculate_content_hash(&operations)?;
2790
2791 // Compressed hash will be calculated after writing the file
2792 // because it needs to include the metadata frame (verification hashes entire file)
2793
2794 // Recalculate chain hash to verify correctness
2795 let (expected_parent, recalculated_chain_hash) = if bundle_num > 1 {
2796 use sha2::{Digest, Sha256};
2797 let parent_chain_hash = self
2798 .index
2799 .read()
2800 .unwrap()
2801 .get_bundle(bundle_num - 1)
2802 .map(|b| b.hash.clone())
2803 .unwrap_or_default();
2804
2805 let chain_input = format!("{}:{}", parent_chain_hash, content_hash);
2806 let mut hasher = Sha256::new();
2807 hasher.update(chain_input.as_bytes());
2808 let hash = format!("{:x}", hasher.finalize());
2809
2810 (parent_chain_hash, hash)
2811 } else {
2812 use sha2::{Digest, Sha256};
2813 let chain_input = format!("plcbundle:genesis:{}", content_hash);
2814 let mut hasher = Sha256::new();
2815 hasher.update(chain_input.as_bytes());
2816 let hash = format!("{:x}", hasher.finalize());
2817
2818 (String::new(), hash)
2819 };
2820
2821 // Verify chain hash matches original
2822 if recalculated_chain_hash != meta.hash {
2823 anyhow::bail!(
2824 "Chain hash mismatch in bundle {}: original={}, recalculated={}\n\
2825 This indicates the original bundle content may be corrupted or the chain was broken.",
2826 bundle_num,
2827 meta.hash,
2828 recalculated_chain_hash
2829 );
2830 }
2831
2832 // Verify parent hash matches
2833 if expected_parent != meta.parent {
2834 anyhow::bail!(
2835 "Parent hash mismatch in bundle {}: original={}, expected={}\n\
2836 This indicates the chain linkage is broken.",
2837 bundle_num,
2838 meta.parent,
2839 expected_parent
2840 );
2841 }
2842
2843 // Use verified hashes from original bundle
2844 let chain_hash = meta.hash.clone();
2845 let parent = meta.parent.clone();
2846
2847 // Get cursor (end_time of previous bundle per spec)
2848 // For the first bundle, cursor is empty string
2849 let cursor = if bundle_num > 1 {
2850 let prev_end_time = self
2851 .index
2852 .read()
2853 .unwrap()
2854 .get_bundle(bundle_num - 1)
2855 .map(|b| b.end_time.clone())
2856 .unwrap_or_default();
2857
2858 // Validate cursor matches previous bundle's end_time
2859 if prev_end_time.is_empty() {
2860 log::warn!(
2861 "⚠️ Bundle {}: Previous bundle {} has empty end_time, cursor will be empty",
2862 bundle_num,
2863 bundle_num - 1
2864 );
2865 }
2866
2867 prev_end_time
2868 } else {
2869 String::new()
2870 };
2871
2872 // Validate cursor correctness (for non-genesis bundles)
2873 if bundle_num > 1 {
2874 let expected_cursor = {
2875 let index = self.index.read().unwrap();
2876 index
2877 .get_bundle(bundle_num - 1)
2878 .map(|b| b.end_time.clone())
2879 .unwrap_or_default()
2880 };
2881 if cursor != expected_cursor {
2882 anyhow::bail!(
2883 "Cursor validation failed for bundle {}: expected {} (previous bundle end_time), got {}",
2884 bundle_num,
2885 expected_cursor,
2886 cursor
2887 );
2888 }
2889 } else if !cursor.is_empty() {
2890 anyhow::bail!(
2891 "Cursor validation failed for bundle {} (genesis): cursor should be empty, got {}",
2892 bundle_num,
2893 cursor
2894 );
2895 }
2896
2897 let origin = self.index.read().unwrap().origin.clone();
2898
2899 // Create bundle metadata using library function
2900 let bundle_metadata_frame = crate::bundle_format::create_bundle_metadata(
2901 bundle_num,
2902 &origin,
2903 &content_hash,
2904 if !parent.is_empty() {
2905 Some(&parent)
2906 } else {
2907 None
2908 },
2909 Some(uncompressed_size),
2910 Some(compressed_size),
2911 operation_count as usize,
2912 did_count as usize,
2913 &start_time,
2914 &end_time,
2915 frame_result.frame_offsets.len() - 1,
2916 constants::FRAME_SIZE,
2917 &frame_result.frame_offsets,
2918 );
2919
2920 // Create backup path
2921 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
2922 let backup_path = bundle_path.with_extension("jsonl.zst.bak");
2923
2924 // Backup existing file
2925 if bundle_path.exists() {
2926 std::fs::copy(&bundle_path, &backup_path)
2927 .with_context(|| format!("Failed to backup bundle: {}", bundle_path.display()))?;
2928 }
2929
2930 // Write new bundle with multi-frame format using library function
2931 let mut file = File::create(&bundle_path)
2932 .with_context(|| format!("Failed to create bundle file: {}", bundle_path.display()))?;
2933
2934 crate::bundle_format::write_bundle_with_frames(
2935 &mut file,
2936 &bundle_metadata_frame,
2937 &frame_result.compressed_frames,
2938 )
2939 .with_context(|| format!("Failed to write bundle: {}", bundle_path.display()))?;
2940
2941 // Verify metadata was written correctly
2942 let embedded_meta = crate::bundle_format::extract_metadata_from_file(&bundle_path)
2943 .with_context(|| "Failed to extract embedded metadata after migration")?;
2944
2945 if embedded_meta.frame_offsets.is_empty() {
2946 // Restore backup on failure
2947 if backup_path.exists() {
2948 std::fs::rename(&backup_path, &bundle_path)?;
2949 }
2950 anyhow::bail!("Frame offsets missing in metadata after migration");
2951 }
2952
2953 // Verify content hash matches
2954 if embedded_meta.content_hash != content_hash {
2955 // Restore backup on failure
2956 if backup_path.exists() {
2957 std::fs::rename(&backup_path, &bundle_path)?;
2958 }
2959 anyhow::bail!("Content hash mismatch after migration");
2960 }
2961
2962 // Calculate compressed_hash from the entire file (as verification does)
2963 // This must be done AFTER writing the file because it includes the metadata frame
2964 use sha2::{Digest, Sha256};
2965 let file_data = std::fs::read(&bundle_path).with_context(|| {
2966 format!(
2967 "Failed to read bundle file for hash: {}",
2968 bundle_path.display()
2969 )
2970 })?;
2971
2972 let mut hasher = Sha256::new();
2973 hasher.update(&file_data);
2974 let compressed_hash = format!("{:x}", hasher.finalize());
2975
2976 // Update index BEFORE removing backup (so if interrupted, index is consistent with file)
2977 let bundle_metadata = crate::index::BundleMetadata {
2978 bundle_number: bundle_num,
2979 start_time,
2980 end_time,
2981 operation_count,
2982 did_count,
2983 hash: chain_hash,
2984 content_hash,
2985 parent,
2986 compressed_hash,
2987 compressed_size,
2988 uncompressed_size,
2989 cursor,
2990 created_at: chrono::Utc::now().to_rfc3339(),
2991 };
2992
2993 {
2994 let mut index = self.index.write().unwrap();
2995 // Update existing bundle metadata
2996 if let Some(existing) = index
2997 .bundles
2998 .iter_mut()
2999 .find(|b| b.bundle_number == bundle_num)
3000 {
3001 *existing = bundle_metadata.clone();
3002 } else {
3003 index.bundles.push(bundle_metadata.clone());
3004 }
3005
3006 // Recalculate totals
3007 index.total_size_bytes = index.bundles.iter().map(|b| b.compressed_size).sum();
3008 index.total_uncompressed_size_bytes =
3009 index.bundles.iter().map(|b| b.uncompressed_size).sum();
3010 index.updated_at = chrono::Utc::now().to_rfc3339();
3011
3012 // Save index to disk using Index::save() (atomic write)
3013 index.save(&self.directory)?;
3014 }
3015
3016 // Remove backup only after index is successfully updated
3017 if backup_path.exists() {
3018 std::fs::remove_file(&backup_path)
3019 .with_context(|| format!("Failed to remove backup: {}", backup_path.display()))?;
3020 }
3021
3022 let size_diff = compressed_size as i64 - old_size as i64;
3023 Ok((size_diff, uncompressed_size, compressed_size))
3024 }
3025
3026 // === Helpers ===
3027 pub fn get_last_bundle(&self) -> u32 {
3028 self.index.read().unwrap().last_bundle
3029 }
3030
3031 pub fn directory(&self) -> &PathBuf {
3032 &self.directory
3033 }
3034
3035 /// Get a copy of the current index
3036 pub fn get_index(&self) -> Index {
3037 self.index.read().unwrap().clone()
3038 }
3039
3040 pub fn bundle_count(&self) -> usize {
3041 self.index.read().unwrap().bundles.len()
3042 }
3043
3044 pub fn get_mempool_operations_from(&self, start: usize) -> Result<Vec<Operation>> {
3045 let mempool_guard = self.mempool.read().unwrap();
3046 match mempool_guard.as_ref() {
3047 Some(mp) => {
3048 let ops = mp.get_operations();
3049 if start >= ops.len() {
3050 Ok(Vec::new())
3051 } else {
3052 Ok(ops[start..].to_vec())
3053 }
3054 }
3055 None => Ok(Vec::new()),
3056 }
3057 }
3058
3059 // === Remote Access ===
3060
3061 /// Fetch index from remote URL or local file path
3062 ///
3063 /// This is an async method that requires a tokio runtime.
3064 /// For synchronous usage, use the remote module functions directly.
3065 pub async fn fetch_remote_index(&self, target: &str) -> Result<Index> {
3066 if target.starts_with("http://") || target.starts_with("https://") {
3067 let client = crate::remote::RemoteClient::new(target)?;
3068 client.fetch_index().await
3069 } else {
3070 crate::remote::load_local_index(target)
3071 }
3072 }
3073
3074 /// Fetch bundle operations from remote URL
3075 ///
3076 /// This is an async method that requires a tokio runtime.
3077 pub async fn fetch_remote_bundle(
3078 &self,
3079 base_url: &str,
3080 bundle_num: u32,
3081 ) -> Result<Vec<Operation>> {
3082 let client = crate::remote::RemoteClient::new(base_url)?;
3083 client.fetch_bundle_operations(bundle_num).await
3084 }
3085
3086 /// Fetch a single operation from remote URL
3087 ///
3088 /// This is an async method that requires a tokio runtime.
3089 pub async fn fetch_remote_operation(
3090 &self,
3091 base_url: &str,
3092 bundle_num: u32,
3093 position: usize,
3094 ) -> Result<String> {
3095 let client = crate::remote::RemoteClient::new(base_url)?;
3096 client.fetch_operation(bundle_num, position).await
3097 }
3098
3099 /// Rollback repository to a specific bundle
3100 pub fn rollback_to_bundle(&mut self, target_bundle: u32) -> Result<()> {
3101 let mut index = self.index.write().unwrap();
3102
3103 // Keep only bundles up to target
3104 index.bundles.retain(|b| b.bundle_number <= target_bundle);
3105 index.last_bundle = target_bundle;
3106 index.updated_at = chrono::Utc::now().to_rfc3339();
3107
3108 // Recalculate total sizes
3109 index.total_size_bytes = index.bundles.iter().map(|b| b.compressed_size).sum();
3110 index.total_uncompressed_size_bytes =
3111 index.bundles.iter().map(|b| b.uncompressed_size).sum();
3112
3113 // Save updated index using Index::save() (atomic write)
3114 index.save(&self.directory)?;
3115
3116 Ok(())
3117 }
3118
3119 /// Get bundle metadata from index
3120 pub fn get_bundle_metadata(
3121 &self,
3122 bundle_num: u32,
3123 ) -> Result<Option<crate::index::BundleMetadata>> {
3124 let index = self.index.read().unwrap();
3125 Ok(index.get_bundle(bundle_num).cloned())
3126 }
3127
3128 /// Get embedded metadata from bundle's skippable frame
3129 pub fn get_embedded_metadata(
3130 &self,
3131 bundle_num: u32,
3132 ) -> Result<Option<crate::bundle_format::BundleMetadata>> {
3133 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
3134
3135 if !bundle_path.exists() {
3136 return Ok(None);
3137 }
3138
3139 match crate::bundle_format::extract_metadata_from_file(&bundle_path) {
3140 Ok(meta) => Ok(Some(meta)),
3141 Err(_) => Ok(None), // Bundle may not have embedded metadata
3142 }
3143 }
3144
3145 /// Delete bundle files from disk
3146 pub fn delete_bundle_files(&self, bundle_numbers: &[u32]) -> Result<RollbackFileStats> {
3147 let mut deleted = 0;
3148 let mut failed = 0;
3149 let mut deleted_size = 0u64;
3150
3151 for &bundle_num in bundle_numbers {
3152 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
3153
3154 // Get file size before deletion
3155 if let Ok(metadata) = std::fs::metadata(&bundle_path) {
3156 deleted_size += metadata.len();
3157 }
3158
3159 match std::fs::remove_file(&bundle_path) {
3160 Ok(_) => deleted += 1,
3161 Err(e) if e.kind() == std::io::ErrorKind::NotFound => deleted += 1,
3162 Err(_) => failed += 1,
3163 }
3164 }
3165
3166 Ok(RollbackFileStats {
3167 deleted,
3168 failed,
3169 deleted_size,
3170 })
3171 }
3172
3173 /// Preview what files would be cleaned without actually deleting them
3174 ///
3175 /// Scans for all `.tmp` files in:
3176 /// - Repository root directory (e.g., `plc_bundles.json.tmp`)
3177 /// - DID index directory `.plcbundle/` (e.g., `config.json.tmp`)
3178 /// - DID index shards directory `.plcbundle/shards/` (e.g., `00.tmp`, `01.tmp`, etc.)
3179 ///
3180 /// # Returns
3181 /// A preview of files that would be removed
3182 pub fn clean_preview(&self) -> Result<CleanPreview> {
3183 use std::fs;
3184
3185 let mut files = Vec::new();
3186 let mut total_size = 0u64;
3187
3188 // Scan repository root directory
3189 let root_dir = &self.directory;
3190 if let Ok(entries) = fs::read_dir(root_dir) {
3191 for entry in entries {
3192 let entry = match entry {
3193 Ok(e) => e,
3194 Err(_) => continue,
3195 };
3196
3197 let path = entry.path();
3198 if !path.is_file() {
3199 continue;
3200 }
3201
3202 if path.extension().is_some_and(|ext| ext == "tmp") {
3203 let file_size = match fs::metadata(&path) {
3204 Ok(meta) => meta.len(),
3205 Err(_) => 0,
3206 };
3207 total_size += file_size;
3208 files.push(CleanPreviewFile {
3209 path,
3210 size: file_size,
3211 });
3212 }
3213 }
3214 }
3215
3216 // Scan DID index directory (.plcbundle/)
3217 let did_index_dir = root_dir.join(constants::DID_INDEX_DIR);
3218 if did_index_dir.exists() {
3219 // Check config.json.tmp
3220 let config_tmp = did_index_dir.join(format!("{}.tmp", constants::DID_INDEX_CONFIG));
3221 if config_tmp.exists() {
3222 let file_size = match fs::metadata(&config_tmp) {
3223 Ok(meta) => meta.len(),
3224 Err(_) => 0,
3225 };
3226 total_size += file_size;
3227 files.push(CleanPreviewFile {
3228 path: config_tmp,
3229 size: file_size,
3230 });
3231 }
3232
3233 // Scan shards directory (.plcbundle/shards/)
3234 let shards_dir = did_index_dir.join(constants::DID_INDEX_SHARDS);
3235 if shards_dir.exists()
3236 && let Ok(entries) = fs::read_dir(&shards_dir)
3237 {
3238 for entry in entries {
3239 let entry = match entry {
3240 Ok(e) => e,
3241 Err(_) => continue,
3242 };
3243
3244 let path = entry.path();
3245 if !path.is_file() {
3246 continue;
3247 }
3248
3249 if path.extension().is_some_and(|ext| ext == "tmp") {
3250 let file_size = match fs::metadata(&path) {
3251 Ok(meta) => meta.len(),
3252 Err(_) => 0,
3253 };
3254 total_size += file_size;
3255 files.push(CleanPreviewFile {
3256 path,
3257 size: file_size,
3258 });
3259 }
3260 }
3261 }
3262 }
3263
3264 Ok(CleanPreview { files, total_size })
3265 }
3266
3267 /// Clean up all temporary files from the repository
3268 ///
3269 /// Removes all `.tmp` files from:
3270 /// - Repository root directory (e.g., `plc_bundles.json.tmp`)
3271 /// - DID index directory `.plcbundle/` (e.g., `config.json.tmp`)
3272 /// - DID index shards directory `.plcbundle/shards/` (e.g., `00.tmp`, `01.tmp`, etc.)
3273 ///
3274 /// # Returns
3275 /// Statistics about the cleanup operation
3276 pub fn clean(&self) -> Result<CleanResult> {
3277 use std::fs;
3278
3279 let verbose = *self.verbose.lock().unwrap();
3280
3281 if verbose {
3282 log::info!("Starting repository cleanup...");
3283 }
3284
3285 let mut files_removed = 0;
3286 let mut bytes_freed = 0u64;
3287 let mut errors = Vec::new();
3288
3289 // Clean repository root directory
3290 let root_dir = &self.directory;
3291 if verbose {
3292 log::info!("Scanning repository root directory: {}", root_dir.display());
3293 }
3294
3295 if let Ok(entries) = fs::read_dir(root_dir) {
3296 for entry in entries {
3297 let entry = match entry {
3298 Ok(e) => e,
3299 Err(e) => {
3300 errors.push(format!("Failed to read directory entry: {}", e));
3301 continue;
3302 }
3303 };
3304
3305 let path = entry.path();
3306 if !path.is_file() {
3307 continue;
3308 }
3309
3310 if path.extension().is_some_and(|ext| ext == "tmp") {
3311 let file_size = match fs::metadata(&path) {
3312 Ok(meta) => {
3313 let size = meta.len();
3314 bytes_freed += size;
3315 size
3316 }
3317 Err(_) => 0,
3318 };
3319
3320 match fs::remove_file(&path) {
3321 Ok(_) => {
3322 files_removed += 1;
3323 if verbose {
3324 log::info!(
3325 " ✓ Removed: {} ({})",
3326 path.file_name().and_then(|n| n.to_str()).unwrap_or("?"),
3327 crate::format::format_bytes(file_size)
3328 );
3329 }
3330 }
3331 Err(e) => {
3332 let error_msg = format!("Failed to remove {}: {}", path.display(), e);
3333 errors.push(error_msg.clone());
3334 if verbose {
3335 log::warn!(" ✗ {}", error_msg);
3336 }
3337 }
3338 }
3339 }
3340 }
3341 }
3342
3343 // Clean DID index directory (.plcbundle/)
3344 let did_index_dir = root_dir.join(constants::DID_INDEX_DIR);
3345 if did_index_dir.exists() {
3346 if verbose {
3347 log::info!("Scanning DID index directory: {}", did_index_dir.display());
3348 }
3349
3350 // Clean config.json.tmp
3351 let config_tmp = did_index_dir.join(format!("{}.tmp", constants::DID_INDEX_CONFIG));
3352 if config_tmp.exists() {
3353 let file_size = match fs::metadata(&config_tmp) {
3354 Ok(meta) => {
3355 let size = meta.len();
3356 bytes_freed += size;
3357 size
3358 }
3359 Err(_) => 0,
3360 };
3361
3362 match fs::remove_file(&config_tmp) {
3363 Ok(_) => {
3364 files_removed += 1;
3365 if verbose {
3366 log::info!(
3367 " ✓ Removed: {} ({})",
3368 config_tmp
3369 .file_name()
3370 .and_then(|n| n.to_str())
3371 .unwrap_or("?"),
3372 crate::format::format_bytes(file_size)
3373 );
3374 }
3375 }
3376 Err(e) => {
3377 let error_msg = format!("Failed to remove {}: {}", config_tmp.display(), e);
3378 errors.push(error_msg.clone());
3379 if verbose {
3380 log::warn!(" ✗ {}", error_msg);
3381 }
3382 }
3383 }
3384 }
3385
3386 // Clean shards directory (.plcbundle/shards/)
3387 let shards_dir = did_index_dir.join(constants::DID_INDEX_SHARDS);
3388 if shards_dir.exists() {
3389 if verbose {
3390 log::info!("Scanning shards directory: {}", shards_dir.display());
3391 }
3392 if let Ok(entries) = fs::read_dir(&shards_dir) {
3393 for entry in entries {
3394 let entry = match entry {
3395 Ok(e) => e,
3396 Err(e) => {
3397 errors
3398 .push(format!("Failed to read shards directory entry: {}", e));
3399 continue;
3400 }
3401 };
3402
3403 let path = entry.path();
3404 if !path.is_file() {
3405 continue;
3406 }
3407
3408 if path.extension().is_some_and(|ext| ext == "tmp") {
3409 let file_size = match fs::metadata(&path) {
3410 Ok(meta) => {
3411 let size = meta.len();
3412 bytes_freed += size;
3413 size
3414 }
3415 Err(_) => 0,
3416 };
3417
3418 match fs::remove_file(&path) {
3419 Ok(_) => {
3420 files_removed += 1;
3421 if verbose {
3422 log::info!(
3423 " ✓ Removed: {} ({})",
3424 path.file_name()
3425 .and_then(|n| n.to_str())
3426 .unwrap_or("?"),
3427 crate::format::format_bytes(file_size)
3428 );
3429 }
3430 }
3431 Err(e) => {
3432 let error_msg =
3433 format!("Failed to remove {}: {}", path.display(), e);
3434 errors.push(error_msg.clone());
3435 if verbose {
3436 log::warn!(" ✗ {}", error_msg);
3437 }
3438 }
3439 }
3440 }
3441 }
3442 }
3443 } else if verbose {
3444 log::debug!("Shards directory does not exist: {}", shards_dir.display());
3445 }
3446 } else if verbose {
3447 log::debug!(
3448 "DID index directory does not exist: {}",
3449 did_index_dir.display()
3450 );
3451 }
3452
3453 // Summary logging
3454 if verbose {
3455 if files_removed > 0 {
3456 log::info!(
3457 "Cleanup complete: removed {} file(s), freed {}",
3458 files_removed,
3459 crate::format::format_bytes(bytes_freed)
3460 );
3461 } else {
3462 log::info!("Cleanup complete: no temporary files found");
3463 }
3464
3465 if !errors.is_empty() {
3466 log::warn!("Encountered {} error(s) during cleanup", errors.len());
3467 }
3468 }
3469
3470 Ok(CleanResult {
3471 files_removed,
3472 bytes_freed,
3473 errors: if errors.is_empty() {
3474 None
3475 } else {
3476 Some(errors)
3477 },
3478 })
3479 }
3480
3481 // === Server API Methods ===
3482
3483 /// Get PLC origin from index
3484 pub fn get_plc_origin(&self) -> String {
3485 self.index.read().unwrap().origin.clone()
3486 }
3487
3488 /// Stream bundle raw (compressed) data
3489 /// Returns a reader that can be used to stream the compressed bundle file
3490 pub fn stream_bundle_raw(&self, bundle_num: u32) -> Result<std::fs::File> {
3491 // Validate bundle exists in index first
3492 if self.get_bundle_metadata(bundle_num)?.is_none() {
3493 anyhow::bail!("Bundle {} not found in index", bundle_num);
3494 }
3495
3496 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
3497 if !bundle_path.exists() {
3498 anyhow::bail!(
3499 "Bundle {} file not found (exists in index but missing on disk)",
3500 bundle_num
3501 );
3502 }
3503 Ok(std::fs::File::open(bundle_path)?)
3504 }
3505
3506 /// Stream bundle decompressed (JSONL) data
3507 /// Returns a reader that decompresses the bundle on-the-fly
3508 pub fn stream_bundle_decompressed(
3509 &self,
3510 bundle_num: u32,
3511 ) -> Result<Box<dyn std::io::Read + Send>> {
3512 let file = self.stream_bundle_raw(bundle_num)?;
3513 Ok(Box::new(zstd::Decoder::new(file)?))
3514 }
3515
3516 /// Get current cursor (global position of last operation)
3517 /// Cursor = (last_bundle * BUNDLE_SIZE) + mempool_ops_count
3518 pub fn get_current_cursor(&self) -> u64 {
3519 let index = self.index.read().unwrap();
3520 let bundled_ops = total_operations_from_bundles(index.last_bundle);
3521
3522 // Add mempool operations if available
3523 let mempool_guard = self.mempool.read().unwrap();
3524 let mempool_ops = if let Some(mp) = mempool_guard.as_ref() {
3525 mp.get_operations().len() as u64
3526 } else {
3527 0
3528 };
3529
3530 bundled_ops + mempool_ops
3531 }
3532
3533 /// Resolve handle to DID or validate DID format (async version)
3534 /// Returns (did, handle_resolve_time_ms)
3535 /// Use this version when calling from async code (e.g., server handlers)
3536 pub async fn resolve_handle_or_did_async(&self, input: &str) -> Result<(String, u64)> {
3537 use std::time::Instant;
3538
3539 let input = input.trim();
3540
3541 // Normalize handle format (remove at://, @ prefixes)
3542 let normalized = if !input.starts_with("did:") {
3543 handle_resolver::normalize_handle(input)
3544 } else {
3545 input.to_string()
3546 };
3547
3548 // If already a DID, validate and return
3549 if normalized.starts_with("did:plc:") {
3550 crate::resolver::validate_did_format(&normalized)?;
3551 return Ok((normalized, 0));
3552 }
3553
3554 // Support did:web too
3555 if normalized.starts_with("did:web:") {
3556 return Ok((normalized, 0));
3557 }
3558
3559 // It's a handle - need resolver
3560 let resolver = match &self.handle_resolver {
3561 Some(r) => r,
3562 None => {
3563 anyhow::bail!(
3564 "Input '{}' appears to be a handle, but handle resolver is not configured\n\n\
3565 Configure resolver with:\n\
3566 plcbundle --handle-resolver {} did resolve {}\n\n\
3567 Or set default in config",
3568 normalized,
3569 constants::DEFAULT_HANDLE_RESOLVER_URL,
3570 normalized
3571 );
3572 }
3573 };
3574
3575 // Resolve handle (async operation)
3576 let resolve_start = Instant::now();
3577 let did = resolver.resolve_handle(&normalized).await?;
3578 let resolve_time = resolve_start.elapsed();
3579
3580 Ok((did, resolve_time.as_millis() as u64))
3581 }
3582
3583 /// Resolve handle to DID or validate DID format
3584 /// Returns (did, handle_resolve_time_ms)
3585 /// This is a synchronous wrapper that uses tokio runtime for async resolution
3586 /// For async code, use resolve_handle_or_did_async instead
3587 pub fn resolve_handle_or_did(&self, input: &str) -> Result<(String, u64)> {
3588 use std::time::Instant;
3589
3590 let input = input.trim();
3591
3592 // Normalize handle format (remove at://, @ prefixes)
3593 let normalized = if !input.starts_with("did:") {
3594 handle_resolver::normalize_handle(input)
3595 } else {
3596 input.to_string()
3597 };
3598
3599 // If already a DID, validate and return
3600 if normalized.starts_with("did:plc:") {
3601 crate::resolver::validate_did_format(&normalized)?;
3602 return Ok((normalized, 0));
3603 }
3604
3605 // Support did:web too
3606 if normalized.starts_with("did:web:") {
3607 return Ok((normalized, 0));
3608 }
3609
3610 // It's a handle - need resolver
3611 let resolver = match &self.handle_resolver {
3612 Some(r) => r,
3613 None => {
3614 anyhow::bail!(
3615 "Input '{}' appears to be a handle, but handle resolver is not configured\n\n\
3616 Configure resolver with:\n\
3617 plcbundle --handle-resolver {} did resolve {}\n\n\
3618 Or set default in config",
3619 normalized,
3620 constants::DEFAULT_HANDLE_RESOLVER_URL,
3621 normalized
3622 );
3623 }
3624 };
3625
3626 // Use tokio runtime to resolve handle (async operation)
3627 // Not in a runtime - safe to create one and use block_on
3628 let resolve_start = Instant::now();
3629 let runtime = tokio::runtime::Runtime::new()
3630 .map_err(|e| anyhow::anyhow!("Failed to create tokio runtime: {}", e))?;
3631 let did = runtime.block_on(resolver.resolve_handle(&normalized))?;
3632 let resolve_time = resolve_start.elapsed();
3633
3634 Ok((did, resolve_time.as_millis() as u64))
3635 }
3636
3637 /// Get resolver statistics
3638 /// Returns a HashMap with resolver performance metrics
3639 pub fn get_resolver_stats(&self) -> HashMap<String, serde_json::Value> {
3640 // For now, return empty stats
3641 // TODO: Track resolver statistics
3642 HashMap::new()
3643 }
3644
3645 /// Get handle resolver base URL
3646 /// Returns None if handle resolver is not configured
3647 pub fn get_handle_resolver_base_url(&self) -> Option<String> {
3648 self.handle_resolver
3649 .as_ref()
3650 .map(|r| r.get_base_url().to_string())
3651 }
3652
3653 /// Get a reference to the handle resolver
3654 /// Returns None if handle resolver is not configured
3655 pub fn get_handle_resolver(&self) -> Option<Arc<handle_resolver::HandleResolver>> {
3656 self.handle_resolver.clone()
3657 }
3658
3659 /// Create a shallow clone suitable for `Arc` sharing
3660 pub fn clone_for_arc(&self) -> Self {
3661 Self {
3662 directory: self.directory.clone(),
3663 index: Arc::clone(&self.index),
3664 did_index: Arc::clone(&self.did_index),
3665 stats: Arc::clone(&self.stats),
3666 mempool: Arc::clone(&self.mempool),
3667 mempool_checked: Arc::clone(&self.mempool_checked),
3668 handle_resolver: self.handle_resolver.clone(),
3669 verbose: Arc::clone(&self.verbose),
3670 }
3671 }
3672 fn load_bundle_from_disk(&self, path: &PathBuf) -> Result<Vec<Operation>> {
3673 use std::io::BufRead;
3674
3675 let file = std::fs::File::open(path)?;
3676 let decoder = zstd::Decoder::new(file)?;
3677 let reader = std::io::BufReader::new(decoder);
3678
3679 let mut operations = Vec::new();
3680 for line in reader.lines() {
3681 let line = line?;
3682 if line.is_empty() {
3683 continue;
3684 }
3685 // CRITICAL: Preserve raw JSON for content hash calculation
3686 // This is required by the V1 specification (docs/specification.md § 4.2)
3687 // to ensure content_hash remains reproducible during migration.
3688 // Without this, re-serialization would change the hash.
3689 // Use Operation::from_json (sonic_rs) instead of serde deserialization
3690 let op = Operation::from_json(&line)?;
3691 operations.push(op);
3692 }
3693
3694 Ok(operations)
3695 }
3696
3697 fn filter_load_result(&self, operations: Vec<Operation>, options: &LoadOptions) -> LoadResult {
3698 let mut filtered = operations;
3699
3700 if let Some(ref filter) = options.filter {
3701 filtered.retain(|op| self.matches_filter(op, filter));
3702 }
3703
3704 if let Some(limit) = options.limit {
3705 filtered.truncate(limit);
3706 }
3707
3708 LoadResult {
3709 bundle_number: 0,
3710 operations: filtered,
3711 metadata: None,
3712 }
3713 }
3714
3715 fn matches_filter(&self, op: &Operation, filter: &OperationFilter) -> bool {
3716 if let Some(ref did) = filter.did
3717 && &op.did != did
3718 {
3719 return false;
3720 }
3721
3722 if let Some(ref op_type) = filter.operation_type
3723 && &op.operation != op_type
3724 {
3725 return false;
3726 }
3727
3728 if !filter.include_nullified && op.nullified {
3729 return false;
3730 }
3731
3732 true
3733 }
3734
3735 fn matches_request(&self, op: &Operation, req: &OperationRequest) -> bool {
3736 if let Some(ref filter) = req.filter {
3737 return self.matches_filter(op, filter);
3738 }
3739 true
3740 }
3741
3742 // === Repository Management ===
3743
3744 /// Initialize a new repository with an empty index
3745 ///
3746 /// This is a static method that doesn't require an existing BundleManager.
3747 /// Creates all necessary directories and an empty index file.
3748 ///
3749 /// # Arguments
3750 /// * `directory` - Directory to initialize
3751 /// * `origin` - PLC directory URL or origin identifier
3752 /// * `force` - Whether to reinitialize if already exists
3753 ///
3754 /// # Returns
3755 /// True if initialized (created new), False if already existed and force=false
3756 pub fn init_repository<P: AsRef<std::path::Path>>(
3757 directory: P,
3758 origin: String,
3759 force: bool,
3760 ) -> Result<bool> {
3761 Index::init(directory, origin, force)
3762 }
3763
3764 /// Rebuild index from existing bundle files
3765 ///
3766 /// This is a static method that doesn't require an existing BundleManager.
3767 /// It scans all .jsonl.zst files in the directory and reconstructs the index
3768 /// by extracting embedded metadata from each bundle's skippable frame.
3769 ///
3770 /// # Arguments
3771 /// * `directory` - Directory containing bundle files
3772 /// * `origin` - Optional origin URL (auto-detected from first bundle if None)
3773 /// * `progress_cb` - Optional progress callback (current, total)
3774 ///
3775 /// # Returns
3776 /// The reconstructed index (already saved to disk)
3777 pub fn rebuild_index<P: AsRef<std::path::Path>, F>(
3778 directory: P,
3779 origin: Option<String>,
3780 progress_cb: Option<F>,
3781 ) -> Result<Index>
3782 where
3783 F: Fn(usize, usize, u64, u64) + Send + Sync, // (current, total, bytes_processed, total_bytes)
3784 {
3785 let index = Index::rebuild_from_bundles(&directory, origin, progress_cb)?;
3786 index.save(&directory)?;
3787 Ok(index)
3788 }
3789
3790 /// Clone repository from a remote plcbundle instance
3791 ///
3792 /// Downloads bundles from a remote instance and reconstructs the repository.
3793 /// This is a static async method that doesn't require an existing BundleManager.
3794 ///
3795 /// # Arguments
3796 /// * `remote_url` - URL of the remote plcbundle instance
3797 /// * `target_dir` - Directory to clone into
3798 /// * `remote_index` - Already fetched remote index
3799 /// * `bundles_to_download` - List of bundle numbers to download
3800 /// * `progress_callback` - Optional callback for progress updates (bundle_num, count, total_count, bytes)
3801 ///
3802 /// # Returns
3803 /// Tuple of (successful_downloads, failed_downloads)
3804 pub async fn clone_from_remote<P, F>(
3805 remote_url: String,
3806 target_dir: P,
3807 remote_index: &Index,
3808 bundles_to_download: Vec<u32>,
3809 progress_callback: Option<F>,
3810 ) -> Result<(usize, usize)>
3811 where
3812 P: AsRef<std::path::Path> + Send + Sync,
3813 F: Fn(u32, usize, usize, u64) + Send + Sync + 'static,
3814 {
3815 use crate::remote::RemoteClient;
3816 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3817
3818 let target_dir = target_dir.as_ref();
3819
3820 // Save index first
3821 remote_index.save(target_dir)?;
3822
3823 // Progress tracking
3824 let downloaded = Arc::new(AtomicUsize::new(0));
3825 let failed = Arc::new(AtomicUsize::new(0));
3826 let bytes_downloaded = Arc::new(AtomicU64::new(0));
3827 let total_count = bundles_to_download.len();
3828
3829 // Parallel download with semaphore (4 concurrent downloads)
3830 let semaphore = Arc::new(tokio::sync::Semaphore::new(4));
3831 let progress_cb = progress_callback.map(Arc::new);
3832
3833 let mut tasks = Vec::new();
3834
3835 for bundle_num in bundles_to_download {
3836 let client = RemoteClient::new(&remote_url)?;
3837 let target_dir = target_dir.to_path_buf();
3838 let downloaded = Arc::clone(&downloaded);
3839 let failed = Arc::clone(&failed);
3840 let bytes_downloaded = Arc::clone(&bytes_downloaded);
3841 let semaphore = Arc::clone(&semaphore);
3842 let progress_cb = progress_cb.clone();
3843
3844 let task = tokio::spawn(async move {
3845 let _permit = semaphore.acquire().await.unwrap();
3846
3847 // Retry logic with exponential backoff
3848 let max_retries = 3;
3849 for attempt in 0..max_retries {
3850 if attempt > 0 {
3851 let delay = std::time::Duration::from_secs(1 << attempt);
3852 tokio::time::sleep(delay).await;
3853 }
3854
3855 match client.download_bundle_file(bundle_num).await {
3856 Ok(data) => {
3857 let data_len = data.len() as u64;
3858
3859 // Write bundle file
3860 let bundle_path = constants::bundle_path(&target_dir, bundle_num);
3861 if let Err(_e) = std::fs::write(&bundle_path, data) {
3862 failed.fetch_add(1, Ordering::SeqCst);
3863 return;
3864 }
3865
3866 let count = downloaded.fetch_add(1, Ordering::SeqCst) + 1;
3867 let bytes =
3868 bytes_downloaded.fetch_add(data_len, Ordering::SeqCst) + data_len;
3869
3870 // Call progress callback
3871 if let Some(ref cb) = progress_cb {
3872 cb(bundle_num, count, total_count, bytes);
3873 }
3874 return;
3875 }
3876 Err(_) => {
3877 continue; // Retry
3878 }
3879 }
3880 }
3881
3882 // All retries failed
3883 failed.fetch_add(1, Ordering::SeqCst);
3884 });
3885
3886 tasks.push(task);
3887 }
3888
3889 // Wait for all downloads
3890 for task in tasks {
3891 let _ = task.await;
3892 }
3893
3894 let downloaded_count = downloaded.load(Ordering::SeqCst);
3895 let failed_count = failed.load(Ordering::SeqCst);
3896
3897 Ok((downloaded_count, failed_count))
3898 }
3899
3900 /// Deletes a bundle file from the repository.
3901 ///
3902 /// This method removes a bundle file from the repository directory.
3903 ///
3904 /// # Arguments
3905 /// * `bundle_num` - The number of the bundle to delete.
3906 ///
3907 /// # Returns
3908 /// A `Result` indicating whether the operation was successful.
3909 pub fn delete_bundle_file(&self, bundle_num: u32) -> Result<()> {
3910 let bundle_path = constants::bundle_path(&self.directory, bundle_num);
3911 if bundle_path.exists() {
3912 std::fs::remove_file(bundle_path)?;
3913 }
3914 Ok(())
3915 }
3916}
3917
3918// Supporting types moved here
3919/// Options controlling bundle loading behavior
3920#[derive(Debug, Clone)]
3921pub struct LoadOptions {
3922 pub cache: bool,
3923 pub decompress: bool,
3924 pub filter: Option<OperationFilter>,
3925 pub limit: Option<usize>,
3926}
3927
3928impl Default for LoadOptions {
3929 fn default() -> Self {
3930 Self {
3931 cache: true,
3932 decompress: true,
3933 filter: None,
3934 limit: None,
3935 }
3936 }
3937}
3938
3939/// Result from a bundle load operation
3940#[derive(Debug)]
3941pub struct LoadResult {
3942 pub bundle_number: u32,
3943 pub operations: Vec<Operation>,
3944 pub metadata: Option<BundleMetadata>,
3945}
3946
3947/// Result for single-operation fetch with timing
3948#[derive(Debug)]
3949pub struct OperationResult {
3950 pub raw_json: String,
3951 pub size_bytes: usize,
3952 pub load_duration: std::time::Duration,
3953}
3954
3955/// Specification for querying bundles
3956#[derive(Debug, Clone)]
3957pub struct QuerySpec {
3958 pub bundles: BundleRange,
3959 pub filter: Option<OperationFilter>,
3960 pub query: String,
3961 pub mode: QueryMode,
3962}
3963
3964// Helper function to format age duration
3965fn format_age(duration: chrono::Duration) -> String {
3966 let days = duration.num_days();
3967 if days >= 365 {
3968 let years = days as f64 / 365.25;
3969 format!("{:.1} years ago", years)
3970 } else if days >= 30 {
3971 let months = days as f64 / 30.0;
3972 format!("{:.1} months ago", months)
3973 } else if days > 0 {
3974 format!("{} days ago", days)
3975 } else {
3976 let hours = duration.num_hours();
3977 if hours > 0 {
3978 format!("{} hours ago", hours)
3979 } else {
3980 let mins = duration.num_minutes();
3981 if mins > 0 {
3982 format!("{} minutes ago", mins)
3983 } else {
3984 "just now".to_string()
3985 }
3986 }
3987 }
3988}
3989
3990/// Bundle selection for queries, exports, and verification
3991#[derive(Debug, Clone)]
3992pub enum BundleRange {
3993 All,
3994 Single(u32),
3995 Range(u32, u32),
3996 List(Vec<u32>),
3997}
3998
3999/// Specification for export operations
4000#[derive(Debug, Clone)]
4001pub struct ExportSpec {
4002 pub bundles: BundleRange,
4003 pub format: ExportFormat,
4004 pub filter: Option<OperationFilter>,
4005 pub count: Option<usize>,
4006 pub after_timestamp: Option<String>,
4007}
4008
4009/// Output format for export
4010#[derive(Debug, Clone)]
4011pub enum ExportFormat {
4012 JsonLines,
4013}
4014
4015/// Statistics collected during export
4016#[derive(Debug, Default)]
4017pub struct ExportStats {
4018 pub records_written: u64,
4019 pub bytes_written: u64,
4020}
4021
4022/// Specification for bundle verification
4023#[derive(Debug, Clone)]
4024pub struct VerifySpec {
4025 pub check_hash: bool,
4026 pub check_content_hash: bool,
4027 pub check_operations: bool,
4028 pub fast: bool, // Fast mode: only check metadata frame, skip hash calculations
4029}
4030
4031/// Result of verifying a single bundle
4032#[derive(Debug)]
4033pub struct VerifyResult {
4034 pub valid: bool,
4035 pub errors: Vec<String>,
4036}
4037
4038/// Specification for chain verification
4039#[derive(Debug, Clone)]
4040pub struct ChainVerifySpec {
4041 pub start_bundle: u32,
4042 pub end_bundle: Option<u32>,
4043 pub check_parent_links: bool,
4044}
4045
4046/// Result of chain verification across multiple bundles
4047#[derive(Debug)]
4048pub struct ChainVerifyResult {
4049 pub valid: bool,
4050 pub bundles_checked: u32,
4051 pub errors: Vec<(u32, String)>,
4052}
4053
4054/// Aggregated bundle information with optional details
4055#[derive(Debug)]
4056pub struct BundleInfo {
4057 pub metadata: BundleMetadata,
4058 pub exists: bool,
4059 pub operations: Option<Vec<Operation>>,
4060 pub size_info: Option<SizeInfo>,
4061}
4062
4063/// Size information (compressed and uncompressed) for a bundle
4064#[derive(Debug)]
4065pub struct SizeInfo {
4066 pub compressed: u64,
4067 pub uncompressed: u64,
4068}
4069
4070/// Flags controlling `get_bundle_info` detail inclusion
4071#[derive(Debug, Clone)]
4072pub struct InfoFlags {
4073 pub include_operations: bool,
4074 pub include_size_info: bool,
4075}
4076
4077/// Specification for rollback execution
4078#[derive(Debug, Clone)]
4079pub struct RollbackSpec {
4080 pub target_bundle: u32,
4081 pub dry_run: bool,
4082}
4083
4084/// Plan produced by `rollback_plan`
4085#[derive(Debug)]
4086pub struct RollbackPlan {
4087 pub target_bundle: u32,
4088 pub affected_bundles: Vec<u32>,
4089 pub affected_operations: usize,
4090 pub affected_dids: usize,
4091 pub estimated_time_ms: u64,
4092}
4093
4094/// Result returned by `rollback`
4095#[derive(Debug)]
4096pub struct RollbackResult {
4097 pub success: bool,
4098 pub bundles_removed: usize,
4099 pub plan: Option<RollbackPlan>,
4100}
4101
4102/// Specification for cache warm-up
4103#[derive(Debug, Clone)]
4104pub struct WarmUpSpec {
4105 pub strategy: WarmUpStrategy,
4106}
4107
4108/// Strategy selection for warm-up
4109#[derive(Debug, Clone)]
4110pub enum WarmUpStrategy {
4111 Recent(u32),
4112 Range(u32, u32),
4113 All,
4114}
4115
4116/// Statistics from DID index rebuild
4117#[derive(Debug, Default, Clone)]
4118pub struct RebuildStats {
4119 pub bundles_processed: u32,
4120 pub operations_indexed: u64,
4121}