High-performance implementation of plcbundle written in Rust
at main 648 lines 22 kB view raw
1// src/bundle_format.rs 2//! Bundle file format implementation with zstd skippable frames and multi-frame compression 3//! 4//! Format: 5//! - Skippable frame with metadata (magic 0x184D2A50) 6//! - Multiple zstd frames (100 operations each) 7//! - Frame offsets in metadata allow efficient random access 8 9use crate::constants; 10use anyhow::Result; 11use serde::{Deserialize, Serialize}; 12use std::io::{Read, Seek, SeekFrom, Write}; 13 14/// Skippable frame magic number for metadata 15pub const SKIPPABLE_MAGIC_METADATA: u32 = 0x184D2A50; 16 17/// Bundle metadata stored in skippable frame 18#[derive(Debug, Clone, Serialize, Deserialize)] 19pub struct BundleMetadata { 20 /// Format version 21 pub format: String, 22 23 /// Bundle number 24 pub bundle_number: u32, 25 26 /// Source origin 27 pub origin: String, 28 29 /// Content hash (SHA256 of uncompressed JSONL) 30 pub content_hash: String, 31 32 /// Parent bundle hash (for chaining) 33 #[serde(skip_serializing_if = "Option::is_none")] 34 pub parent_hash: Option<String>, 35 36 /// Total uncompressed size in bytes 37 #[serde(default)] 38 pub uncompressed_size: Option<u64>, 39 40 /// Total compressed size in bytes (excluding metadata frame) 41 #[serde(default)] 42 pub compressed_size: Option<u64>, 43 44 /// Number of operations 45 pub operation_count: usize, 46 47 /// Number of unique DIDs 48 pub did_count: usize, 49 50 /// First operation timestamp 51 pub start_time: String, 52 53 /// Last operation timestamp 54 pub end_time: String, 55 56 /// Creation timestamp 57 pub created_at: String, 58 59 /// Creator version (e.g., "plcbundle/0.9.0") 60 pub created_by: String, 61 62 /// Number of frames 63 pub frame_count: usize, 64 65 /// Operations per frame 66 pub frame_size: usize, 67 68 /// Frame byte offsets (RELATIVE to first data frame) 69 pub frame_offsets: Vec<i64>, 70} 71 72/// Write a zstd skippable frame 73pub fn write_skippable_frame<W: Write>(writer: &mut W, magic: u32, data: &[u8]) -> Result<usize> { 74 let frame_size = data.len() as u32; 75 76 // Write magic number (little-endian) 77 writer.write_all(&magic.to_le_bytes())?; 78 79 // Write frame size (little-endian) 80 writer.write_all(&frame_size.to_le_bytes())?; 81 82 // Write data 83 writer.write_all(data)?; 84 85 Ok(8 + data.len()) // magic(4) + size(4) + data 86} 87 88/// Read a zstd skippable frame 89pub fn read_skippable_frame<R: Read>(reader: &mut R) -> Result<(u32, Vec<u8>)> { 90 // Read magic number 91 let mut magic_buf = [0u8; 4]; 92 reader.read_exact(&mut magic_buf)?; 93 let magic = u32::from_le_bytes(magic_buf); 94 95 // Verify it's a skippable frame (0x184D2A50 - 0x184D2A5F) 96 if !(0x184D2A50..=0x184D2A5F).contains(&magic) { 97 anyhow::bail!("Not a skippable frame: magic=0x{:08X}", magic); 98 } 99 100 // Read frame size 101 let mut size_buf = [0u8; 4]; 102 reader.read_exact(&mut size_buf)?; 103 let frame_size = u32::from_le_bytes(size_buf); 104 105 // Read data 106 let mut data = vec![0u8; frame_size as usize]; 107 reader.read_exact(&mut data)?; 108 109 Ok((magic, data)) 110} 111 112/// Write metadata as skippable frame 113pub fn write_metadata_frame<W: Write>(writer: &mut W, metadata: &BundleMetadata) -> Result<usize> { 114 let json_data = sonic_rs::to_vec(metadata)?; 115 write_skippable_frame(writer, SKIPPABLE_MAGIC_METADATA, &json_data) 116} 117 118/// Read metadata from skippable frame 119pub fn read_metadata_frame<R: Read>(reader: &mut R) -> Result<BundleMetadata> { 120 let (magic, data) = read_skippable_frame(reader)?; 121 122 if magic != SKIPPABLE_MAGIC_METADATA { 123 anyhow::bail!( 124 "Unexpected magic: 0x{:08X} (expected 0x{:08X})", 125 magic, 126 SKIPPABLE_MAGIC_METADATA 127 ); 128 } 129 130 let metadata: BundleMetadata = sonic_rs::from_slice(&data)?; 131 Ok(metadata) 132} 133 134/// Extract metadata from bundle file without decompressing data 135pub fn extract_metadata_from_file(path: &std::path::Path) -> Result<BundleMetadata> { 136 let file = std::fs::File::open(path)?; 137 let mut reader = std::io::BufReader::new(file); 138 read_metadata_frame(&mut reader) 139} 140 141/// Load single operation using frame index for efficient access 142pub fn load_operation_at_position<R: Read + Seek>( 143 reader: &mut R, 144 position: usize, 145 frame_offsets: &[i64], 146 metadata_frame_size: i64, 147) -> Result<String> { 148 let frame_index = position / constants::FRAME_SIZE; 149 let line_in_frame = position % constants::FRAME_SIZE; 150 151 if frame_index >= frame_offsets.len() - 1 { 152 anyhow::bail!( 153 "Position {} out of bounds (frame {}, total frames {})", 154 position, 155 frame_index, 156 frame_offsets.len() - 1 157 ); 158 } 159 160 // Convert relative offsets to absolute 161 let start_offset = metadata_frame_size + frame_offsets[frame_index]; 162 let end_offset = metadata_frame_size + frame_offsets[frame_index + 1]; 163 let frame_length = end_offset - start_offset; 164 165 if frame_length <= 0 || frame_length > 10 * 1024 * 1024 { 166 anyhow::bail!( 167 "Invalid frame length: {} (offsets: {}-{})", 168 frame_length, 169 start_offset, 170 end_offset 171 ); 172 } 173 174 // Seek to frame start 175 reader.seek(SeekFrom::Start(start_offset as u64))?; 176 177 // Read compressed frame 178 let mut compressed_frame = vec![0u8; frame_length as usize]; 179 reader.read_exact(&mut compressed_frame)?; 180 181 // Decompress frame 182 let decompressed = zstd::bulk::decompress(&compressed_frame, 10 * 1024 * 1024)?; 183 184 // Find the line we want 185 use std::io::BufRead; 186 let cursor = std::io::Cursor::new(decompressed); 187 let lines = cursor.lines(); 188 189 for (idx, line_result) in lines.enumerate() { 190 if idx == line_in_frame { 191 return Ok(line_result?); 192 } 193 } 194 195 anyhow::bail!("Position {} not found in frame {}", position, frame_index) 196} 197 198/// Result of compressing operations into frames 199#[derive(Debug)] 200pub struct FrameCompressionResult { 201 /// Compressed frames (one per FRAME_SIZE operations) 202 pub compressed_frames: Vec<Vec<u8>>, 203 /// Frame offsets (relative to first data frame) 204 pub frame_offsets: Vec<i64>, 205 /// Total uncompressed size 206 pub uncompressed_size: u64, 207 /// Total compressed size 208 pub compressed_size: u64, 209 /// Time spent serializing (in milliseconds) 210 pub serialize_time_ms: f64, 211 /// Time spent compressing (in milliseconds) 212 pub compress_time_ms: f64, 213} 214 215/// Compress operations into multiple frames using parallel compression 216/// 217/// Each frame contains FRAME_SIZE operations (except possibly the last frame). 218/// Returns the compressed frames and their relative offsets. 219/// Uses rayon to compress multiple frames in parallel for better performance. 220pub fn compress_operations_to_frames_parallel( 221 operations: &[crate::operations::Operation], 222) -> anyhow::Result<FrameCompressionResult> { 223 use rayon::prelude::*; 224 use std::time::Instant; 225 226 let num_frames = operations.len().div_ceil(constants::FRAME_SIZE); 227 228 // Process all frames in parallel 229 let frame_results: Vec<_> = (0..num_frames) 230 .into_par_iter() 231 .map(|frame_idx| { 232 let frame_start = frame_idx * constants::FRAME_SIZE; 233 let frame_end = (frame_start + constants::FRAME_SIZE).min(operations.len()); 234 let frame_ops = &operations[frame_start..frame_end]; 235 236 // Serialize frame to JSONL 237 let serialize_start = Instant::now(); 238 let mut frame_data = Vec::new(); 239 for op in frame_ops { 240 let json = if let Some(raw) = &op.raw_json { 241 raw.clone() 242 } else { 243 sonic_rs::to_string(op)? 244 }; 245 frame_data.extend_from_slice(json.as_bytes()); 246 frame_data.push(b'\n'); 247 } 248 let serialize_time = serialize_start.elapsed(); 249 let uncompressed_size = frame_data.len() as u64; 250 251 // Compress frame with content size and checksum 252 let compress_start = Instant::now(); 253 let mut compressed_frame = Vec::new(); 254 { 255 let mut encoder = 256 zstd::Encoder::new(&mut compressed_frame, constants::ZSTD_COMPRESSION_LEVEL)?; 257 encoder.set_pledged_src_size(Some(frame_data.len() as u64))?; 258 encoder.include_contentsize(true)?; 259 encoder.include_checksum(true)?; // Enable XXH64 checksum 260 std::io::copy(&mut frame_data.as_slice(), &mut encoder)?; 261 encoder.finish()?; 262 } 263 let compress_time = compress_start.elapsed(); 264 265 Ok::<_, anyhow::Error>(( 266 compressed_frame, 267 uncompressed_size, 268 serialize_time, 269 compress_time, 270 )) 271 }) 272 .collect::<Result<Vec<_>, _>>()?; 273 274 // Calculate offsets sequentially (must be done in order) 275 let mut frame_offsets = Vec::with_capacity(num_frames + 1); 276 let mut compressed_frames: Vec<Vec<u8>> = Vec::with_capacity(num_frames); 277 let mut total_uncompressed = 0u64; 278 let mut total_serialize_time = std::time::Duration::ZERO; 279 let mut total_compress_time = std::time::Duration::ZERO; 280 281 for (compressed_frame, uncompressed_size, serialize_time, compress_time) in frame_results { 282 let offset = if frame_offsets.is_empty() { 283 0i64 284 } else { 285 let prev_frame_size = compressed_frames.last().unwrap().len() as i64; 286 frame_offsets.last().unwrap() + prev_frame_size 287 }; 288 frame_offsets.push(offset); 289 compressed_frames.push(compressed_frame); 290 total_uncompressed += uncompressed_size; 291 total_serialize_time += serialize_time; 292 total_compress_time += compress_time; 293 } 294 295 // Add final offset (end of last frame) 296 if let Some(last_frame) = compressed_frames.last() { 297 let final_offset = frame_offsets.last().unwrap() + last_frame.len() as i64; 298 frame_offsets.push(final_offset); 299 } 300 301 let compressed_size: u64 = compressed_frames.iter().map(|f| f.len() as u64).sum(); 302 303 Ok(FrameCompressionResult { 304 compressed_frames, 305 frame_offsets, 306 uncompressed_size: total_uncompressed, 307 compressed_size, 308 serialize_time_ms: total_serialize_time.as_secs_f64() * 1000.0, 309 compress_time_ms: total_compress_time.as_secs_f64() * 1000.0, 310 }) 311} 312 313/// Serialize operations to JSONL (uncompressed) 314/// 315/// CRITICAL: This function implements the V1 specification requirement (docs/specification.md § 4.2) 316/// for deterministic content hash calculation. It MUST use the raw JSON bytes when available 317/// to preserve exact byte content, including field order and whitespace. 318pub fn serialize_operations_to_jsonl( 319 operations: &[crate::operations::Operation], 320) -> anyhow::Result<Vec<u8>> { 321 let mut data = Vec::new(); 322 for op in operations { 323 // CRITICAL: Use raw_json if available to preserve exact byte content 324 // This is required for deterministic content_hash calculation. 325 // Re-serialization would change field order/whitespace and break hash verification. 326 let json = if let Some(raw) = &op.raw_json { 327 raw.clone() 328 } else { 329 // Fallback: Re-serialize if raw_json is not available 330 // WARNING: This may produce different content_hash than the original! 331 sonic_rs::to_string(op)? 332 }; 333 data.extend_from_slice(json.as_bytes()); 334 data.push(b'\n'); 335 } 336 Ok(data) 337} 338 339/// Calculate content hash (SHA256 of uncompressed JSONL) 340pub fn calculate_content_hash( 341 operations: &[crate::operations::Operation], 342) -> anyhow::Result<String> { 343 use sha2::{Digest, Sha256}; 344 345 let jsonl_data = serialize_operations_to_jsonl(operations)?; 346 let mut hasher = Sha256::new(); 347 hasher.update(&jsonl_data); 348 Ok(format!("{:x}", hasher.finalize())) 349} 350 351/// Create bundle metadata structure 352#[allow(clippy::too_many_arguments)] 353pub fn create_bundle_metadata( 354 bundle_number: u32, 355 origin: &str, 356 content_hash: &str, 357 parent_hash: Option<&str>, 358 uncompressed_size: Option<u64>, 359 compressed_size: Option<u64>, 360 operation_count: usize, 361 did_count: usize, 362 start_time: &str, 363 end_time: &str, 364 frame_count: usize, 365 frame_size: usize, 366 frame_offsets: &[i64], 367) -> BundleMetadata { 368 BundleMetadata { 369 format: "plcbundle/1.0".to_string(), 370 bundle_number, 371 origin: origin.to_string(), 372 content_hash: content_hash.to_string(), 373 parent_hash: parent_hash.map(|s| s.to_string()), 374 uncompressed_size, 375 compressed_size, 376 operation_count, 377 did_count, 378 start_time: start_time.to_string(), 379 end_time: end_time.to_string(), 380 created_at: chrono::Utc::now().to_rfc3339(), 381 created_by: crate::constants::created_by(), 382 frame_count, 383 frame_size, 384 frame_offsets: frame_offsets.to_vec(), 385 } 386} 387 388/// Write bundle to file with multi-frame format 389/// 390/// Writes metadata frame followed by compressed data frames. 391pub fn write_bundle_with_frames<W: Write>( 392 writer: &mut W, 393 metadata: &BundleMetadata, 394 compressed_frames: &[Vec<u8>], 395) -> anyhow::Result<()> { 396 // Write metadata as skippable frame first 397 write_metadata_frame(writer, metadata)?; 398 399 // Write all compressed frames 400 for frame in compressed_frames { 401 writer.write_all(frame)?; 402 } 403 404 writer.flush()?; 405 Ok(()) 406} 407 408#[cfg(test)] 409mod tests { 410 use super::*; 411 412 #[test] 413 fn test_zstd_content_size_and_checksum() { 414 // Test that our compression includes content size and checksum in frame header 415 let test_data = b"Test data for zstd compression with content size and checksum"; 416 417 // NEW method: with content size and checksum 418 let mut compressed_new = Vec::new(); 419 { 420 let mut encoder = zstd::Encoder::new(&mut compressed_new, 3).unwrap(); 421 encoder 422 .set_pledged_src_size(Some(test_data.len() as u64)) 423 .unwrap(); 424 encoder.include_contentsize(true).unwrap(); 425 encoder.include_checksum(true).unwrap(); 426 std::io::copy(&mut &test_data[..], &mut encoder).unwrap(); 427 encoder.finish().unwrap(); 428 } 429 430 // OLD method: without content size or checksum (for comparison) 431 let compressed_old = zstd::encode_all(&test_data[..], 3).unwrap(); 432 433 // Write both for comparison 434 let test_file_new = "/tmp/test_with_metadata.zst"; 435 let test_file_old = "/tmp/test_without_metadata.zst"; 436 std::fs::write(test_file_new, &compressed_new).unwrap(); 437 std::fs::write(test_file_old, &compressed_old).unwrap(); 438 439 // Both should decompress correctly 440 let decompressed_new = zstd::decode_all(&compressed_new[..]).unwrap(); 441 let decompressed_old = zstd::decode_all(&compressed_old[..]).unwrap(); 442 assert_eq!(&decompressed_new, test_data); 443 assert_eq!(&decompressed_old, test_data); 444 445 println!("✓ Created test files:"); 446 println!(" With metadata (size+checksum): {}", test_file_new); 447 println!(" Without metadata: {}", test_file_old); 448 println!(); 449 println!("Compare with: zstd -l /tmp/test_with*.zst /tmp/test_without*.zst"); 450 println!("Expected: New file should show 'Uncompressed' size and 'XXH64' check"); 451 } 452 453 #[test] 454 fn test_skippable_frame_roundtrip() { 455 let data = b"test data"; 456 let mut buffer = Vec::new(); 457 458 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap(); 459 460 let mut cursor = std::io::Cursor::new(&buffer); 461 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap(); 462 463 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA); 464 assert_eq!(read_data, data); 465 } 466 467 #[test] 468 fn test_metadata_frame_roundtrip() { 469 let metadata = BundleMetadata { 470 format: "plcbundle-v1".to_string(), 471 bundle_number: 42, 472 origin: constants::DEFAULT_PLC_DIRECTORY_URL.to_string(), 473 content_hash: "abc123".to_string(), 474 parent_hash: None, 475 uncompressed_size: None, 476 compressed_size: None, 477 operation_count: 10000, 478 did_count: 5000, 479 start_time: "2024-01-01T00:00:00Z".to_string(), 480 end_time: "2024-01-01T23:59:59Z".to_string(), 481 created_at: "2024-01-02T00:00:00Z".to_string(), 482 created_by: crate::constants::created_by(), 483 frame_count: 100, 484 frame_size: 100, 485 frame_offsets: vec![0, 1000, 2000], 486 }; 487 488 let mut buffer = Vec::new(); 489 write_metadata_frame(&mut buffer, &metadata).unwrap(); 490 491 let mut cursor = std::io::Cursor::new(&buffer); 492 let read_metadata = read_metadata_frame(&mut cursor).unwrap(); 493 494 assert_eq!(read_metadata.bundle_number, 42); 495 assert_eq!(read_metadata.frame_count, 100); 496 } 497 498 #[test] 499 fn test_skippable_frame_empty_data() { 500 let data = b""; 501 let mut buffer = Vec::new(); 502 503 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap(); 504 505 let mut cursor = std::io::Cursor::new(&buffer); 506 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap(); 507 508 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA); 509 assert_eq!(read_data, data); 510 assert_eq!(read_data.len(), 0); 511 } 512 513 #[test] 514 fn test_skippable_frame_large_data() { 515 let data = vec![0x42u8; 10000]; 516 let mut buffer = Vec::new(); 517 518 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, &data).unwrap(); 519 520 let mut cursor = std::io::Cursor::new(&buffer); 521 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap(); 522 523 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA); 524 assert_eq!(read_data, data); 525 } 526 527 #[test] 528 fn test_skippable_frame_invalid_magic() { 529 let data = b"test"; 530 let mut buffer = Vec::new(); 531 532 // Write with invalid magic (not in skippable range) 533 buffer.write_all(&0x12345678u32.to_le_bytes()).unwrap(); 534 buffer 535 .write_all(&(data.len() as u32).to_le_bytes()) 536 .unwrap(); 537 buffer.write_all(data).unwrap(); 538 539 let mut cursor = std::io::Cursor::new(&buffer); 540 let result = read_skippable_frame(&mut cursor); 541 assert!(result.is_err()); 542 assert!( 543 result 544 .unwrap_err() 545 .to_string() 546 .contains("Not a skippable frame") 547 ); 548 } 549 550 #[test] 551 fn test_skippable_frame_valid_magic_range() { 552 // Test all valid magic values in range 0x184D2A50 - 0x184D2A5F 553 for magic_val in 0x184D2A50..=0x184D2A5F { 554 let data = b"test"; 555 let mut buffer = Vec::new(); 556 557 write_skippable_frame(&mut buffer, magic_val, data).unwrap(); 558 559 let mut cursor = std::io::Cursor::new(&buffer); 560 let (read_magic, read_data) = read_skippable_frame(&mut cursor).unwrap(); 561 562 assert_eq!(read_magic, magic_val); 563 assert_eq!(read_data, data); 564 } 565 } 566 567 #[test] 568 fn test_metadata_frame_with_parent_hash() { 569 let metadata = BundleMetadata { 570 format: "plcbundle-v1".to_string(), 571 bundle_number: 2, 572 origin: "test".to_string(), 573 content_hash: "hash2".to_string(), 574 parent_hash: Some("hash1".to_string()), 575 uncompressed_size: Some(1000000), 576 compressed_size: Some(500000), 577 operation_count: 10000, 578 did_count: 5000, 579 start_time: "2024-01-01T00:00:00Z".to_string(), 580 end_time: "2024-01-01T23:59:59Z".to_string(), 581 created_at: "2024-01-02T00:00:00Z".to_string(), 582 created_by: "test/1.0".to_string(), 583 frame_count: 100, 584 frame_size: 100, 585 frame_offsets: vec![0, 1000, 2000, 3000], 586 }; 587 588 let mut buffer = Vec::new(); 589 write_metadata_frame(&mut buffer, &metadata).unwrap(); 590 591 let mut cursor = std::io::Cursor::new(&buffer); 592 let read_metadata = read_metadata_frame(&mut cursor).unwrap(); 593 594 assert_eq!(read_metadata.bundle_number, 2); 595 assert_eq!(read_metadata.parent_hash, Some("hash1".to_string())); 596 assert_eq!(read_metadata.uncompressed_size, Some(1000000)); 597 assert_eq!(read_metadata.compressed_size, Some(500000)); 598 assert_eq!(read_metadata.frame_offsets.len(), 4); 599 } 600 601 #[test] 602 fn test_metadata_frame_wrong_magic() { 603 let metadata = BundleMetadata { 604 format: "plcbundle-v1".to_string(), 605 bundle_number: 1, 606 origin: "test".to_string(), 607 content_hash: "hash1".to_string(), 608 parent_hash: None, 609 uncompressed_size: None, 610 compressed_size: None, 611 operation_count: 100, 612 did_count: 50, 613 start_time: "2024-01-01T00:00:00Z".to_string(), 614 end_time: "2024-01-01T23:59:59Z".to_string(), 615 created_at: "2024-01-02T00:00:00Z".to_string(), 616 created_by: "test/1.0".to_string(), 617 frame_count: 1, 618 frame_size: 100, 619 frame_offsets: vec![0, 1000], 620 }; 621 622 let mut buffer = Vec::new(); 623 // Write with wrong magic 624 write_skippable_frame( 625 &mut buffer, 626 0x184D2A51, 627 &sonic_rs::to_vec(&metadata).unwrap(), 628 ) 629 .unwrap(); 630 631 let mut cursor = std::io::Cursor::new(&buffer); 632 let result = read_metadata_frame(&mut cursor); 633 assert!(result.is_err()); 634 assert!(result.unwrap_err().to_string().contains("Unexpected magic")); 635 } 636 637 #[test] 638 fn test_skippable_frame_size_calculation() { 639 let data = b"hello world"; 640 let mut buffer = Vec::new(); 641 642 let written = write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap(); 643 644 // Should be: magic(4) + size(4) + data(11) = 19 645 assert_eq!(written, 8 + data.len()); 646 assert_eq!(buffer.len(), written); 647 } 648}