High-performance implementation of plcbundle written in Rust

Refactor various components for improved readability and performance

+650 -720
+1 -1
Makefile
··· 38 38 cargo fmt 39 39 40 40 lint: ## Run clippy linter 41 - cargo clippy -- -D warnings 41 + cargo clippy --all-features -- -D warnings 42 42 43 43 check: ## Check without building 44 44 cargo check
+5 -4
src/bundle_format.rs
··· 93 93 let magic = u32::from_le_bytes(magic_buf); 94 94 95 95 // Verify it's a skippable frame (0x184D2A50 - 0x184D2A5F) 96 - if magic < 0x184D2A50 || magic > 0x184D2A5F { 96 + if !(0x184D2A50..=0x184D2A5F).contains(&magic) { 97 97 anyhow::bail!("Not a skippable frame: magic=0x{:08X}", magic); 98 98 } 99 99 ··· 223 223 use rayon::prelude::*; 224 224 use std::time::Instant; 225 225 226 - let num_frames = (operations.len() + constants::FRAME_SIZE - 1) / constants::FRAME_SIZE; 226 + let num_frames = operations.len().div_ceil(constants::FRAME_SIZE); 227 227 228 228 // Process all frames in parallel 229 229 let frame_results: Vec<_> = (0..num_frames) ··· 327 327 let mut total_compress_time = std::time::Duration::ZERO; 328 328 329 329 // Process operations in frames of FRAME_SIZE 330 - let num_frames = (operations.len() + constants::FRAME_SIZE - 1) / constants::FRAME_SIZE; 330 + let num_frames = operations.len().div_ceil(constants::FRAME_SIZE); 331 331 332 332 for frame_idx in 0..num_frames { 333 333 let frame_start = frame_idx * constants::FRAME_SIZE; ··· 462 462 reader.read_exact(&mut magic_buf)?; 463 463 let magic = u32::from_le_bytes(magic_buf); 464 464 465 - if magic >= 0x184D2A50 && magic <= 0x184D2A5F { 465 + if (0x184D2A50..=0x184D2A5F).contains(&magic) { 466 466 // Skip metadata frame 467 467 let mut size_buf = [0u8; 4]; 468 468 reader.read_exact(&mut size_buf)?; ··· 486 486 } 487 487 488 488 /// Create bundle metadata structure 489 + #[allow(clippy::too_many_arguments)] 489 490 pub fn create_bundle_metadata( 490 491 bundle_number: u32, 491 492 origin: &str,
+2 -4
src/cache.rs
··· 22 22 23 23 pub fn insert(&self, bundle: u32, ops: Vec<Operation>) { 24 24 let mut cache = self.cache.write().unwrap(); 25 - if cache.len() >= self.capacity { 26 - if let Some(&key) = cache.keys().next() { 27 - cache.remove(&key); 28 - } 25 + if cache.len() >= self.capacity && let Some(&key) = cache.keys().next() { 26 + cache.remove(&key); 29 27 } 30 28 cache.insert(bundle, ops); 31 29 }
+3 -4
src/cli/cmd_bench.rs
··· 342 342 }; 343 343 344 344 let mut processed = 0; 345 - for (_i, &bundle_num) in bundles.iter().enumerate() { 345 + for &bundle_num in bundles.iter() { 346 346 let size = match bundle_compressed_size(manager, bundle_num)? { 347 347 Some(size) => size, 348 348 None => continue, ··· 402 402 // Load bundles to get operation counts 403 403 let mut bundle_op_counts = Vec::with_capacity(bundles.len()); 404 404 for &bundle_num in &bundles { 405 - if let Ok(bundle) = manager.load_bundle(bundle_num, LoadOptions::default()) { 406 - if !bundle.operations.is_empty() { 405 + if let Ok(bundle) = manager.load_bundle(bundle_num, LoadOptions::default()) 406 + && !bundle.operations.is_empty() { 407 407 bundle_op_counts.push((bundle_num, bundle.operations.len())); 408 408 } 409 - } 410 409 } 411 410 412 411 if bundle_op_counts.is_empty() {
+3 -4
src/cli/cmd_clean.rs
··· 81 81 } 82 82 83 83 // Show errors if any 84 - if let Some(errors) = &result.errors { 85 - if !errors.is_empty() { 84 + if let Some(errors) = &result.errors 85 + && !errors.is_empty() { 86 86 eprintln!("\n⚠ Warning: Some errors occurred during cleanup:"); 87 87 for error in errors { 88 - eprintln!(" • {}", error); 88 + eprintln!(" - {}", error); 89 89 } 90 90 } 91 - } 92 91 93 92 Ok(()) 94 93 }
+66 -83
src/cli/cmd_compare.rs
··· 4 4 use plcbundle::{BundleManager, constants, remote}; 5 5 use sonic_rs::JsonValueTrait; 6 6 use std::collections::HashMap; 7 - use std::path::PathBuf; 7 + use std::path::{Path, PathBuf}; 8 8 use tokio::runtime::Runtime; 9 9 10 10 use super::utils::colors; ··· 96 96 97 97 async fn diff_indexes( 98 98 manager: &BundleManager, 99 - dir: &PathBuf, 99 + dir: &Path, 100 100 target: &str, 101 101 verbose: bool, 102 102 binary_name: &str, ··· 322 322 target_total_size: u64, 323 323 local_updated: String, 324 324 target_updated: String, 325 + #[allow(dead_code)] 325 326 origins_match: bool, 326 327 } 327 328 ··· 352 353 target: &plcbundle::index::Index, 353 354 origins_match: bool, 354 355 ) -> IndexComparison { 355 - let mut comparison = IndexComparison::default(); 356 - comparison.origins_match = origins_match; 356 + let mut comparison = IndexComparison { 357 + origins_match, 358 + ..Default::default() 359 + }; 357 360 358 361 let local_map: HashMap<u32, &plcbundle::index::BundleMetadata> = 359 362 local.bundles.iter().map(|b| (b.bundle_number, b)).collect(); ··· 594 597 eprintln!(); 595 598 if !c.has_differences() { 596 599 eprintln!("✅ Indexes are identical"); 600 + } else if !origins_match { 601 + // Different origins - all differences are expected 602 + eprintln!("ℹ️ Indexes differ (EXPECTED - comparing different origins)"); 603 + eprintln!(" All differences shown above are normal when comparing"); 604 + eprintln!(" repositories from different PLC directory sources."); 597 605 } else { 598 - if !origins_match { 599 - // Different origins - all differences are expected 600 - eprintln!("ℹ️ Indexes differ (EXPECTED - comparing different origins)"); 601 - eprintln!(" All differences shown above are normal when comparing"); 602 - eprintln!(" repositories from different PLC directory sources."); 606 + // Same origins - differences may be critical 607 + if !c.hash_mismatches.is_empty() { 608 + eprintln!("❌ Indexes differ (CRITICAL: hash mismatches detected)"); 609 + eprintln!("\n⚠️ WARNING: Chain hash mismatches indicate different bundle content"); 610 + eprintln!(" or chain integrity issues. This requires investigation."); 603 611 } else { 604 - // Same origins - differences may be critical 605 - if !c.hash_mismatches.is_empty() { 606 - eprintln!("❌ Indexes differ (CRITICAL: hash mismatches detected)"); 607 - eprintln!("\n⚠️ WARNING: Chain hash mismatches indicate different bundle content"); 608 - eprintln!(" or chain integrity issues. This requires investigation."); 609 - } else { 610 - // Just missing/extra bundles - not critical 611 - eprintln!("ℹ️ Indexes differ (missing or extra bundles, but hashes match)"); 612 - eprintln!( 613 - " This is normal when comparing repositories at different sync states." 614 - ); 615 - } 612 + // Just missing/extra bundles - not critical 613 + eprintln!("ℹ️ Indexes differ (missing or extra bundles, but hashes match)"); 614 + eprintln!( 615 + " This is normal when comparing repositories at different sync states." 616 + ); 616 617 } 617 618 } 618 619 } ··· 632 633 mismatches.len() 633 634 }; 634 635 635 - for i in 0..display_count { 636 - let m = &mismatches[i]; 636 + for m in mismatches.iter().take(display_count) { 637 637 eprintln!(" Bundle {}:", m.bundle_number); 638 638 eprintln!(" Chain Hash:"); 639 639 eprintln!(" Local: {}", m.local_hash); ··· 677 677 let first_break = mismatches[0].bundle_number; 678 678 679 679 // Determine last good bundle (one before first break) 680 - let last_good = if first_break > 1 { first_break - 1 } else { 0 }; 680 + let last_good = first_break.saturating_sub(1); 681 681 682 682 eprintln!(" Chain Status:"); 683 683 if last_good > 0 { ··· 692 692 693 693 // Count consecutive breaks 694 694 let mut consecutive_breaks = 1; 695 - for i in 1..mismatches.len() { 696 - if mismatches[i].bundle_number == first_break + consecutive_breaks { 695 + for window in mismatches.windows(2) { 696 + if window[1].bundle_number == first_break + consecutive_breaks { 697 697 consecutive_breaks += 1; 698 698 } else { 699 699 break; ··· 765 765 mismatches.len() 766 766 }; 767 767 768 - for i in 0..display_count { 769 - let m = &mismatches[i]; 768 + for m in mismatches.iter().take(display_count) { 770 769 let local = if m.local_cursor.is_empty() { 771 770 "(empty)" 772 771 } else { ··· 797 796 bundles.len() 798 797 }; 799 798 800 - for i in 0..display_count { 801 - eprintln!(" {}", bundles[i]); 799 + for bundle in bundles.iter().take(display_count) { 800 + eprintln!(" {}", bundle); 802 801 } 803 802 804 803 if bundles.len() > display_count { ··· 823 822 bundles.len() 824 823 }; 825 824 826 - for i in 0..display_count { 827 - eprintln!(" {}", bundles[i]); 825 + for bundle in bundles.iter().take(display_count) { 826 + eprintln!(" {}", bundle); 828 827 } 829 828 830 829 if bundles.len() > display_count { ··· 847 846 let mut range_end = bundles[0]; 848 847 let mut ranges = Vec::new(); 849 848 850 - for i in 1..bundles.len() { 851 - if bundles[i] == range_end + 1 { 852 - range_end = bundles[i]; 849 + for window in bundles.windows(2) { 850 + if window[1] == range_end + 1 { 851 + range_end = window[1]; 853 852 } else { 854 853 ranges.push((range_start, range_end)); 855 - range_start = bundles[i]; 856 - range_end = bundles[i]; 854 + range_start = window[1]; 855 + range_end = window[1]; 857 856 } 858 857 } 859 858 860 859 ranges.push((range_start, range_end)); 861 860 862 861 // Display all ranges except the last one 863 - for i in 0..ranges.len().saturating_sub(1) { 864 - let (start, end) = ranges[i]; 862 + for (start, end) in ranges.iter().take(ranges.len().saturating_sub(1)) { 865 863 if start == end { 866 864 eprintln!(" {}", start); 867 865 } else { ··· 882 880 } else { 883 881 eprintln!(" {}", start); 884 882 } 883 + } else if let Some(count) = total_count { 884 + eprintln!( 885 + " {} - {} ({} bundle{})", 886 + start, 887 + end, 888 + count, 889 + if count == 1 { "" } else { "s" } 890 + ); 885 891 } else { 886 - if let Some(count) = total_count { 887 - eprintln!( 888 - " {} - {} ({} bundle{})", 889 - start, 890 - end, 891 - count, 892 - if count == 1 { "" } else { "s" } 893 - ); 894 - } else { 895 - eprintln!(" {} - {}", start, end); 896 - } 892 + eprintln!(" {} - {}", start, end); 897 893 } 898 894 } 899 895 } ··· 1093 1089 missing_in_local.len() 1094 1090 ); 1095 1091 let display_count = sample_size.min(missing_in_local.len()); 1096 - for i in 0..display_count { 1097 - let (cid, pos) = &missing_in_local[i]; 1092 + for (cid, pos) in missing_in_local.iter().take(display_count) { 1098 1093 // Find the operation in remote_ops to get details 1099 1094 if let Some(remote_op) = remote_ops.get(*pos) { 1100 1095 eprintln!(" - [{:04}] {}", pos, cid); ··· 1116 1111 } 1117 1112 1118 1113 // Add hints for exploring missing operations 1119 - if let Some((first_cid, first_pos)) = missing_in_local.first() { 1120 - if target.starts_with("http") { 1121 - let base_url = if target.ends_with('/') { 1122 - &target[..target.len() - 1] 1114 + if let Some((first_cid, first_pos)) = missing_in_local.first() 1115 + && target.starts_with("http") { 1116 + let base_url = if let Some(stripped) = target.strip_suffix('/') { 1117 + stripped 1123 1118 } else { 1124 1119 target 1125 1120 }; ··· 1133 1128 " • View in remote: curl '{}/op/{}' | grep '{}' | jq .", 1134 1129 base_url, global_pos, first_cid 1135 1130 ); 1136 - } 1137 1131 } 1138 1132 eprintln!(); 1139 1133 } ··· 1144 1138 missing_in_remote.len() 1145 1139 ); 1146 1140 let display_count = sample_size.min(missing_in_remote.len()); 1147 - for i in 0..display_count { 1148 - let (cid, pos) = &missing_in_remote[i]; 1141 + for (cid, pos) in missing_in_remote.iter().take(display_count) { 1149 1142 // Find the operation in local_ops to get details 1150 1143 if let Some(local_op) = local_ops.get(*pos) { 1151 1144 eprintln!(" + [{:04}] {}", pos, cid); ··· 1191 1184 position_mismatches.len() 1192 1185 ); 1193 1186 let display_count = sample_size.min(position_mismatches.len()); 1194 - for i in 0..display_count { 1195 - let (cid, local_pos, remote_pos) = &position_mismatches[i]; 1187 + for (cid, local_pos, remote_pos) in position_mismatches.iter().take(display_count) { 1196 1188 eprintln!(" ~ {}", cid); 1197 1189 eprintln!(" Local: position {:04}", local_pos); 1198 1190 eprintln!(" Remote: position {:04}", remote_pos); ··· 1218 1210 sample_size.min(local_ops.len()) 1219 1211 ); 1220 1212 eprintln!("────────────────────────────────"); 1221 - for i in 0..sample_size.min(local_ops.len()) { 1222 - let op = &local_ops[i]; 1213 + for (i, op) in local_ops.iter().enumerate().take(sample_size.min(local_ops.len())) { 1223 1214 let remote_match = if let Some(ref cid) = op.cid { 1224 1215 if let Some(&remote_pos) = remote_cids.get(cid) { 1225 1216 if remote_pos == i { ··· 1266 1257 " Content Hash: {}", 1267 1258 if content_match { 1268 1259 "✅" 1260 + } else if origins_match { 1261 + "❌" 1269 1262 } else { 1270 - if origins_match { 1271 - "❌" 1272 - } else { 1273 - "ℹ️ (expected)" 1274 - } 1263 + "ℹ️ (expected)" 1275 1264 } 1276 1265 ); 1277 1266 eprintln!(" Local: {}", local_meta.content_hash); ··· 1291 1280 " Compressed Hash: {}", 1292 1281 if comp_match { 1293 1282 "✅" 1283 + } else if origins_match { 1284 + "❌" 1294 1285 } else { 1295 - if origins_match { 1296 - "❌" 1297 - } else { 1298 - "ℹ️ (expected)" 1299 - } 1286 + "ℹ️ (expected)" 1300 1287 } 1301 1288 ); 1302 1289 eprintln!(" Local: {}", local_meta.compressed_hash); ··· 1314 1301 " Chain Hash: {}", 1315 1302 if chain_match { 1316 1303 "✅" 1304 + } else if origins_match { 1305 + "❌" 1317 1306 } else { 1318 - if origins_match { 1319 - "❌" 1320 - } else { 1321 - "ℹ️ (expected)" 1322 - } 1307 + "ℹ️ (expected)" 1323 1308 } 1324 1309 ); 1325 1310 eprintln!(" Local: {}", local_meta.hash); ··· 1332 1317 " Parent: {}", 1333 1318 if parent_match { 1334 1319 "✅" 1320 + } else if origins_match { 1321 + "❌" 1335 1322 } else { 1336 - if origins_match { 1337 - "❌" 1338 - } else { 1339 - "ℹ️ (expected)" 1340 - } 1323 + "ℹ️ (expected)" 1341 1324 } 1342 1325 ); 1343 1326 eprintln!(" Local: {}", local_meta.parent);
+33 -49
src/cli/cmd_did.rs
··· 294 294 let origin = &local_index.origin; 295 295 296 296 // If origin is "local" or empty, use default PLC directory 297 - let url = if origin == "local" || origin.is_empty() { 297 + if origin == "local" || origin.is_empty() { 298 298 if verbose { 299 299 log::info!("Origin is '{}', using default PLC directory: {}", origin, constants::DEFAULT_PLC_DIRECTORY_URL); 300 300 } ··· 304 304 log::info!("Using repository origin as PLC directory: {}", origin); 305 305 } 306 306 origin.clone() 307 - }; 308 - url 307 + } 309 308 } 310 309 }; 311 310 ··· 345 344 } 346 345 347 346 // Get DID index for shard calculation (only for PLC DIDs) 348 - if did.starts_with("did:plc:") { 349 - let identifier = &did[8..]; // Strip "did:plc:" prefix 347 + if let Some(identifier) = did.strip_prefix("did:plc:") { 350 348 let shard_num = calculate_shard_for_display(identifier); 351 349 log::debug!( 352 350 "DID {} -> identifier '{}' -> shard {:02x}", ··· 732 730 733 731 // DID LOOKUP - Find all operations for a DID 734 732 733 + #[allow(clippy::too_many_arguments)] 735 734 pub fn cmd_did_lookup( 736 735 dir: PathBuf, 737 736 input: String, ··· 771 770 } 772 771 773 772 // Get DID index for shard calculation (only for PLC DIDs) 774 - if did.starts_with("did:plc:") { 775 - let identifier = &did[8..]; // Strip "did:plc:" prefix 773 + if let Some(identifier) = did.strip_prefix("did:plc:") { 776 774 let shard_num = calculate_shard_for_display(identifier); 777 775 log::debug!( 778 776 "DID {} -> identifier '{}' -> shard {:02x}", ··· 997 995 Ok(()) 998 996 } 999 997 998 + #[allow(clippy::too_many_arguments)] 1000 999 fn display_lookup_results( 1001 1000 did: &str, 1002 1001 bundled_ops: &[plcbundle::OperationWithLocation], ··· 1095 1094 } 1096 1095 if let Some(handle) = op_val.get("handle").and_then(|v| v.as_str()) { 1097 1096 eprintln!(" handle: {}", handle); 1098 - } else { 1099 - if let Some(aka) = op_val.get("alsoKnownAs").and_then(|v| v.as_array()) { 1100 - if let Some(aka_str) = aka.first().and_then(|v| v.as_str()) { 1101 - let handle = aka_str.strip_prefix("at://").unwrap_or(aka_str); 1102 - eprintln!(" handle: {}", handle); 1103 - } 1104 - } 1097 + } else if let Some(aka) = op_val.get("alsoKnownAs").and_then(|v| v.as_array()) 1098 + && let Some(aka_str) = aka.first().and_then(|v| v.as_str()) { 1099 + let handle = aka_str.strip_prefix("at://").unwrap_or(aka_str); 1100 + eprintln!(" handle: {}", handle); 1105 1101 } 1106 1102 } 1107 1103 } ··· 1176 1172 } 1177 1173 } 1178 1174 "cid" => { 1179 - owl.operation.cid.as_ref() 1180 - .map(|c| c.clone()) 1181 - .unwrap_or_else(|| "".to_string()) 1175 + owl.operation.cid.clone() 1176 + .unwrap_or_default() 1182 1177 } 1183 1178 "created_at" | "created" | "date" | "time" => owl.operation.created_at.clone(), 1184 1179 "nullified" => { ··· 1213 1208 "global" | "global_pos" => "".to_string(), 1214 1209 "status" => "✓".to_string(), 1215 1210 "cid" => { 1216 - op.cid.as_ref() 1217 - .map(|c| c.clone()) 1218 - .unwrap_or_else(|| "".to_string()) 1211 + op.cid.clone() 1212 + .unwrap_or_default() 1219 1213 } 1220 1214 "created_at" | "created" | "date" | "time" => op.created_at.clone(), 1221 1215 "nullified" => "false".to_string(), ··· 1650 1644 } 1651 1645 1652 1646 // Update rotation keys if this operation changes them 1653 - if let Some(new_rotation_keys) = entry.operation.rotation_keys() { 1654 - if new_rotation_keys != &current_rotation_keys { 1647 + if let Some(new_rotation_keys) = entry.operation.rotation_keys() 1648 + && new_rotation_keys != current_rotation_keys { 1655 1649 if verbose { 1656 1650 println!(" 🔄 Rotation keys updated by this operation"); 1657 1651 println!(" Old keys: {}", current_rotation_keys.len()); ··· 1661 1655 } 1662 1656 } 1663 1657 current_rotation_keys = new_rotation_keys.to_vec(); 1664 - } 1665 1658 } 1666 1659 } 1667 1660 ··· 1737 1730 let mut current_cid = genesis.cid.clone(); 1738 1731 1739 1732 // Follow the chain, preferring non-nullified operations 1740 - loop { 1741 - if let Some(indices) = prev_to_indices.get(&current_cid) { 1742 - // Find the first non-nullified operation 1743 - if let Some(&next_idx) = indices.iter().find(|&&idx| !audit_log[idx].nullified) { 1733 + while let Some(indices) = prev_to_indices.get(&current_cid) { 1734 + // Find the first non-nullified operation 1735 + if let Some(&next_idx) = indices.iter().find(|&&idx| !audit_log[idx].nullified) { 1736 + canonical.push(next_idx); 1737 + current_cid = audit_log[next_idx].cid.clone(); 1738 + } else { 1739 + // All operations at this point are nullified - try to find any operation 1740 + if let Some(&next_idx) = indices.first() { 1744 1741 canonical.push(next_idx); 1745 1742 current_cid = audit_log[next_idx].cid.clone(); 1746 1743 } else { 1747 - // All operations at this point are nullified - try to find any operation 1748 - if let Some(&next_idx) = indices.first() { 1749 - canonical.push(next_idx); 1750 - current_cid = audit_log[next_idx].cid.clone(); 1751 - } else { 1752 - break; 1753 - } 1744 + break; 1754 1745 } 1755 - } else { 1756 - // No more operations 1757 - break; 1758 1746 } 1759 1747 } 1760 1748 ··· 2017 2005 /// Find which rotation key signed an operation 2018 2006 fn find_signing_key(operation: &Operation, rotation_keys: &[String]) -> (Option<usize>, Option<String>) { 2019 2007 for (index, key_did) in rotation_keys.iter().enumerate() { 2020 - if let Ok(verifying_key) = VerifyingKey::from_did_key(key_did) { 2021 - if operation.verify(&[verifying_key]).is_ok() { 2008 + if let Ok(verifying_key) = VerifyingKey::from_did_key(key_did) 2009 + && operation.verify(&[verifying_key]).is_ok() { 2022 2010 return (Some(index), Some(key_did.clone())); 2023 - } 2024 2011 } 2025 2012 } 2026 2013 (None, None) ··· 2074 2061 let prev = entry.operation.prev(); 2075 2062 2076 2063 // Check if this operation is part of a fork 2077 - if let Some(_prev_cid) = prev { 2078 - if let Some(fork) = fork_map.get(&entry.cid) { 2064 + if let Some(_prev_cid) = prev 2065 + && let Some(fork) = fork_map.get(&entry.cid) { 2079 2066 // This is a fork point 2080 2067 if !processed_forks.contains(&fork.prev_cid) { 2081 2068 processed_forks.insert(fork.prev_cid.clone()); ··· 2101 2088 2102 2089 if let Some(key_idx) = fork_op.signing_key_index { 2103 2090 println!(" │ Signed by: rotation_key[{}]", key_idx); 2104 - if verbose { 2105 - if let Some(key) = &fork_op.signing_key { 2091 + if verbose 2092 + && let Some(key) = &fork_op.signing_key { 2106 2093 println!(" │ Key: {}", truncate_cid(key)); 2107 - } 2108 2094 } 2109 2095 } 2110 2096 ··· 2128 2114 println!(); 2129 2115 } 2130 2116 continue; 2131 - } 2132 2117 } 2133 2118 2134 2119 // Regular operation (not part of a fork) ··· 2136 2121 println!("🌱 Genesis"); 2137 2122 println!(" CID: {}", truncate_cid(&entry.cid)); 2138 2123 println!(" Timestamp: {}", entry.created_at); 2139 - if verbose { 2140 - if let Operation::PlcOperation { rotation_keys, .. } = &entry.operation { 2124 + if verbose 2125 + && let Operation::PlcOperation { rotation_keys, .. } = &entry.operation { 2141 2126 println!(" Rotation keys: {}", rotation_keys.len()); 2142 - } 2143 2127 } 2144 2128 println!(); 2145 2129 }
+6 -9
src/cli/cmd_export.rs
··· 178 178 // Process bundles through BundleManager API (follows RULES.md) 179 179 for bundle_num in bundle_numbers { 180 180 // Check count limit 181 - if let Some(limit) = count { 182 - if exported_count >= limit { 181 + if let Some(limit) = count 182 + && exported_count >= limit { 183 183 break; 184 - } 185 184 } 186 185 187 186 // Use BundleManager API to get decompressed stream ··· 226 225 } 227 226 228 227 // Check count limit 229 - if let Some(limit) = count { 230 - if exported_count >= limit { 228 + if let Some(limit) = count 229 + && exported_count >= limit { 231 230 break; 232 - } 233 231 } 234 232 235 233 output_buffer.push_str(&line); ··· 247 245 } 248 246 249 247 // Progress update (operations count in message, but bundles in progress bar) 250 - if let Some(ref pb) = pb { 251 - if exported_count % BATCH_SIZE == 0 || exported_count == 1 { 248 + if let Some(ref pb) = pb 249 + && (exported_count % BATCH_SIZE == 0 || exported_count == 1) { 252 250 let bytes = bytes_written.lock().unwrap(); 253 251 let total_bytes = *bytes + output_buffer.len() as u64; 254 252 drop(bytes); 255 253 pb.set_with_bytes(bundles_processed, total_bytes); 256 254 pb.set_message(format!("{} ops", utils::format_number(exported_count as u64))); 257 - } 258 255 } 259 256 } 260 257
+21 -27
src/cli/cmd_index.rs
··· 3 3 use anyhow::Result; 4 4 use clap::{Args, Subcommand}; 5 5 use plcbundle::{BundleManager, constants}; 6 - use std::path::PathBuf; 6 + use std::path::{Path, PathBuf}; 7 7 use std::time::Instant; 8 8 9 9 #[derive(Args)] ··· 274 274 fn drop(&mut self) { 275 275 // Cleanup temp files on drop (CTRL+C, panic, or normal exit) 276 276 let did_index = self.manager.get_did_index(); 277 - if let Some(idx) = did_index.read().unwrap().as_ref() { 278 - if let Err(e) = idx.cleanup_temp_files() { 277 + if let Some(idx) = did_index.read().unwrap().as_ref() 278 + && let Err(e) = idx.cleanup_temp_files() { 279 279 log::warn!("[Index Build] Failed to cleanup temp files: {}", e); 280 - } 281 280 } 282 281 } 283 282 } ··· 302 301 Err(e) => { 303 302 // Error occurred - explicitly cleanup temp files before returning 304 303 let did_index = manager_arc.get_did_index(); 305 - if let Some(idx) = did_index.read().unwrap().as_ref() { 306 - if let Err(cleanup_err) = idx.cleanup_temp_files() { 304 + if let Some(idx) = did_index.read().unwrap().as_ref() 305 + && let Err(cleanup_err) = idx.cleanup_temp_files() { 307 306 log::warn!("[Index Build] Failed to cleanup temp files after error: {}", cleanup_err); 308 - } 309 307 } 310 308 return Err(e); 311 309 } ··· 433 431 } 434 432 435 433 // Show compaction recommendation if needed 436 - if final_delta_segments >= 50 && final_delta_segments < 100 { 434 + if (50..100).contains(&final_delta_segments) { 437 435 eprintln!(); 438 436 eprintln!("💡 Tip: Consider running '{} index compact' to optimize performance", constants::BINARY_NAME); 439 437 } ··· 805 803 } else if warnings > 0 { 806 804 use super::utils::colors; 807 805 use super::utils::format_number; 808 - eprintln!("{}⚠️{} Index verification passed with warnings", "\x1b[33m", colors::RESET); 806 + eprintln!("\x1b[33m⚠️{} Index verification passed with warnings", colors::RESET); 809 807 eprintln!(" Warnings: {}", warnings); 810 808 eprintln!(" Total DIDs: {}", format_number(total_dids as u64)); 811 809 eprintln!(" Last bundle: {}", format_number(last_bundle as u64)); ··· 831 829 } 832 830 833 831 /// Get raw shard data as JSON 834 - fn get_raw_shard_data_json(dir: &PathBuf, shard_num: u8) -> Result<serde_json::Value> { 832 + fn get_raw_shard_data_json(dir: &Path, shard_num: u8) -> Result<serde_json::Value> { 835 833 use std::fs; 836 834 use crate::constants; 837 835 use crate::did_index::OpLocation; ··· 936 934 } 937 935 938 936 /// Get raw delta segment data as JSON 939 - fn get_raw_segment_data_json(dir: &PathBuf, shard_num: u8, file_name: &str) -> Result<serde_json::Value> { 937 + fn get_raw_segment_data_json(dir: &Path, shard_num: u8, file_name: &str) -> Result<serde_json::Value> { 940 938 use std::fs; 941 939 use crate::constants; 942 940 use crate::did_index::OpLocation; ··· 1040 1038 } 1041 1039 1042 1040 /// Display raw shard data in a readable format 1043 - fn display_raw_shard_data(dir: &PathBuf, shard_num: u8) -> Result<()> { 1041 + fn display_raw_shard_data(dir: &Path, shard_num: u8) -> Result<()> { 1044 1042 use std::fs; 1045 1043 use crate::constants; 1046 1044 use crate::did_index::OpLocation; ··· 1155 1153 } 1156 1154 1157 1155 /// Display raw delta segment data in a readable format 1158 - fn display_raw_segment_data(dir: &PathBuf, shard_num: u8, file_name: &str) -> Result<()> { 1156 + fn display_raw_segment_data(dir: &Path, shard_num: u8, file_name: &str) -> Result<()> { 1159 1157 use std::fs; 1160 1158 use crate::constants; 1161 1159 use crate::did_index::OpLocation; ··· 1318 1316 let lookup_elapsed = lookup_start.elapsed(); 1319 1317 1320 1318 // Extract identifier for shard calculation 1321 - let identifier = if did.starts_with("did:plc:") { 1322 - &did[8..] 1319 + let identifier = if let Some(id) = did.strip_prefix("did:plc:") { 1320 + id 1323 1321 } else { 1324 1322 anyhow::bail!("Only did:plc: DIDs are supported"); 1325 1323 }; ··· 1480 1478 1481 1479 if json { 1482 1480 // Add raw data to JSON output if a specific shard is requested 1483 - if let Some(shard_num) = shard { 1484 - if let Some(detail) = shard_details.first_mut() { 1481 + if let Some(shard_num) = shard 1482 + && let Some(detail) = shard_details.first_mut() { 1485 1483 let base_exists = detail 1486 1484 .get("base_exists") 1487 1485 .and_then(|v| v.as_bool()) 1488 1486 .unwrap_or(false); 1489 1487 1490 - if base_exists { 1491 - if let Ok(raw_data) = get_raw_shard_data_json(&dir, shard_num) { 1488 + if base_exists 1489 + && let Ok(raw_data) = get_raw_shard_data_json(&dir, shard_num) { 1492 1490 detail.insert("raw_data".to_string(), raw_data); 1493 - } 1494 1491 } 1495 1492 1496 1493 // Add raw data for delta segments ··· 1498 1495 for seg in segments { 1499 1496 let file_name = seg.get("file_name").and_then(|v| v.as_str()).unwrap_or(""); 1500 1497 let exists = seg.get("exists").and_then(|v| v.as_bool()).unwrap_or(false); 1501 - if exists && !file_name.is_empty() { 1502 - if let Ok(raw_data) = get_raw_segment_data_json(&dir, shard_num, file_name) { 1498 + if exists && !file_name.is_empty() 1499 + && let Ok(raw_data) = get_raw_segment_data_json(&dir, shard_num, file_name) { 1503 1500 seg.as_object_mut().unwrap().insert("raw_data".to_string(), raw_data); 1504 - } 1505 1501 } 1506 1502 } 1507 1503 } 1508 - } 1509 1504 } 1510 1505 1511 1506 let json_str = serde_json::to_string_pretty(&shard_details)?; ··· 1562 1557 println!(" Total size: {}", utils::format_bytes(total_size)); 1563 1558 println!(" Next segment ID: {}", next_segment_id); 1564 1559 1565 - if let Some(segments) = detail.get("segments").and_then(|v| v.as_array()) { 1566 - if !segments.is_empty() { 1560 + if let Some(segments) = detail.get("segments").and_then(|v| v.as_array()) 1561 + && !segments.is_empty() { 1567 1562 println!("\n Delta Segments:"); 1568 1563 println!(" ───────────────────────────────────────"); 1569 1564 for (idx, seg) in segments.iter().enumerate() { ··· 1596 1591 .unwrap_or(0) 1597 1592 ); 1598 1593 } 1599 - } 1600 1594 } 1601 1595 1602 1596 // Show raw shard data
+2 -2
src/cli/cmd_init.rs
··· 1 1 use anyhow::Result; 2 2 use clap::{Args, ValueHint}; 3 3 use plcbundle::{constants, BundleManager}; 4 - use std::path::PathBuf; 4 + use std::path::{Path, PathBuf}; 5 5 6 6 #[derive(Args)] 7 7 #[command( ··· 119 119 } 120 120 121 121 /// Create an error for when repository is already initialized 122 - fn already_initialized_error(dir: &PathBuf) -> anyhow::Error { 122 + fn already_initialized_error(dir: &Path) -> anyhow::Error { 123 123 anyhow::anyhow!( 124 124 "Repository already initialized at: {}\n\nUse --force to reinitialize", 125 125 dir.display()
+18 -24
src/cli/cmd_inspect.rs
··· 379 379 // Handle analysis 380 380 if let Some(aka) = op_val.get("alsoKnownAs").and_then(|v| v.as_array()) { 381 381 for item in aka.iter() { 382 - if let Some(aka_str) = item.as_str() { 383 - if aka_str.starts_with("at://") { 382 + if let Some(aka_str) = item.as_str() 383 + && aka_str.starts_with("at://") { 384 384 total_handles += 1; 385 385 386 386 // Extract domain ··· 402 402 if handle.contains('_') { 403 403 invalid_handles += 1; 404 404 } 405 - } 406 405 } 407 406 } 408 407 } ··· 412 411 total_services += services.len(); 413 412 414 413 // Extract PDS endpoints 415 - if let Some(pds_val) = op_val.get("services").and_then(|v| v.get("atproto_pds")) { 416 - if let Some(_pds) = pds_val.as_object() { 417 - if let Some(endpoint) = pds_val.get("endpoint").and_then(|v| v.as_str()) { 418 - // Normalize endpoint 419 - let endpoint = endpoint 420 - .strip_prefix("https://") 421 - .or_else(|| endpoint.strip_prefix("http://")) 422 - .unwrap_or(endpoint); 423 - let endpoint = endpoint.split('/').next().unwrap_or(endpoint); 424 - *endpoint_counts.entry(endpoint.to_string()).or_insert(0) += 1; 425 - } 426 - } 414 + if let Some(pds_val) = op_val.get("services").and_then(|v| v.get("atproto_pds")) 415 + && let Some(_pds) = pds_val.as_object() 416 + && let Some(endpoint) = pds_val.get("endpoint").and_then(|v| v.as_str()) { 417 + // Normalize endpoint 418 + let endpoint = endpoint 419 + .strip_prefix("https://") 420 + .or_else(|| endpoint.strip_prefix("http://")) 421 + .unwrap_or(endpoint); 422 + let endpoint = endpoint.split('/').next().unwrap_or(endpoint); 423 + *endpoint_counts.entry(endpoint.to_string()).or_insert(0) += 1; 427 424 } 428 425 } 429 426 } ··· 666 663 println!(); 667 664 668 665 // Embedded metadata (if available and not skipped) 669 - if !cmd.skip_metadata && result.has_metadata_frame { 670 - if let Some(ref meta) = result.embedded_metadata { 666 + if !cmd.skip_metadata && result.has_metadata_frame 667 + && let Some(ref meta) = result.embedded_metadata { 671 668 println!("📋 Embedded Metadata (Skippable Frame)"); 672 669 println!("───────────────────────────────────────"); 673 670 println!(" Format: {}", meta.format); ··· 710 707 711 708 println!("\n Integrity:"); 712 709 println!(" Content hash: {}", meta.content_hash); 713 - if let Some(ref parent) = meta.parent_hash { 714 - if !parent.is_empty() { 710 + if let Some(ref parent) = meta.parent_hash 711 + && !parent.is_empty() { 715 712 println!(" Parent hash: {}", parent); 716 - } 717 713 } 718 714 719 715 // Index metadata for chain info ··· 750 746 751 747 println!(); 752 748 } 753 - } 754 749 755 750 // Operations breakdown 756 751 println!("📊 Operations Analysis"); ··· 816 811 println!("🏷️ Handle Statistics"); 817 812 println!("────────────────────"); 818 813 println!(" Total handles: {}", format_number(total_handles)); 819 - if let Some(invalid) = result.invalid_handles { 820 - if invalid > 0 { 814 + if let Some(invalid) = result.invalid_handles 815 + && invalid > 0 { 821 816 println!( 822 817 " Invalid patterns: {} ({:.1}%)", 823 818 format_number(invalid), 824 819 (invalid as f64 / total_handles as f64 * 100.0) 825 820 ); 826 - } 827 821 } 828 822 829 823 if !result.top_domains.is_empty() {
+4 -4
src/cli/cmd_ls.rs
··· 246 246 247 247 "size" | "compressed" => { 248 248 if human_readable { 249 - format_bytes_compact(meta.compressed_size as u64) 249 + format_bytes_compact(meta.compressed_size) 250 250 } else { 251 251 format!("{}", meta.compressed_size) 252 252 } 253 253 }, 254 254 "size_mb" => format!("{:.2}", meta.compressed_size as f64 / (1024.0 * 1024.0)), 255 - "size_h" | "size_human" => format_bytes_compact(meta.compressed_size as u64), 255 + "size_h" | "size_human" => format_bytes_compact(meta.compressed_size), 256 256 257 257 "uncompressed" => { 258 258 if human_readable { 259 - format_bytes_compact(meta.uncompressed_size as u64) 259 + format_bytes_compact(meta.uncompressed_size) 260 260 } else { 261 261 format!("{}", meta.uncompressed_size) 262 262 } 263 263 }, 264 264 "uncompressed_mb" => format!("{:.2}", meta.uncompressed_size as f64 / (1024.0 * 1024.0)), 265 - "uncompressed_h" | "uncompressed_human" => format_bytes_compact(meta.uncompressed_size as u64), 265 + "uncompressed_h" | "uncompressed_human" => format_bytes_compact(meta.uncompressed_size), 266 266 267 267 "ratio" => { 268 268 if meta.compressed_size > 0 {
+3 -2
src/cli/cmd_mempool.rs
··· 4 4 use anyhow::Result; 5 5 use clap::{Args, Subcommand, ValueHint}; 6 6 use plcbundle::format::format_number; 7 + use std::path::Path; 7 8 use plcbundle::{BundleManager, constants}; 8 9 use std::io::{self, Write}; 9 10 use std::path::PathBuf; ··· 94 95 } 95 96 } 96 97 97 - fn show_status(manager: &BundleManager, dir: &PathBuf, verbose: bool) -> Result<()> { 98 + fn show_status(manager: &BundleManager, dir: &Path, verbose: bool) -> Result<()> { 98 99 manager.load_mempool()?; 99 100 let stats: plcbundle::MempoolStats = manager.get_mempool_stats()?; 100 101 ··· 205 206 Ok(()) 206 207 } 207 208 208 - fn clear(manager: &BundleManager, dir: &PathBuf, force: bool) -> Result<()> { 209 + fn clear(manager: &BundleManager, dir: &Path, force: bool) -> Result<()> { 209 210 manager.load_mempool()?; 210 211 let stats = manager.get_mempool_stats()?; 211 212 let count = stats.count;
+10 -8
src/cli/cmd_migrate.rs
··· 101 101 102 102 for meta in bundles { 103 103 // Filter by bundle range if specified 104 - if let Some(ref target_set) = target_bundles { 105 - if !target_set.contains(&meta.bundle_number) { 104 + if let Some(ref target_set) = target_bundles 105 + && !target_set.contains(&meta.bundle_number) { 106 106 continue; 107 - } 108 107 } 109 108 110 109 let embedded_meta = manager.get_embedded_metadata(meta.bundle_number)?; ··· 239 238 let total_bytes = bytes_atomic.fetch_add(info.old_size, Ordering::Relaxed) + info.old_size; 240 239 241 240 // Only update progress bar periodically to reduce lock contention 242 - if current_count % update_interval == 0 || current_count == 1 { 241 + if current_count.is_multiple_of(update_interval) || current_count == 1 { 243 242 let prog = progress_arc.lock().unwrap(); 244 243 prog.set_with_bytes(current_count, total_bytes); 245 244 } ··· 372 371 size_change, 373 372 size_diff as f64 / total_old_size as f64 * 100.0 374 373 ); 374 + let old_ratio_str = format!("{:.3}x", old_ratio); 375 + let new_ratio_str = format!("{:.3}x", new_ratio); 376 + let ratio_diff_str = format!("{:+.3}x", ratio_diff); 375 377 eprintln!( 376 378 "Ratio {:<13} {:<13} {}", 377 - format!("{:.3}x", old_ratio), 378 - format!("{:.3}x", new_ratio), 379 - format!("{:+.3}x", ratio_diff) 379 + old_ratio_str, 380 + new_ratio_str, 381 + ratio_diff_str 380 382 ); 381 383 let avg_change = size_diff / success as i64; 382 384 let avg_change_str = if avg_change >= 0 { ··· 482 484 483 485 // Check if it's a skippable frame 484 486 let magic = u32::from_le_bytes([header[0], header[1], header[2], header[3]]); 485 - if magic < 0x184D2A50 || magic > 0x184D2A5F { 487 + if !(0x184D2A50..=0x184D2A5F).contains(&magic) { 486 488 return Ok(0); // No metadata frame 487 489 } 488 490
+1 -1
src/cli/cmd_op.rs
··· 406 406 }; 407 407 408 408 for (i, op) in result.operations.iter().enumerate() { 409 - let cid_matches = op.cid.as_ref().map_or(false, |c| c == &cid); 409 + let cid_matches = op.cid.as_ref() == Some(&cid); 410 410 411 411 if cid_matches { 412 412 let global_pos =
+1 -5
src/cli/cmd_query.rs
··· 177 177 178 178 // Track bundle count separately since callback gives increment, not total 179 179 let bundle_count = Arc::new(Mutex::new(0usize)); 180 - let pb_arc = if let Some(ref pb) = pb { 181 - Some(Arc::new(Mutex::new(pb))) 182 - } else { 183 - None 184 - }; 180 + let pb_arc = pb.as_ref().map(|pb| Arc::new(Mutex::new(pb))); 185 181 186 182 let stats = processor.process( 187 183 &bundle_numbers,
+3 -3
src/cli/cmd_rollback.rs
··· 3 3 use clap::Args; 4 4 use plcbundle::BundleManager; 5 5 use std::io::{self, Write}; 6 - use std::path::PathBuf; 6 + use std::path::{Path, PathBuf}; 7 7 8 8 #[derive(Args)] 9 9 #[command( ··· 106 106 bail!("cannot use both --to and --last together"); 107 107 } 108 108 109 - if super::utils::is_repository_empty(&manager) { 109 + if super::utils::is_repository_empty(manager) { 110 110 bail!("no bundles to rollback"); 111 111 } 112 112 ··· 194 194 }) 195 195 } 196 196 197 - fn display_rollback_plan(dir: &PathBuf, plan: &RollbackPlan) -> Result<()> { 197 + fn display_rollback_plan(dir: &Path, plan: &RollbackPlan) -> Result<()> { 198 198 println!("╔════════════════════════════════════════════════════════════════╗"); 199 199 println!("║ ROLLBACK PLAN ║"); 200 200 println!("╚════════════════════════════════════════════════════════════════╝\n");
+1 -1
src/cli/cmd_server.rs
··· 155 155 let callback = Box::new(move |current: u32, _total: u32, bytes_processed: u64, _total_bytes: u64| { 156 156 let pb = progress_clone.lock().unwrap(); 157 157 pb.set_with_bytes(current as usize, bytes_processed); 158 - if verbose && current % 100 == 0 { 158 + if verbose && current.is_multiple_of(100) { 159 159 log::debug!("[DIDResolver] Index progress: {}/{} bundles", current, _total); 160 160 } 161 161 }) as Box<dyn Fn(u32, u32, u64, u64) + Send + Sync>;
+1 -1
src/cli/cmd_stats.rs
··· 328 328 .max() 329 329 .cloned(); 330 330 331 - let time_span_days = if let (Some(ref earliest), Some(ref latest)) = (earliest_time.as_ref(), latest_time.as_ref()) { 331 + let time_span_days = if let (Some(earliest), Some(latest)) = (earliest_time.as_ref(), latest_time.as_ref()) { 332 332 if let (Ok(e), Ok(l)) = ( 333 333 chrono::DateTime::parse_from_rfc3339(earliest), 334 334 chrono::DateTime::parse_from_rfc3339(latest),
+8 -10
src/cli/cmd_status.rs
··· 1 1 use anyhow::Result; 2 2 use clap::Parser; 3 3 use plcbundle::*; 4 - use std::path::PathBuf; 4 + use std::path::{Path, PathBuf}; 5 5 6 6 use super::utils; 7 7 ··· 63 63 fn print_human_status( 64 64 manager: &BundleManager, 65 65 index: &Index, 66 - dir: &PathBuf, 66 + dir: &Path, 67 67 detailed: bool 68 68 ) -> Result<()> { 69 69 let dir_display = utils::display_path(dir); ··· 187 187 suggestions.push("Build DID index for fast lookups: plcbundle index build"); 188 188 } 189 189 190 - if let Ok(mempool_stats) = manager.get_mempool_stats() { 191 - if mempool_stats.count >= constants::BUNDLE_SIZE { 190 + if let Ok(mempool_stats) = manager.get_mempool_stats() 191 + && mempool_stats.count >= constants::BUNDLE_SIZE { 192 192 suggestions.push("Mempool ready to create new bundle: plcbundle sync"); 193 - } 194 193 } 195 194 196 195 // Add general hints ··· 231 230 fn print_json_status( 232 231 manager: &BundleManager, 233 232 index: &Index, 234 - dir: &PathBuf, 233 + dir: &Path, 235 234 ) -> Result<()> { 236 235 use serde_json::json; 237 236 ··· 290 289 if !is_empty && !did_index_exists { 291 290 health.push("did_index_not_built"); 292 291 } 293 - if let Ok(mempool_stats) = manager.get_mempool_stats() { 294 - if mempool_stats.count >= constants::BUNDLE_SIZE { 292 + if let Ok(mempool_stats) = manager.get_mempool_stats() 293 + && mempool_stats.count >= constants::BUNDLE_SIZE { 295 294 health.push("mempool_ready_to_bundle"); 296 - } 297 295 } 298 296 status["health"] = json!(health); 299 297 ··· 302 300 Ok(()) 303 301 } 304 302 305 - fn check_did_index_exists(dir: &PathBuf) -> bool { 303 + fn check_did_index_exists(dir: &Path) -> bool { 306 304 let did_index_dir = dir.join(constants::DID_INDEX_DIR).join(constants::DID_INDEX_SHARDS); 307 305 did_index_dir.exists() && did_index_dir.is_dir() 308 306 }
+4 -6
src/cli/cmd_verify.rs
··· 349 349 drop(job_tx); // Close sender, workers will finish 350 350 351 351 // Collect results and update progress in real-time 352 - let mut results: Vec<(usize, u32, VerifyResult)> = Vec::new(); 353 - results.reserve(bundles.len()); 352 + let mut results: Vec<(usize, u32, VerifyResult)> = Vec::with_capacity(bundles.len()); 354 353 355 354 let mut verified_count = 0; 356 355 let mut error_count = 0; ··· 404 403 } 405 404 406 405 // Store first error for summary 407 - if first_error.is_none() { 408 - if let Some(first_err) = errors.first() { 406 + if first_error.is_none() 407 + && let Some(first_err) = errors.first() { 409 408 first_error = Some(anyhow::anyhow!("{}", first_err)); 410 - } 411 409 } 412 410 } else { 413 411 verified_count += 1; ··· 617 615 let bundles_per_sec = total as f64 / elapsed.as_secs_f64(); 618 616 eprintln!(" Throughput: {:.1} bundles/sec", bundles_per_sec); 619 617 620 - let avg_time = elapsed / total as u32; 618 + let avg_time = elapsed / total; 621 619 eprintln!(" Avg/bundle: {:?}", avg_time); 622 620 } 623 621
+2 -3
src/cli/progress.rs
··· 247 247 // Don't set message - progress bar template will show pos/len without extra message 248 248 249 249 // Finish progress bar when stage 2 completes (256/256) 250 - if current == 256 { 251 - if let Some(pb) = stage2_pb_guard.take() { 250 + if current == 256 251 + && let Some(pb) = stage2_pb_guard.take() { 252 252 pb.finish(); 253 - } 254 253 } 255 254 } 256 255 } else {
+1 -1
src/cli/utils.rs
··· 206 206 let mut stat: libc::statvfs = std::mem::zeroed(); 207 207 if libc::statvfs(c_path.as_ptr(), &mut stat) == 0 { 208 208 // Calculate free space: available blocks * block size 209 - let free_bytes = stat.f_bavail as u64 * stat.f_frsize as u64; 209 + let free_bytes = stat.f_bavail as u64 * stat.f_frsize; 210 210 Some(free_bytes) 211 211 } else { 212 212 None
+46 -56
src/did_index.rs
··· 20 20 const DIDINDEX_MAGIC: &[u8; 4] = b"PLCD"; 21 21 const DIDINDEX_VERSION: u32 = 4; 22 22 23 + // Type aliases to simplify complex signatures for Clippy 24 + type ShardEntries = HashMap<u8, Vec<(String, OpLocation)>>; 25 + type ShardProcessResult = Result<(ShardEntries, u64, u64)>; 26 + 23 27 // ============================================================================ 24 28 // OpLocation - Packed 32-bit global position with nullified flag 25 29 // ··· 253 257 fn add(&mut self, identifier: String, loc: OpLocation) { 254 258 self.entries 255 259 .entry(identifier) 256 - .or_insert_with(Vec::new) 260 + .or_default() 257 261 .push(loc); 258 262 } 259 263 260 264 fn merge(&mut self, other: HashMap<String, Vec<OpLocation>>) { 261 265 for (id, locs) in other { 262 - self.entries.entry(id).or_insert_with(Vec::new).extend(locs); 266 + self.entries.entry(id).or_default().extend(locs); 263 267 } 264 268 } 265 269 ··· 513 517 anyhow::bail!("DID index has zero entries."); 514 518 } 515 519 516 - let seed = seed.unwrap_or_else(|| unix_timestamp()); 520 + let seed = seed.unwrap_or_else(unix_timestamp); 517 521 let mut attempts = 0usize; 518 522 let mut results = Vec::with_capacity(count); 519 523 ··· 716 720 fn process_bundle_for_index( 717 721 bundle_dir: &PathBuf, 718 722 bundle_num: u32, 719 - ) -> Result<(HashMap<u8, Vec<(String, OpLocation)>>, u64, u64)> { 723 + ) -> ShardProcessResult { 720 724 use std::fs::File; 721 725 use std::io::BufReader; 722 726 ··· 744 748 fn process_bundle_lines<R: std::io::BufRead>( 745 749 bundle_num: u32, 746 750 reader: R, 747 - ) -> Result<(HashMap<u8, Vec<(String, OpLocation)>>, u64, u64)> { 751 + ) -> ShardProcessResult { 748 752 use sonic_rs::JsonValueTrait; 749 753 750 754 let mut shard_entries: HashMap<u8, Vec<(String, OpLocation)>> = HashMap::new(); ··· 766 770 total_operations += 1; 767 771 768 772 // Parse only the fields we need (did and nullified) 769 - if let Ok(value) = sonic_rs::from_str::<sonic_rs::Value>(&line) { 770 - if let Some(did) = value.get("did").and_then(|v| v.as_str()) { 771 - let nullified = value.get("nullified") 772 - .and_then(|v| v.as_bool()) 773 - .unwrap_or(false); 773 + if let Ok(value) = sonic_rs::from_str::<sonic_rs::Value>(&line) 774 + && let Some(did) = value.get("did").and_then(|v| v.as_str()) { 775 + let nullified = value.get("nullified") 776 + .and_then(|v| v.as_bool()) 777 + .unwrap_or(false); 774 778 775 779 // Calculate shard directly from DID bytes (no String allocation) 776 780 if let Some(shard_num) = calculate_shard_from_did(did) { ··· 778 782 let identifier = &did[DID_PREFIX.len()..DID_PREFIX.len() + DID_IDENTIFIER_LEN]; 779 783 let identifier = identifier.to_string(); 780 784 781 - let global_pos = crate::constants::bundle_position_to_global(bundle_num as u32, position as usize) as u32; 785 + let global_pos = crate::constants::bundle_position_to_global(bundle_num, position as usize) as u32; 782 786 let loc = OpLocation::new(global_pos, nullified); 783 787 784 788 shard_entries 785 789 .entry(shard_num) 786 - .or_insert_with(Vec::new) 790 + .or_default() 787 791 .push((identifier, loc)); 788 792 } 789 793 } 790 - } 791 794 792 795 position += 1; 793 796 if position >= constants::BUNDLE_SIZE as u16 { ··· 858 861 for entry in fs::read_dir(shard_dir)? { 859 862 let entry = entry?; 860 863 let path = entry.path(); 861 - if let Some(ext) = path.extension() { 862 - if ext == "tmp" { 863 - fs::remove_file(&path)?; 864 - cleaned += 1; 865 - } 864 + if path.extension().is_some_and(|ext| ext == "tmp") { 865 + fs::remove_file(&path)?; 866 + cleaned += 1; 866 867 } 867 868 } 868 869 if cleaned > 0 { ··· 933 934 let mut total_flush_time = std::time::Duration::ZERO; 934 935 935 936 // Metrics tracking (aggregated every N bundles) 936 - let metrics_interval = 100; // Log metrics every 100 bundles 937 + let metrics_interval = 100u32; // Log metrics every 100 bundles 937 938 let mut metrics_start = Instant::now(); 938 939 let mut metrics_ops = 0u64; 939 940 let mut metrics_bytes = 0u64; ··· 947 948 let batch_end = (batch_start + batch_size).min(bundle_numbers.len()); 948 949 let batch: Vec<u32> = bundle_numbers[batch_start..batch_end].to_vec(); 949 950 950 - let batch_results: Vec<Result<(HashMap<u8, Vec<(String, OpLocation)>>, u64, u64)>> = if use_parallel { 951 + let batch_results: Vec<ShardProcessResult> = if use_parallel { 951 952 // Process batch in parallel 952 953 batch.par_iter() 953 954 .map(|&bundle_num| Self::process_bundle_for_index(bundle_dir, bundle_num)) ··· 996 997 // Merge batch entries into main shard_entries 997 998 for (shard_num, mut entries) in batch_entries.drain() { 998 999 shard_entries.entry(shard_num) 999 - .or_insert_with(Vec::new) 1000 + .or_default() 1000 1001 .append(&mut entries); 1001 1002 } 1002 1003 ··· 1050 1051 1051 1052 log::info!( 1052 1053 "[DID Index] Metrics (bundles {}..{}): {} ops, {:.1} MB | {:.1} ops/sec, {:.1} MB/sec", 1053 - last_bundle_in_batch.saturating_sub(metrics_interval as u32 - 1).max(1), 1054 + last_bundle_in_batch.saturating_sub(metrics_interval - 1).max(1), 1054 1055 last_bundle_in_batch, 1055 1056 metrics_ops, 1056 1057 metrics_bytes as f64 / 1_000_000.0, ··· 1271 1272 1272 1273 valid_dids += 1; 1273 1274 let shard_num = self.calculate_shard(&identifier); 1274 - let global_pos = crate::constants::bundle_position_to_global(bundle_num as u32, position as usize) as u32; 1275 + let global_pos = crate::constants::bundle_position_to_global(bundle_num, position) as u32; 1275 1276 let loc = OpLocation::new(global_pos, *nullified); 1276 1277 1277 1278 shard_ops 1278 1279 .entry(shard_num) 1279 - .or_insert_with(HashMap::new) 1280 + .or_default() 1280 1281 .entry(identifier) 1281 - .or_insert_with(Vec::new) 1282 + .or_default() 1282 1283 .push(loc); 1283 1284 } 1284 1285 ··· 2049 2050 2050 2051 impl Drop for TempFileGuard { 2051 2052 fn drop(&mut self) { 2052 - if self.path.exists() { 2053 - if let Err(e) = fs::remove_file(&self.path) { 2054 - log::warn!("[DID Index] Failed to remove temp file {}: {}", self.path.display(), e); 2055 - } 2053 + if self.path.exists() && let Err(e) = fs::remove_file(&self.path) { 2054 + log::warn!("[DID Index] Failed to remove temp file {}: {}", self.path.display(), e); 2056 2055 } 2057 2056 } 2058 2057 } ··· 2456 2455 let start = Instant::now(); 2457 2456 2458 2457 // Ensure shard directory exists 2459 - if let Some(parent) = target.parent() { 2460 - if !parent.exists() { 2461 - fs::create_dir_all(parent)?; 2462 - } 2458 + if let Some(parent) = target.parent() && !parent.exists() { 2459 + fs::create_dir_all(parent)?; 2463 2460 } 2464 2461 2465 2462 if builder.entries.is_empty() { 2466 - fs::write(target, &[])?; 2463 + fs::write(target, [])?; 2467 2464 return Ok(()); 2468 2465 } 2469 2466 ··· 2740 2737 for entry in fs::read_dir(&self.shard_dir)? { 2741 2738 let entry = entry?; 2742 2739 let path = entry.path(); 2743 - if let Some(ext) = path.extension() { 2744 - if ext == "tmp" { 2745 - fs::remove_file(&path)?; 2746 - cleaned += 1; 2747 - } 2740 + if let Some(ext) = path.extension() && ext == "tmp" { 2741 + fs::remove_file(&path)?; 2742 + cleaned += 1; 2748 2743 } 2749 2744 } 2750 2745 if cleaned > 0 { ··· 2890 2885 let exists = seg.get("exists").and_then(|v| v.as_bool()).unwrap_or(false); 2891 2886 if !exists { 2892 2887 missing_delta_segments += 1; 2893 - if let Some(start) = seg.get("bundle_start").and_then(|v| v.as_u64()).map(|v| v as u32) { 2894 - if let Some(end) = seg.get("bundle_end").and_then(|v| v.as_u64()).map(|v| v as u32) { 2888 + if let Some(start) = seg.get("bundle_start").and_then(|v| v.as_u64()).map(|v| v as u32) 2889 + && let Some(end) = seg.get("bundle_end").and_then(|v| v.as_u64()).map(|v| v as u32) { 2895 2890 for bundle_num in start..=end { 2896 2891 missing_segment_bundles.insert(bundle_num); 2897 2892 } 2898 2893 } 2899 - } 2900 2894 } 2901 2895 } 2902 2896 } 2903 2897 } 2904 2898 2905 - let missing_bundles = if index_last_bundle < last_bundle_in_repo { 2906 - last_bundle_in_repo - index_last_bundle 2907 - } else { 2908 - 0 2909 - }; 2899 + let missing_bundles = last_bundle_in_repo.saturating_sub(index_last_bundle); 2910 2900 2911 2901 let needs_rebuild = index_last_bundle < last_bundle_in_repo 2912 2902 || missing_base_shards > 0 ··· 3350 3340 // Test creating OpLocation with nullified = false 3351 3341 let loc = OpLocation::new(0, false); 3352 3342 assert_eq!(loc.global_position(), 0); 3353 - assert_eq!(loc.nullified(), false); 3343 + assert!(!loc.nullified()); 3354 3344 assert_eq!(loc.bundle(), 1); 3355 3345 assert_eq!(loc.position(), 0); 3356 3346 3357 3347 // Test creating OpLocation with nullified = true 3358 3348 let loc = OpLocation::new(0, true); 3359 3349 assert_eq!(loc.global_position(), 0); 3360 - assert_eq!(loc.nullified(), true); 3350 + assert!(loc.nullified()); 3361 3351 3362 3352 // Test with various global positions 3363 3353 let loc = OpLocation::new(42, false); 3364 3354 assert_eq!(loc.global_position(), 42); 3365 - assert_eq!(loc.nullified(), false); 3355 + assert!(!loc.nullified()); 3366 3356 3367 3357 let loc = OpLocation::new(10000, false); 3368 3358 assert_eq!(loc.global_position(), 10000); ··· 3371 3361 3372 3362 let loc = OpLocation::new(10500, true); 3373 3363 assert_eq!(loc.global_position(), 10500); 3374 - assert_eq!(loc.nullified(), true); 3364 + assert!(loc.nullified()); 3375 3365 assert_eq!(loc.bundle(), 2); 3376 3366 assert_eq!(loc.position(), 500); 3377 3367 } ··· 3412 3402 3413 3403 assert_eq!(loc1.global_position(), loc2.global_position()); 3414 3404 assert_ne!(loc1.nullified(), loc2.nullified()); 3415 - assert_eq!(loc1.nullified(), false); 3416 - assert_eq!(loc2.nullified(), true); 3405 + assert!(!loc1.nullified()); 3406 + assert!(loc2.nullified()); 3417 3407 } 3418 3408 3419 3409 #[test] ··· 3431 3421 let loc2 = OpLocation::from_u32(u32_val); 3432 3422 assert_eq!(loc1.global_position(), loc2.global_position()); 3433 3423 assert_eq!(loc1.nullified(), loc2.nullified()); 3434 - assert_eq!(loc2.nullified(), true); 3424 + assert!(loc2.nullified()); 3435 3425 } 3436 3426 3437 3427 #[test] ··· 3460 3450 // Test maximum u32 value (will overflow bundle/position but should not panic) 3461 3451 let loc = OpLocation::new(u32::MAX >> 1, false); 3462 3452 assert_eq!(loc.global_position(), u32::MAX >> 1); 3463 - assert_eq!(loc.nullified(), false); 3453 + assert!(!loc.nullified()); 3464 3454 3465 3455 // Test with nullified flag set on max value 3466 3456 let loc = OpLocation::new(u32::MAX >> 1, true); 3467 - assert_eq!(loc.nullified(), true); 3457 + assert!(loc.nullified()); 3468 3458 } 3469 3459 3470 3460 #[test]
+96 -10
src/ffi.rs
··· 113 113 // BundleManager lifecycle 114 114 // ============================================================================ 115 115 116 + /// # Safety 117 + /// 118 + /// The caller must ensure `bundle_dir` is a valid, NUL-terminated C string 119 + /// pointer. The returned pointer is owned by the caller and must be freed 120 + /// with `bundle_manager_free` when no longer needed. Passing a null or 121 + /// invalid pointer is undefined behavior. 116 122 #[unsafe(no_mangle)] 117 123 pub unsafe extern "C" fn bundle_manager_new(bundle_dir: *const c_char) -> *mut CBundleManager { 118 124 let bundle_dir = unsafe { ··· 133 139 } 134 140 } 135 141 142 + /// # Safety 143 + /// 144 + /// The caller must ensure `manager` is a pointer previously returned by 145 + /// `bundle_manager_new` and not already freed. Passing invalid or dangling 146 + /// pointers is undefined behavior. 136 147 #[unsafe(no_mangle)] 137 148 pub unsafe extern "C" fn bundle_manager_free(manager: *mut CBundleManager) { 138 149 if !manager.is_null() { ··· 146 157 // Smart Loading 147 158 // ============================================================================ 148 159 160 + /// # Safety 161 + /// 162 + /// The `manager` pointer must be valid. `options` may be NULL to use defaults. 163 + /// `out_result` must be a valid writable pointer to a `CLoadResult` struct. 164 + /// Strings passed to this API must be NUL-terminated. Violating these 165 + /// requirements is undefined behavior. 149 166 #[unsafe(no_mangle)] 150 167 pub unsafe extern "C" fn bundle_manager_load_bundle( 151 168 manager: *const CBundleManager, ··· 198 215 // Batch Operations 199 216 // ============================================================================ 200 217 218 + /// # Safety 219 + /// 220 + /// The `manager` pointer must be valid. `requests` must point to `count` 221 + /// valid `COperationRequest` items. `out_operations` and `out_count` must 222 + /// be valid writable pointers. Returned `COperation` arrays must be freed 223 + /// by the caller using `bundle_manager_free_operations` if applicable. 201 224 #[unsafe(no_mangle)] 202 225 pub unsafe extern "C" fn bundle_manager_get_operations_batch( 203 226 manager: *const CBundleManager, ··· 244 267 // DID Operations 245 268 // ============================================================================ 246 269 270 + /// # Safety 271 + /// 272 + /// The `manager` pointer must be valid. `did` must be a valid NUL-terminated 273 + /// C string. `out_operations` and `out_count` must be valid writable pointers 274 + /// to receive results. Returned memory ownership rules are documented in the 275 + /// API and must be followed by the caller. 247 276 #[unsafe(no_mangle)] 248 277 pub unsafe extern "C" fn bundle_manager_get_did_operations( 249 278 manager: *const CBundleManager, ··· 281 310 } 282 311 } 283 312 313 + /// # Safety 314 + /// 315 + /// `manager` must be valid. `dids` must point to `did_count` valid 316 + /// NUL-terminated C string pointers. `callback` will be called from this 317 + /// function and must be a valid function pointer. The caller must ensure the 318 + /// callback and its environment remain valid for the duration of the call. 284 319 #[unsafe(no_mangle)] 285 320 pub unsafe extern "C" fn bundle_manager_batch_resolve_dids( 286 321 manager: *const CBundleManager, ··· 328 363 // Query 329 364 // ============================================================================ 330 365 366 + /// # Safety 367 + /// 368 + /// `manager` must be valid. `query_str` must be a valid NUL-terminated C 369 + /// string. `out_operations` and `out_count` must be valid writable pointers. 370 + /// The caller is responsible for freeing any returned memory according to the 371 + /// API's ownership rules. 331 372 #[unsafe(no_mangle)] 332 373 pub unsafe extern "C" fn bundle_manager_query( 333 374 manager: *const CBundleManager, ··· 388 429 // Verification 389 430 // ============================================================================ 390 431 432 + /// # Safety 433 + /// 434 + /// `manager` must be a valid pointer. `out_result` must be a valid writable 435 + /// pointer to `CVerifyResult`. Passing invalid or null pointers is undefined 436 + /// behavior. 391 437 #[unsafe(no_mangle)] 392 438 pub unsafe extern "C" fn bundle_manager_verify_bundle( 393 439 manager: *const CBundleManager, ··· 432 478 } 433 479 } 434 480 481 + /// # Safety 482 + /// 483 + /// `manager` must be a valid pointer. Calling this function with invalid 484 + /// pointers is undefined behavior. 435 485 #[unsafe(no_mangle)] 436 486 pub unsafe extern "C" fn bundle_manager_verify_chain( 437 487 manager: *const CBundleManager, ··· 466 516 // Info 467 517 // ============================================================================ 468 518 519 + /// # Safety 520 + /// 521 + /// `manager` must be valid. `out_info` must be a valid writable pointer to 522 + /// `CBundleInfo`. Any returned string pointers must be freed by the caller as 523 + /// documented by the API. 469 524 #[unsafe(no_mangle)] 470 525 pub unsafe extern "C" fn bundle_manager_get_bundle_info( 471 526 manager: *const CBundleManager, ··· 507 562 // Cache Management 508 563 // ============================================================================ 509 564 565 + /// # Safety 566 + /// 567 + /// `manager` must be valid. `bundle_nums` must point to `count` valid u32 568 + /// values. Passing invalid pointers is undefined behavior. 510 569 #[unsafe(no_mangle)] 511 570 pub unsafe extern "C" fn bundle_manager_prefetch_bundles( 512 571 manager: *const CBundleManager, ··· 526 585 } 527 586 } 528 587 588 + /// # Safety 589 + /// 590 + /// `manager` must be valid. Passing invalid pointers is undefined behavior. 529 591 #[unsafe(no_mangle)] 530 592 pub unsafe extern "C" fn bundle_manager_warm_up( 531 593 manager: *const CBundleManager, ··· 556 618 } 557 619 } 558 620 621 + /// # Safety 622 + /// 623 + /// `manager` must be a valid pointer previously returned by 624 + /// `bundle_manager_new`. 559 625 #[unsafe(no_mangle)] 560 626 pub unsafe extern "C" fn bundle_manager_clear_caches(manager: *const CBundleManager) -> i32 { 561 627 if manager.is_null() { ··· 571 637 // DID Index 572 638 // ============================================================================ 573 639 640 + /// # Safety 641 + /// 642 + /// `manager` must be valid. `out_stats` must be a valid writable pointer if 643 + /// provided. The caller must ensure `progress_callback` is a valid function 644 + /// pointer if passed. 574 645 #[unsafe(no_mangle)] 575 646 pub unsafe extern "C" fn bundle_manager_rebuild_did_index( 576 647 manager: *const CBundleManager, ··· 606 677 } 607 678 } 608 679 680 + /// # Safety 681 + /// 682 + /// `manager` must be valid. `out_stats` must be a valid writable pointer. 609 683 #[unsafe(no_mangle)] 610 684 pub unsafe extern "C" fn bundle_manager_get_did_index_stats( 611 685 manager: *const CBundleManager, ··· 636 710 // Observability 637 711 // ============================================================================ 638 712 713 + /// # Safety 714 + /// 715 + /// `manager` must be valid. `out_stats` must be a valid writable pointer. 639 716 #[unsafe(no_mangle)] 640 717 pub unsafe extern "C" fn bundle_manager_get_stats( 641 718 manager: *const CBundleManager, ··· 703 780 } 704 781 } 705 782 783 + /// # Safety 784 + /// 785 + /// `s` must be a pointer previously returned by this API that is safe to 786 + /// free. Passing invalid pointers is undefined behavior. 706 787 #[unsafe(no_mangle)] 707 788 pub unsafe extern "C" fn bundle_manager_free_string(s: *mut c_char) { 708 789 if !s.is_null() { ··· 712 793 } 713 794 } 714 795 796 + /// # Safety 797 + /// 798 + /// `op` must be a pointer previously returned by this API and safe to free. 715 799 #[unsafe(no_mangle)] 716 800 pub unsafe extern "C" fn bundle_manager_free_operation(op: *mut COperation) { 717 801 if !op.is_null() { ··· 722 806 } 723 807 } 724 808 809 + /// # Safety 810 + /// 811 + /// `ops` must point to an array of `count` `COperation` previously returned 812 + /// by this API and safe to free. 725 813 #[unsafe(no_mangle)] 726 814 pub unsafe extern "C" fn bundle_manager_free_operations(ops: *mut COperation, count: usize) { 727 815 if !ops.is_null() { ··· 744 832 pub type ExportCallback = 745 833 extern "C" fn(data: *const c_char, len: usize, user_data: *mut std::ffi::c_void) -> i32; 746 834 835 + /// # Safety 836 + /// 837 + /// `manager` must be valid. `spec` must point to a valid `CExportSpec` and 838 + /// `callback` must be a valid function pointer. `out_stats` must be a valid 839 + /// writable pointer if provided. 747 840 #[unsafe(no_mangle)] 748 841 pub unsafe extern "C" fn bundle_manager_export( 749 842 manager: *const CBundleManager, ··· 790 883 }; 791 884 bundle_numbers 792 885 .into_iter() 793 - .filter_map(|num| { 794 - if let Some(meta) = index.get_bundle(num) { 795 - if meta.end_time >= after_ts { 796 - Some(num) 797 - } else { 798 - None 799 - } 800 - } else { 801 - Some(num) 802 - } 886 + .filter(|num| match index.get_bundle(*num) { 887 + Some(meta) => meta.end_time >= after_ts, 888 + None => true, 803 889 }) 804 890 .collect() 805 891 } else {
+18 -23
src/index.rs
··· 220 220 .unwrap_or(""); 221 221 222 222 // Match pattern: NNNNNN.jsonl.zst (16 chars: 6 digits + 10 chars for .jsonl.zst) 223 - if filename.ends_with(".jsonl.zst") && filename.len() == 16 { 224 - if let Ok(bundle_num) = filename[0..6].parse::<u32>() { 225 - bundle_files.push((bundle_num, path)); 226 - } 223 + if filename.ends_with(".jsonl.zst") && filename.len() == 16 224 + && let Ok(bundle_num) = filename[0..6].parse::<u32>() { 225 + bundle_files.push((bundle_num, path)); 227 226 } 228 227 } 229 228 ··· 244 243 } 245 244 246 245 // Validate no gaps in bundle sequence 247 - for i in 0..bundle_files.len() { 246 + for (i, (actual, _)) in bundle_files.iter().enumerate() { 248 247 let expected = (i + 1) as u32; 249 - let actual = bundle_files[i].0; 250 - if actual != expected { 248 + if *actual != expected { 251 249 anyhow::bail!( 252 250 "Gap detected in bundle files: expected {:06}.jsonl.zst, found {:06}.jsonl.zst", 253 251 expected, 254 - actual 252 + *actual 255 253 ); 256 254 } 257 255 } ··· 288 286 let current_count = count_atomic.fetch_add(1, Ordering::Relaxed) + 1; 289 287 290 288 // Update progress periodically 291 - if current_count % update_interval == 0 || current_count == 1 || current_count == bundle_count { 292 - if let Ok(cb_guard) = progress_cb_arc.lock() { 293 - if let Some(ref cb) = *cb_guard { 294 - cb(current_count, bundle_count, bytes_processed, total_bytes); 295 - } 296 - } 289 + if (current_count.is_multiple_of(update_interval) || current_count == 1 || current_count == bundle_count) 290 + && let Ok(cb_guard) = progress_cb_arc.lock() 291 + && let Some(ref cb) = *cb_guard { 292 + cb(current_count, bundle_count, bytes_processed, total_bytes); 297 293 } 298 294 299 295 // Extract embedded metadata from bundle file ··· 311 307 // Verify origin matches 312 308 { 313 309 let origin_guard = detected_origin.lock().unwrap(); 314 - if let Some(ref expected_origin) = *origin_guard { 315 - if embedded.origin != *expected_origin { 316 - anyhow::bail!( 317 - "Bundle {:06}: origin mismatch (expected '{}', got '{}')", 318 - bundle_num, 319 - expected_origin, 320 - embedded.origin 321 - ); 322 - } 310 + if let Some(ref expected_origin) = *origin_guard 311 + && embedded.origin != *expected_origin { 312 + anyhow::bail!( 313 + "Bundle {:06}: origin mismatch (expected '{}', got '{}')", 314 + bundle_num, 315 + expected_origin, 316 + embedded.origin 317 + ); 323 318 } 324 319 } 325 320
+2 -4
src/iterators.rs
··· 157 157 158 158 fn next(&mut self) -> Option<Self::Item> { 159 159 // Check count limit 160 - if let Some(ref mut remaining) = self.count_remaining { 161 - if *remaining == 0 { 162 - return None; 163 - } 160 + if self.count_remaining == Some(0) { 161 + return None; 164 162 } 165 163 166 164 loop {
+162 -211
src/manager.rs
··· 122 122 } 123 123 124 124 /// Options for configuring BundleManager initialization 125 - #[derive(Debug, Clone)] 125 + #[derive(Debug, Clone, Default)] 126 126 pub struct ManagerOptions { 127 127 /// Optional handle resolver URL for resolving @handle.did identifiers 128 128 pub handle_resolver_url: Option<String>, ··· 130 130 pub preload_mempool: bool, 131 131 /// Whether to enable verbose logging 132 132 pub verbose: bool, 133 - } 134 - 135 - impl Default for ManagerOptions { 136 - fn default() -> Self { 137 - Self { 138 - handle_resolver_url: None, 139 - preload_mempool: false, 140 - verbose: false, 141 - } 142 - } 143 133 } 144 134 145 135 /// Trait to allow passing ManagerOptions or using defaults ··· 226 216 } else { 227 217 let mempool_preload_time = mempool_preload_start.elapsed(); 228 218 let mempool_preload_ms = mempool_preload_time.as_secs_f64() * 1000.0; 229 - if let Ok(stats) = manager.get_mempool_stats() { 230 - if stats.count > 0 { 231 - log::debug!( 232 - "[BundleManager] Pre-loaded mempool: {} operations for bundle {} ({:.3}ms)", 233 - stats.count, 234 - stats.target_bundle, 235 - mempool_preload_ms 236 - ); 237 - } 219 + if let Ok(stats) = manager.get_mempool_stats() && stats.count > 0 { 220 + log::debug!( 221 + "[BundleManager] Pre-loaded mempool: {} operations for bundle {} ({:.3}ms)", 222 + stats.count, 223 + stats.target_bundle, 224 + mempool_preload_ms 225 + ); 238 226 } 239 227 } 240 228 } ··· 301 289 302 290 // Try frame-based access first (new format) 303 291 match self.get_operation_raw_with_frames(&bundle_path, position) { 304 - Ok(json) => return Ok(json), 292 + Ok(json) => Ok(json), 305 293 Err(e) => { 306 294 // Fall back to legacy sequential scan 307 295 // This happens for old bundles without frame index 308 296 if let Ok(json) = self.get_operation_raw_legacy(&bundle_path, position) { 309 - return Ok(json); 297 + Ok(json) 298 + } else { 299 + Err(e) 310 300 } 311 - return Err(e); 312 301 } 313 302 } 314 303 } ··· 632 621 log::debug!("[Resolve] Found latest non-nullified operation in mempool, skipping bundle lookup"); 633 622 634 623 // Build document from latest mempool operation 635 - let document = crate::resolver::resolve_did_document(did, &[operation.clone()])?; 624 + let document = crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?; 636 625 let load_time = load_start.elapsed(); 637 626 638 627 return Ok(ResolveResult { ··· 668 657 let mut latest_time = DateTime::parse_from_rfc3339("1970-01-01T00:00:00Z").unwrap(); 669 658 670 659 for loc in &locations { 671 - if !loc.nullified() { 672 - if let Ok(op) = self.get_operation(loc.bundle() as u32, loc.position() as usize) { 673 - if let Ok(op_time) = DateTime::parse_from_rfc3339(&op.created_at) { 674 - if op_time > latest_time { 675 - latest_time = op_time; 676 - latest_operation = Some((op, loc.bundle() as u32, loc.position() as usize)); 677 - } 678 - } 679 - } 660 + if !loc.nullified() 661 + && let Ok(op) = self.get_operation(loc.bundle() as u32, loc.position() as usize) 662 + && let Ok(op_time) = DateTime::parse_from_rfc3339(&op.created_at) 663 + && op_time > latest_time 664 + { 665 + latest_time = op_time; 666 + latest_operation = Some((op, loc.bundle() as u32, loc.position() as usize)); 680 667 } 681 668 } 682 669 let load_time = load_start.elapsed(); ··· 685 672 .ok_or_else(|| anyhow::anyhow!("DID not found: {} (checked bundles and mempool)", did))?; 686 673 687 674 // Build document from latest bundle operation 688 - let document = crate::resolver::resolve_did_document(did, &[operation.clone()])?; 675 + let document = crate::resolver::resolve_did_document(did, std::slice::from_ref(&operation))?; 689 676 690 677 Ok(ResolveResult { 691 678 document, ··· 1261 1248 // Also delete all mempool files to prevent stale data from previous bundles 1262 1249 if let Ok(entries) = std::fs::read_dir(&self.directory) { 1263 1250 for entry in entries.flatten() { 1264 - if let Some(name) = entry.file_name().to_str() { 1265 - if name.starts_with(constants::MEMPOOL_FILE_PREFIX) && name.ends_with(".jsonl") 1266 - { 1267 - let _ = std::fs::remove_file(entry.path()); 1268 - } 1251 + if let Some(name) = entry.file_name().to_str() 1252 + && name.starts_with(constants::MEMPOOL_FILE_PREFIX) && name.ends_with(".jsonl") 1253 + { 1254 + let _ = std::fs::remove_file(entry.path()); 1269 1255 } 1270 1256 } 1271 1257 } ··· 1321 1307 let mut found_stale_files = false; 1322 1308 if let Ok(entries) = std::fs::read_dir(&self.directory) { 1323 1309 for entry in entries.flatten() { 1324 - if let Some(name) = entry.file_name().to_str() { 1325 - if name.starts_with(constants::MEMPOOL_FILE_PREFIX) && name.ends_with(".jsonl") 1326 - { 1327 - // Extract bundle number from filename: plc_mempool_NNNNNN.jsonl 1328 - if let Some(num_str) = name 1329 - .strip_prefix(constants::MEMPOOL_FILE_PREFIX) 1330 - .and_then(|s| s.strip_suffix(".jsonl")) 1331 - { 1332 - if let Ok(bundle_num) = num_str.parse::<u32>() { 1333 - // Delete mempool files for completed bundles or way future bundles 1334 - if bundle_num <= last_bundle || bundle_num > next_bundle_num { 1335 - log::warn!( 1336 - "Removing stale mempool file for bundle {:06}", 1337 - bundle_num 1338 - ); 1339 - let _ = std::fs::remove_file(entry.path()); 1340 - found_stale_files = true; 1341 - } 1342 - } 1310 + if let Some(name) = entry.file_name().to_str() 1311 + && name.starts_with(constants::MEMPOOL_FILE_PREFIX) && name.ends_with(".jsonl") 1312 + { 1313 + // Extract bundle number from filename: plc_mempool_NNNNNN.jsonl 1314 + if let Some(num_str) = name 1315 + .strip_prefix(constants::MEMPOOL_FILE_PREFIX) 1316 + .and_then(|s| s.strip_suffix(".jsonl")) 1317 + && let Ok(bundle_num) = num_str.parse::<u32>() { 1318 + // Delete mempool files for completed bundles or way future bundles 1319 + if bundle_num <= last_bundle || bundle_num > next_bundle_num { 1320 + log::warn!( 1321 + "Removing stale mempool file for bundle {:06}", 1322 + bundle_num 1323 + ); 1324 + let _ = std::fs::remove_file(entry.path()); 1325 + found_stale_files = true; 1343 1326 } 1344 1327 } 1345 1328 } ··· 1363 1346 } 1364 1347 1365 1348 // Get the last operation from the previous bundle 1366 - let last_bundle_time = if next_bundle_num > 1 { 1367 - let last_bundle_result = 1368 - self.load_bundle(next_bundle_num - 1, LoadOptions::default())?; 1369 - if let Some(last_op) = last_bundle_result.operations.last() { 1370 - chrono::DateTime::parse_from_rfc3339(&last_op.created_at) 1371 - .ok() 1372 - .map(|dt| dt.with_timezone(&chrono::Utc)) 1349 + let last_bundle_time = if next_bundle_num > 1 1350 + && let Ok(last_bundle_result) = self.load_bundle(next_bundle_num - 1, LoadOptions::default()) { 1351 + last_bundle_result.operations.last().and_then(|last_op| { 1352 + chrono::DateTime::parse_from_rfc3339(&last_op.created_at) 1353 + .ok() 1354 + .map(|dt| dt.with_timezone(&chrono::Utc)) 1355 + }) 1373 1356 } else { 1374 1357 None 1375 - } 1376 - } else { 1377 - None 1378 - }; 1358 + }; 1379 1359 1380 1360 // Special case: When creating the first bundle (next_bundle_num == 1, meaning 1381 1361 // last_bundle == 0, i.e., empty repository), any existing mempool is likely stale ··· 1397 1377 } 1398 1378 1399 1379 // Check if mempool operations are chronologically valid relative to last bundle 1400 - if let Some(last_time) = last_bundle_time { 1401 - if let Some(first_mempool_time) = mempool_stats.first_time { 1380 + if let Some(last_time) = last_bundle_time 1381 + && let Some(first_mempool_time) = mempool_stats.first_time { 1402 1382 // Case 1: Mempool operations are BEFORE the last bundle (definitely stale) 1403 1383 if first_mempool_time < last_time { 1404 1384 log::warn!("Detected stale mempool data (operations before last bundle)"); ··· 1456 1436 "This likely indicates a previous failed sync attempt. Clearing mempool..." 1457 1437 ); 1458 1438 self.clear_mempool()?; 1459 - } else { 1460 - if *self.verbose.lock().unwrap() { 1461 - log::debug!( 1462 - "Mempool appears recent, allowing resume despite close timestamp" 1463 - ); 1464 - } 1439 + } else if *self.verbose.lock().unwrap() { 1440 + log::debug!( 1441 + "Mempool appears recent, allowing resume despite close timestamp" 1442 + ); 1465 1443 } 1466 1444 return Ok(()); 1467 1445 } 1468 - } 1469 1446 } 1470 1447 1471 1448 // Check if mempool has way too many operations (likely from failed previous attempt) ··· 1621 1598 // If mempool has operations, update cursor AND boundaries from mempool 1622 1599 // (mempool operations already had boundary dedup applied when they were added) 1623 1600 let mempool_stats = self.get_mempool_stats()?; 1624 - if mempool_stats.count > 0 { 1625 - if let Some(last_time) = mempool_stats.last_time { 1626 - if *self.verbose.lock().unwrap() { 1627 - log::debug!( 1628 - "Mempool has {} ops, resuming from {}", 1629 - mempool_stats.count, 1630 - last_time.format("%Y-%m-%dT%H:%M:%S") 1631 - ); 1632 - } 1633 - after_time = last_time.to_rfc3339(); 1601 + if mempool_stats.count > 0 && let Some(last_time) = mempool_stats.last_time { 1602 + if *self.verbose.lock().unwrap() { 1603 + log::debug!( 1604 + "Mempool has {} ops, resuming from {}", 1605 + mempool_stats.count, 1606 + last_time.format("%Y-%m-%dT%H:%M:%S") 1607 + ); 1608 + } 1609 + after_time = last_time.to_rfc3339(); 1634 1610 1635 - // Calculate boundaries from MEMPOOL for next fetch 1636 - let mempool_ops = self.get_mempool_operations()?; 1637 - if !mempool_ops.is_empty() { 1638 - prev_boundary = get_boundary_cids(&mempool_ops); 1639 - if *self.verbose.lock().unwrap() { 1640 - log::info!("Using {} boundary CIDs from mempool", prev_boundary.len()); 1641 - } 1611 + // Calculate boundaries from MEMPOOL for next fetch 1612 + let mempool_ops = self.get_mempool_operations()?; 1613 + if !mempool_ops.is_empty() { 1614 + prev_boundary = get_boundary_cids(&mempool_ops); 1615 + if *self.verbose.lock().unwrap() { 1616 + log::info!("Using {} boundary CIDs from mempool", prev_boundary.len()); 1642 1617 } 1643 1618 } 1644 1619 } ··· 1989 1964 synced += 1; 1990 1965 1991 1966 // Check if we've reached the limit 1992 - if let Some(max) = max_bundles { 1993 - if synced >= max { 1994 - break; 1995 - } 1967 + if let Some(max) = max_bundles && synced >= max { 1968 + break; 1996 1969 } 1997 1970 } 1998 1971 Ok(SyncResult::CaughtUp { .. }) => { ··· 2041 2014 let did_count = unique_dids.len() as u32; 2042 2015 2043 2016 // Use multi-frame compression for better performance on large bundles 2044 - let serialize_time; 2045 - let compress_time; 2046 - let uncompressed_size; 2047 - let compressed_size; 2048 - let frame_count; 2049 - let frame_offsets; 2050 - let compressed_frames; 2051 - let content_hash; 2052 2017 2053 2018 // Compress operations to frames using parallel compression 2054 - let compress_result = { 2055 - let result = crate::bundle_format::compress_operations_to_frames_parallel(&operations)?; 2056 - serialize_time = std::time::Duration::from_secs_f64(result.serialize_time_ms / 1000.0); 2057 - compress_time = std::time::Duration::from_secs_f64(result.compress_time_ms / 1000.0); 2058 - result 2059 - }; 2019 + let compress_result = crate::bundle_format::compress_operations_to_frames_parallel(&operations)?; 2020 + 2021 + let serialize_time = std::time::Duration::from_secs_f64(compress_result.serialize_time_ms / 1000.0); 2022 + let compress_time = std::time::Duration::from_secs_f64(compress_result.compress_time_ms / 1000.0); 2060 2023 2061 - uncompressed_size = compress_result.uncompressed_size; 2062 - compressed_size = compress_result.compressed_size; 2063 - frame_count = compress_result.compressed_frames.len(); 2064 - frame_offsets = compress_result.frame_offsets; 2065 - compressed_frames = compress_result.compressed_frames; 2024 + let uncompressed_size = compress_result.uncompressed_size; 2025 + let compressed_size = compress_result.compressed_size; 2026 + let frame_count = compress_result.compressed_frames.len(); 2027 + let frame_offsets = compress_result.frame_offsets; 2028 + let compressed_frames = compress_result.compressed_frames; 2066 2029 2067 2030 // Calculate content hash from uncompressed data 2068 2031 let hash_start = Instant::now(); 2069 - content_hash = { 2032 + let content_hash = { 2070 2033 use sha2::{Digest, Sha256}; 2071 2034 let mut hasher = Sha256::new(); 2072 2035 let mut missing_raw_json = 0; ··· 2833 2796 continue; 2834 2797 } 2835 2798 2836 - if let Some(ext) = path.extension() { 2837 - if ext == "tmp" { 2838 - let file_size = match fs::metadata(&path) { 2839 - Ok(meta) => meta.len(), 2840 - Err(_) => 0, 2841 - }; 2842 - total_size += file_size; 2843 - files.push(CleanPreviewFile { 2844 - path, 2845 - size: file_size, 2846 - }); 2847 - } 2799 + if path.extension().is_some_and(|ext| ext == "tmp") { 2800 + let file_size = match fs::metadata(&path) { 2801 + Ok(meta) => meta.len(), 2802 + Err(_) => 0, 2803 + }; 2804 + total_size += file_size; 2805 + files.push(CleanPreviewFile { 2806 + path, 2807 + size: file_size, 2808 + }); 2848 2809 } 2849 2810 } 2850 2811 } ··· 2868 2829 2869 2830 // Scan shards directory (.plcbundle/shards/) 2870 2831 let shards_dir = did_index_dir.join(constants::DID_INDEX_SHARDS); 2871 - if shards_dir.exists() { 2872 - if let Ok(entries) = fs::read_dir(&shards_dir) { 2873 - for entry in entries { 2874 - let entry = match entry { 2875 - Ok(e) => e, 2876 - Err(_) => continue, 2877 - }; 2832 + if shards_dir.exists() && let Ok(entries) = fs::read_dir(&shards_dir) { 2833 + for entry in entries { 2834 + let entry = match entry { 2835 + Ok(e) => e, 2836 + Err(_) => continue, 2837 + }; 2878 2838 2879 - let path = entry.path(); 2880 - if !path.is_file() { 2881 - continue; 2882 - } 2839 + let path = entry.path(); 2840 + if !path.is_file() { 2841 + continue; 2842 + } 2883 2843 2884 - if let Some(ext) = path.extension() { 2885 - if ext == "tmp" { 2886 - let file_size = match fs::metadata(&path) { 2887 - Ok(meta) => meta.len(), 2888 - Err(_) => 0, 2889 - }; 2890 - total_size += file_size; 2891 - files.push(CleanPreviewFile { 2892 - path, 2893 - size: file_size, 2894 - }); 2895 - } 2896 - } 2844 + if path.extension().is_some_and(|ext| ext == "tmp") { 2845 + let file_size = match fs::metadata(&path) { 2846 + Ok(meta) => meta.len(), 2847 + Err(_) => 0, 2848 + }; 2849 + total_size += file_size; 2850 + files.push(CleanPreviewFile { 2851 + path, 2852 + size: file_size, 2853 + }); 2897 2854 } 2898 2855 } 2899 2856 } ··· 2945 2902 continue; 2946 2903 } 2947 2904 2948 - if let Some(ext) = path.extension() { 2949 - if ext == "tmp" { 2950 - let file_size = match fs::metadata(&path) { 2951 - Ok(meta) => { 2952 - let size = meta.len(); 2953 - bytes_freed += size; 2954 - size 2955 - } 2956 - Err(_) => 0 2957 - }; 2905 + if path.extension().is_some_and(|ext| ext == "tmp") { 2906 + let file_size = match fs::metadata(&path) { 2907 + Ok(meta) => { 2908 + let size = meta.len(); 2909 + bytes_freed += size; 2910 + size 2911 + } 2912 + Err(_) => 0 2913 + }; 2958 2914 2959 - match fs::remove_file(&path) { 2960 - Ok(_) => { 2961 - files_removed += 1; 2962 - if verbose { 2963 - log::info!(" ✓ Removed: {} ({})", 2964 - path.file_name().and_then(|n| n.to_str()).unwrap_or("?"), 2965 - crate::format::format_bytes(file_size)); 2966 - } 2915 + match fs::remove_file(&path) { 2916 + Ok(_) => { 2917 + files_removed += 1; 2918 + if verbose { 2919 + log::info!(" ✓ Removed: {} ({})", 2920 + path.file_name().and_then(|n| n.to_str()).unwrap_or("?"), 2921 + crate::format::format_bytes(file_size)); 2967 2922 } 2968 - Err(e) => { 2969 - let error_msg = format!("Failed to remove {}: {}", path.display(), e); 2970 - errors.push(error_msg.clone()); 2971 - if verbose { 2972 - log::warn!(" ✗ {}", error_msg); 2973 - } 2923 + } 2924 + Err(e) => { 2925 + let error_msg = format!("Failed to remove {}: {}", path.display(), e); 2926 + errors.push(error_msg.clone()); 2927 + if verbose { 2928 + log::warn!(" ✗ {}", error_msg); 2974 2929 } 2975 2930 } 2976 2931 } ··· 3037 2992 continue; 3038 2993 } 3039 2994 3040 - if let Some(ext) = path.extension() { 3041 - if ext == "tmp" { 3042 - let file_size = match fs::metadata(&path) { 3043 - Ok(meta) => { 3044 - let size = meta.len(); 3045 - bytes_freed += size; 3046 - size 3047 - } 3048 - Err(_) => 0 3049 - }; 2995 + if path.extension().is_some_and(|ext| ext == "tmp") { 2996 + let file_size = match fs::metadata(&path) { 2997 + Ok(meta) => { 2998 + let size = meta.len(); 2999 + bytes_freed += size; 3000 + size 3001 + } 3002 + Err(_) => 0 3003 + }; 3050 3004 3051 - match fs::remove_file(&path) { 3052 - Ok(_) => { 3053 - files_removed += 1; 3054 - if verbose { 3055 - log::info!(" ✓ Removed: {} ({})", 3056 - path.file_name().and_then(|n| n.to_str()).unwrap_or("?"), 3057 - crate::format::format_bytes(file_size)); 3058 - } 3005 + match fs::remove_file(&path) { 3006 + Ok(_) => { 3007 + files_removed += 1; 3008 + if verbose { 3009 + log::info!(" ✓ Removed: {} ({})", 3010 + path.file_name().and_then(|n| n.to_str()).unwrap_or("?"), 3011 + crate::format::format_bytes(file_size)); 3059 3012 } 3060 - Err(e) => { 3061 - let error_msg = format!("Failed to remove {}: {}", path.display(), e); 3062 - errors.push(error_msg.clone()); 3063 - if verbose { 3064 - log::warn!(" ✗ {}", error_msg); 3065 - } 3013 + } 3014 + Err(e) => { 3015 + let error_msg = format!("Failed to remove {}: {}", path.display(), e); 3016 + errors.push(error_msg.clone()); 3017 + if verbose { 3018 + log::warn!(" ✗ {}", error_msg); 3066 3019 } 3067 3020 } 3068 3021 } ··· 3333 3286 } 3334 3287 3335 3288 fn matches_filter(&self, op: &Operation, filter: &OperationFilter) -> bool { 3336 - if let Some(ref did) = filter.did { 3337 - if &op.did != did { 3338 - return false; 3339 - } 3289 + if let Some(ref did) = filter.did 3290 + && &op.did != did { 3291 + return false; 3340 3292 } 3341 3293 3342 - if let Some(ref op_type) = filter.operation_type { 3343 - if &op.operation != op_type { 3344 - return false; 3345 - } 3294 + if let Some(ref op_type) = filter.operation_type 3295 + && &op.operation != op_type { 3296 + return false; 3346 3297 } 3347 3298 3348 3299 if !filter.include_nullified && op.nullified {
+5 -5
src/mempool.rs
··· 152 152 // Update DID index for new operations 153 153 for (offset, op) in self.operations[start_idx..].iter().enumerate() { 154 154 let idx = start_idx + offset; 155 - self.did_index.entry(op.did.clone()).or_insert_with(Vec::new).push(idx); 155 + self.did_index.entry(op.did.clone()).or_default().push(idx); 156 156 } 157 157 158 158 self.validated = true; ··· 451 451 452 452 // Update DID index 453 453 let did = self.operations[idx].did.clone(); 454 - self.did_index.entry(did).or_insert_with(Vec::new).push(idx); 454 + self.did_index.entry(did).or_default().push(idx); 455 455 } 456 456 457 457 // Validate loaded data ··· 572 572 fn rebuild_did_index(&mut self) { 573 573 self.did_index.clear(); 574 574 for (idx, op) in self.operations.iter().enumerate() { 575 - self.did_index.entry(op.did.clone()).or_insert_with(Vec::new).push(idx); 575 + self.did_index.entry(op.did.clone()).or_default().push(idx); 576 576 } 577 577 } 578 578 ··· 809 809 810 810 let stats = mempool.stats(); 811 811 assert_eq!(stats.count, 0); 812 - assert_eq!(stats.can_create_bundle, false); 812 + assert!(!stats.can_create_bundle); 813 813 assert_eq!(stats.target_bundle, 1); 814 814 } 815 815 ··· 829 829 830 830 let stats = mempool.stats(); 831 831 assert_eq!(stats.count, 2); 832 - assert_eq!(stats.can_create_bundle, false); // Need BUNDLE_SIZE (10000) ops 832 + assert!(!stats.can_create_bundle); // Need BUNDLE_SIZE (10000) ops 833 833 assert_eq!(stats.target_bundle, 1); 834 834 assert!(stats.first_time.is_some()); 835 835 assert!(stats.last_time.is_some());
+6 -6
src/operations.rs
··· 56 56 57 57 let operation = value.get("operation") 58 58 .cloned() 59 - .unwrap_or_else(|| Value::new()); 59 + .unwrap_or_else(Value::new); 60 60 61 61 // Extract optional fields with defaults 62 62 let cid = value.get("cid") ··· 133 133 134 134 let op = Operation::from_json(json).unwrap(); 135 135 assert_eq!(op.did, "did:plc:abcdefghijklmnopqrstuvwx"); 136 - assert_eq!(op.nullified, false); 136 + assert!(!op.nullified); 137 137 assert_eq!(op.created_at, "2024-01-01T00:00:00Z"); 138 138 assert!(op.cid.is_none()); 139 139 assert!(op.raw_json.is_some()); ··· 152 152 153 153 let op = Operation::from_json(json).unwrap(); 154 154 assert_eq!(op.did, "did:plc:abcdefghijklmnopqrstuvwx"); 155 - assert_eq!(op.nullified, true); 155 + assert!(op.nullified); 156 156 assert_eq!(op.created_at, "2024-01-01T12:34:56Z"); 157 157 assert_eq!(op.cid, Some("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi".to_string())); 158 158 ··· 224 224 }"#; 225 225 226 226 let op = Operation::from_json(json).unwrap(); 227 - assert_eq!(op.nullified, false); 227 + assert!(!op.nullified); 228 228 } 229 229 230 230 #[test] ··· 257 257 assert!(filter.did.is_none()); 258 258 assert!(filter.operation_type.is_none()); 259 259 assert!(filter.time_range.is_none()); 260 - assert_eq!(filter.include_nullified, false); 260 + assert!(!filter.include_nullified); 261 261 } 262 262 263 263 #[test] ··· 298 298 299 299 assert_eq!(op_with_loc.bundle, 1); 300 300 assert_eq!(op_with_loc.position, 42); 301 - assert_eq!(op_with_loc.nullified, true); 301 + assert!(op_with_loc.nullified); 302 302 assert_eq!(op_with_loc.operation.did, "did:plc:test"); 303 303 } 304 304 }
+34 -33
src/plc_client.rs
··· 257 257 fn parse_retry_after(response: &reqwest::Response) -> Duration { 258 258 const MAX_RETRY_SECONDS: u64 = 60; 259 259 260 - if let Some(retry_after_header) = response.headers().get("retry-after") { 261 - if let Ok(retry_after_str) = retry_after_header.to_str() { 262 - // Try parsing as seconds (integer) - most common format 263 - if let Ok(seconds) = retry_after_str.parse::<u64>() { 264 - // Cap at maximum wait time 265 - return Duration::from_secs(seconds.min(MAX_RETRY_SECONDS)); 266 - } 260 + if let Some(retry_after_header) = response.headers().get("retry-after") 261 + && let Ok(retry_after_str) = retry_after_header.to_str() 262 + { 263 + // Try parsing as seconds (integer) - most common format 264 + if let Ok(seconds) = retry_after_str.parse::<u64>() { 265 + // Cap at maximum wait time 266 + return Duration::from_secs(seconds.min(MAX_RETRY_SECONDS)); 267 + } 267 268 268 - // Try parsing as HTTP date (RFC 7231) 269 - // httpdate::parse_http_date returns a SystemTime 270 - if let Ok(http_time) = httpdate::parse_http_date(retry_after_str) { 271 - if let Ok(duration) = http_time.duration_since(std::time::SystemTime::now()) { 272 - // Cap at maximum wait time 273 - return duration.min(Duration::from_secs(MAX_RETRY_SECONDS)); 274 - } 275 - } 269 + // Try parsing as HTTP date (RFC 7231) 270 + // httpdate::parse_http_date returns a SystemTime 271 + if let Ok(http_time) = httpdate::parse_http_date(retry_after_str) 272 + && let Ok(duration) = http_time.duration_since(std::time::SystemTime::now()) 273 + { 274 + // Cap at maximum wait time 275 + return duration.min(Duration::from_secs(MAX_RETRY_SECONDS)); 276 276 } 277 277 } 278 278 ··· 280 280 Duration::from_secs(MAX_RETRY_SECONDS) 281 281 } 282 282 283 - #[cfg(test)] 284 - mod tests { 285 - use super::*; 286 - 287 - #[tokio::test] 288 - async fn test_plc_client_new() { 289 - let client = PLCClient::new("https://plc.directory").unwrap(); 290 - // Verify client was created successfully 291 - assert!(client.base_url.contains("plc.directory")); 292 - } 293 - 294 - #[tokio::test] 295 - async fn test_plc_client_new_with_trailing_slash() { 296 - let client = PLCClient::new("https://plc.directory/").unwrap(); 297 - // URL should be stored as-is (no normalization in PLCClient) 298 - assert!(client.base_url.contains("plc.directory")); 299 - } 300 - } 301 283 302 284 /// Simple token bucket rate limiter 303 285 /// Prevents burst requests by starting with 0 permits and refilling at steady rate ··· 346 328 } 347 329 } 348 330 331 + #[cfg(test)] 332 + mod tests { 333 + use super::*; 334 + 335 + #[tokio::test] 336 + async fn test_plc_client_new() { 337 + let client = PLCClient::new("https://plc.directory").unwrap(); 338 + // Verify client was created successfully 339 + assert!(client.base_url.contains("plc.directory")); 340 + } 341 + 342 + #[tokio::test] 343 + async fn test_plc_client_new_with_trailing_slash() { 344 + let client = PLCClient::new("https://plc.directory/").unwrap(); 345 + // URL should be stored as-is (no normalization in PLCClient) 346 + assert!(client.base_url.contains("plc.directory")); 347 + } 348 + } 349 +
+4 -4
src/resolver.rs
··· 154 154 } 155 155 156 156 // Handle legacy handle format 157 - if let Some(handle) = op_data.get("handle").and_then(|v| v.as_str()) { 158 - if state.also_known_as.is_empty() { 159 - state.also_known_as = vec![format!("at://{}", handle)]; 160 - } 157 + if let Some(handle) = op_data.get("handle").and_then(|v| v.as_str()) 158 + && state.also_known_as.is_empty() 159 + { 160 + state.also_known_as = vec![format!("at://{}", handle)]; 161 161 } 162 162 163 163 // Update services
+7 -13
src/runtime.rs
··· 97 97 self.trigger_shutdown(); 98 98 99 99 // Always abort resolver tasks immediately - they're just keep-alive pings 100 - if let Some(resolver_tasks) = resolver_tasks { 101 - if !resolver_tasks.is_empty() { 102 - resolver_tasks.abort_all(); 103 - while let Some(result) = resolver_tasks.join_next().await { 104 - if let Err(e) = result { 105 - if !e.is_cancelled() { 106 - eprintln!("Resolver task error: {}", e); 107 - } 108 - } 100 + if let Some(resolver_tasks) = resolver_tasks && !resolver_tasks.is_empty() { 101 + resolver_tasks.abort_all(); 102 + while let Some(result) = resolver_tasks.join_next().await { 103 + if let Err(e) = result && !e.is_cancelled() { 104 + eprintln!("Resolver task error: {}", e); 109 105 } 110 106 } 111 107 } ··· 118 114 background_tasks.abort_all(); 119 115 // Wait briefly for aborted tasks to finish 120 116 while let Some(result) = background_tasks.join_next().await { 121 - if let Err(e) = result { 122 - if !e.is_cancelled() { 123 - eprintln!("Background task error: {}", e); 124 - } 117 + if let Err(e) = result && !e.is_cancelled() { 118 + eprintln!("Background task error: {}", e); 125 119 } 126 120 } 127 121 } else {
+4 -6
src/server/handle_root.rs
··· 23 23 let mut response = String::new(); 24 24 25 25 // ASCII art banner 26 - response.push_str("\n"); 26 + response.push('\n'); 27 27 response.push_str(&crate::server::get_ascii_art_banner(&state.config.version)); 28 28 response.push_str(&format!(" {} server\n\n", constants::BINARY_NAME)); 29 29 response.push_str("*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*\n"); ··· 34 34 response.push_str("| any time. Do not use this for production systems. |\n"); 35 35 response.push_str("| Please wait for the 1.0 release. |\n"); 36 36 response.push_str("|________________________________________________________________|\n"); 37 - response.push_str("\n"); 37 + response.push('\n'); 38 38 response.push_str("What is PLC Bundle?\n"); 39 39 response.push_str("━━━━━━━━━━━━━━━━━━━━\n"); 40 40 response.push_str("plcbundle archives AT Protocol's DID PLC Directory operations into\n"); ··· 82 82 } 83 83 } 84 84 85 - if state.config.sync_mode { 86 - if let Ok(mempool_stats) = state.manager.get_mempool_stats() { 85 + if state.config.sync_mode && let Ok(mempool_stats) = state.manager.get_mempool_stats() { 87 86 response.push_str("\nMempool\n"); 88 87 response.push_str("━━━━━━━\n"); 89 88 response.push_str(&format!( ··· 123 122 } else { 124 123 response.push_str(" (empty)\n"); 125 124 } 126 - } 127 125 } 128 126 129 127 if state.config.enable_resolver { ··· 164 162 )); 165 163 } 166 164 } 167 - response.push_str("\n"); 165 + response.push('\n'); 168 166 } 169 167 170 168 response.push_str("Server Stats\n");
+1 -3
src/server/handle_status.rs
··· 66 66 response["bundles"]["total_operations"] = json!(total_ops); 67 67 } 68 68 69 - if state.config.sync_mode { 70 - if let Ok(mempool_stats) = state.manager.get_mempool_stats() { 69 + if state.config.sync_mode && let Ok(mempool_stats) = state.manager.get_mempool_stats() { 71 70 response["mempool"] = json!({ 72 71 "count": mempool_stats.count, 73 72 "target_bundle": mempool_stats.target_bundle, ··· 76 75 "bundle_size": constants::BUNDLE_SIZE, 77 76 "operations_needed": constants::BUNDLE_SIZE - mempool_stats.count, 78 77 }); 79 - } 80 78 } 81 79 82 80 // DID Index stats (get_stats is fast, but we should still avoid holding lock in async context)
+2 -5
src/server/mod.rs
··· 92 92 /// Returns the ASCII art banner as a string 93 93 #[cfg(feature = "server")] 94 94 pub fn get_ascii_art_banner(_version: &str) -> String { 95 - format!( 96 - r#" 95 + r#" 97 96 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 98 97 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 99 98 ⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀ ··· 116 115 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 117 116 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 118 117 ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 119 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 120 - "# 121 - ) 118 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀"#.to_string() 122 119 }
+2 -2
src/server/startup.rs
··· 589 589 use crate::server::get_ascii_art_banner; 590 590 591 591 // Print ASCII art banner 592 - eprint!( 593 - "{}\n", 592 + eprintln!( 593 + "{}", 594 594 get_ascii_art_banner(env!("CARGO_PKG_VERSION")) 595 595 ); 596 596 eprintln!("{} HTTP server started", constants::BINARY_NAME);
+16 -34
src/server/utils.rs
··· 55 55 56 56 // Must not have invalid characters 57 57 for c in input.chars() { 58 - if !((c >= 'a' && c <= 'z') 59 - || (c >= 'A' && c <= 'Z') 60 - || (c >= '0' && c <= '9') 61 - || c == '.' 62 - || c == '-') 63 - { 58 + if !(c.is_ascii_lowercase() || c.is_ascii_uppercase() || c.is_ascii_digit() || c == '.' || c == '-') { 64 59 return false; 65 60 } 66 61 } ··· 121 116 122 117 /// Extract base URL from request headers and URI 123 118 pub fn extract_base_url(headers: &HeaderMap, uri: &Uri) -> String { 124 - if let Some(host) = headers.get("host") { 125 - if let Ok(host_str) = host.to_str() { 119 + if let Some(host_str) = headers.get("host").and_then(|h| h.to_str().ok()) { 126 120 // Check if request is HTTPS (from X-Forwarded-Proto or X-Forwarded-Ssl) 127 - let scheme = if headers 121 + let is_https = headers 128 122 .get("x-forwarded-proto") 129 123 .and_then(|v| v.to_str().ok()) 130 124 .map(|s| s == "https") 131 125 .unwrap_or(false) 132 - { 133 - "https" 134 - } else if headers 135 - .get("x-forwarded-ssl") 136 - .and_then(|v| v.to_str().ok()) 137 - .map(|s| s == "on") 138 - .unwrap_or(false) 139 - { 140 - "https" 141 - } else { 142 - "http" 143 - }; 126 + || headers 127 + .get("x-forwarded-ssl") 128 + .and_then(|v| v.to_str().ok()) 129 + .map(|s| s == "on") 130 + .unwrap_or(false); 131 + 132 + let scheme = if is_https { "https" } else { "http" }; 144 133 return format!("{}://{}", scheme, host_str); 145 - } 146 134 } 147 135 148 136 if let Some(authority) = uri.authority() { ··· 170 158 171 159 // Simple parser: "60s", "5m", "1h" 172 160 let s = s.trim(); 173 - if s.ends_with('s') { 174 - let secs: u64 = s[..s.len() - 1] 175 - .parse() 176 - .context("Invalid duration format")?; 161 + if let Some(stripped) = s.strip_suffix('s') { 162 + let secs: u64 = stripped.parse().context("Invalid duration format")?; 177 163 Ok(Duration::from_secs(secs)) 178 - } else if s.ends_with('m') { 179 - let mins: u64 = s[..s.len() - 1] 180 - .parse() 181 - .context("Invalid duration format")?; 164 + } else if let Some(stripped) = s.strip_suffix('m') { 165 + let mins: u64 = stripped.parse().context("Invalid duration format")?; 182 166 Ok(Duration::from_secs(mins * 60)) 183 - } else if s.ends_with('h') { 184 - let hours: u64 = s[..s.len() - 1] 185 - .parse() 186 - .context("Invalid duration format")?; 167 + } else if let Some(stripped) = s.strip_suffix('h') { 168 + let hours: u64 = stripped.parse().context("Invalid duration format")?; 187 169 Ok(Duration::from_secs(hours * 3600)) 188 170 } else { 189 171 // Try parsing as seconds
+6 -8
src/server/websocket.rs
··· 83 83 let start_bundle_idx = (start_bundle_num - 1) as usize; 84 84 85 85 if start_bundle_idx < bundles.len() { 86 - for i in start_bundle_idx..bundles.len() { 87 - let bundle_num = bundles[i].bundle_number; 86 + for (i, bundle) in bundles.iter().enumerate().skip(start_bundle_idx) { 87 + let bundle_num = bundle.bundle_number; 88 88 let skip_until = if i == start_bundle_idx { 89 89 start_position 90 90 } else { ··· 197 197 streamed += 1; 198 198 199 199 // Send ping every 1000 operations 200 - if streamed % 1000 == 0 { 201 - if sender.send(Message::Ping(Bytes::new())).await.is_err() { 202 - break; 203 - } 200 + if streamed % 1000 == 0 && sender.send(Message::Ping(Bytes::new())).await.is_err() { 201 + break; 204 202 } 205 203 } 206 204 ··· 227 225 return Ok(()); 228 226 } 229 227 230 - for i in *last_seen_count..mempool_ops.len() { 228 + for (i, op) in mempool_ops.iter().enumerate().skip(*last_seen_count) { 231 229 let record_num = bundle_record_base + i as u64; 232 230 if record_num < start_cursor { 233 231 continue; 234 232 } 235 233 236 234 // Send operation as JSON 237 - let json = match sonic_rs::to_string(&mempool_ops[i]) { 235 + let json = match sonic_rs::to_string(op) { 238 236 Ok(j) => j, 239 237 Err(_) => continue, // Skip invalid operations 240 238 };
+32 -33
src/sync.rs
··· 1 + 1 2 // Sync module - PLC directory synchronization 2 3 use crate::constants; 3 4 use crate::operations::Operation; ··· 19 20 nullified: Option<Value>, 20 21 #[serde(rename = "createdAt")] 21 22 created_at: String, 23 + 24 + 22 25 #[serde(skip)] 23 26 pub raw_json: Option<String>, 24 27 } 25 28 26 29 impl From<PLCOperation> for Operation { 27 30 fn from(plc: PLCOperation) -> Self { 28 - let is_nullified = plc.nullified.as_ref().map_or(false, |v| { 29 - v.as_bool().unwrap_or(false) || v.as_str().map_or(false, |s| !s.is_empty()) 31 + let is_nullified = plc.nullified.as_ref().is_some_and(|v| { 32 + v.as_bool().unwrap_or(false) || v.as_str().is_some_and(|s| !s.is_empty()) 30 33 }); 31 34 32 35 Self { ··· 72 75 operations.retain(|op| { 73 76 op.cid 74 77 .as_ref() 75 - .map_or(true, |cid| !prev_boundary.contains(cid)) 78 + .is_none_or(|cid| !prev_boundary.contains(cid)) 76 79 }); 77 80 78 81 operations ··· 94 97 total_duration_ms: u64, 95 98 fetch_requests: usize, 96 99 did_index_compacted: bool, 100 + 97 101 unique_dids: u32, 98 102 size_bytes: u64, 99 103 }, ··· 156 160 pub trait SyncLogger: Send + Sync { 157 161 fn on_sync_start(&self, interval: Duration); 158 162 163 + #[allow(clippy::too_many_arguments)] 159 164 fn on_bundle_created( 160 165 &self, 161 166 bundle_num: u32, ··· 171 176 size_bytes: u64, 172 177 ); 173 178 179 + // Allow the sync logger to accept multiple arguments for detailed bundle info 180 + // (Removed workaround method; use allow attribute on trait method instead) 181 + 182 + 174 183 fn on_caught_up( 175 184 &self, 176 185 next_bundle: u32, ··· 238 247 } 239 248 } 240 249 241 - 242 250 impl SyncLogger for SyncLoggerImpl { 243 251 fn as_any(&self) -> &dyn Any { 244 252 self ··· 246 254 247 255 fn on_sync_start(&self, interval: Duration) { 248 256 eprintln!("[Sync] Starting initial sync..."); 249 - if let Some(verbose) = &self.verbose { 250 - if *verbose.lock().unwrap() { 251 - eprintln!("[Sync] Sync loop interval: {:?}", interval); 252 - } 257 + if let Some(verbose) = &self.verbose && *verbose.lock().unwrap() { 258 + eprintln!("[Sync] Sync loop interval: {:?}", interval); 253 259 } 254 260 } 255 261 262 + #[allow(clippy::too_many_arguments)] 256 263 fn on_bundle_created( 257 264 &self, 258 265 bundle_num: u32, ··· 345 352 client: PLCClient, 346 353 config: SyncConfig, 347 354 logger: Option<Box<dyn SyncLogger>>, 355 + #[allow(clippy::type_complexity)] 348 356 event_callback: Option<Box<dyn Fn(&SyncEvent) + Send + Sync>>, 349 357 } 350 358 ··· 471 479 472 480 loop { 473 481 // Check for shutdown if configured 474 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 475 - if *shutdown_rx.borrow() { 482 + if let Some(ref shutdown_rx) = self.config.shutdown_rx 483 + && *shutdown_rx.borrow() { 476 484 break; 477 485 } 478 - } 479 486 480 487 // Get stats before sync to track compaction 481 488 let stats_before = self.manager.get_did_index_stats(); ··· 519 526 self.show_compaction_if_needed(did_index_compacted, delta_segments_before, index_ms); 520 527 521 528 // Check if we've reached the limit 522 - if let Some(max) = max_bundles { 523 - if synced >= max { 524 - break; 525 - } 529 + if let Some(max) = max_bundles && synced >= max { 530 + break; 526 531 } 527 532 } 528 533 Ok(crate::manager::SyncResult::CaughtUp { ··· 586 591 587 592 loop { 588 593 // Check for shutdown before starting sync 589 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 590 - if *shutdown_rx.borrow() { 591 - if self.config.verbose { 592 - eprintln!("[Sync] Shutdown requested, stopping..."); 593 - } 594 - break; 594 + if let Some(ref shutdown_rx) = self.config.shutdown_rx && *shutdown_rx.borrow() { 595 + if self.config.verbose { 596 + eprintln!("[Sync] Shutdown requested, stopping..."); 595 597 } 598 + break; 596 599 } 597 600 598 601 // Update DID index on every bundle (now fast with delta segments) ··· 658 661 } 659 662 660 663 // Check for shutdown before sleeping 661 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 662 - if *shutdown_rx.borrow() { 663 - if self.config.verbose { 664 - eprintln!("[Sync] Shutdown requested, stopping..."); 665 - } 666 - break; 664 + if let Some(ref shutdown_rx) = self.config.shutdown_rx && *shutdown_rx.borrow() { 665 + if self.config.verbose { 666 + eprintln!("[Sync] Shutdown requested, stopping..."); 667 667 } 668 + break; 668 669 } 669 670 670 671 // During initial sync, sleep briefly (500ms) to avoid hammering the API ··· 697 698 fetch_duration_ms, 698 699 }) => { 699 700 // Check for shutdown 700 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 701 - if *shutdown_rx.borrow() { 702 - if self.config.verbose { 703 - eprintln!("[Sync] Shutdown requested, stopping..."); 704 - } 705 - break; 701 + if let Some(ref shutdown_rx) = self.config.shutdown_rx && *shutdown_rx.borrow() { 702 + if self.config.verbose { 703 + eprintln!("[Sync] Shutdown requested, stopping..."); 706 704 } 705 + break; 707 706 } 708 707 709 708 // Caught up to the end of the chain
+1 -1
src/verification.rs
··· 224 224 225 225 let count = reader 226 226 .lines() 227 - .filter_map(|l| l.ok()) 227 + .map_while(Result::ok) 228 228 .filter(|l| !l.is_empty()) 229 229 .count(); 230 230
+6 -6
tests/manager.rs
··· 33 33 // Query DID operations and resolve DID 34 34 let did = "did:plc:aaaaaaaaaaaaaaaaaaaaaaaa"; 35 35 let did_ops = manager.get_did_operations(did, false, false)?; 36 - assert!(did_ops.operations.len() >= 1); 36 + assert!(!did_ops.operations.is_empty()); 37 37 38 38 let resolved = manager.resolve_did(did)?; 39 39 assert_eq!(resolved.document.id, did); ··· 75 75 let arc_mgr = Arc::new(manager.clone_for_arc()); 76 76 77 77 // Range iterator over bundle 1 should yield 10 operations 78 - let mut range_iter = arc_mgr.get_operations_range(1, 1, None); 78 + let range_iter = arc_mgr.get_operations_range(1, 1, None); 79 79 let mut count = 0usize; 80 - while let Some(res) = range_iter.next() { 80 + for res in range_iter { 81 81 let op = res?; 82 82 assert!(op.did.starts_with("did:plc:")); 83 83 count += 1; ··· 93 93 count: Some(5), 94 94 after_timestamp: None, 95 95 }; 96 - let mut export_iter = plcbundle::ExportIterator::new(Arc::clone(&arc_mgr), spec); 96 + let export_iter = plcbundle::ExportIterator::new(Arc::clone(&arc_mgr), spec); 97 97 let mut exported = Vec::new(); 98 - while let Some(item) = export_iter.next() { 98 + for item in export_iter { 99 99 let s = item?; 100 100 exported.push(s); 101 101 } ··· 130 130 // Verify we can query DID operations from the newly built index 131 131 let first_did = "did:plc:aaaaaaaaaaaaaaaaaaaaaaaa"; 132 132 let did_ops2 = manager2.get_did_operations(first_did, false, false)?; 133 - assert!(did_ops2.operations.len() >= 1); 133 + assert!(!did_ops2.operations.is_empty()); 134 134 135 135 Ok(()) 136 136 }
+1 -1
tests/server.rs
··· 106 106 } 107 107 let json: serde_json::Value = serde_json::from_str(&body_text)?; 108 108 assert!(json.is_array()); 109 - assert!(json.as_array().unwrap().len() >= 1); 109 + assert!(!json.as_array().unwrap().is_empty()); 110 110 assert_eq!(json.as_array().unwrap()[0]["did"], "did:plc:aaaaaaaaaaaaaaaaaaaaaaaa"); 111 111 112 112 server_handle.abort();