[DEPRECATED] Go implementation of plcbundle
at main 295 lines 7.8 kB view raw
1package storage 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "encoding/json" 7 "fmt" 8 "io" 9 "os" 10 "path/filepath" 11 12 "github.com/valyala/gozstd" 13) 14 15// ============================================================================ 16// ZSTD COMPRESSION ABSTRACTION LAYER 17// ============================================================================ 18 19const ( 20 CompressionLevel = 1 21 FrameSize = 100 22 23 SkippableMagicMetadata = 0x184D2A50 24) 25 26// ============================================================================ 27// SKIPPABLE FRAME FUNCTIONS 28// ============================================================================ 29 30// WriteSkippableFrame writes a skippable frame with the given data 31func WriteSkippableFrame(w io.Writer, magicNumber uint32, data []byte) (int64, error) { 32 frameSize := uint32(len(data)) 33 34 // Write magic number (little-endian) 35 if err := binary.Write(w, binary.LittleEndian, magicNumber); err != nil { 36 return 0, err 37 } 38 39 // Write frame size (little-endian) 40 if err := binary.Write(w, binary.LittleEndian, frameSize); err != nil { 41 return 0, err 42 } 43 44 // Write data 45 n, err := w.Write(data) 46 if err != nil { 47 return 0, err 48 } 49 50 totalBytes := int64(4 + 4 + n) // magic + size + data 51 return totalBytes, nil 52} 53 54// ReadSkippableFrame with debug 55func (ops *Operations) ReadSkippableFrame(r io.Reader) (uint32, []byte, error) { 56 var magic uint32 57 if err := binary.Read(r, binary.LittleEndian, &magic); err != nil { 58 return 0, nil, fmt.Errorf("failed to read magic: %w", err) 59 } 60 61 if magic < 0x184D2A50 || magic > 0x184D2A5F { 62 return 0, nil, fmt.Errorf("not a skippable frame: magic=0x%08X (expected 0x184D2A50-0x184D2A5F)", magic) 63 } 64 65 var frameSize uint32 66 if err := binary.Read(r, binary.LittleEndian, &frameSize); err != nil { 67 return 0, nil, fmt.Errorf("failed to read frame size: %w", err) 68 } 69 70 data := make([]byte, frameSize) 71 if _, err := io.ReadFull(r, data); err != nil { 72 return 0, nil, fmt.Errorf("failed to read frame data: %w", err) 73 } 74 75 return magic, data, nil 76} 77 78// WriteMetadataFrame writes bundle metadata as skippable frame (compact JSON) 79func (op *Operations) WriteMetadataFrame(w io.Writer, meta *BundleMetadata) (int64, error) { 80 jsonData, err := json.Marshal(meta) 81 if err != nil { 82 return 0, fmt.Errorf("failed to marshal metadata: %w", err) 83 } 84 return WriteSkippableFrame(w, SkippableMagicMetadata, jsonData) 85} 86 87// ReadMetadataFrame reads bundle metadata from skippable frame 88func (ops *Operations) ReadMetadataFrame(r io.Reader) (*BundleMetadata, error) { 89 magic, data, err := ops.ReadSkippableFrame(r) 90 if err != nil { 91 return nil, err 92 } 93 94 if magic != SkippableMagicMetadata { 95 return nil, fmt.Errorf("unexpected skippable frame magic: 0x%08X (expected 0x%08X)", 96 magic, SkippableMagicMetadata) 97 } 98 99 var meta BundleMetadata 100 if err := json.Unmarshal(data, &meta); err != nil { 101 return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) 102 } 103 104 return &meta, nil 105} 106 107// ExtractMetadataFromFile reads metadata without decompressing 108func (ops *Operations) ExtractMetadataFromFile(path string) (*BundleMetadata, error) { 109 file, err := os.Open(path) 110 if err != nil { 111 return nil, err 112 } 113 defer file.Close() 114 115 // Check first bytes 116 header := make([]byte, 8) 117 if _, err := file.Read(header); err != nil { 118 return nil, fmt.Errorf("failed to read header: %w", err) 119 } 120 121 // Seek back to start 122 file.Seek(0, io.SeekStart) 123 124 meta, err := ops.ReadMetadataFrame(file) 125 if err != nil { 126 return nil, fmt.Errorf("no metadata frame found: %w", err) 127 } 128 129 return meta, nil 130} 131 132// ExtractFrameIndexFromFile now just reads from metadata 133func (ops *Operations) ExtractFrameIndexFromFile(path string) ([]int64, error) { 134 meta, err := ops.ExtractMetadataFromFile(path) 135 if err != nil { 136 return nil, err 137 } 138 139 if len(meta.FrameOffsets) == 0 { 140 return nil, fmt.Errorf("metadata has no frame offsets") 141 } 142 143 return meta.FrameOffsets, nil 144} 145 146// DebugFrameOffsets extracts and displays frame offset information 147func (ops *Operations) DebugFrameOffsets(path string) error { 148 meta, err := ops.ExtractMetadataFromFile(path) 149 if err != nil { 150 return fmt.Errorf("failed to extract metadata: %w", err) 151 } 152 153 fmt.Printf("Frame Offset Debug for: %s\n\n", filepath.Base(path)) 154 fmt.Printf("Metadata:\n") 155 fmt.Printf(" Bundle: %d\n", meta.BundleNumber) 156 fmt.Printf(" Frames: %d\n", meta.FrameCount) 157 fmt.Printf(" Frame size: %d ops\n", meta.FrameSize) 158 fmt.Printf(" Total ops: %d\n", meta.OperationCount) 159 160 fmt.Printf("\nFrame Offsets (%d total):\n", len(meta.FrameOffsets)) 161 for i, offset := range meta.FrameOffsets { 162 if i < len(meta.FrameOffsets)-1 { 163 nextOffset := meta.FrameOffsets[i+1] 164 frameSize := nextOffset - offset 165 fmt.Printf(" Frame %3d: offset %10d, size %10d bytes\n", i, offset, frameSize) 166 } else { 167 fmt.Printf(" End mark: offset %10d\n", offset) 168 } 169 } 170 171 // Try to verify first frame 172 fmt.Printf("\nVerifying first frame...\n") 173 file, err := os.Open(path) 174 if err != nil { 175 return err 176 } 177 defer file.Close() 178 179 if len(meta.FrameOffsets) < 2 { 180 return fmt.Errorf("not enough frame offsets") 181 } 182 183 startOffset := meta.FrameOffsets[0] 184 endOffset := meta.FrameOffsets[1] 185 frameLength := endOffset - startOffset 186 187 fmt.Printf(" Start: %d, End: %d, Length: %d\n", startOffset, endOffset, frameLength) 188 189 compressedFrame := make([]byte, frameLength) 190 _, err = file.ReadAt(compressedFrame, startOffset) 191 if err != nil { 192 return fmt.Errorf("failed to read: %w", err) 193 } 194 195 decompressed, err := DecompressFrame(compressedFrame) 196 if err != nil { 197 return fmt.Errorf("failed to decompress: %w", err) 198 } 199 200 fmt.Printf(" ✓ Decompressed: %d bytes\n", len(decompressed)) 201 202 // Count lines 203 lines := bytes.Count(decompressed, []byte("\n")) 204 fmt.Printf(" ✓ Lines: %d\n", lines) 205 206 return nil 207} 208 209// ============================================================================ 210// COMPRESSION/DECOMPRESSION 211// ============================================================================ 212 213func CompressFrame(data []byte) ([]byte, error) { 214 compressed := gozstd.CompressLevel(nil, data, CompressionLevel) 215 return compressed, nil 216} 217 218func DecompressAll(compressed []byte) ([]byte, error) { 219 decompressed, err := gozstd.Decompress(nil, compressed) 220 if err != nil { 221 return nil, fmt.Errorf("decompression failed: %w", err) 222 } 223 return decompressed, nil 224} 225 226func DecompressFrame(compressedFrame []byte) ([]byte, error) { 227 return gozstd.Decompress(nil, compressedFrame) 228} 229 230func NewStreamingReader(r io.Reader) (StreamReader, error) { 231 reader := gozstd.NewReader(r) 232 return &gozstdReader{reader: reader}, nil 233} 234 235func NewStreamingWriter(w io.Writer) (StreamWriter, error) { 236 writer := gozstd.NewWriterLevel(w, CompressionLevel) 237 return &gozstdWriter{writer: writer}, nil 238} 239 240// ============================================================================ 241// INTERFACES 242// ============================================================================ 243 244type StreamReader interface { 245 io.Reader 246 io.WriterTo 247 Release() 248} 249 250type StreamWriter interface { 251 io.Writer 252 io.Closer 253 Flush() error 254 Release() 255} 256 257// ============================================================================ 258// WRAPPER TYPES 259// ============================================================================ 260 261type gozstdReader struct { 262 reader *gozstd.Reader 263} 264 265func (r *gozstdReader) Read(p []byte) (int, error) { 266 return r.reader.Read(p) 267} 268 269func (r *gozstdReader) WriteTo(w io.Writer) (int64, error) { 270 return r.reader.WriteTo(w) 271} 272 273func (r *gozstdReader) Release() { 274 r.reader.Release() 275} 276 277type gozstdWriter struct { 278 writer *gozstd.Writer 279} 280 281func (w *gozstdWriter) Write(p []byte) (int, error) { 282 return w.writer.Write(p) 283} 284 285func (w *gozstdWriter) Close() error { 286 return w.writer.Close() 287} 288 289func (w *gozstdWriter) Flush() error { 290 return w.writer.Flush() 291} 292 293func (w *gozstdWriter) Release() { 294 w.writer.Release() 295}