···11package mempool
22+33+import (
44+ "bufio"
55+ "bytes"
66+ "fmt"
77+ "os"
88+ "path/filepath"
99+ "sync"
1010+ "time"
1111+1212+ "github.com/goccy/go-json"
1313+ "tangled.org/atscan.net/plcbundle/internal/types"
1414+ "tangled.org/atscan.net/plcbundle/plcclient"
1515+)
1616+1717+const MEMPOOL_FILE_PREFIX = "plc_mempool_"
1818+1919+// Mempool stores operations waiting to be bundled
2020+// Operations must be strictly chronological
2121+type Mempool struct {
2222+ operations []plcclient.PLCOperation
2323+ targetBundle int // Which bundle number these operations are for
2424+ minTimestamp time.Time // Operations must be after this time
2525+ file string
2626+ mu sync.RWMutex
2727+ logger types.Logger
2828+ validated bool // Track if we've validated chronological order
2929+ dirty bool // Track if mempool changed
3030+}
3131+3232+// NewMempool creates a new mempool for a specific bundle number
3333+func NewMempool(bundleDir string, targetBundle int, minTimestamp time.Time, logger types.Logger) (*Mempool, error) {
3434+ filename := fmt.Sprintf("%s%06d.jsonl", MEMPOOL_FILE_PREFIX, targetBundle)
3535+3636+ m := &Mempool{
3737+ file: filepath.Join(bundleDir, filename),
3838+ targetBundle: targetBundle,
3939+ minTimestamp: minTimestamp,
4040+ operations: make([]plcclient.PLCOperation, 0),
4141+ logger: logger,
4242+ validated: false,
4343+ }
4444+4545+ // Load existing mempool from disk if it exists
4646+ if err := m.Load(); err != nil {
4747+ // If file doesn't exist, that's OK
4848+ if !os.IsNotExist(err) {
4949+ return nil, fmt.Errorf("failed to load mempool: %w", err)
5050+ }
5151+ }
5252+5353+ return m, nil
5454+}
5555+5656+// Add adds operations to the mempool with strict validation
5757+func (m *Mempool) Add(ops []plcclient.PLCOperation) (int, error) {
5858+ m.mu.Lock()
5959+ defer m.mu.Unlock()
6060+6161+ if len(ops) == 0 {
6262+ return 0, nil
6363+ }
6464+6565+ // Build existing CID set
6666+ existingCIDs := make(map[string]bool)
6767+ for _, op := range m.operations {
6868+ existingCIDs[op.CID] = true
6969+ }
7070+7171+ // Validate and add operations
7272+ var newOps []plcclient.PLCOperation
7373+ var lastTime time.Time
7474+7575+ // Start from last operation time if we have any
7676+ if len(m.operations) > 0 {
7777+ lastTime = m.operations[len(m.operations)-1].CreatedAt
7878+ } else {
7979+ lastTime = m.minTimestamp
8080+ }
8181+8282+ for _, op := range ops {
8383+ // Skip duplicates
8484+ if existingCIDs[op.CID] {
8585+ continue
8686+ }
8787+8888+ // CRITICAL: Validate chronological order
8989+ if !op.CreatedAt.After(lastTime) && !op.CreatedAt.Equal(lastTime) {
9090+ return len(newOps), fmt.Errorf(
9191+ "chronological violation: operation %s at %s is not after %s",
9292+ op.CID, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano),
9393+ )
9494+ }
9595+9696+ // Validate operation is after minimum timestamp
9797+ if op.CreatedAt.Before(m.minTimestamp) {
9898+ return len(newOps), fmt.Errorf(
9999+ "operation %s at %s is before minimum timestamp %s (belongs in earlier bundle)",
100100+ op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano),
101101+ )
102102+ }
103103+104104+ newOps = append(newOps, op)
105105+ existingCIDs[op.CID] = true
106106+ lastTime = op.CreatedAt
107107+ }
108108+109109+ // Add new operations
110110+ m.operations = append(m.operations, newOps...)
111111+ m.validated = true
112112+ m.dirty = true
113113+114114+ return len(newOps), nil
115115+}
116116+117117+// Validate performs a full chronological validation of all operations
118118+func (m *Mempool) Validate() error {
119119+ m.mu.RLock()
120120+ defer m.mu.RUnlock()
121121+122122+ if len(m.operations) == 0 {
123123+ return nil
124124+ }
125125+126126+ // Check all operations are after minimum timestamp
127127+ for i, op := range m.operations {
128128+ if op.CreatedAt.Before(m.minTimestamp) {
129129+ return fmt.Errorf(
130130+ "operation %d (CID: %s) at %s is before minimum timestamp %s",
131131+ i, op.CID, op.CreatedAt.Format(time.RFC3339Nano), m.minTimestamp.Format(time.RFC3339Nano),
132132+ )
133133+ }
134134+ }
135135+136136+ // Check chronological order
137137+ for i := 1; i < len(m.operations); i++ {
138138+ prev := m.operations[i-1]
139139+ curr := m.operations[i]
140140+141141+ if curr.CreatedAt.Before(prev.CreatedAt) {
142142+ return fmt.Errorf(
143143+ "chronological violation at index %d: %s (%s) is before %s (%s)",
144144+ i, curr.CID, curr.CreatedAt.Format(time.RFC3339Nano),
145145+ prev.CID, prev.CreatedAt.Format(time.RFC3339Nano),
146146+ )
147147+ }
148148+ }
149149+150150+ // Check for duplicate CIDs
151151+ cidSet := make(map[string]int)
152152+ for i, op := range m.operations {
153153+ if prevIdx, exists := cidSet[op.CID]; exists {
154154+ return fmt.Errorf(
155155+ "duplicate CID %s at indices %d and %d",
156156+ op.CID, prevIdx, i,
157157+ )
158158+ }
159159+ cidSet[op.CID] = i
160160+ }
161161+162162+ return nil
163163+}
164164+165165+// Count returns the number of operations in mempool
166166+func (m *Mempool) Count() int {
167167+ m.mu.RLock()
168168+ defer m.mu.RUnlock()
169169+ return len(m.operations)
170170+}
171171+172172+// Take removes and returns up to n operations from the front
173173+func (m *Mempool) Take(n int) ([]plcclient.PLCOperation, error) {
174174+ m.mu.Lock()
175175+ defer m.mu.Unlock()
176176+177177+ // Validate before taking
178178+ if err := m.validateLocked(); err != nil {
179179+ return nil, fmt.Errorf("mempool validation failed: %w", err)
180180+ }
181181+182182+ if n > len(m.operations) {
183183+ n = len(m.operations)
184184+ }
185185+186186+ result := make([]plcclient.PLCOperation, n)
187187+ copy(result, m.operations[:n])
188188+189189+ // Remove taken operations
190190+ m.operations = m.operations[n:]
191191+192192+ return result, nil
193193+}
194194+195195+// validateLocked performs validation with lock already held
196196+func (m *Mempool) validateLocked() error {
197197+ if m.validated {
198198+ return nil
199199+ }
200200+201201+ if len(m.operations) == 0 {
202202+ return nil
203203+ }
204204+205205+ // Check chronological order
206206+ lastTime := m.minTimestamp
207207+ for i, op := range m.operations {
208208+ if op.CreatedAt.Before(lastTime) {
209209+ return fmt.Errorf(
210210+ "chronological violation at index %d: %s is before %s",
211211+ i, op.CreatedAt.Format(time.RFC3339Nano), lastTime.Format(time.RFC3339Nano),
212212+ )
213213+ }
214214+ lastTime = op.CreatedAt
215215+ }
216216+217217+ m.validated = true
218218+ return nil
219219+}
220220+221221+// Peek returns up to n operations without removing them
222222+func (m *Mempool) Peek(n int) []plcclient.PLCOperation {
223223+ m.mu.RLock()
224224+ defer m.mu.RUnlock()
225225+226226+ if n > len(m.operations) {
227227+ n = len(m.operations)
228228+ }
229229+230230+ result := make([]plcclient.PLCOperation, n)
231231+ copy(result, m.operations[:n])
232232+233233+ return result
234234+}
235235+236236+// Clear removes all operations
237237+func (m *Mempool) Clear() {
238238+ m.mu.Lock()
239239+ defer m.mu.Unlock()
240240+ m.operations = make([]plcclient.PLCOperation, 0)
241241+ m.validated = false
242242+}
243243+244244+// Save persists mempool to disk
245245+func (m *Mempool) Save() error {
246246+ m.mu.Lock()
247247+ defer m.mu.Unlock()
248248+249249+ if !m.dirty {
250250+ return nil
251251+ }
252252+253253+ if len(m.operations) == 0 {
254254+ // Remove file if empty
255255+ os.Remove(m.file)
256256+ return nil
257257+ }
258258+259259+ // Validate before saving
260260+ if err := m.validateLocked(); err != nil {
261261+ return fmt.Errorf("mempool validation failed, refusing to save: %w", err)
262262+ }
263263+264264+ // Serialize to JSONL
265265+ var buf bytes.Buffer
266266+ for _, op := range m.operations {
267267+ if len(op.RawJSON) > 0 {
268268+ buf.Write(op.RawJSON)
269269+ } else {
270270+ data, _ := json.Marshal(op)
271271+ buf.Write(data)
272272+ }
273273+ buf.WriteByte('\n')
274274+ }
275275+276276+ // Write atomically
277277+ tempFile := m.file + ".tmp"
278278+ if err := os.WriteFile(tempFile, buf.Bytes(), 0644); err != nil {
279279+ return fmt.Errorf("failed to write mempool: %w", err)
280280+ }
281281+282282+ if err := os.Rename(tempFile, m.file); err != nil {
283283+ os.Remove(tempFile)
284284+ return fmt.Errorf("failed to rename mempool file: %w", err)
285285+ }
286286+287287+ m.dirty = false
288288+ return nil
289289+}
290290+291291+// Load reads mempool from disk and validates it
292292+func (m *Mempool) Load() error {
293293+ data, err := os.ReadFile(m.file)
294294+ if err != nil {
295295+ return err
296296+ }
297297+298298+ m.mu.Lock()
299299+ defer m.mu.Unlock()
300300+301301+ // Parse JSONL
302302+ scanner := bufio.NewScanner(bytes.NewReader(data))
303303+ buf := make([]byte, 0, 64*1024)
304304+ scanner.Buffer(buf, 1024*1024)
305305+306306+ m.operations = make([]plcclient.PLCOperation, 0)
307307+308308+ for scanner.Scan() {
309309+ line := scanner.Bytes()
310310+ if len(line) == 0 {
311311+ continue
312312+ }
313313+314314+ var op plcclient.PLCOperation
315315+ if err := json.Unmarshal(line, &op); err != nil {
316316+ return fmt.Errorf("failed to parse mempool operation: %w", err)
317317+ }
318318+319319+ op.RawJSON = make([]byte, len(line))
320320+ copy(op.RawJSON, line)
321321+322322+ m.operations = append(m.operations, op)
323323+ }
324324+325325+ if err := scanner.Err(); err != nil {
326326+ return fmt.Errorf("scanner error: %w", err)
327327+ }
328328+329329+ // CRITICAL: Validate loaded data
330330+ if err := m.validateLocked(); err != nil {
331331+ return fmt.Errorf("loaded mempool failed validation: %w", err)
332332+ }
333333+334334+ if len(m.operations) > 0 {
335335+ m.logger.Printf("Loaded %d operations from mempool for bundle %06d", len(m.operations), m.targetBundle)
336336+ }
337337+338338+ return nil
339339+}
340340+341341+// GetFirstTime returns the created_at of the first operation
342342+func (m *Mempool) GetFirstTime() string {
343343+ m.mu.RLock()
344344+ defer m.mu.RUnlock()
345345+346346+ if len(m.operations) == 0 {
347347+ return ""
348348+ }
349349+350350+ return m.operations[0].CreatedAt.Format(time.RFC3339Nano)
351351+}
352352+353353+// GetLastTime returns the created_at of the last operation
354354+func (m *Mempool) GetLastTime() string {
355355+ m.mu.RLock()
356356+ defer m.mu.RUnlock()
357357+358358+ if len(m.operations) == 0 {
359359+ return ""
360360+ }
361361+362362+ return m.operations[len(m.operations)-1].CreatedAt.Format(time.RFC3339Nano)
363363+}
364364+365365+// GetTargetBundle returns the bundle number this mempool is for
366366+func (m *Mempool) GetTargetBundle() int {
367367+ return m.targetBundle
368368+}
369369+370370+// GetMinTimestamp returns the minimum timestamp for operations
371371+func (m *Mempool) GetMinTimestamp() time.Time {
372372+ return m.minTimestamp
373373+}
374374+375375+// Stats returns mempool statistics
376376+func (m *Mempool) Stats() map[string]interface{} {
377377+ m.mu.RLock()
378378+ defer m.mu.RUnlock()
379379+380380+ count := len(m.operations)
381381+382382+ stats := map[string]interface{}{
383383+ "count": count,
384384+ "can_create_bundle": count >= types.BUNDLE_SIZE,
385385+ "target_bundle": m.targetBundle,
386386+ "min_timestamp": m.minTimestamp,
387387+ "validated": m.validated,
388388+ }
389389+390390+ if count > 0 {
391391+ stats["first_time"] = m.operations[0].CreatedAt
392392+ stats["last_time"] = m.operations[len(m.operations)-1].CreatedAt
393393+394394+ // Calculate size and unique DIDs
395395+ totalSize := 0
396396+ didSet := make(map[string]bool)
397397+ for _, op := range m.operations {
398398+ totalSize += len(op.RawJSON)
399399+ didSet[op.DID] = true
400400+ }
401401+ stats["size_bytes"] = totalSize
402402+ stats["did_count"] = len(didSet)
403403+ }
404404+405405+ return stats
406406+}
407407+408408+// Delete removes the mempool file
409409+func (m *Mempool) Delete() error {
410410+ if err := os.Remove(m.file); err != nil && !os.IsNotExist(err) {
411411+ return fmt.Errorf("failed to delete mempool file: %w", err)
412412+ }
413413+ return nil
414414+}
415415+416416+// GetFilename returns the mempool filename
417417+func (m *Mempool) GetFilename() string {
418418+ return filepath.Base(m.file)
419419+}
+18
internal/types/types.go
···11+package types
22+33+// Logger is a simple logging interface used throughout plcbundle
44+type Logger interface {
55+ Printf(format string, v ...interface{})
66+ Println(v ...interface{})
77+}
88+99+const (
1010+ // BUNDLE_SIZE is the standard number of operations per bundle
1111+ BUNDLE_SIZE = 10000
1212+1313+ // INDEX_FILE is the default index filename
1414+ INDEX_FILE = "plc_bundles.json"
1515+1616+ // INDEX_VERSION is the current index format version
1717+ INDEX_VERSION = "1.0"
1818+)
-148
plcbundle.go
···11-package plcbundle
22-33-import (
44- "context"
55- "io"
66- "time"
77-88- "tangled.org/atscan.net/plcbundle/bundle"
99- "tangled.org/atscan.net/plcbundle/plcclient"
1010-)
1111-1212-// Re-export commonly used types for convenience
1313-type (
1414- Bundle = bundle.Bundle
1515- BundleMetadata = bundle.BundleMetadata
1616- Index = bundle.Index
1717- Manager = bundle.Manager
1818- Config = bundle.Config
1919- VerificationResult = bundle.VerificationResult
2020- ChainVerificationResult = bundle.ChainVerificationResult
2121- DirectoryScanResult = bundle.DirectoryScanResult
2222- Logger = bundle.Logger
2323-2424- PLCOperation = plcclient.PLCOperation
2525- PLCClient = plcclient.Client
2626- ExportOptions = plcclient.ExportOptions
2727-)
2828-2929-// Re-export constants
3030-const (
3131- BUNDLE_SIZE = bundle.BUNDLE_SIZE
3232- INDEX_FILE = bundle.INDEX_FILE
3333-)
3434-3535-// NewManager creates a new bundle manager (convenience wrapper)
3636-func NewManager(config *Config, plcClient *PLCClient) (*Manager, error) {
3737- return bundle.NewManager(config, plcClient)
3838-}
3939-4040-// NewPLCClient creates a new PLC client (convenience wrapper)
4141-func NewPLCClient(baseURL string, opts ...plcclient.ClientOption) *PLCClient {
4242- return plcclient.NewClient(baseURL, opts...)
4343-}
4444-4545-// DefaultConfig returns default configuration (convenience wrapper)
4646-func DefaultConfig(bundleDir string) *Config {
4747- return bundle.DefaultConfig(bundleDir)
4848-}
4949-5050-// NewIndex creates a new empty index (convenience wrapper)
5151-func NewIndex(origin string) *Index {
5252- return bundle.NewIndex(origin)
5353-}
5454-5555-// LoadIndex loads an index from a file (convenience wrapper)
5656-func LoadIndex(path string) (*Index, error) {
5757- return bundle.LoadIndex(path)
5858-}
5959-6060-// BundleManager provides a high-level API for bundle operations
6161-type BundleManager struct {
6262- mgr *Manager
6363-}
6464-6565-// New creates a new BundleManager with default settings
6666-func New(bundleDir string, plcURL string) (*BundleManager, error) {
6767- config := DefaultConfig(bundleDir)
6868- var plcClient *PLCClient
6969- if plcURL != "" {
7070- plcClient = NewPLCClient(plcURL)
7171- }
7272-7373- mgr, err := NewManager(config, plcClient)
7474- if err != nil {
7575- return nil, err
7676- }
7777-7878- return &BundleManager{mgr: mgr}, nil
7979-}
8080-8181-// Close closes the manager
8282-func (bm *BundleManager) Close() {
8383- bm.mgr.Close()
8484-}
8585-8686-// FetchNext fetches the next bundle from PLC
8787-func (bm *BundleManager) FetchNext(ctx context.Context) (*Bundle, error) {
8888- b, err := bm.mgr.FetchNextBundle(ctx, false)
8989- if err != nil {
9090- return nil, err
9191- }
9292- return b, bm.mgr.SaveBundle(ctx, b, false)
9393-}
9494-9595-// Load loads a bundle by number
9696-func (bm *BundleManager) Load(ctx context.Context, bundleNumber int) (*Bundle, error) {
9797- return bm.mgr.LoadBundle(ctx, bundleNumber)
9898-}
9999-100100-// Verify verifies a bundle
101101-func (bm *BundleManager) Verify(ctx context.Context, bundleNumber int) (*VerificationResult, error) {
102102- return bm.mgr.VerifyBundle(ctx, bundleNumber)
103103-}
104104-105105-// VerifyChain verifies the entire chain
106106-func (bm *BundleManager) VerifyChain(ctx context.Context) (*ChainVerificationResult, error) {
107107- return bm.mgr.VerifyChain(ctx)
108108-}
109109-110110-// GetIndex returns the index
111111-func (bm *BundleManager) GetIndex() *Index {
112112- return bm.mgr.GetIndex()
113113-}
114114-115115-// GetInfo returns manager info
116116-func (bm *BundleManager) GetInfo() map[string]interface{} {
117117- return bm.mgr.GetInfo()
118118-}
119119-120120-// Export exports operations from bundles
121121-func (bm *BundleManager) Export(ctx context.Context, afterTime time.Time, count int) ([]PLCOperation, error) {
122122- return bm.mgr.ExportOperations(ctx, afterTime, count)
123123-}
124124-125125-// Scan scans the directory and rebuilds the index
126126-func (bm *BundleManager) Scan() (*DirectoryScanResult, error) {
127127- return bm.mgr.ScanDirectory()
128128-}
129129-130130-// ScanBundle scans a single bundle file
131131-func (bm *BundleManager) ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) {
132132- return bm.mgr.ScanBundle(path, bundleNumber)
133133-}
134134-135135-// IsBundleIndexed checks if a bundle is in the index
136136-func (bm *BundleManager) IsBundleIndexed(bundleNumber int) bool {
137137- return bm.mgr.IsBundleIndexed(bundleNumber)
138138-}
139139-140140-// StreamRaw streams raw compressed bundle data
141141-func (bm *BundleManager) StreamRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
142142- return bm.mgr.StreamBundleRaw(ctx, bundleNumber)
143143-}
144144-145145-// StreamDecompressed streams decompressed bundle data
146146-func (bm *BundleManager) StreamDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
147147- return bm.mgr.StreamBundleDecompressed(ctx, bundleNumber)
148148-}