High-performance implementation of plcbundle written in Rust
at main 1937 lines 67 kB view raw
1// DID Index CLI commands 2use super::utils; 3use anyhow::Result; 4use clap::{Args, Subcommand}; 5use plcbundle::{BundleManager, constants}; 6use std::path::{Path, PathBuf}; 7use std::time::Instant; 8 9#[derive(Args)] 10#[command( 11 about = "DID index management", 12 long_about = "Manage the DID position index, a critical data structure that maps DIDs to 13their bundle locations. This index enables fast O(1) DID lookups and is required 14for efficient DID resolution, operation history queries, and many other DID-based 15operations. 16 17The index is organized into 256 shards based on DID identifier hashes, allowing 18for efficient distribution and parallel access. The 'build' subcommand creates 19the index from scratch by scanning all bundles. The 'repair' subcommand intelligently 20updates the index when new bundles are added or when the index becomes corrupted. 21 22Use 'stats' to see index statistics including total DIDs, shard distribution, 23and storage usage. Use 'verify' to validate index integrity by rebuilding it 24in memory and comparing with the existing index. Use 'compact' to merge delta 25segments and optimize index performance. 26 27Regular index maintenance ensures optimal performance for DID lookups. The index 28is automatically updated during sync operations, but manual repair or compaction 29may be needed for optimal performance.", 30 help_template = crate::clap_help!( 31 examples: " # Build DID position index\n \ 32 {bin} index build\n\n \ 33 # Repair DID index (rebuild from bundles)\n \ 34 {bin} index repair\n\n \ 35 # Show DID index statistics\n \ 36 {bin} index stats\n\n \ 37 # Verify DID index integrity\n \ 38 {bin} index verify\n\n \ 39 # Compact index segments\n \ 40 {bin} index compact" 41 ) 42)] 43pub struct IndexCommand { 44 #[command(subcommand)] 45 pub command: IndexCommands, 46} 47 48#[derive(Subcommand)] 49pub enum IndexCommands { 50 /// Build DID position index 51 #[command(help_template = crate::clap_help!( 52 examples: " # Build index (default: flush every 64 bundles)\n \ 53 {bin} index build\n\n \ 54 # Force rebuild from scratch\n \ 55 {bin} index build --force\n\n \ 56 # Use 8 parallel threads\n \ 57 {bin} index build -j 8\n\n \ 58 # Flush every 100 bundles (reduce memory usage)\n \ 59 {bin} index build --flush-interval 100\n\n \ 60 # No intermediate flushes (maximum speed, high memory)\n \ 61 {bin} index build --flush-interval 0" 62 ))] 63 Build { 64 /// Rebuild even if index exists 65 #[arg(short, long)] 66 force: bool, 67 68 /// Number of threads to use (0 = auto-detect) 69 #[arg(short = 'j', long, default_value = "0")] 70 threads: usize, 71 72 /// Flush to disk every N bundles (0 = only at end, default = 64) 73 #[arg(long, default_value_t = constants::DID_INDEX_FLUSH_INTERVAL)] 74 flush_interval: u32, 75 }, 76 77 /// Repair DID index (incremental update + compaction) 78 #[command( 79 alias = "rebuild", 80 help_template = crate::clap_help!( 81 before: "Intelligently repairs the DID index by:\n \ 82 - Incrementally updating missing bundles (if < 1000 behind)\n \ 83 - Performing full rebuild (if > 1000 bundles behind)\n \ 84 - Compacting delta segments (if > 50 segments)\n\n \ 85 Use this after syncing new bundles or if index is corrupted.", 86 examples: " {bin} index repair\n \ 87 {bin} index repair -j 8\n \ 88 {bin} index repair --flush-interval 100" 89 ) 90 )] 91 Repair { 92 /// Number of threads to use (0 = auto-detect) 93 #[arg(short = 'j', long, default_value = "0")] 94 threads: usize, 95 96 /// Flush to disk every N bundles (0 = only at end, default = 64) 97 #[arg(long, default_value_t = constants::DID_INDEX_FLUSH_INTERVAL)] 98 flush_interval: u32, 99 }, 100 101 /// Show DID index status 102 #[command(aliases = ["stats", "info"])] 103 Status { 104 /// Output as JSON 105 #[arg(long)] 106 json: bool, 107 }, 108 109 /// Verify DID index integrity 110 #[command(alias = "check")] 111 Verify { 112 /// Flush to disk every N bundles (0 = only at end, default = 64) 113 #[arg(long, default_value_t = constants::DID_INDEX_FLUSH_INTERVAL)] 114 flush_interval: u32, 115 116 /// Perform full verification by rebuilding index in memory and comparing 117 #[arg(long)] 118 full: bool, 119 }, 120 121 /// Debug and inspect DID index internals 122 #[command(alias = "inspect")] 123 Debug { 124 /// Show specific shard (0-255 or hex like 0xac) 125 #[arg(short, long)] 126 shard: Option<String>, 127 128 /// Lookup DID or handle in index 129 #[arg(long)] 130 did: Option<String>, 131 132 /// Output as JSON 133 #[arg(long)] 134 json: bool, 135 }, 136 137 /// Compact delta segments in DID index 138 #[command(help_template = crate::clap_help!( 139 examples: " # Compact all shards\n \ 140 {bin} index compact\n\n \ 141 # Compact specific shards\n \ 142 {bin} index compact --shards 0xac 0x12 0xff" 143 ))] 144 Compact { 145 /// Specific shards to compact (0-255 or hex like 0xac) 146 #[arg(short, long, value_delimiter = ' ')] 147 shards: Option<Vec<String>>, 148 }, 149} 150 151pub fn run(cmd: IndexCommand, dir: PathBuf, global_verbose: bool) -> Result<()> { 152 match cmd.command { 153 IndexCommands::Build { 154 force, 155 threads, 156 flush_interval, 157 } => { 158 cmd_index_build(dir, force, threads, flush_interval)?; 159 } 160 IndexCommands::Repair { 161 threads, 162 flush_interval, 163 } => { 164 cmd_index_repair(dir, threads, flush_interval)?; 165 } 166 IndexCommands::Status { json } => { 167 cmd_index_stats(dir, json)?; 168 } 169 IndexCommands::Verify { 170 flush_interval, 171 full, 172 } => { 173 cmd_index_verify(dir, global_verbose, flush_interval, full)?; 174 } 175 IndexCommands::Debug { shard, did, json } => { 176 let shard_num = shard.map(|s| parse_shard(&s)).transpose()?; 177 cmd_index_debug(dir, shard_num, did, json)?; 178 } 179 IndexCommands::Compact { shards } => { 180 let shard_nums = shards 181 .map(|shard_list| { 182 shard_list 183 .iter() 184 .map(|s| parse_shard(s)) 185 .collect::<Result<Vec<u8>>>() 186 }) 187 .transpose()?; 188 cmd_index_compact(dir, shard_nums)?; 189 } 190 } 191 Ok(()) 192} 193 194/// Parse shard number from string (supports hex 0xac or decimal) 195fn parse_shard(s: &str) -> Result<u8> { 196 if s.starts_with("0x") || s.starts_with("0X") { 197 u8::from_str_radix(&s[2..], 16).map_err(|_| anyhow::anyhow!("Invalid shard number: {}", s)) 198 } else { 199 s.parse::<u8>() 200 .map_err(|_| anyhow::anyhow!("Invalid shard number: {}", s)) 201 } 202} 203 204pub fn cmd_index_build( 205 dir: PathBuf, 206 force: bool, 207 threads: usize, 208 flush_interval: u32, 209) -> Result<()> { 210 // Set thread pool size 211 let num_threads = super::utils::get_worker_threads(threads, 4); 212 rayon::ThreadPoolBuilder::new() 213 .num_threads(num_threads) 214 .build_global() 215 .ok(); // Ignore error if already initialized 216 217 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 218 219 // Check if index exists 220 let did_index = manager.get_did_index_stats(); 221 let total_dids = did_index 222 .get("total_dids") 223 .and_then(|v| v.as_i64()) 224 .unwrap_or(0); 225 let index_exists = did_index 226 .get("exists") 227 .and_then(|v| v.as_bool()) 228 .unwrap_or(false); 229 230 if index_exists && total_dids > 0 && !force { 231 let last_bundle = manager.get_last_bundle(); 232 let index_last_bundle = did_index 233 .get("last_bundle") 234 .and_then(|v| v.as_i64()) 235 .unwrap_or(0) as u32; 236 let shards_with_data = did_index 237 .get("shards_with_data") 238 .and_then(|v| v.as_i64()) 239 .unwrap_or(0); 240 241 eprintln!("\n✅ DID Index Already Built"); 242 eprintln!( 243 " Total DIDs: {}", 244 utils::format_number(total_dids as u64) 245 ); 246 eprintln!(" Last bundle: {} / {}", index_last_bundle, last_bundle); 247 eprintln!(" Shards: {} with data", shards_with_data); 248 eprintln!( 249 " Location: {}/{}/", 250 utils::display_path(&dir).display(), 251 constants::DID_INDEX_DIR 252 ); 253 254 if index_last_bundle < last_bundle { 255 let missing = last_bundle - index_last_bundle; 256 eprintln!(); 257 eprintln!(" ⚠️ Index is {} bundles behind", missing); 258 eprintln!(" 💡 Run: {} index repair", constants::BINARY_NAME); 259 } else if index_last_bundle == last_bundle { 260 eprintln!(); 261 eprintln!(" ✓ Index is up-to-date"); 262 } 263 264 eprintln!(); 265 eprintln!(" 💡 Use --force to rebuild from scratch"); 266 return Ok(()); 267 } 268 269 if super::utils::is_repository_empty(&manager) { 270 log::info!("No bundles to index"); 271 return Ok(()); 272 } 273 274 // Get total uncompressed size for progress tracking 275 let index = manager.get_index(); 276 let bundle_numbers: Vec<u32> = (1..=manager.get_last_bundle()).collect(); 277 let total_bytes = index.total_uncompressed_size_for_bundles(&bundle_numbers); 278 279 // Create two-stage progress bar 280 use super::progress::TwoStageProgress; 281 use std::sync::Arc; 282 let last_bundle = manager.get_last_bundle() as u32; 283 let progress = TwoStageProgress::new(last_bundle, total_bytes); 284 285 // Set up cleanup guard to ensure temp files are deleted on CTRL+C or panic 286 // Use Arc to share manager for cleanup 287 let manager_arc = Arc::new(manager); 288 let manager_for_cleanup = manager_arc.clone(); 289 290 struct IndexBuildCleanup { 291 manager: Arc<BundleManager>, 292 } 293 294 impl Drop for IndexBuildCleanup { 295 fn drop(&mut self) { 296 // Cleanup temp files on drop (CTRL+C, panic, or normal exit) 297 let did_index = self.manager.get_did_index(); 298 if let Some(idx) = did_index.read().unwrap().as_ref() 299 && let Err(e) = idx.cleanup_temp_files() 300 { 301 log::warn!("[Index Build] Failed to cleanup temp files: {}", e); 302 } 303 } 304 } 305 306 let _cleanup_guard = IndexBuildCleanup { 307 manager: manager_for_cleanup.clone(), 308 }; 309 310 // Wrap build call - cleanup guard will handle CTRL+C and panics 311 let build_result = manager_arc.build_did_index( 312 flush_interval, 313 Some(progress.callback_for_build_did_index()), 314 Some(num_threads), 315 Some(progress.interrupted()), 316 ); 317 318 // Handle build result - ensure cleanup happens on error 319 match build_result { 320 Ok(_) => { 321 // Success - continue 322 } 323 Err(e) => { 324 // Error occurred - explicitly cleanup temp files before returning 325 let did_index = manager_arc.get_did_index(); 326 if let Some(idx) = did_index.read().unwrap().as_ref() 327 && let Err(cleanup_err) = idx.cleanup_temp_files() 328 { 329 log::warn!( 330 "[Index Build] Failed to cleanup temp files after error: {}", 331 cleanup_err 332 ); 333 } 334 return Err(e); 335 } 336 } 337 338 // Finish the progress bars 339 progress.finish(); 340 341 eprintln!(); 342 343 Ok(()) 344} 345 346pub fn cmd_index_repair(dir: PathBuf, threads: usize, flush_interval: u32) -> Result<()> { 347 // Set thread pool size 348 let num_threads = super::utils::get_worker_threads(threads, 4); 349 rayon::ThreadPoolBuilder::new() 350 .num_threads(num_threads) 351 .build_global() 352 .ok(); // Ignore error if already initialized 353 354 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 355 356 // Check if index config exists (even if corrupted) 357 let stats_map = manager.get_did_index_stats(); 358 let index_exists = stats_map 359 .get("exists") 360 .and_then(|v| v.as_bool()) 361 .unwrap_or(false); 362 363 if !index_exists { 364 log::error!("DID index does not exist"); 365 log::info!("Use: {} index build", constants::BINARY_NAME); 366 return Ok(()); 367 } 368 369 // Check if there are bundles to index 370 if super::utils::is_repository_empty(&manager) { 371 log::info!("No bundles to index"); 372 return Ok(()); 373 } 374 375 log::info!("Analyzing DID index for repair...\n"); 376 377 let last_bundle = manager.get_last_bundle(); 378 let index_last_bundle = stats_map 379 .get("last_bundle") 380 .and_then(|v| v.as_i64()) 381 .unwrap_or(0) as u32; 382 let delta_segments = stats_map 383 .get("delta_segments") 384 .and_then(|v| v.as_u64()) 385 .unwrap_or(0); 386 387 // Check if repair is needed before setting up progress bar 388 let did_index = manager.get_did_index(); 389 let idx = did_index.read().unwrap(); 390 let idx_ref = idx 391 .as_ref() 392 .ok_or_else(|| anyhow::anyhow!("DID index not initialized"))?; 393 let repair_info = idx_ref.get_repair_info(last_bundle)?; 394 let needs_work = repair_info.needs_rebuild || repair_info.needs_compact; 395 drop(idx); 396 397 // Use BundleManager API for repair 398 let start = Instant::now(); 399 400 // Set up progress callback only if work is needed 401 use super::progress::TwoStageProgress; 402 let progress = if needs_work { 403 let index = manager.get_index(); 404 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect(); 405 let total_bytes = index.total_uncompressed_size_for_bundles(&bundle_numbers); 406 Some(TwoStageProgress::new(last_bundle, total_bytes)) 407 } else { 408 None 409 }; 410 411 let repair_result = manager.repair_did_index( 412 num_threads, 413 flush_interval, 414 progress.as_ref().map(|p| p.callback_for_build_did_index()), 415 )?; 416 417 if let Some(progress) = progress { 418 progress.finish(); 419 } 420 421 let elapsed = start.elapsed(); 422 let stats = manager.get_did_index_stats(); 423 let final_delta_segments = stats 424 .get("delta_segments") 425 .and_then(|v| v.as_u64()) 426 .unwrap_or(0); 427 let total_dids = stats 428 .get("total_dids") 429 .and_then(|v| v.as_i64()) 430 .unwrap_or(0); 431 let shard_count = stats 432 .get("shard_count") 433 .and_then(|v| v.as_i64()) 434 .unwrap_or(0); 435 436 // Print success message and summary 437 use super::utils::colors; 438 eprintln!(); 439 if repair_result.repaired || repair_result.compacted { 440 eprintln!( 441 "{}{} Index repair completed successfully in {:?}", 442 colors::GREEN, 443 colors::RESET, 444 elapsed 445 ); 446 eprintln!(); 447 eprintln!("📊 Index Statistics:"); 448 eprintln!( 449 " Total DIDs: {}", 450 utils::format_number(total_dids as u64) 451 ); 452 eprintln!(" Last bundle: {}", last_bundle); 453 eprintln!(" Shards: {}", shard_count); 454 eprintln!(" Delta segments: {}", final_delta_segments); 455 456 // Show what was fixed 457 eprintln!(); 458 eprintln!("🔧 Repairs performed:"); 459 if repair_result.repaired { 460 eprintln!(" • Processed {} bundles", repair_result.bundles_processed); 461 if repair_result.segments_rebuilt > 0 { 462 eprintln!( 463 " • Rebuilt {} delta segment(s)", 464 repair_result.segments_rebuilt 465 ); 466 } 467 } 468 if repair_result.compacted { 469 eprintln!(" • Compacted delta segments"); 470 } 471 472 // Show compaction recommendation if needed 473 if (50..100).contains(&final_delta_segments) { 474 eprintln!(); 475 eprintln!( 476 "💡 Tip: Consider running '{} index compact' to optimize performance", 477 constants::BINARY_NAME 478 ); 479 } 480 } else { 481 eprintln!( 482 "{}{} Index is up-to-date and optimized", 483 colors::GREEN, 484 colors::RESET 485 ); 486 eprintln!(" Last bundle: {}", index_last_bundle); 487 eprintln!(" Delta segments: {}", delta_segments); 488 } 489 490 Ok(()) 491} 492 493pub fn cmd_index_stats(dir: PathBuf, json: bool) -> Result<()> { 494 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 495 496 // Get raw stats from did_index 497 let stats_map = manager.get_did_index_stats(); 498 499 // Check if index has been built 500 let is_built = stats_map 501 .get("exists") 502 .and_then(|v| v.as_bool()) 503 .unwrap_or(false); 504 505 if json { 506 // Add built status to JSON output 507 use serde_json::json; 508 let mut json_output = serde_json::Map::new(); 509 for (k, v) in stats_map { 510 json_output.insert(k, v); 511 } 512 json_output.insert("is_built".to_string(), json!(is_built)); 513 let json_str = serde_json::to_string_pretty(&json_output)?; 514 println!("{}", json_str); 515 return Ok(()); 516 } 517 518 // Check if index exists 519 if !stats_map 520 .get("exists") 521 .and_then(|v| v.as_bool()) 522 .unwrap_or(false) 523 { 524 log::error!("DID index does not exist"); 525 log::info!("Run: {} index build", constants::BINARY_NAME); 526 return Ok(()); 527 } 528 529 let total_dids = stats_map 530 .get("total_dids") 531 .and_then(|v| v.as_i64()) 532 .unwrap_or(0); 533 let shard_count = stats_map 534 .get("shard_count") 535 .and_then(|v| v.as_i64()) 536 .unwrap_or(0); 537 let shards_with_data = stats_map 538 .get("shards_with_data") 539 .and_then(|v| v.as_i64()) 540 .unwrap_or(0); 541 let shards_with_segments = stats_map 542 .get("shards_with_segments") 543 .and_then(|v| v.as_i64()) 544 .unwrap_or(0); 545 let max_segments_per_shard = stats_map 546 .get("max_segments_per_shard") 547 .and_then(|v| v.as_i64()) 548 .unwrap_or(0); 549 let last_bundle = stats_map 550 .get("last_bundle") 551 .and_then(|v| v.as_i64()) 552 .unwrap_or(0); 553 let cached_shards = stats_map 554 .get("cached_shards") 555 .and_then(|v| v.as_i64()) 556 .unwrap_or(0); 557 let cache_limit = stats_map 558 .get("cache_limit") 559 .and_then(|v| v.as_i64()) 560 .unwrap_or(0); 561 let cache_hit_rate = stats_map 562 .get("cache_hit_rate") 563 .and_then(|v| v.as_f64()) 564 .unwrap_or(0.0); 565 let total_lookups = stats_map 566 .get("total_lookups") 567 .and_then(|v| v.as_i64()) 568 .unwrap_or(0); 569 let delta_segments = stats_map 570 .get("delta_segments") 571 .and_then(|v| v.as_u64()) 572 .unwrap_or(0); 573 let total_shard_size = stats_map 574 .get("total_shard_size_bytes") 575 .and_then(|v| v.as_u64()) 576 .unwrap_or(0); 577 let total_delta_size = stats_map 578 .get("total_delta_size_bytes") 579 .and_then(|v| v.as_u64()) 580 .unwrap_or(0); 581 let compaction_strategy = stats_map 582 .get("compaction_strategy") 583 .and_then(|v| v.as_str()) 584 .unwrap_or("manual"); 585 586 println!("\nDID Index Statistics"); 587 println!("════════════════════\n"); 588 println!( 589 " Location: {}/{}/", 590 utils::display_path(&dir).display(), 591 constants::DID_INDEX_DIR 592 ); 593 use super::utils::colors; 594 let status = if is_built { 595 format!("{}✓ Built{}", colors::GREEN, colors::RESET) 596 } else { 597 format!("{}✗ Not built{}", colors::RED, colors::RESET) 598 }; 599 println!(" Status: {}", status); 600 println!( 601 " Total DIDs: {}", 602 utils::format_number(total_dids as u64) 603 ); 604 println!( 605 " Shard count: {} ({} with data, {} with segments)", 606 shard_count, shards_with_data, shards_with_segments 607 ); 608 println!(" Last bundle: {}", last_bundle); 609 println!(); 610 println!(" Storage:"); 611 println!( 612 " Base shards: {} ({})", 613 shards_with_data, 614 utils::format_bytes(total_shard_size) 615 ); 616 println!( 617 " Delta segs: {} ({})", 618 delta_segments, 619 utils::format_bytes(total_delta_size) 620 ); 621 println!( 622 " Total size: {}", 623 utils::format_bytes(total_shard_size + total_delta_size) 624 ); 625 if max_segments_per_shard > 0 { 626 println!(" Max seg/shard: {}", max_segments_per_shard); 627 } 628 println!(" Strategy: {}", compaction_strategy); 629 println!(); 630 println!(" Cache:"); 631 println!(" Cached: {} / {}", cached_shards, cache_limit); 632 if total_lookups > 0 { 633 println!(" Hit rate: {:.1}%", cache_hit_rate * 100.0); 634 println!( 635 " Total lookups: {}", 636 utils::format_number(total_lookups as u64) 637 ); 638 } 639 640 println!(); 641 642 Ok(()) 643} 644 645pub fn cmd_index_verify( 646 dir: PathBuf, 647 verbose: bool, 648 flush_interval: u32, 649 full: bool, 650) -> Result<()> { 651 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 652 653 let stats_map = manager.get_did_index_stats(); 654 655 if !stats_map 656 .get("exists") 657 .and_then(|v| v.as_bool()) 658 .unwrap_or(false) 659 { 660 log::error!("DID index does not exist"); 661 log::info!("Run: {} index build", constants::BINARY_NAME); 662 return Ok(()); 663 } 664 665 log::info!("Verifying DID index...\n"); 666 667 let total_dids = stats_map 668 .get("total_dids") 669 .and_then(|v| v.as_i64()) 670 .unwrap_or(0); 671 let last_bundle = stats_map 672 .get("last_bundle") 673 .and_then(|v| v.as_i64()) 674 .unwrap_or(0) as u32; 675 676 // Use BundleManager API for verification 677 let verify_result = if full { 678 // Full verification: show progress bar and header info 679 use super::progress::TwoStageProgress; 680 let index = manager.get_index(); 681 let bundle_numbers: Vec<u32> = (1..=last_bundle).collect(); 682 let total_bytes = index.total_uncompressed_size_for_bundles(&bundle_numbers); 683 let progress = TwoStageProgress::new(last_bundle, total_bytes); 684 685 // Print header info like build command 686 eprintln!("\n📦 Building Temporary DID Index (for verification)"); 687 eprintln!(" Strategy: Streaming (memory-efficient)"); 688 eprintln!(" Bundles: {}", last_bundle); 689 if flush_interval > 0 { 690 if flush_interval == constants::DID_INDEX_FLUSH_INTERVAL { 691 // Default value - show with tuning hint 692 eprintln!( 693 " Flush: Every {} bundles (tune with --flush-interval)", 694 flush_interval 695 ); 696 } else { 697 // Non-default value - show with tuning hint 698 eprintln!( 699 " Flush: {} bundles (you can tune with --flush-interval)", 700 flush_interval 701 ); 702 } 703 } else { 704 eprintln!(" Flush: Only at end (maximum memory usage)"); 705 } 706 eprintln!(); 707 eprintln!("📊 Stage 1: Processing bundles..."); 708 709 let result = manager.verify_did_index( 710 verbose, 711 flush_interval, 712 full, 713 Some(progress.callback_for_build_did_index()), 714 )?; 715 716 progress.finish(); 717 eprintln!("\n"); 718 result 719 } else { 720 // Standard verification: no progress bar 721 manager.verify_did_index::<fn(u32, u32, u64, u64)>(verbose, flush_interval, full, None)? 722 }; 723 724 let errors = verify_result.errors; 725 let warnings = verify_result.warnings; 726 let missing_base_shards = verify_result.missing_base_shards; 727 let missing_delta_segments = verify_result.missing_delta_segments; 728 let shards_checked = verify_result.shards_checked; 729 let segments_checked = verify_result.segments_checked; 730 let error_categories = verify_result.error_categories; 731 732 // Display verification results 733 if verbose { 734 // Check 1: Last bundle consistency 735 log::info!("Checking bundle consistency..."); 736 let manager_last = manager.get_last_bundle(); 737 if last_bundle < manager_last { 738 log::warn!( 739 " ⚠️ Index is behind (has bundle {}, repo has {})", 740 last_bundle, 741 manager_last 742 ); 743 log::info!(" Run: {} index repair", constants::BINARY_NAME); 744 } else { 745 log::info!(" ✓ Last bundle matches: {}", last_bundle); 746 } 747 748 // Check 2: Shard files 749 log::info!("Checking shard files..."); 750 if missing_base_shards > 0 { 751 log::info!(" ✗ Missing {} base shard(s)", missing_base_shards); 752 } 753 if missing_delta_segments > 0 { 754 log::info!(" ✗ Missing {} delta segment(s)", missing_delta_segments); 755 } 756 if missing_base_shards == 0 && missing_delta_segments == 0 { 757 log::info!( 758 " ✓ All shard files exist ({} shards, {} segments)", 759 shards_checked, 760 segments_checked 761 ); 762 } 763 764 // Check 3: Index configuration 765 log::info!("Checking index configuration..."); 766 let shard_count = stats_map 767 .get("shard_count") 768 .and_then(|v| v.as_i64()) 769 .unwrap_or(0); 770 if shard_count != 256 { 771 log::warn!( 772 " ⚠️ Unexpected shard count: {} (expected 256)", 773 shard_count 774 ); 775 } else { 776 log::info!(" ✓ Shard count: {}", shard_count); 777 } 778 779 // Check 4: Delta segments 780 log::info!("Checking delta segments..."); 781 let delta_segments = stats_map 782 .get("delta_segments") 783 .and_then(|v| v.as_u64()) 784 .unwrap_or(0); 785 let max_segments_per_shard = stats_map 786 .get("max_segments_per_shard") 787 .and_then(|v| v.as_i64()) 788 .unwrap_or(0); 789 const SEGMENT_WARNING_THRESHOLD: u64 = 50; 790 const SEGMENT_ERROR_THRESHOLD: u64 = 100; 791 if delta_segments >= SEGMENT_ERROR_THRESHOLD { 792 log::info!( 793 " ✗ Too many delta segments: {} (performance will degrade)", 794 delta_segments 795 ); 796 log::info!(" Run: {} index compact", constants::BINARY_NAME); 797 } else if delta_segments >= SEGMENT_WARNING_THRESHOLD { 798 log::warn!( 799 " ⚠️ Many delta segments: {} (consider compacting)", 800 delta_segments 801 ); 802 log::info!(" Run: {} index compact", constants::BINARY_NAME); 803 } else { 804 log::info!( 805 " ✓ Delta segments: {} (max per shard: {})", 806 delta_segments, 807 max_segments_per_shard 808 ); 809 } 810 811 if !full { 812 log::info!(" (Skipping full rebuild verification - use --full to enable)"); 813 } 814 } else { 815 // Non-verbose: show summary 816 if missing_base_shards > 0 { 817 log::info!(" ✗ Missing {} base shard(s)", missing_base_shards); 818 } 819 if missing_delta_segments > 0 { 820 log::info!(" ✗ Missing {} delta segment(s)", missing_delta_segments); 821 } 822 if missing_base_shards == 0 && missing_delta_segments == 0 { 823 log::info!( 824 " ✓ All shard files exist ({} shards, {} segments)", 825 shards_checked, 826 segments_checked 827 ); 828 } 829 eprintln!(); 830 } 831 832 // Summary 833 if errors > 0 { 834 use super::utils::colors; 835 eprintln!( 836 "{}{} Index verification failed", 837 colors::RED, 838 colors::RESET 839 ); 840 eprintln!(" Errors: {}", errors); 841 eprintln!(" Warnings: {}", warnings); 842 843 // Print error breakdown 844 if missing_base_shards > 0 { 845 eprintln!(" • Missing base shards: {}", missing_base_shards); 846 } 847 if missing_delta_segments > 0 { 848 eprintln!(" • Missing delta segments: {}", missing_delta_segments); 849 } 850 851 // Print other error categories 852 for (category, count) in &error_categories { 853 eprintln!("{}: {}", category, count); 854 } 855 856 eprintln!("\n Run: {} index repair", constants::BINARY_NAME); 857 std::process::exit(1); 858 } else if warnings > 0 { 859 use super::utils::colors; 860 use super::utils::format_number; 861 eprintln!( 862 "\x1b[33m⚠️{} Index verification passed with warnings", 863 colors::RESET 864 ); 865 eprintln!(" Warnings: {}", warnings); 866 eprintln!(" Total DIDs: {}", format_number(total_dids as u64)); 867 eprintln!(" Last bundle: {}", format_number(last_bundle as u64)); 868 } else { 869 use super::utils::colors; 870 use super::utils::format_number; 871 let delta_segments = stats_map 872 .get("delta_segments") 873 .and_then(|v| v.as_u64()) 874 .unwrap_or(0); 875 const SEGMENT_WARNING_THRESHOLD: u64 = 50; 876 eprintln!("{}{} DID index is valid", colors::GREEN, colors::RESET); 877 eprintln!(" Total DIDs: {}", format_number(total_dids as u64)); 878 eprintln!(" Last bundle: {}", format_number(last_bundle as u64)); 879 eprintln!(" Shards: {}", format_number(shards_checked as u64)); 880 eprintln!(" Delta segments: {}", format_number(delta_segments)); 881 if delta_segments > 0 && delta_segments < SEGMENT_WARNING_THRESHOLD { 882 eprintln!(" (compaction not needed)"); 883 } 884 } 885 886 Ok(()) 887} 888 889/// Get raw shard data as JSON 890fn get_raw_shard_data_json(dir: &Path, shard_num: u8) -> Result<serde_json::Value> { 891 use plcbundle::constants; 892 use plcbundle::did_index::OpLocation; 893 use serde_json::json; 894 use std::fs; 895 896 const DID_IDENTIFIER_LEN: usize = 24; 897 898 let shard_path = dir 899 .join(constants::DID_INDEX_DIR) 900 .join("shards") 901 .join(format!("{:02x}.idx", shard_num)); 902 903 if !shard_path.exists() { 904 return Ok(json!(null)); 905 } 906 907 let data = fs::read(&shard_path)?; 908 if data.len() < 1056 { 909 return Ok(json!({ 910 "error": "Shard file too small", 911 "size_bytes": data.len() 912 })); 913 } 914 915 // Parse header 916 let entry_count = u32::from_le_bytes([data[9], data[10], data[11], data[12]]) as usize; 917 let offset_table_start = 1056; 918 919 let shard_filename = format!("{:02x}.idx", shard_num); 920 921 // Parse entries 922 let max_entries_to_show = 10; 923 let entries_to_show = entry_count.min(max_entries_to_show); 924 let mut entries = Vec::new(); 925 926 for i in 0..entries_to_show { 927 let offset_pos = offset_table_start + (i * 4); 928 if offset_pos + 4 > data.len() { 929 break; 930 } 931 932 let entry_offset = u32::from_le_bytes([ 933 data[offset_pos], 934 data[offset_pos + 1], 935 data[offset_pos + 2], 936 data[offset_pos + 3], 937 ]) as usize; 938 939 if entry_offset + DID_IDENTIFIER_LEN + 2 > data.len() { 940 continue; 941 } 942 943 // Read identifier 944 let mut current_offset = entry_offset; 945 let identifier_bytes = &data[current_offset..current_offset + DID_IDENTIFIER_LEN]; 946 let identifier = String::from_utf8_lossy(identifier_bytes); 947 let identifier_clean = identifier.trim_end_matches('\0').to_string(); 948 let full_did = format!("did:plc:{}", identifier_clean); 949 current_offset += DID_IDENTIFIER_LEN; 950 951 // Read location count 952 let loc_count = 953 u16::from_le_bytes([data[current_offset], data[current_offset + 1]]) as usize; 954 current_offset += 2; 955 956 // Read locations 957 let mut locations = Vec::new(); 958 for _j in 0..loc_count { 959 if current_offset + 4 > data.len() { 960 break; 961 } 962 let packed = u32::from_le_bytes([ 963 data[current_offset], 964 data[current_offset + 1], 965 data[current_offset + 2], 966 data[current_offset + 3], 967 ]); 968 let loc = OpLocation::from_u32(packed); 969 locations.push(json!({ 970 "bundle": loc.bundle(), 971 "position": loc.position(), 972 "global_position": loc.global_position(), 973 "nullified": loc.nullified() 974 })); 975 current_offset += 4; 976 } 977 978 entries.push(json!({ 979 "identifier": identifier_clean, 980 "did": full_did, 981 "locations": locations 982 })); 983 } 984 985 Ok(json!({ 986 "file": shard_filename, 987 "file_size_bytes": data.len(), 988 "entry_count": entry_count, 989 "offset_table_start": format!("0x{:04x}", offset_table_start), 990 "entries_shown": entries_to_show, 991 "entries": entries 992 })) 993} 994 995/// Get raw delta segment data as JSON 996fn get_raw_segment_data_json( 997 dir: &Path, 998 shard_num: u8, 999 file_name: &str, 1000) -> Result<serde_json::Value> { 1001 use plcbundle::constants; 1002 use plcbundle::did_index::OpLocation; 1003 use serde_json::json; 1004 use std::fs; 1005 1006 const DID_IDENTIFIER_LEN: usize = 24; 1007 1008 let segment_path = dir 1009 .join(constants::DID_INDEX_DIR) 1010 .join("delta") 1011 .join(format!("{:02x}", shard_num)) 1012 .join(file_name); 1013 1014 if !segment_path.exists() { 1015 return Ok(json!(null)); 1016 } 1017 1018 let data = fs::read(&segment_path)?; 1019 if data.len() < 32 { 1020 return Ok(json!({ 1021 "error": "Segment file too small", 1022 "size_bytes": data.len() 1023 })); 1024 } 1025 1026 // Delta segments use the same format as base shards 1027 let entry_count = u32::from_le_bytes([data[9], data[10], data[11], data[12]]) as usize; 1028 let offset_table_start = 1056; 1029 1030 // Parse entries 1031 let max_entries_to_show = 10; 1032 let entries_to_show = entry_count.min(max_entries_to_show); 1033 let mut entries = Vec::new(); 1034 1035 for i in 0..entries_to_show { 1036 let offset_pos = offset_table_start + (i * 4); 1037 if offset_pos + 4 > data.len() { 1038 break; 1039 } 1040 1041 let entry_offset = u32::from_le_bytes([ 1042 data[offset_pos], 1043 data[offset_pos + 1], 1044 data[offset_pos + 2], 1045 data[offset_pos + 3], 1046 ]) as usize; 1047 1048 if entry_offset + DID_IDENTIFIER_LEN + 2 > data.len() { 1049 continue; 1050 } 1051 1052 // Read identifier 1053 let mut current_offset = entry_offset; 1054 let identifier_bytes = &data[current_offset..current_offset + DID_IDENTIFIER_LEN]; 1055 let identifier = String::from_utf8_lossy(identifier_bytes); 1056 let identifier_clean = identifier.trim_end_matches('\0').to_string(); 1057 let full_did = format!("did:plc:{}", identifier_clean); 1058 current_offset += DID_IDENTIFIER_LEN; 1059 1060 // Read location count 1061 let loc_count = 1062 u16::from_le_bytes([data[current_offset], data[current_offset + 1]]) as usize; 1063 current_offset += 2; 1064 1065 // Read locations 1066 let mut locations = Vec::new(); 1067 for _j in 0..loc_count { 1068 if current_offset + 4 > data.len() { 1069 break; 1070 } 1071 let packed = u32::from_le_bytes([ 1072 data[current_offset], 1073 data[current_offset + 1], 1074 data[current_offset + 2], 1075 data[current_offset + 3], 1076 ]); 1077 let loc = OpLocation::from_u32(packed); 1078 locations.push(json!({ 1079 "bundle": loc.bundle(), 1080 "position": loc.position(), 1081 "global_position": loc.global_position(), 1082 "nullified": loc.nullified() 1083 })); 1084 current_offset += 4; 1085 } 1086 1087 entries.push(json!({ 1088 "identifier": identifier_clean, 1089 "did": full_did, 1090 "locations": locations 1091 })); 1092 } 1093 1094 Ok(json!({ 1095 "file": file_name, 1096 "file_size_bytes": data.len(), 1097 "entry_count": entry_count, 1098 "offset_table_start": format!("0x{:04x}", offset_table_start), 1099 "entries_shown": entries_to_show, 1100 "entries": entries 1101 })) 1102} 1103 1104/// Display raw shard data in a readable format 1105fn display_raw_shard_data(dir: &Path, shard_num: u8) -> Result<()> { 1106 use plcbundle::constants; 1107 use plcbundle::did_index::OpLocation; 1108 use std::fs; 1109 1110 const DID_IDENTIFIER_LEN: usize = 24; 1111 1112 let shard_path = dir 1113 .join(constants::DID_INDEX_DIR) 1114 .join("shards") 1115 .join(format!("{:02x}.idx", shard_num)); 1116 1117 if !shard_path.exists() { 1118 println!(" Shard file does not exist"); 1119 return Ok(()); 1120 } 1121 1122 let data = fs::read(&shard_path)?; 1123 if data.len() < 1056 { 1124 println!(" Shard file too small ({} bytes)", data.len()); 1125 return Ok(()); 1126 } 1127 1128 // Parse header 1129 let entry_count = u32::from_le_bytes([data[9], data[10], data[11], data[12]]) as usize; 1130 let offset_table_start = 1056; 1131 1132 let shard_filename = format!("{:02x}.idx", shard_num); 1133 println!(" File: {}", shard_filename); 1134 println!(" File size: {} bytes", data.len()); 1135 println!(" Entry count: {}", entry_count); 1136 println!(" Offset table starts at: 0x{:04x}", offset_table_start); 1137 1138 // Show first few entries 1139 let max_entries_to_show = 10; 1140 let entries_to_show = entry_count.min(max_entries_to_show); 1141 1142 if entries_to_show > 0 { 1143 println!("\n First {} entries:", entries_to_show); 1144 println!(" ───────────────────────────────────────"); 1145 1146 for i in 0..entries_to_show { 1147 let offset_pos = offset_table_start + (i * 4); 1148 if offset_pos + 4 > data.len() { 1149 break; 1150 } 1151 1152 let entry_offset = u32::from_le_bytes([ 1153 data[offset_pos], 1154 data[offset_pos + 1], 1155 data[offset_pos + 2], 1156 data[offset_pos + 3], 1157 ]) as usize; 1158 1159 if entry_offset + DID_IDENTIFIER_LEN + 2 > data.len() { 1160 println!(" Entry {}: Invalid offset (0x{:04x})", i, entry_offset); 1161 continue; 1162 } 1163 1164 // Read identifier 1165 let mut current_offset = entry_offset; 1166 let identifier_bytes = &data[current_offset..current_offset + DID_IDENTIFIER_LEN]; 1167 let identifier = String::from_utf8_lossy(identifier_bytes); 1168 current_offset += DID_IDENTIFIER_LEN; 1169 1170 // Read location count 1171 let loc_count = 1172 u16::from_le_bytes([data[current_offset], data[current_offset + 1]]) as usize; 1173 current_offset += 2; 1174 1175 // Read locations 1176 let mut locations = Vec::new(); 1177 for _j in 0..loc_count { 1178 if current_offset + 4 > data.len() { 1179 break; 1180 } 1181 let packed = u32::from_le_bytes([ 1182 data[current_offset], 1183 data[current_offset + 1], 1184 data[current_offset + 2], 1185 data[current_offset + 3], 1186 ]); 1187 locations.push(OpLocation::from_u32(packed)); 1188 current_offset += 4; 1189 } 1190 1191 println!(" Entry {}:", i); 1192 1193 let identifier_clean = identifier.trim_end_matches('\0'); 1194 let full_did = format!("did:plc:{}", identifier_clean); 1195 println!( 1196 " Identifier: {} [did={}]", 1197 identifier_clean, full_did 1198 ); 1199 1200 print!(" Locations ({}): [ ", loc_count); 1201 for (idx, loc) in locations.iter().enumerate() { 1202 if idx > 0 { 1203 print!(", "); 1204 } 1205 print!("{}", loc.global_position()); 1206 if loc.nullified() { 1207 print!(" (nullified)"); 1208 } 1209 } 1210 println!(" ]\n"); 1211 } 1212 1213 if entry_count > max_entries_to_show { 1214 println!( 1215 "\n ... ({} more entries)", 1216 entry_count - max_entries_to_show 1217 ); 1218 } 1219 } else { 1220 println!(" No entries in shard"); 1221 } 1222 1223 Ok(()) 1224} 1225 1226/// Display raw delta segment data in a readable format 1227fn display_raw_segment_data(dir: &Path, shard_num: u8, file_name: &str) -> Result<()> { 1228 use crate::did_index::OpLocation; 1229 use plcbundle::constants; 1230 use std::fs; 1231 1232 const DID_IDENTIFIER_LEN: usize = 24; 1233 1234 let segment_path = dir 1235 .join(constants::DID_INDEX_DIR) 1236 .join("delta") 1237 .join(format!("{:02x}", shard_num)) 1238 .join(file_name); 1239 1240 if !segment_path.exists() { 1241 println!(" Segment file does not exist"); 1242 return Ok(()); 1243 } 1244 1245 let data = fs::read(&segment_path)?; 1246 if data.len() < 32 { 1247 println!(" Segment file too small ({} bytes)", data.len()); 1248 return Ok(()); 1249 } 1250 1251 // Delta segments use the same format as base shards 1252 let entry_count = u32::from_le_bytes([data[9], data[10], data[11], data[12]]) as usize; 1253 let offset_table_start = 1056; 1254 1255 println!(" File size: {} bytes", data.len()); 1256 println!(" Entry count: {}", entry_count); 1257 println!(" Offset table starts at: 0x{:04x}", offset_table_start); 1258 1259 // Show first few entries 1260 let max_entries_to_show = 10; 1261 let entries_to_show = entry_count.min(max_entries_to_show); 1262 1263 if entries_to_show > 0 { 1264 println!("\n First {} entries:", entries_to_show); 1265 println!(" ───────────────────────────────────────"); 1266 1267 for i in 0..entries_to_show { 1268 let offset_pos = offset_table_start + (i * 4); 1269 if offset_pos + 4 > data.len() { 1270 break; 1271 } 1272 1273 let entry_offset = u32::from_le_bytes([ 1274 data[offset_pos], 1275 data[offset_pos + 1], 1276 data[offset_pos + 2], 1277 data[offset_pos + 3], 1278 ]) as usize; 1279 1280 if entry_offset + DID_IDENTIFIER_LEN + 2 > data.len() { 1281 println!(" Entry {}: Invalid offset (0x{:04x})", i, entry_offset); 1282 continue; 1283 } 1284 1285 // Read identifier 1286 let mut current_offset = entry_offset; 1287 let identifier_bytes = &data[current_offset..current_offset + DID_IDENTIFIER_LEN]; 1288 let identifier = String::from_utf8_lossy(identifier_bytes); 1289 current_offset += DID_IDENTIFIER_LEN; 1290 1291 // Read location count 1292 let loc_count = 1293 u16::from_le_bytes([data[current_offset], data[current_offset + 1]]) as usize; 1294 current_offset += 2; 1295 1296 // Read locations 1297 let mut locations = Vec::new(); 1298 for _j in 0..loc_count { 1299 if current_offset + 4 > data.len() { 1300 break; 1301 } 1302 let packed = u32::from_le_bytes([ 1303 data[current_offset], 1304 data[current_offset + 1], 1305 data[current_offset + 2], 1306 data[current_offset + 3], 1307 ]); 1308 locations.push(OpLocation::from_u32(packed)); 1309 current_offset += 4; 1310 } 1311 1312 println!(" Entry {}:", i); 1313 1314 let identifier_clean = identifier.trim_end_matches('\0'); 1315 let full_did = format!("did:plc:{}", identifier_clean); 1316 println!( 1317 " Identifier: {} [did={}]", 1318 identifier_clean, full_did 1319 ); 1320 1321 print!(" Locations ({}): [ ", loc_count); 1322 for (idx, loc) in locations.iter().enumerate() { 1323 if idx > 0 { 1324 print!(", "); 1325 } 1326 print!("{}", loc.global_position()); 1327 if loc.nullified() { 1328 print!(" (nullified)"); 1329 } 1330 } 1331 println!(" ]\n"); 1332 } 1333 1334 if entry_count > max_entries_to_show { 1335 println!( 1336 "\n ... ({} more entries)", 1337 entry_count - max_entries_to_show 1338 ); 1339 } 1340 } else { 1341 println!(" No entries in segment"); 1342 } 1343 1344 Ok(()) 1345} 1346 1347/// Debug lookup for a specific DID or handle 1348fn cmd_index_debug_did_lookup(dir: PathBuf, input: String, json: bool) -> Result<()> { 1349 use crate::constants; 1350 use plcbundle::BundleManager; 1351 use serde_json::json; 1352 use std::time::Instant; 1353 1354 // Initialize manager with handle resolver 1355 let resolver_url = Some(constants::DEFAULT_HANDLE_RESOLVER_URL.to_string()); 1356 let options = plcbundle::ManagerOptions { 1357 handle_resolver_url: resolver_url, 1358 preload_mempool: false, 1359 verbose: false, 1360 }; 1361 let manager = BundleManager::new(dir.clone(), options)?; 1362 1363 // Resolve handle to DID if needed 1364 let (did, handle_resolve_time) = manager.resolve_handle_or_did(&input)?; 1365 1366 let is_handle = did != input; 1367 if is_handle { 1368 log::info!("Resolved handle: {} → {}", input, did); 1369 } 1370 1371 // Ensure DID index exists and is loaded 1372 let stats_map = manager.get_did_index_stats(); 1373 if !stats_map 1374 .get("exists") 1375 .and_then(|v| v.as_bool()) 1376 .unwrap_or(false) 1377 { 1378 anyhow::bail!( 1379 "DID index does not exist. Run: {} index build", 1380 constants::BINARY_NAME 1381 ); 1382 } 1383 1384 // Ensure the index is loaded 1385 manager.ensure_did_index()?; 1386 1387 // Get DID index and lookup DID with stats 1388 let did_index = manager.get_did_index(); 1389 let lookup_start = Instant::now(); 1390 let (locations, shard_stats, shard_num, lookup_timings) = { 1391 let index_guard = did_index.read().unwrap(); 1392 let index = index_guard.as_ref().ok_or_else(|| { 1393 anyhow::anyhow!("DID index not loaded. This is a bug - please report it.") 1394 })?; 1395 index.get_did_locations_with_stats(&did)? 1396 }; 1397 let lookup_elapsed = lookup_start.elapsed(); 1398 1399 // Extract identifier for shard calculation 1400 let identifier = if let Some(id) = did.strip_prefix("did:plc:") { 1401 id 1402 } else { 1403 anyhow::bail!("Only did:plc: DIDs are supported"); 1404 }; 1405 1406 if json { 1407 let mut locations_json = Vec::new(); 1408 for loc in &locations { 1409 locations_json.push(json!({ 1410 "bundle": loc.bundle(), 1411 "position": loc.position(), 1412 "nullified": loc.nullified(), 1413 "global_position": loc.global_position() 1414 })); 1415 } 1416 1417 let result = json!({ 1418 "input": input, 1419 "did": did, 1420 "identifier": identifier, 1421 "shard": format!("{:02x}", shard_num), 1422 "shard_num": shard_num, 1423 "locations_count": locations.len(), 1424 "locations": locations_json, 1425 "lookup_stats": { 1426 "shard_size": shard_stats.shard_size, 1427 "total_entries": shard_stats.total_entries, 1428 "prefix_narrowed_to": shard_stats.prefix_narrowed_to, 1429 "binary_search_attempts": shard_stats.binary_search_attempts, 1430 "locations_found": shard_stats.locations_found 1431 }, 1432 "lookup_timings": { 1433 "extract_identifier_ms": lookup_timings.extract_identifier.as_secs_f64() * 1000.0, 1434 "calculate_shard_ms": lookup_timings.calculate_shard.as_secs_f64() * 1000.0, 1435 "load_shard_ms": lookup_timings.load_shard.as_secs_f64() * 1000.0, 1436 "search_ms": lookup_timings.search.as_secs_f64() * 1000.0, 1437 "cache_hit": lookup_timings.cache_hit, 1438 "base_search_time_ms": lookup_timings.base_search_time.map(|t| t.as_secs_f64() * 1000.0), 1439 "delta_segment_times_ms": lookup_timings.delta_segment_times.iter().map(|(name, t)| json!({ 1440 "name": name, 1441 "time_ms": t.as_secs_f64() * 1000.0 1442 })).collect::<Vec<_>>(), 1443 "merge_time_ms": lookup_timings.merge_time.as_secs_f64() * 1000.0 1444 }, 1445 "total_lookup_time_ms": lookup_elapsed.as_secs_f64() * 1000.0, 1446 "handle_resolve_time_ms": handle_resolve_time as f64 1447 }); 1448 1449 let json_str = serde_json::to_string_pretty(&result)?; 1450 println!("{}", json_str); 1451 return Ok(()); 1452 } 1453 1454 // Text output 1455 println!("\nDID Index Lookup"); 1456 println!("═══════════════════════════════════════\n"); 1457 1458 if is_handle { 1459 println!(" Input: {} (handle)", input); 1460 println!(" DID: {}", did); 1461 } else { 1462 println!(" DID: {}", did); 1463 } 1464 println!(" Identifier: {}", identifier); 1465 println!(" Shard: {:02x} ({})", shard_num, shard_num); 1466 println!(); 1467 1468 if locations.is_empty() { 1469 println!(" ⚠️ No locations found in index"); 1470 println!(); 1471 return Ok(()); 1472 } 1473 1474 println!(" Locations ({}):", locations.len()); 1475 println!(" ───────────────────────────────────────"); 1476 1477 // Group locations for more compact display 1478 let mut current_line = String::new(); 1479 for (idx, loc) in locations.iter().enumerate() { 1480 if idx > 0 && idx % 4 == 0 { 1481 println!(" {}", current_line); 1482 current_line.clear(); 1483 } 1484 1485 if !current_line.is_empty() { 1486 current_line.push_str(", "); 1487 } 1488 1489 let mut loc_str = format!("{}", loc.global_position()); 1490 if loc.nullified() { 1491 loc_str.push_str(" (nullified)"); 1492 } 1493 current_line.push_str(&loc_str); 1494 } 1495 1496 if !current_line.is_empty() { 1497 println!(" {}", current_line); 1498 } 1499 println!(); 1500 1501 println!(" Lookup Statistics:"); 1502 println!(" ───────────────────────────────────────"); 1503 println!( 1504 " Shard size: {} bytes", 1505 utils::format_bytes(shard_stats.shard_size as u64) 1506 ); 1507 println!( 1508 " Total entries: {}", 1509 utils::format_number(shard_stats.total_entries as u64) 1510 ); 1511 println!( 1512 " Binary search attempts: {}", 1513 shard_stats.binary_search_attempts 1514 ); 1515 println!( 1516 " Prefix narrowed to: {}", 1517 shard_stats.prefix_narrowed_to 1518 ); 1519 println!( 1520 " Locations found: {}", 1521 shard_stats.locations_found 1522 ); 1523 println!(); 1524 1525 println!(" Timing:"); 1526 println!(" ───────────────────────────────────────"); 1527 println!( 1528 " Extract identifier: {:.3}ms", 1529 lookup_timings.extract_identifier.as_secs_f64() * 1000.0 1530 ); 1531 println!( 1532 " Calculate shard: {:.3}ms", 1533 lookup_timings.calculate_shard.as_secs_f64() * 1000.0 1534 ); 1535 println!( 1536 " Load shard: {:.3}ms ({})", 1537 lookup_timings.load_shard.as_secs_f64() * 1000.0, 1538 if lookup_timings.cache_hit { 1539 "cache hit" 1540 } else { 1541 "cache miss" 1542 } 1543 ); 1544 println!( 1545 " Search: {:.3}ms", 1546 lookup_timings.search.as_secs_f64() * 1000.0 1547 ); 1548 if let Some(base_time) = lookup_timings.base_search_time { 1549 println!( 1550 " Base search: {:.3}ms", 1551 base_time.as_secs_f64() * 1000.0 1552 ); 1553 } 1554 if !lookup_timings.delta_segment_times.is_empty() { 1555 println!( 1556 " Delta segments: {} segments", 1557 lookup_timings.delta_segment_times.len() 1558 ); 1559 for (idx, (name, time)) in lookup_timings.delta_segment_times.iter().enumerate() { 1560 println!( 1561 " Segment {} ({:20}): {:.3}ms", 1562 idx + 1, 1563 name, 1564 time.as_secs_f64() * 1000.0 1565 ); 1566 } 1567 } 1568 println!( 1569 " Merge: {:.3}ms", 1570 lookup_timings.merge_time.as_secs_f64() * 1000.0 1571 ); 1572 if handle_resolve_time > 0 { 1573 println!(" Handle resolve: {}ms", handle_resolve_time); 1574 } 1575 println!( 1576 " Total: {:.3}ms", 1577 lookup_elapsed.as_secs_f64() * 1000.0 1578 ); 1579 println!(); 1580 1581 Ok(()) 1582} 1583 1584pub fn cmd_index_debug( 1585 dir: PathBuf, 1586 shard: Option<u8>, 1587 did: Option<String>, 1588 json: bool, 1589) -> Result<()> { 1590 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 1591 1592 let stats_map = manager.get_did_index_stats(); 1593 1594 if !stats_map 1595 .get("exists") 1596 .and_then(|v| v.as_bool()) 1597 .unwrap_or(false) 1598 { 1599 log::error!("DID index does not exist"); 1600 log::info!("Run: {} index build", constants::BINARY_NAME); 1601 return Ok(()); 1602 } 1603 1604 // Handle DID lookup 1605 if let Some(did_input) = did { 1606 return cmd_index_debug_did_lookup(dir, did_input, json); 1607 } 1608 1609 let did_index = manager.get_did_index(); 1610 let mut shard_details = did_index 1611 .read() 1612 .unwrap() 1613 .as_ref() 1614 .unwrap() 1615 .get_shard_details(shard)?; 1616 1617 if json { 1618 // Add raw data to JSON output if a specific shard is requested 1619 if let Some(shard_num) = shard 1620 && let Some(detail) = shard_details.first_mut() 1621 { 1622 let base_exists = detail 1623 .get("base_exists") 1624 .and_then(|v| v.as_bool()) 1625 .unwrap_or(false); 1626 1627 if base_exists && let Ok(raw_data) = get_raw_shard_data_json(&dir, shard_num) { 1628 detail.insert("raw_data".to_string(), raw_data); 1629 } 1630 1631 // Add raw data for delta segments 1632 if let Some(segments) = detail.get_mut("segments").and_then(|v| v.as_array_mut()) { 1633 for seg in segments { 1634 let file_name = seg.get("file_name").and_then(|v| v.as_str()).unwrap_or(""); 1635 let exists = seg.get("exists").and_then(|v| v.as_bool()).unwrap_or(false); 1636 if exists 1637 && !file_name.is_empty() 1638 && let Ok(raw_data) = get_raw_segment_data_json(&dir, shard_num, file_name) 1639 { 1640 seg.as_object_mut() 1641 .unwrap() 1642 .insert("raw_data".to_string(), raw_data); 1643 } 1644 } 1645 } 1646 } 1647 1648 let json_str = serde_json::to_string_pretty(&shard_details)?; 1649 println!("{}", json_str); 1650 return Ok(()); 1651 } 1652 1653 if let Some(shard_num) = shard { 1654 // Show single shard details 1655 if let Some(detail) = shard_details.first() { 1656 println!("\nShard {:02x} Details", shard_num); 1657 println!("═══════════════════════════════════════\n"); 1658 1659 let did_count = detail 1660 .get("did_count") 1661 .and_then(|v| v.as_u64()) 1662 .unwrap_or(0); 1663 let segment_count = detail 1664 .get("segment_count") 1665 .and_then(|v| v.as_u64()) 1666 .unwrap_or(0); 1667 let base_exists = detail 1668 .get("base_exists") 1669 .and_then(|v| v.as_bool()) 1670 .unwrap_or(false); 1671 let base_size = detail 1672 .get("base_size_bytes") 1673 .and_then(|v| v.as_u64()) 1674 .unwrap_or(0); 1675 let total_segment_size = detail 1676 .get("total_segment_size_bytes") 1677 .and_then(|v| v.as_u64()) 1678 .unwrap_or(0); 1679 let total_size = detail 1680 .get("total_size_bytes") 1681 .and_then(|v| v.as_u64()) 1682 .unwrap_or(0); 1683 let next_segment_id = detail 1684 .get("next_segment_id") 1685 .and_then(|v| v.as_u64()) 1686 .unwrap_or(0); 1687 1688 println!(" DIDs: {}", utils::format_number(did_count)); 1689 println!( 1690 " Base shard: {} ({})", 1691 if base_exists { "exists" } else { "missing" }, 1692 utils::format_bytes(base_size) 1693 ); 1694 println!(" Delta segments: {}", segment_count); 1695 println!( 1696 " Segment size: {}", 1697 utils::format_bytes(total_segment_size) 1698 ); 1699 println!(" Total size: {}", utils::format_bytes(total_size)); 1700 println!(" Next segment ID: {}", next_segment_id); 1701 1702 if let Some(segments) = detail.get("segments").and_then(|v| v.as_array()) 1703 && !segments.is_empty() 1704 { 1705 println!("\n Delta Segments:"); 1706 println!(" ───────────────────────────────────────"); 1707 for (idx, seg) in segments.iter().enumerate() { 1708 let file_name = seg.get("file_name").and_then(|v| v.as_str()).unwrap_or("?"); 1709 let exists = seg.get("exists").and_then(|v| v.as_bool()).unwrap_or(false); 1710 let size = seg.get("size_bytes").and_then(|v| v.as_u64()).unwrap_or(0); 1711 let did_count = seg.get("did_count").and_then(|v| v.as_u64()).unwrap_or(0); 1712 let bundle_start = seg 1713 .get("bundle_start") 1714 .and_then(|v| v.as_u64()) 1715 .unwrap_or(0); 1716 let bundle_end = seg.get("bundle_end").and_then(|v| v.as_u64()).unwrap_or(0); 1717 1718 println!( 1719 " [{:2}] {} {} ({})", 1720 idx + 1, 1721 if exists { "" } else { "" }, 1722 file_name, 1723 utils::format_bytes(size) 1724 ); 1725 println!( 1726 " Bundles: {}-{}, DIDs: {}, Locations: {}", 1727 bundle_start, 1728 bundle_end, 1729 utils::format_number(did_count), 1730 seg.get("location_count") 1731 .and_then(|v| v.as_u64()) 1732 .unwrap_or(0) 1733 ); 1734 } 1735 } 1736 1737 // Show raw shard data 1738 if base_exists { 1739 let shard_filename = format!("{:02x}.idx", shard_num); 1740 println!("\n Raw Shard Data: {}", shard_filename); 1741 println!(" ───────────────────────────────────────"); 1742 if let Err(e) = display_raw_shard_data(&dir, shard_num) { 1743 println!(" Error reading shard data: {}", e); 1744 } 1745 } 1746 1747 // Show raw delta segment data 1748 if let Some(segments) = detail.get("segments").and_then(|v| v.as_array()) { 1749 for seg in segments { 1750 let file_name = seg.get("file_name").and_then(|v| v.as_str()).unwrap_or("?"); 1751 let exists = seg.get("exists").and_then(|v| v.as_bool()).unwrap_or(false); 1752 if exists { 1753 println!("\n Raw Delta Segment Data: {}", file_name); 1754 println!(" ───────────────────────────────────────"); 1755 if let Err(e) = display_raw_segment_data(&dir, shard_num, file_name) { 1756 println!(" Error reading segment data: {}", e); 1757 } 1758 } 1759 } 1760 } 1761 1762 println!(); 1763 } 1764 } else { 1765 // Show summary of all shards 1766 println!("\nShard Summary"); 1767 println!("═══════════════════════════════════════\n"); 1768 1769 let mut shards_with_data = 0; 1770 let mut shards_with_segments = 0; 1771 let mut total_dids = 0u64; 1772 let mut total_base_size = 0u64; 1773 let mut total_segment_size = 0u64; 1774 let mut max_segments = 0; 1775 1776 for detail in &shard_details { 1777 let did_count = detail 1778 .get("did_count") 1779 .and_then(|v| v.as_u64()) 1780 .unwrap_or(0); 1781 let segment_count = detail 1782 .get("segment_count") 1783 .and_then(|v| v.as_u64()) 1784 .unwrap_or(0); 1785 let base_size = detail 1786 .get("base_size_bytes") 1787 .and_then(|v| v.as_u64()) 1788 .unwrap_or(0); 1789 let seg_size = detail 1790 .get("total_segment_size_bytes") 1791 .and_then(|v| v.as_u64()) 1792 .unwrap_or(0); 1793 1794 if did_count > 0 { 1795 shards_with_data += 1; 1796 } 1797 if segment_count > 0 { 1798 shards_with_segments += 1; 1799 } 1800 total_dids += did_count; 1801 total_base_size += base_size; 1802 total_segment_size += seg_size; 1803 max_segments = max_segments.max(segment_count); 1804 } 1805 1806 println!( 1807 " Shards with data: {} / {}", 1808 shards_with_data, 1809 shard_details.len() 1810 ); 1811 println!( 1812 " Shards with segments: {} / {}", 1813 shards_with_segments, 1814 shard_details.len() 1815 ); 1816 println!( 1817 " Total DIDs: {}", 1818 utils::format_number(total_dids) 1819 ); 1820 println!( 1821 " Base shard size: {}", 1822 utils::format_bytes(total_base_size) 1823 ); 1824 println!( 1825 " Delta segment size: {}", 1826 utils::format_bytes(total_segment_size) 1827 ); 1828 println!( 1829 " Total size: {}", 1830 utils::format_bytes(total_base_size + total_segment_size) 1831 ); 1832 println!(" Max segments/shard: {}", max_segments); 1833 println!(); 1834 1835 // Show top 10 shards by DID count 1836 let mut sorted_shards: Vec<_> = shard_details 1837 .iter() 1838 .filter(|d| d.get("did_count").and_then(|v| v.as_u64()).unwrap_or(0) > 0) 1839 .collect(); 1840 sorted_shards.sort_by_key(|d| { 1841 std::cmp::Reverse(d.get("did_count").and_then(|v| v.as_u64()).unwrap_or(0)) 1842 }); 1843 1844 if !sorted_shards.is_empty() { 1845 println!(" Top 10 Shards by DID Count:"); 1846 println!(" ───────────────────────────────────────"); 1847 for (idx, detail) in sorted_shards.iter().take(10).enumerate() { 1848 let shard_hex = detail 1849 .get("shard_hex") 1850 .and_then(|v| v.as_str()) 1851 .unwrap_or("??"); 1852 let did_count = detail 1853 .get("did_count") 1854 .and_then(|v| v.as_u64()) 1855 .unwrap_or(0); 1856 let segment_count = detail 1857 .get("segment_count") 1858 .and_then(|v| v.as_u64()) 1859 .unwrap_or(0); 1860 let total_size = detail 1861 .get("total_size_bytes") 1862 .and_then(|v| v.as_u64()) 1863 .unwrap_or(0); 1864 1865 println!( 1866 " [{:2}] Shard {:2}: {} DIDs, {} segments, {}", 1867 idx + 1, 1868 shard_hex, 1869 utils::format_number(did_count), 1870 segment_count, 1871 utils::format_bytes(total_size) 1872 ); 1873 } 1874 println!(); 1875 } 1876 } 1877 1878 Ok(()) 1879} 1880 1881pub fn cmd_index_compact(dir: PathBuf, shards: Option<Vec<u8>>) -> Result<()> { 1882 let manager = super::utils::create_manager(dir.clone(), false, false, false)?; 1883 1884 let stats_map = manager.get_did_index_stats(); 1885 1886 if !stats_map 1887 .get("exists") 1888 .and_then(|v| v.as_bool()) 1889 .unwrap_or(false) 1890 { 1891 eprintln!("Error: DID index does not exist"); 1892 eprintln!("Run: {} index build", constants::BINARY_NAME); 1893 return Ok(()); 1894 } 1895 1896 let delta_segments_before = stats_map 1897 .get("delta_segments") 1898 .and_then(|v| v.as_u64()) 1899 .unwrap_or(0); 1900 1901 if delta_segments_before == 0 { 1902 eprintln!("No pending delta segments to compact"); 1903 return Ok(()); 1904 } 1905 1906 eprintln!("Compacting delta segments..."); 1907 if let Some(ref shard_list) = shards { 1908 eprintln!(" Targeting {} specific shard(s)", shard_list.len()); 1909 } else { 1910 eprintln!(" Compacting all shards"); 1911 } 1912 1913 let did_index = manager.get_did_index(); 1914 let start = Instant::now(); 1915 did_index 1916 .write() 1917 .unwrap() 1918 .as_mut() 1919 .unwrap() 1920 .compact_pending_segments(shards)?; 1921 let elapsed = start.elapsed(); 1922 1923 let stats_map_after = manager.get_did_index_stats(); 1924 let delta_segments_after = stats_map_after 1925 .get("delta_segments") 1926 .and_then(|v| v.as_u64()) 1927 .unwrap_or(0); 1928 1929 let segments_compacted = delta_segments_before.saturating_sub(delta_segments_after); 1930 1931 eprintln!("\n✓ Compaction complete in {:?}", elapsed); 1932 eprintln!(" Delta segments before: {}", delta_segments_before); 1933 eprintln!(" Delta segments after: {}", delta_segments_after); 1934 eprintln!(" Segments compacted: {}", segments_compacted); 1935 1936 Ok(()) 1937}