forked from
atscan.net/plcbundle-rs
High-performance implementation of plcbundle written in Rust
1// src/bundle_format.rs
2//! Bundle file format implementation with zstd skippable frames and multi-frame compression
3//!
4//! Format:
5//! - Skippable frame with metadata (magic 0x184D2A50)
6//! - Multiple zstd frames (100 operations each)
7//! - Frame offsets in metadata allow efficient random access
8
9use crate::constants;
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::io::{Read, Seek, SeekFrom, Write};
13
14/// Skippable frame magic number for metadata
15pub const SKIPPABLE_MAGIC_METADATA: u32 = 0x184D2A50;
16
17/// Bundle metadata stored in skippable frame
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct BundleMetadata {
20 /// Format version
21 pub format: String,
22
23 /// Bundle number
24 pub bundle_number: u32,
25
26 /// Source origin
27 pub origin: String,
28
29 /// Content hash (SHA256 of uncompressed JSONL)
30 pub content_hash: String,
31
32 /// Parent bundle hash (for chaining)
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub parent_hash: Option<String>,
35
36 /// Total uncompressed size in bytes
37 #[serde(default)]
38 pub uncompressed_size: Option<u64>,
39
40 /// Total compressed size in bytes (excluding metadata frame)
41 #[serde(default)]
42 pub compressed_size: Option<u64>,
43
44 /// Number of operations
45 pub operation_count: usize,
46
47 /// Number of unique DIDs
48 pub did_count: usize,
49
50 /// First operation timestamp
51 pub start_time: String,
52
53 /// Last operation timestamp
54 pub end_time: String,
55
56 /// Creation timestamp
57 pub created_at: String,
58
59 /// Creator version (e.g., "plcbundle/0.9.0")
60 pub created_by: String,
61
62 /// Number of frames
63 pub frame_count: usize,
64
65 /// Operations per frame
66 pub frame_size: usize,
67
68 /// Frame byte offsets (RELATIVE to first data frame)
69 pub frame_offsets: Vec<i64>,
70}
71
72/// Write a zstd skippable frame
73pub fn write_skippable_frame<W: Write>(writer: &mut W, magic: u32, data: &[u8]) -> Result<usize> {
74 let frame_size = data.len() as u32;
75
76 // Write magic number (little-endian)
77 writer.write_all(&magic.to_le_bytes())?;
78
79 // Write frame size (little-endian)
80 writer.write_all(&frame_size.to_le_bytes())?;
81
82 // Write data
83 writer.write_all(data)?;
84
85 Ok(8 + data.len()) // magic(4) + size(4) + data
86}
87
88/// Read a zstd skippable frame
89pub fn read_skippable_frame<R: Read>(reader: &mut R) -> Result<(u32, Vec<u8>)> {
90 // Read magic number
91 let mut magic_buf = [0u8; 4];
92 reader.read_exact(&mut magic_buf)?;
93 let magic = u32::from_le_bytes(magic_buf);
94
95 // Verify it's a skippable frame (0x184D2A50 - 0x184D2A5F)
96 if !(0x184D2A50..=0x184D2A5F).contains(&magic) {
97 anyhow::bail!("Not a skippable frame: magic=0x{:08X}", magic);
98 }
99
100 // Read frame size
101 let mut size_buf = [0u8; 4];
102 reader.read_exact(&mut size_buf)?;
103 let frame_size = u32::from_le_bytes(size_buf);
104
105 // Read data
106 let mut data = vec![0u8; frame_size as usize];
107 reader.read_exact(&mut data)?;
108
109 Ok((magic, data))
110}
111
112/// Write metadata as skippable frame
113pub fn write_metadata_frame<W: Write>(writer: &mut W, metadata: &BundleMetadata) -> Result<usize> {
114 let json_data = sonic_rs::to_vec(metadata)?;
115 write_skippable_frame(writer, SKIPPABLE_MAGIC_METADATA, &json_data)
116}
117
118/// Read metadata from skippable frame
119pub fn read_metadata_frame<R: Read>(reader: &mut R) -> Result<BundleMetadata> {
120 let (magic, data) = read_skippable_frame(reader)?;
121
122 if magic != SKIPPABLE_MAGIC_METADATA {
123 anyhow::bail!(
124 "Unexpected magic: 0x{:08X} (expected 0x{:08X})",
125 magic,
126 SKIPPABLE_MAGIC_METADATA
127 );
128 }
129
130 let metadata: BundleMetadata = sonic_rs::from_slice(&data)?;
131 Ok(metadata)
132}
133
134/// Extract metadata from bundle file without decompressing data
135pub fn extract_metadata_from_file(path: &std::path::Path) -> Result<BundleMetadata> {
136 let file = std::fs::File::open(path)?;
137 let mut reader = std::io::BufReader::new(file);
138 read_metadata_frame(&mut reader)
139}
140
141/// Load single operation using frame index for efficient access
142pub fn load_operation_at_position<R: Read + Seek>(
143 reader: &mut R,
144 position: usize,
145 frame_offsets: &[i64],
146 metadata_frame_size: i64,
147) -> Result<String> {
148 let frame_index = position / constants::FRAME_SIZE;
149 let line_in_frame = position % constants::FRAME_SIZE;
150
151 if frame_index >= frame_offsets.len() - 1 {
152 anyhow::bail!(
153 "Position {} out of bounds (frame {}, total frames {})",
154 position,
155 frame_index,
156 frame_offsets.len() - 1
157 );
158 }
159
160 // Convert relative offsets to absolute
161 let start_offset = metadata_frame_size + frame_offsets[frame_index];
162 let end_offset = metadata_frame_size + frame_offsets[frame_index + 1];
163 let frame_length = end_offset - start_offset;
164
165 if frame_length <= 0 || frame_length > 10 * 1024 * 1024 {
166 anyhow::bail!(
167 "Invalid frame length: {} (offsets: {}-{})",
168 frame_length,
169 start_offset,
170 end_offset
171 );
172 }
173
174 // Seek to frame start
175 reader.seek(SeekFrom::Start(start_offset as u64))?;
176
177 // Read compressed frame
178 let mut compressed_frame = vec![0u8; frame_length as usize];
179 reader.read_exact(&mut compressed_frame)?;
180
181 // Decompress frame
182 let decompressed = zstd::bulk::decompress(&compressed_frame, 10 * 1024 * 1024)?;
183
184 // Find the line we want
185 use std::io::BufRead;
186 let cursor = std::io::Cursor::new(decompressed);
187 let lines = cursor.lines();
188
189 for (idx, line_result) in lines.enumerate() {
190 if idx == line_in_frame {
191 return Ok(line_result?);
192 }
193 }
194
195 anyhow::bail!("Position {} not found in frame {}", position, frame_index)
196}
197
198/// Result of compressing operations into frames
199#[derive(Debug)]
200pub struct FrameCompressionResult {
201 /// Compressed frames (one per FRAME_SIZE operations)
202 pub compressed_frames: Vec<Vec<u8>>,
203 /// Frame offsets (relative to first data frame)
204 pub frame_offsets: Vec<i64>,
205 /// Total uncompressed size
206 pub uncompressed_size: u64,
207 /// Total compressed size
208 pub compressed_size: u64,
209 /// Time spent serializing (in milliseconds)
210 pub serialize_time_ms: f64,
211 /// Time spent compressing (in milliseconds)
212 pub compress_time_ms: f64,
213}
214
215/// Compress operations into multiple frames using parallel compression
216///
217/// Each frame contains FRAME_SIZE operations (except possibly the last frame).
218/// Returns the compressed frames and their relative offsets.
219/// Uses rayon to compress multiple frames in parallel for better performance.
220pub fn compress_operations_to_frames_parallel(
221 operations: &[crate::operations::Operation],
222) -> anyhow::Result<FrameCompressionResult> {
223 use rayon::prelude::*;
224 use std::time::Instant;
225
226 let num_frames = operations.len().div_ceil(constants::FRAME_SIZE);
227
228 // Process all frames in parallel
229 let frame_results: Vec<_> = (0..num_frames)
230 .into_par_iter()
231 .map(|frame_idx| {
232 let frame_start = frame_idx * constants::FRAME_SIZE;
233 let frame_end = (frame_start + constants::FRAME_SIZE).min(operations.len());
234 let frame_ops = &operations[frame_start..frame_end];
235
236 // Serialize frame to JSONL
237 let serialize_start = Instant::now();
238 let mut frame_data = Vec::new();
239 for op in frame_ops {
240 let json = if let Some(raw) = &op.raw_json {
241 raw.clone()
242 } else {
243 sonic_rs::to_string(op)?
244 };
245 frame_data.extend_from_slice(json.as_bytes());
246 frame_data.push(b'\n');
247 }
248 let serialize_time = serialize_start.elapsed();
249 let uncompressed_size = frame_data.len() as u64;
250
251 // Compress frame with content size and checksum
252 let compress_start = Instant::now();
253 let mut compressed_frame = Vec::new();
254 {
255 let mut encoder =
256 zstd::Encoder::new(&mut compressed_frame, constants::ZSTD_COMPRESSION_LEVEL)?;
257 encoder.set_pledged_src_size(Some(frame_data.len() as u64))?;
258 encoder.include_contentsize(true)?;
259 encoder.include_checksum(true)?; // Enable XXH64 checksum
260 std::io::copy(&mut frame_data.as_slice(), &mut encoder)?;
261 encoder.finish()?;
262 }
263 let compress_time = compress_start.elapsed();
264
265 Ok::<_, anyhow::Error>((
266 compressed_frame,
267 uncompressed_size,
268 serialize_time,
269 compress_time,
270 ))
271 })
272 .collect::<Result<Vec<_>, _>>()?;
273
274 // Calculate offsets sequentially (must be done in order)
275 let mut frame_offsets = Vec::with_capacity(num_frames + 1);
276 let mut compressed_frames: Vec<Vec<u8>> = Vec::with_capacity(num_frames);
277 let mut total_uncompressed = 0u64;
278 let mut total_serialize_time = std::time::Duration::ZERO;
279 let mut total_compress_time = std::time::Duration::ZERO;
280
281 for (compressed_frame, uncompressed_size, serialize_time, compress_time) in frame_results {
282 let offset = if frame_offsets.is_empty() {
283 0i64
284 } else {
285 let prev_frame_size = compressed_frames.last().unwrap().len() as i64;
286 frame_offsets.last().unwrap() + prev_frame_size
287 };
288 frame_offsets.push(offset);
289 compressed_frames.push(compressed_frame);
290 total_uncompressed += uncompressed_size;
291 total_serialize_time += serialize_time;
292 total_compress_time += compress_time;
293 }
294
295 // Add final offset (end of last frame)
296 if let Some(last_frame) = compressed_frames.last() {
297 let final_offset = frame_offsets.last().unwrap() + last_frame.len() as i64;
298 frame_offsets.push(final_offset);
299 }
300
301 let compressed_size: u64 = compressed_frames.iter().map(|f| f.len() as u64).sum();
302
303 Ok(FrameCompressionResult {
304 compressed_frames,
305 frame_offsets,
306 uncompressed_size: total_uncompressed,
307 compressed_size,
308 serialize_time_ms: total_serialize_time.as_secs_f64() * 1000.0,
309 compress_time_ms: total_compress_time.as_secs_f64() * 1000.0,
310 })
311}
312
313/// Serialize operations to JSONL (uncompressed)
314///
315/// CRITICAL: This function implements the V1 specification requirement (docs/specification.md § 4.2)
316/// for deterministic content hash calculation. It MUST use the raw JSON bytes when available
317/// to preserve exact byte content, including field order and whitespace.
318pub fn serialize_operations_to_jsonl(
319 operations: &[crate::operations::Operation],
320) -> anyhow::Result<Vec<u8>> {
321 let mut data = Vec::new();
322 for op in operations {
323 // CRITICAL: Use raw_json if available to preserve exact byte content
324 // This is required for deterministic content_hash calculation.
325 // Re-serialization would change field order/whitespace and break hash verification.
326 let json = if let Some(raw) = &op.raw_json {
327 raw.clone()
328 } else {
329 // Fallback: Re-serialize if raw_json is not available
330 // WARNING: This may produce different content_hash than the original!
331 sonic_rs::to_string(op)?
332 };
333 data.extend_from_slice(json.as_bytes());
334 data.push(b'\n');
335 }
336 Ok(data)
337}
338
339/// Calculate content hash (SHA256 of uncompressed JSONL)
340pub fn calculate_content_hash(
341 operations: &[crate::operations::Operation],
342) -> anyhow::Result<String> {
343 use sha2::{Digest, Sha256};
344
345 let jsonl_data = serialize_operations_to_jsonl(operations)?;
346 let mut hasher = Sha256::new();
347 hasher.update(&jsonl_data);
348 Ok(format!("{:x}", hasher.finalize()))
349}
350
351/// Create bundle metadata structure
352#[allow(clippy::too_many_arguments)]
353pub fn create_bundle_metadata(
354 bundle_number: u32,
355 origin: &str,
356 content_hash: &str,
357 parent_hash: Option<&str>,
358 uncompressed_size: Option<u64>,
359 compressed_size: Option<u64>,
360 operation_count: usize,
361 did_count: usize,
362 start_time: &str,
363 end_time: &str,
364 frame_count: usize,
365 frame_size: usize,
366 frame_offsets: &[i64],
367) -> BundleMetadata {
368 BundleMetadata {
369 format: "plcbundle/1.0".to_string(),
370 bundle_number,
371 origin: origin.to_string(),
372 content_hash: content_hash.to_string(),
373 parent_hash: parent_hash.map(|s| s.to_string()),
374 uncompressed_size,
375 compressed_size,
376 operation_count,
377 did_count,
378 start_time: start_time.to_string(),
379 end_time: end_time.to_string(),
380 created_at: chrono::Utc::now().to_rfc3339(),
381 created_by: crate::constants::created_by(),
382 frame_count,
383 frame_size,
384 frame_offsets: frame_offsets.to_vec(),
385 }
386}
387
388/// Write bundle to file with multi-frame format
389///
390/// Writes metadata frame followed by compressed data frames.
391pub fn write_bundle_with_frames<W: Write>(
392 writer: &mut W,
393 metadata: &BundleMetadata,
394 compressed_frames: &[Vec<u8>],
395) -> anyhow::Result<()> {
396 // Write metadata as skippable frame first
397 write_metadata_frame(writer, metadata)?;
398
399 // Write all compressed frames
400 for frame in compressed_frames {
401 writer.write_all(frame)?;
402 }
403
404 writer.flush()?;
405 Ok(())
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_zstd_content_size_and_checksum() {
414 // Test that our compression includes content size and checksum in frame header
415 let test_data = b"Test data for zstd compression with content size and checksum";
416
417 // NEW method: with content size and checksum
418 let mut compressed_new = Vec::new();
419 {
420 let mut encoder = zstd::Encoder::new(&mut compressed_new, 3).unwrap();
421 encoder
422 .set_pledged_src_size(Some(test_data.len() as u64))
423 .unwrap();
424 encoder.include_contentsize(true).unwrap();
425 encoder.include_checksum(true).unwrap();
426 std::io::copy(&mut &test_data[..], &mut encoder).unwrap();
427 encoder.finish().unwrap();
428 }
429
430 // OLD method: without content size or checksum (for comparison)
431 let compressed_old = zstd::encode_all(&test_data[..], 3).unwrap();
432
433 // Write both for comparison
434 let test_file_new = "/tmp/test_with_metadata.zst";
435 let test_file_old = "/tmp/test_without_metadata.zst";
436 std::fs::write(test_file_new, &compressed_new).unwrap();
437 std::fs::write(test_file_old, &compressed_old).unwrap();
438
439 // Both should decompress correctly
440 let decompressed_new = zstd::decode_all(&compressed_new[..]).unwrap();
441 let decompressed_old = zstd::decode_all(&compressed_old[..]).unwrap();
442 assert_eq!(&decompressed_new, test_data);
443 assert_eq!(&decompressed_old, test_data);
444
445 println!("✓ Created test files:");
446 println!(" With metadata (size+checksum): {}", test_file_new);
447 println!(" Without metadata: {}", test_file_old);
448 println!();
449 println!("Compare with: zstd -l /tmp/test_with*.zst /tmp/test_without*.zst");
450 println!("Expected: New file should show 'Uncompressed' size and 'XXH64' check");
451 }
452
453 #[test]
454 fn test_skippable_frame_roundtrip() {
455 let data = b"test data";
456 let mut buffer = Vec::new();
457
458 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap();
459
460 let mut cursor = std::io::Cursor::new(&buffer);
461 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap();
462
463 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA);
464 assert_eq!(read_data, data);
465 }
466
467 #[test]
468 fn test_metadata_frame_roundtrip() {
469 let metadata = BundleMetadata {
470 format: "plcbundle-v1".to_string(),
471 bundle_number: 42,
472 origin: constants::DEFAULT_PLC_DIRECTORY_URL.to_string(),
473 content_hash: "abc123".to_string(),
474 parent_hash: None,
475 uncompressed_size: None,
476 compressed_size: None,
477 operation_count: 10000,
478 did_count: 5000,
479 start_time: "2024-01-01T00:00:00Z".to_string(),
480 end_time: "2024-01-01T23:59:59Z".to_string(),
481 created_at: "2024-01-02T00:00:00Z".to_string(),
482 created_by: crate::constants::created_by(),
483 frame_count: 100,
484 frame_size: 100,
485 frame_offsets: vec![0, 1000, 2000],
486 };
487
488 let mut buffer = Vec::new();
489 write_metadata_frame(&mut buffer, &metadata).unwrap();
490
491 let mut cursor = std::io::Cursor::new(&buffer);
492 let read_metadata = read_metadata_frame(&mut cursor).unwrap();
493
494 assert_eq!(read_metadata.bundle_number, 42);
495 assert_eq!(read_metadata.frame_count, 100);
496 }
497
498 #[test]
499 fn test_skippable_frame_empty_data() {
500 let data = b"";
501 let mut buffer = Vec::new();
502
503 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap();
504
505 let mut cursor = std::io::Cursor::new(&buffer);
506 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap();
507
508 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA);
509 assert_eq!(read_data, data);
510 assert_eq!(read_data.len(), 0);
511 }
512
513 #[test]
514 fn test_skippable_frame_large_data() {
515 let data = vec![0x42u8; 10000];
516 let mut buffer = Vec::new();
517
518 write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, &data).unwrap();
519
520 let mut cursor = std::io::Cursor::new(&buffer);
521 let (magic, read_data) = read_skippable_frame(&mut cursor).unwrap();
522
523 assert_eq!(magic, SKIPPABLE_MAGIC_METADATA);
524 assert_eq!(read_data, data);
525 }
526
527 #[test]
528 fn test_skippable_frame_invalid_magic() {
529 let data = b"test";
530 let mut buffer = Vec::new();
531
532 // Write with invalid magic (not in skippable range)
533 buffer.write_all(&0x12345678u32.to_le_bytes()).unwrap();
534 buffer
535 .write_all(&(data.len() as u32).to_le_bytes())
536 .unwrap();
537 buffer.write_all(data).unwrap();
538
539 let mut cursor = std::io::Cursor::new(&buffer);
540 let result = read_skippable_frame(&mut cursor);
541 assert!(result.is_err());
542 assert!(
543 result
544 .unwrap_err()
545 .to_string()
546 .contains("Not a skippable frame")
547 );
548 }
549
550 #[test]
551 fn test_skippable_frame_valid_magic_range() {
552 // Test all valid magic values in range 0x184D2A50 - 0x184D2A5F
553 for magic_val in 0x184D2A50..=0x184D2A5F {
554 let data = b"test";
555 let mut buffer = Vec::new();
556
557 write_skippable_frame(&mut buffer, magic_val, data).unwrap();
558
559 let mut cursor = std::io::Cursor::new(&buffer);
560 let (read_magic, read_data) = read_skippable_frame(&mut cursor).unwrap();
561
562 assert_eq!(read_magic, magic_val);
563 assert_eq!(read_data, data);
564 }
565 }
566
567 #[test]
568 fn test_metadata_frame_with_parent_hash() {
569 let metadata = BundleMetadata {
570 format: "plcbundle-v1".to_string(),
571 bundle_number: 2,
572 origin: "test".to_string(),
573 content_hash: "hash2".to_string(),
574 parent_hash: Some("hash1".to_string()),
575 uncompressed_size: Some(1000000),
576 compressed_size: Some(500000),
577 operation_count: 10000,
578 did_count: 5000,
579 start_time: "2024-01-01T00:00:00Z".to_string(),
580 end_time: "2024-01-01T23:59:59Z".to_string(),
581 created_at: "2024-01-02T00:00:00Z".to_string(),
582 created_by: "test/1.0".to_string(),
583 frame_count: 100,
584 frame_size: 100,
585 frame_offsets: vec![0, 1000, 2000, 3000],
586 };
587
588 let mut buffer = Vec::new();
589 write_metadata_frame(&mut buffer, &metadata).unwrap();
590
591 let mut cursor = std::io::Cursor::new(&buffer);
592 let read_metadata = read_metadata_frame(&mut cursor).unwrap();
593
594 assert_eq!(read_metadata.bundle_number, 2);
595 assert_eq!(read_metadata.parent_hash, Some("hash1".to_string()));
596 assert_eq!(read_metadata.uncompressed_size, Some(1000000));
597 assert_eq!(read_metadata.compressed_size, Some(500000));
598 assert_eq!(read_metadata.frame_offsets.len(), 4);
599 }
600
601 #[test]
602 fn test_metadata_frame_wrong_magic() {
603 let metadata = BundleMetadata {
604 format: "plcbundle-v1".to_string(),
605 bundle_number: 1,
606 origin: "test".to_string(),
607 content_hash: "hash1".to_string(),
608 parent_hash: None,
609 uncompressed_size: None,
610 compressed_size: None,
611 operation_count: 100,
612 did_count: 50,
613 start_time: "2024-01-01T00:00:00Z".to_string(),
614 end_time: "2024-01-01T23:59:59Z".to_string(),
615 created_at: "2024-01-02T00:00:00Z".to_string(),
616 created_by: "test/1.0".to_string(),
617 frame_count: 1,
618 frame_size: 100,
619 frame_offsets: vec![0, 1000],
620 };
621
622 let mut buffer = Vec::new();
623 // Write with wrong magic
624 write_skippable_frame(
625 &mut buffer,
626 0x184D2A51,
627 &sonic_rs::to_vec(&metadata).unwrap(),
628 )
629 .unwrap();
630
631 let mut cursor = std::io::Cursor::new(&buffer);
632 let result = read_metadata_frame(&mut cursor);
633 assert!(result.is_err());
634 assert!(result.unwrap_err().to_string().contains("Unexpected magic"));
635 }
636
637 #[test]
638 fn test_skippable_frame_size_calculation() {
639 let data = b"hello world";
640 let mut buffer = Vec::new();
641
642 let written = write_skippable_frame(&mut buffer, SKIPPABLE_MAGIC_METADATA, data).unwrap();
643
644 // Should be: magic(4) + size(4) + data(11) = 19
645 assert_eq!(written, 8 + data.len());
646 assert_eq!(buffer.len(), written);
647 }
648}