A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

update structure (1)

+644 -150
+9 -8
bundle/bundle_test.go
··· 6 6 "time" 7 7 8 8 "tangled.org/atscan.net/plcbundle/bundle" 9 - "tangled.org/atscan.net/plcbundle/plc" 9 + "tangled.org/atscan.net/plcbundle/internal/storage" 10 + "tangled.org/atscan.net/plcbundle/plcclient" 10 11 ) 11 12 12 13 // TestIndex tests index operations ··· 230 231 } 231 232 232 233 // Try to add operation before last one (should fail) 233 - oldOp := []plc.PLCOperation{ 234 + oldOp := []plcclient.PLCOperation{ 234 235 { 235 236 DID: "did:plc:old", 236 237 CID: "old123", ··· 311 312 tmpDir := t.TempDir() 312 313 logger := &testLogger{t: t} 313 314 314 - ops, err := bundle.NewOperations(logger) 315 + ops, err := storage.NewOperations(logger) 315 316 if err != nil { 316 317 t.Fatalf("NewOperations failed: %v", err) 317 318 } ··· 371 372 }) 372 373 373 374 t.Run("ExtractUniqueDIDs", func(t *testing.T) { 374 - operations := []plc.PLCOperation{ 375 + operations := []plcclient.PLCOperation{ 375 376 {DID: "did:plc:1"}, 376 377 {DID: "did:plc:2"}, 377 378 {DID: "did:plc:1"}, // duplicate ··· 386 387 387 388 t.Run("GetBoundaryCIDs", func(t *testing.T) { 388 389 baseTime := time.Now() 389 - operations := []plc.PLCOperation{ 390 + operations := []plcclient.PLCOperation{ 390 391 {CID: "cid1", CreatedAt: baseTime}, 391 392 {CID: "cid2", CreatedAt: baseTime.Add(time.Second)}, 392 393 {CID: "cid3", CreatedAt: baseTime.Add(2 * time.Second)}, ··· 406 407 407 408 // Helper functions 408 409 409 - func makeTestOperations(count int) []plc.PLCOperation { 410 - ops := make([]plc.PLCOperation, count) 410 + func makeTestOperations(count int) []plcclient.PLCOperation { 411 + ops := make([]plcclient.PLCOperation, count) 411 412 baseTime := time.Now().Add(-time.Hour) 412 413 413 414 for i := 0; i < count; i++ { 414 - ops[i] = plc.PLCOperation{ 415 + ops[i] = plcclient.PLCOperation{ 415 416 DID: "did:plc:test" + string(rune(i)), 416 417 CID: "bafytest" + string(rune(i)), 417 418 CreatedAt: baseTime.Add(time.Duration(i) * time.Second),
+2 -2
bundle/did_index.go
··· 12 12 "time" 13 13 14 14 "github.com/goccy/go-json" 15 - "tangled.org/atscan.net/plcbundle/plc" 15 + "tangled.org/atscan.net/plcbundle/plcclient" 16 16 ) 17 17 18 18 const ( ··· 478 478 479 479 // extractDIDIdentifier extracts the 24-char identifier from full DID 480 480 func extractDIDIdentifier(did string) (string, error) { 481 - if err := plc.ValidateDIDFormat(did); err != nil { 481 + if err := plcclient.ValidateDIDFormat(did); err != nil { 482 482 return "", err 483 483 } 484 484
+18 -18
bundle/did_resolver.go
··· 5 5 "fmt" 6 6 "sort" 7 7 8 - "tangled.org/atscan.net/plcbundle/plc" 8 + "tangled.org/atscan.net/plcbundle/plcclient" 9 9 ) 10 10 11 11 // GetDIDOperationsBundledOnly retrieves operations from bundles only (no mempool) 12 - func (m *Manager) GetDIDOperationsBundledOnly(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) { 13 - if err := plc.ValidateDIDFormat(did); err != nil { 12 + func (m *Manager) GetDIDOperationsBundledOnly(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, error) { 13 + if err := plcclient.ValidateDIDFormat(did); err != nil { 14 14 return nil, err 15 15 } 16 16 ··· 36 36 } 37 37 38 38 // GetDIDOperationsFromMempool retrieves operations for a DID from mempool only 39 - func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plc.PLCOperation, error) { 40 - if err := plc.ValidateDIDFormat(did); err != nil { 39 + func (m *Manager) GetDIDOperationsFromMempool(did string) ([]plcclient.PLCOperation, error) { 40 + if err := plcclient.ValidateDIDFormat(did); err != nil { 41 41 return nil, err 42 42 } 43 43 44 44 if m.mempool == nil { 45 - return []plc.PLCOperation{}, nil 45 + return []plcclient.PLCOperation{}, nil 46 46 } 47 47 48 48 // Get all mempool operations (max 10K) ··· 52 52 } 53 53 54 54 // Pre-allocate with reasonable capacity 55 - matchingOps := make([]plc.PLCOperation, 0, 16) 55 + matchingOps := make([]plcclient.PLCOperation, 0, 16) 56 56 57 57 // Linear scan (fast for 10K operations) 58 58 for _, op := range allMempoolOps { ··· 65 65 } 66 66 67 67 // GetDIDOperations retrieves all operations for a DID (bundles + mempool combined) 68 - func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) { 69 - if err := plc.ValidateDIDFormat(did); err != nil { 68 + func (m *Manager) GetDIDOperations(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, error) { 69 + if err := plcclient.ValidateDIDFormat(did); err != nil { 70 70 return nil, err 71 71 } 72 72 ··· 97 97 } 98 98 99 99 // getDIDOperationsIndexed uses index for fast lookup (PRIVATE - bundles only) 100 - func (m *Manager) getDIDOperationsIndexed(ctx context.Context, did string, verbose bool) ([]plc.PLCOperation, error) { 100 + func (m *Manager) getDIDOperationsIndexed(ctx context.Context, did string, verbose bool) ([]plcclient.PLCOperation, error) { 101 101 locations, err := m.didIndex.GetDIDLocations(did) 102 102 if err != nil { 103 103 return nil, err 104 104 } 105 105 106 106 if len(locations) == 0 { 107 - return []plc.PLCOperation{}, nil 107 + return []plcclient.PLCOperation{}, nil 108 108 } 109 109 110 110 // Filter nullified at index level (save loading those bundles!) ··· 131 131 } 132 132 133 133 // Load operations 134 - var allOps []plc.PLCOperation 134 + var allOps []plcclient.PLCOperation 135 135 for bundleNum, positions := range bundleMap { 136 136 bundle, err := m.LoadBundle(ctx, int(bundleNum)) 137 137 if err != nil { ··· 158 158 } 159 159 160 160 // getDIDOperationsScan falls back to full scan (slow) (PRIVATE - bundles only) 161 - func (m *Manager) getDIDOperationsScan(ctx context.Context, did string) ([]plc.PLCOperation, error) { 162 - var allOps []plc.PLCOperation 161 + func (m *Manager) getDIDOperationsScan(ctx context.Context, did string) ([]plcclient.PLCOperation, error) { 162 + var allOps []plcclient.PLCOperation 163 163 bundles := m.index.GetBundles() 164 164 165 165 for _, meta := range bundles { ··· 190 190 } 191 191 192 192 // GetLatestDIDOperation returns only the most recent non-nullified operation 193 - func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plc.PLCOperation, error) { 194 - if err := plc.ValidateDIDFormat(did); err != nil { 193 + func (m *Manager) GetLatestDIDOperation(ctx context.Context, did string) (*plcclient.PLCOperation, error) { 194 + if err := plcclient.ValidateDIDFormat(did); err != nil { 195 195 return nil, err 196 196 } 197 197 ··· 234 234 } 235 235 236 236 // getLatestDIDOperationIndexed uses index to find only the latest operation (PRIVATE) 237 - func (m *Manager) getLatestDIDOperationIndexed(ctx context.Context, did string) (*plc.PLCOperation, error) { 237 + func (m *Manager) getLatestDIDOperationIndexed(ctx context.Context, did string) (*plcclient.PLCOperation, error) { 238 238 // Get all locations from index 239 239 locations, err := m.didIndex.GetDIDLocations(did) 240 240 if err != nil { ··· 330 330 331 331 // GetDIDOperationsWithLocations returns operations along with their bundle/position info 332 332 func (m *Manager) GetDIDOperationsWithLocations(ctx context.Context, did string, verbose bool) ([]PLCOperationWithLocation, error) { 333 - if err := plc.ValidateDIDFormat(did); err != nil { 333 + if err := plcclient.ValidateDIDFormat(did); err != nil { 334 334 return nil, err 335 335 } 336 336
+10 -10
bundle/manager.go
··· 14 14 "sync" 15 15 "time" 16 16 17 - "tangled.org/atscan.net/plcbundle/plc" 17 + "tangled.org/atscan.net/plcbundle/plcclient" 18 18 ) 19 19 20 20 // defaultLogger is a simple logger implementation ··· 34 34 operations *Operations 35 35 index *Index 36 36 indexPath string 37 - plcClient *plc.Client 37 + plcClient *plcclient.Client 38 38 logger Logger 39 39 mempool *Mempool 40 40 didIndex *DIDIndexManager ··· 45 45 } 46 46 47 47 // NewManager creates a new bundle manager 48 - func NewManager(config *Config, plcClient *plc.Client) (*Manager, error) { 48 + func NewManager(config *Config, plcClient *plcclient.Client) (*Manager, error) { 49 49 if config == nil { 50 50 config = DefaultConfig("./plc_bundles") 51 51 } ··· 574 574 fetchNum+1, batchSize, m.mempool.Count()) 575 575 } 576 576 577 - batch, err := m.plcClient.Export(ctx, plc.ExportOptions{ 577 + batch, err := m.plcClient.Export(ctx, plcclient.ExportOptions{ 578 578 Count: batchSize, 579 579 After: currentAfter, 580 580 }) ··· 595 595 } 596 596 597 597 // Deduplicate 598 - uniqueOps := make([]plc.PLCOperation, 0) 598 + uniqueOps := make([]plcclient.PLCOperation, 0) 599 599 for _, op := range batch { 600 600 if !seenCIDs[op.CID] { 601 601 seenCIDs[op.CID] = true ··· 646 646 } 647 647 648 648 // GetMempoolOperations returns all operations currently in mempool 649 - func (m *Manager) GetMempoolOperations() ([]plc.PLCOperation, error) { 649 + func (m *Manager) GetMempoolOperations() ([]plcclient.PLCOperation, error) { 650 650 if m.mempool == nil { 651 651 return nil, fmt.Errorf("mempool not initialized") 652 652 } ··· 654 654 // Use Peek to get operations without removing them 655 655 count := m.mempool.Count() 656 656 if count == 0 { 657 - return []plc.PLCOperation{}, nil 657 + return []plcclient.PLCOperation{}, nil 658 658 } 659 659 660 660 return m.mempool.Peek(count), nil ··· 1078 1078 } 1079 1079 1080 1080 // ExportOperations exports operations from bundles 1081 - func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plc.PLCOperation, error) { 1081 + func (m *Manager) ExportOperations(ctx context.Context, afterTime time.Time, count int) ([]plcclient.PLCOperation, error) { 1082 1082 if count <= 0 { 1083 1083 count = 1000 1084 1084 } 1085 1085 1086 - var result []plc.PLCOperation 1086 + var result []plcclient.PLCOperation 1087 1087 seenCIDs := make(map[string]bool) 1088 1088 1089 1089 bundles := m.index.GetBundles() ··· 1344 1344 1345 1345 // LoadOperation loads a single operation from a bundle efficiently 1346 1346 // This is much faster than LoadBundle() when you only need one operation 1347 - func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plc.PLCOperation, error) { 1347 + func (m *Manager) LoadOperation(ctx context.Context, bundleNumber int, position int) (*plcclient.PLCOperation, error) { 1348 1348 1349 1349 // Validate bundle exists in index 1350 1350 _, err := m.index.GetBundle(bundleNumber)
+12 -13
bundle/mempool.go
··· 10 10 "time" 11 11 12 12 "github.com/goccy/go-json" 13 - 14 - "tangled.org/atscan.net/plcbundle/plc" 13 + "tangled.org/atscan.net/plcbundle/plcclient" 15 14 ) 16 15 17 16 const MEMPOOL_FILE_PREFIX = "plc_mempool_" ··· 19 18 // Mempool stores operations waiting to be bundled 20 19 // Operations must be strictly chronological 21 20 type Mempool struct { 22 - operations []plc.PLCOperation 21 + operations []plcclient.PLCOperation 23 22 targetBundle int // Which bundle number these operations are for 24 23 minTimestamp time.Time // Operations must be after this time 25 24 file string ··· 37 36 file: filepath.Join(bundleDir, filename), 38 37 targetBundle: targetBundle, 39 38 minTimestamp: minTimestamp, 40 - operations: make([]plc.PLCOperation, 0), 39 + operations: make([]plcclient.PLCOperation, 0), 41 40 logger: logger, 42 41 validated: false, 43 42 } ··· 54 53 } 55 54 56 55 // Add adds operations to the mempool with strict validation 57 - func (m *Mempool) Add(ops []plc.PLCOperation) (int, error) { 56 + func (m *Mempool) Add(ops []plcclient.PLCOperation) (int, error) { 58 57 m.mu.Lock() 59 58 defer m.mu.Unlock() 60 59 ··· 69 68 } 70 69 71 70 // Validate and add operations 72 - var newOps []plc.PLCOperation 71 + var newOps []plcclient.PLCOperation 73 72 var lastTime time.Time 74 73 75 74 // Start from last operation time if we have any ··· 170 169 } 171 170 172 171 // Take removes and returns up to n operations from the front 173 - func (m *Mempool) Take(n int) ([]plc.PLCOperation, error) { 172 + func (m *Mempool) Take(n int) ([]plcclient.PLCOperation, error) { 174 173 m.mu.Lock() 175 174 defer m.mu.Unlock() 176 175 ··· 183 182 n = len(m.operations) 184 183 } 185 184 186 - result := make([]plc.PLCOperation, n) 185 + result := make([]plcclient.PLCOperation, n) 187 186 copy(result, m.operations[:n]) 188 187 189 188 // Remove taken operations ··· 219 218 } 220 219 221 220 // Peek returns up to n operations without removing them 222 - func (m *Mempool) Peek(n int) []plc.PLCOperation { 221 + func (m *Mempool) Peek(n int) []plcclient.PLCOperation { 223 222 m.mu.RLock() 224 223 defer m.mu.RUnlock() 225 224 ··· 227 226 n = len(m.operations) 228 227 } 229 228 230 - result := make([]plc.PLCOperation, n) 229 + result := make([]plcclient.PLCOperation, n) 231 230 copy(result, m.operations[:n]) 232 231 233 232 return result ··· 237 236 func (m *Mempool) Clear() { 238 237 m.mu.Lock() 239 238 defer m.mu.Unlock() 240 - m.operations = make([]plc.PLCOperation, 0) 239 + m.operations = make([]plcclient.PLCOperation, 0) 241 240 m.validated = false 242 241 } 243 242 ··· 303 302 buf := make([]byte, 0, 64*1024) 304 303 scanner.Buffer(buf, 1024*1024) 305 304 306 - m.operations = make([]plc.PLCOperation, 0) 305 + m.operations = make([]plcclient.PLCOperation, 0) 307 306 308 307 for scanner.Scan() { 309 308 line := scanner.Bytes() ··· 311 310 continue 312 311 } 313 312 314 - var op plc.PLCOperation 313 + var op plcclient.PLCOperation 315 314 if err := json.Unmarshal(line, &op); err != nil { 316 315 return fmt.Errorf("failed to parse mempool operation: %w", err) 317 316 }
+15 -15
bundle/operations.go
··· 13 13 14 14 gozstd "github.com/DataDog/zstd" 15 15 "github.com/goccy/go-json" 16 - "tangled.org/atscan.net/plcbundle/plc" 16 + "tangled.org/atscan.net/plcbundle/plcclient" 17 17 ) 18 18 19 19 // Operations handles low-level bundle file operations ··· 35 35 36 36 // SerializeJSONL serializes operations to newline-delimited JSON 37 37 // This is the ONE method everyone should use for serialization 38 - func (op *Operations) SerializeJSONL(operations []plc.PLCOperation) []byte { 38 + func (op *Operations) SerializeJSONL(operations []plcclient.PLCOperation) []byte { 39 39 var buf bytes.Buffer 40 40 41 41 for _, operation := range operations { ··· 55 55 56 56 // ParseJSONL parses newline-delimited JSON into operations 57 57 // This is the ONE method everyone should use for deserialization 58 - func (op *Operations) ParseJSONL(data []byte) ([]plc.PLCOperation, error) { 59 - var operations []plc.PLCOperation 58 + func (op *Operations) ParseJSONL(data []byte) ([]plcclient.PLCOperation, error) { 59 + var operations []plcclient.PLCOperation 60 60 scanner := bufio.NewScanner(bytes.NewReader(data)) 61 61 buf := make([]byte, 0, 64*1024) 62 62 scanner.Buffer(buf, 1024*1024) ··· 67 67 continue 68 68 } 69 69 70 - var operation plc.PLCOperation 70 + var operation plcclient.PLCOperation 71 71 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 72 72 return nil, fmt.Errorf("failed to parse line: %w", err) 73 73 } ··· 85 85 // ======================================== 86 86 87 87 // LoadBundle loads a compressed bundle 88 - func (op *Operations) LoadBundle(path string) ([]plc.PLCOperation, error) { 88 + func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) { 89 89 compressed, err := os.ReadFile(path) 90 90 if err != nil { 91 91 return nil, fmt.Errorf("failed to read file: %w", err) ··· 101 101 102 102 // SaveBundle saves operations to disk (compressed) 103 103 // Returns: contentHash, compressedHash, contentSize, compressedSize, error 104 - func (op *Operations) SaveBundle(path string, operations []plc.PLCOperation) (string, string, int64, int64, error) { 104 + func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) { 105 105 jsonlData := op.SerializeJSONL(operations) 106 106 contentSize := int64(len(jsonlData)) 107 107 contentHash := op.Hash(jsonlData) ··· 132 132 133 133 // LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle 134 134 // This is much more efficient for single-operation lookups 135 - func (op *Operations) LoadOperationAtPosition(path string, position int) (*plc.PLCOperation, error) { 135 + func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) { 136 136 if position < 0 { 137 137 return nil, fmt.Errorf("invalid position: %d", position) 138 138 } ··· 158 158 if lineNum == position { 159 159 line := scanner.Bytes() 160 160 161 - var operation plc.PLCOperation 161 + var operation plcclient.PLCOperation 162 162 if err := json.UnmarshalNoEscape(line, &operation); err != nil { 163 163 return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 164 164 } ··· 303 303 } 304 304 305 305 // ExtractUniqueDIDs extracts unique DIDs from operations 306 - func (op *Operations) ExtractUniqueDIDs(operations []plc.PLCOperation) []string { 306 + func (op *Operations) ExtractUniqueDIDs(operations []plcclient.PLCOperation) []string { 307 307 didSet := make(map[string]bool) 308 308 for _, operation := range operations { 309 309 didSet[operation.DID] = true ··· 318 318 } 319 319 320 320 // GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation 321 - func (op *Operations) GetBoundaryCIDs(operations []plc.PLCOperation) (time.Time, map[string]bool) { 321 + func (op *Operations) GetBoundaryCIDs(operations []plcclient.PLCOperation) (time.Time, map[string]bool) { 322 322 if len(operations) == 0 { 323 323 return time.Time{}, nil 324 324 } ··· 341 341 } 342 342 343 343 // StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs 344 - func (op *Operations) StripBoundaryDuplicates(operations []plc.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plc.PLCOperation { 344 + func (op *Operations) StripBoundaryDuplicates(operations []plcclient.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plcclient.PLCOperation { 345 345 if len(operations) == 0 || len(prevBoundaryCIDs) == 0 { 346 346 return operations 347 347 } ··· 371 371 } 372 372 373 373 // CreateBundle creates a complete bundle structure from operations 374 - func (op *Operations) CreateBundle(bundleNumber int, operations []plc.PLCOperation, cursor string, parent string) *Bundle { 374 + func (op *Operations) CreateBundle(bundleNumber int, operations []plcclient.PLCOperation, cursor string, parent string) *Bundle { 375 375 if len(operations) != BUNDLE_SIZE { 376 376 op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), BUNDLE_SIZE) 377 377 } ··· 407 407 408 408 // CalculateBundleMetadata calculates complete metadata for a bundle 409 409 // This is the ONE method everyone should use for metadata calculation 410 - func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, parent string, cursor string) (*BundleMetadata, error) { 410 + func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plcclient.PLCOperation, parent string, cursor string) (*BundleMetadata, error) { 411 411 if len(operations) == 0 { 412 412 return nil, fmt.Errorf("bundle is empty") 413 413 } ··· 456 456 457 457 // CalculateBundleMetadataFast calculates metadata quickly without chain hash 458 458 // Used during parallel scanning - chain hash calculated later sequentially 459 - func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation, cursor string) (*BundleMetadata, error) { 459 + func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plcclient.PLCOperation, cursor string) (*BundleMetadata, error) { 460 460 if len(operations) == 0 { 461 461 return nil, fmt.Errorf("bundle is empty") 462 462 }
+7 -7
bundle/types.go
··· 5 5 "path/filepath" 6 6 "time" 7 7 8 - "tangled.org/atscan.net/plcbundle/plc" 8 + "tangled.org/atscan.net/plcbundle/plcclient" 9 9 ) 10 10 11 11 const ( ··· 15 15 16 16 // Bundle represents a PLC bundle 17 17 type Bundle struct { 18 - BundleNumber int `json:"bundle_number"` 19 - StartTime time.Time `json:"start_time"` 20 - EndTime time.Time `json:"end_time"` 21 - Operations []plc.PLCOperation `json:"-"` 22 - DIDCount int `json:"did_count"` 18 + BundleNumber int `json:"bundle_number"` 19 + StartTime time.Time `json:"start_time"` 20 + EndTime time.Time `json:"end_time"` 21 + Operations []plcclient.PLCOperation `json:"-"` 22 + DIDCount int `json:"did_count"` 23 23 24 24 Hash string `json:"hash"` // Chain hash (primary) 25 25 ContentHash string `json:"content_hash"` // Content hash ··· 216 216 217 217 // PLCOperationWithLocation contains an operation with its bundle/position metadata 218 218 type PLCOperationWithLocation struct { 219 - Operation plc.PLCOperation 219 + Operation plcclient.PLCOperation 220 220 Bundle int 221 221 Position int 222 222 }
+3 -3
cmd/plcbundle/detector.go
··· 16 16 "github.com/goccy/go-json" 17 17 18 18 "tangled.org/atscan.net/plcbundle/detector" 19 - "tangled.org/atscan.net/plcbundle/plc" 19 + "tangled.org/atscan.net/plcbundle/plcclient" 20 20 ) 21 21 22 22 type defaultLogger struct{} ··· 136 136 totalCount++ 137 137 totalBytes += int64(len(line)) 138 138 139 - var op plc.PLCOperation 139 + var op plcclient.PLCOperation 140 140 if err := json.Unmarshal(line, &op); err != nil { 141 141 continue 142 142 } ··· 544 544 } 545 545 546 546 // detectOperation runs all detectors on an operation and returns labels + confidence 547 - func detectOperation(ctx context.Context, detectors []detector.Detector, op plc.PLCOperation, minConfidence float64) ([]string, float64) { 547 + func detectOperation(ctx context.Context, detectors []detector.Detector, op plcclient.PLCOperation, minConfidence float64) ([]string, float64) { 548 548 // Parse Operation ONCE before running detectors 549 549 opData, err := op.GetOperationData() 550 550 if err != nil {
+6 -6
cmd/plcbundle/did_index.go
··· 10 10 11 11 "github.com/goccy/go-json" 12 12 "tangled.org/atscan.net/plcbundle/bundle" 13 - "tangled.org/atscan.net/plcbundle/plc" 13 + "tangled.org/atscan.net/plcbundle/plcclient" 14 14 ) 15 15 16 16 func cmdDIDIndex() { ··· 498 498 499 499 // ✨ STEP 0: Check mempool first (most recent data) 500 500 mempoolStart := time.Now() 501 - var latestOp *plc.PLCOperation 501 + var latestOp *plcclient.PLCOperation 502 502 foundInMempool := false 503 503 504 504 if mgr.GetMempool() != nil { ··· 520 520 fmt.Fprintf(os.Stderr, "Mempool check: %s (✓ found in mempool)\n", mempoolTime) 521 521 522 522 // Build document from mempool operation 523 - ops := []plc.PLCOperation{*latestOp} 524 - doc, err := plc.ResolveDIDDocument(did, ops) 523 + ops := []plcclient.PLCOperation{*latestOp} 524 + doc, err := plcclient.ResolveDIDDocument(did, ops) 525 525 if err != nil { 526 526 fmt.Fprintf(os.Stderr, "Build document failed: %v\n", err) 527 527 os.Exit(1) ··· 592 592 opTime, latestLoc.Bundle, latestLoc.Position) 593 593 594 594 // STEP 3: Build DID document 595 - ops := []plc.PLCOperation{*op} 596 - doc, err := plc.ResolveDIDDocument(did, ops) 595 + ops := []plcclient.PLCOperation{*op} 596 + doc, err := plcclient.ResolveDIDDocument(did, ops) 597 597 if err != nil { 598 598 fmt.Fprintf(os.Stderr, "Build document failed: %v\n", err) 599 599 os.Exit(1)
+5 -5
cmd/plcbundle/main.go
··· 19 19 "github.com/goccy/go-json" 20 20 21 21 "tangled.org/atscan.net/plcbundle/bundle" 22 - "tangled.org/atscan.net/plcbundle/plc" 22 + "tangled.org/atscan.net/plcbundle/plcclient" 23 23 ) 24 24 25 25 // Version information (injected at build time via ldflags or read from build info) ··· 149 149 150 150 config := bundle.DefaultConfig(dir) 151 151 152 - var client *plc.Client 152 + var client *plcclient.Client 153 153 if plcURL != "" { 154 - client = plc.NewClient(plcURL) 154 + client = plcclient.NewClient(plcURL) 155 155 } 156 156 157 157 mgr, err := bundle.NewManager(config, client) ··· 1289 1289 } 1290 1290 } 1291 1291 1292 - var client *plc.Client 1292 + var client *plcclient.Client 1293 1293 if plcURLForManager != "" { 1294 - client = plc.NewClient(plcURLForManager) 1294 + client = plcclient.NewClient(plcURLForManager) 1295 1295 } 1296 1296 1297 1297 fmt.Printf("Starting plcbundle HTTP server...\n")
+7 -7
cmd/plcbundle/server.go
··· 16 16 "github.com/gorilla/websocket" 17 17 18 18 "tangled.org/atscan.net/plcbundle/bundle" 19 - "tangled.org/atscan.net/plcbundle/plc" 19 + "tangled.org/atscan.net/plcbundle/plcclient" 20 20 ) 21 21 22 22 var upgrader = websocket.Upgrader{ ··· 692 692 return 693 693 } 694 694 695 - doc, err := plc.ResolveDIDDocument(did, []plc.PLCOperation{*op}) 695 + doc, err := plcclient.ResolveDIDDocument(did, []plcclient.PLCOperation{*op}) 696 696 if err != nil { 697 697 if strings.Contains(err.Error(), "deactivated") { 698 698 sendJSON(w, 410, map[string]string{"error": "DID has been deactivated"}) ··· 709 709 710 710 func handleDIDDataNative(mgr *bundle.Manager, did string) http.HandlerFunc { 711 711 return func(w http.ResponseWriter, r *http.Request) { 712 - if err := plc.ValidateDIDFormat(did); err != nil { 712 + if err := plcclient.ValidateDIDFormat(did); err != nil { 713 713 sendJSON(w, 400, map[string]string{"error": "Invalid DID format"}) 714 714 return 715 715 } ··· 725 725 return 726 726 } 727 727 728 - state, err := plc.BuildDIDState(did, operations) 728 + state, err := plcclient.BuildDIDState(did, operations) 729 729 if err != nil { 730 730 if strings.Contains(err.Error(), "deactivated") { 731 731 sendJSON(w, 410, map[string]string{"error": "DID has been deactivated"}) ··· 741 741 742 742 func handleDIDAuditLogNative(mgr *bundle.Manager, did string) http.HandlerFunc { 743 743 return func(w http.ResponseWriter, r *http.Request) { 744 - if err := plc.ValidateDIDFormat(did); err != nil { 744 + if err := plcclient.ValidateDIDFormat(did); err != nil { 745 745 sendJSON(w, 400, map[string]string{"error": "Invalid DID format"}) 746 746 return 747 747 } ··· 757 757 return 758 758 } 759 759 760 - auditLog := plc.FormatAuditLog(operations) 760 + auditLog := plcclient.FormatAuditLog(operations) 761 761 sendJSON(w, 200, auditLog) 762 762 } 763 763 } ··· 925 925 return nil 926 926 } 927 927 928 - func sendOperation(conn *websocket.Conn, op plc.PLCOperation) error { 928 + func sendOperation(conn *websocket.Conn, op plcclient.PLCOperation) error { 929 929 var data []byte 930 930 var err error 931 931
+6 -6
detector/builtin.go
··· 6 6 "regexp" 7 7 "strings" 8 8 9 - "tangled.org/atscan.net/plcbundle/plc" 9 + "tangled.org/atscan.net/plcbundle/plcclient" 10 10 ) 11 11 12 12 // NoOpDetector is an empty detector for speed testing ··· 22 22 } 23 23 func (d *NoOpDetector) Version() string { return "1.0.0" } 24 24 25 - func (d *NoOpDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 25 + func (d *NoOpDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 26 26 // Instant return - no work done 27 27 return nil, nil 28 28 } ··· 48 48 } 49 49 func (d *InvalidHandleDetector) Version() string { return "1.0.0" } 50 50 51 - func (d *InvalidHandleDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 51 + func (d *InvalidHandleDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 52 52 // Parse Operation field on-demand 53 53 operation, err := op.GetOperationMap() 54 54 if err != nil { ··· 212 212 } 213 213 func (d *AlsoKnownAsSpamDetector) Version() string { return "1.0.0" } 214 214 215 - func (d *AlsoKnownAsSpamDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 215 + func (d *AlsoKnownAsSpamDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 216 216 // Parse Operation field on-demand 217 217 operation, err := op.GetOperationMap() 218 218 if err != nil { ··· 313 313 } 314 314 func (d *SpamPDSDetector) Version() string { return "1.0.0" } 315 315 316 - func (d *SpamPDSDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 316 + func (d *SpamPDSDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 317 317 // Parse Operation field on-demand 318 318 operation, err := op.GetOperationMap() 319 319 if err != nil { ··· 416 416 } 417 417 func (d *ServiceAbuseDetector) Version() string { return "1.0.0" } 418 418 419 - func (d *ServiceAbuseDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 419 + func (d *ServiceAbuseDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 420 420 // Parse Operation field on-demand 421 421 operation, err := op.GetOperationMap() 422 422 if err != nil {
+2 -2
detector/detector.go
··· 5 5 "context" 6 6 "time" 7 7 8 - "tangled.org/atscan.net/plcbundle/plc" 8 + "tangled.org/atscan.net/plcbundle/plcclient" 9 9 ) 10 10 11 11 // Detector represents a spam detection algorithm ··· 17 17 Description() string 18 18 19 19 // Detect analyzes an operation and returns a match result 20 - Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) 20 + Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) 21 21 22 22 // Version returns the detector version 23 23 Version() string
+3 -3
detector/runner.go
··· 8 8 "time" 9 9 10 10 "tangled.org/atscan.net/plcbundle/bundle" 11 - "tangled.org/atscan.net/plcbundle/plc" 11 + "tangled.org/atscan.net/plcbundle/plcclient" 12 12 ) 13 13 14 14 // Runner executes detectors against operations ··· 82 82 func (r *Runner) runParallel(ctx context.Context, detector Detector, b *bundle.Bundle) []*Result { 83 83 type job struct { 84 84 pos int 85 - op plc.PLCOperation 85 + op plcclient.PLCOperation 86 86 } 87 87 88 88 jobs := make(chan job, len(b.Operations)) ··· 130 130 return results 131 131 } 132 132 133 - func (r *Runner) detectOne(ctx context.Context, detector Detector, bundleNum, pos int, op plc.PLCOperation) *Result { 133 + func (r *Runner) detectOne(ctx context.Context, detector Detector, bundleNum, pos int, op plcclient.PLCOperation) *Result { 134 134 // Create timeout context 135 135 detectCtx, cancel := context.WithTimeout(ctx, r.config.Timeout) 136 136 defer cancel()
+2 -3
detector/script.go
··· 13 13 "time" 14 14 15 15 "github.com/goccy/go-json" 16 - 17 - "tangled.org/atscan.net/plcbundle/plc" 16 + "tangled.org/atscan.net/plcbundle/plcclient" 18 17 ) 19 18 20 19 // ScriptDetector runs a JavaScript detector via Unix socket ··· 140 139 return fmt.Errorf("failed to connect to socket within timeout") 141 140 } 142 141 143 - func (d *ScriptDetector) Detect(ctx context.Context, op plc.PLCOperation) (*Match, error) { 142 + func (d *ScriptDetector) Detect(ctx context.Context, op plcclient.PLCOperation) (*Match, error) { 144 143 if d.conn == nil { 145 144 return nil, fmt.Errorf("not connected to server") 146 145 }
+12 -12
docs/library.md
··· 189 189 BundleNumber int // Sequential number (1, 2, 3...) 190 190 StartTime time.Time // First operation timestamp 191 191 EndTime time.Time // Last operation timestamp 192 - Operations []plc.PLCOperation // The 10,000 operations 192 + Operations []plcclient.PLCOperation // The 10,000 operations 193 193 DIDCount int // Unique DIDs in bundle 194 194 Hash string // Chain hash (includes history) 195 195 ContentHash string // This bundle's content hash ··· 1130 1130 "time" 1131 1131 1132 1132 "tangled.org/atscan.net/plcbundle/bundle" 1133 - "tangled.org/atscan.net/plcbundle/plc" 1133 + "tangled.org/atscan.net/plcbundle/plcclient" 1134 1134 plcbundle "tangled.org/atscan.net/plcbundle" 1135 1135 ) 1136 1136 ··· 1153 1153 } 1154 1154 1155 1155 // Custom PLC client with rate limiting 1156 - plcClient := plc.NewClient("https://plc.directory", 1157 - plc.WithRateLimit(60, time.Minute), // 60 req/min 1158 - plc.WithTimeout(30*time.Second), // 30s timeout 1159 - plc.WithLogger(&MyCustomLogger{}), // Custom logger 1156 + plcClient := plcclient.NewClient("https://plc.directory", 1157 + plcclient.WithRateLimit(60, time.Minute), // 60 req/min 1158 + plcclient.WithTimeout(30*time.Second), // 30s timeout 1159 + plcclient.WithLogger(&MyCustomLogger{}), // Custom logger 1160 1160 ) 1161 1161 1162 1162 // Create manager ··· 1523 1523 1524 1524 ```go 1525 1525 // Production: Be conservative 1526 - plcClient := plc.NewClient("https://plc.directory", 1527 - plc.WithRateLimit(60, time.Minute), // 60 req/min max 1528 - plc.WithTimeout(60*time.Second), 1526 + plcClient := plcclient.NewClient("https://plc.directory", 1527 + plcclient.WithRateLimit(60, time.Minute), // 60 req/min max 1528 + plcclient.WithTimeout(60*time.Second), 1529 1529 ) 1530 1530 1531 1531 // Development: Can be more aggressive (but respectful) 1532 - plcClient := plc.NewClient("https://plc.directory", 1533 - plc.WithRateLimit(90, time.Minute), 1534 - plc.WithTimeout(30*time.Second), 1532 + plcClient := plcclient.NewClient("https://plc.directory", 1533 + plcclient.WithRateLimit(90, time.Minute), 1534 + plcclient.WithTimeout(30*time.Second), 1535 1535 ) 1536 1536 ``` 1537 1537
+1
internal/bundleindex/bundleindex.go
··· 1 + package bundleindex
+1
internal/didindex/didindex.go
··· 1 + package didindex
+1
internal/mempool/mempool.go
··· 1 + package mempool
+1
internal/resolver/resolver.go
··· 1 + package resolver
+491
internal/storage/storage.go
··· 1 + package storage 2 + 3 + import ( 4 + "bufio" 5 + "bytes" 6 + "crypto/sha256" 7 + "encoding/hex" 8 + "fmt" 9 + "io" 10 + "os" 11 + "sync" 12 + "time" 13 + 14 + gozstd "github.com/DataDog/zstd" 15 + "github.com/goccy/go-json" 16 + 17 + "tangled.org/atscan.net/plcbundle/bundle" 18 + "tangled.org/atscan.net/plcbundle/plcclient" 19 + ) 20 + 21 + // Operations handles low-level bundle file operations 22 + type Operations struct { 23 + logger bundle.Logger 24 + } 25 + 26 + func NewOperations(logger bundle.Logger) (*Operations, error) { 27 + return &Operations{logger: logger}, nil 28 + } 29 + 30 + func (op *Operations) Close() { 31 + // Nothing to close 32 + } 33 + 34 + // ======================================== 35 + // CORE SERIALIZATION (JSONL) 36 + // ======================================== 37 + 38 + // SerializeJSONL serializes operations to newline-delimited JSON 39 + // This is the ONE method everyone should use for serialization 40 + func (op *Operations) SerializeJSONL(operations []plcclient.PLCOperation) []byte { 41 + var buf bytes.Buffer 42 + 43 + for _, operation := range operations { 44 + // Use RawJSON if available (preserves exact format) 45 + if len(operation.RawJSON) > 0 { 46 + buf.Write(operation.RawJSON) 47 + } else { 48 + // Fallback to marshaling 49 + data, _ := json.Marshal(operation) 50 + buf.Write(data) 51 + } 52 + buf.WriteByte('\n') 53 + } 54 + 55 + return buf.Bytes() 56 + } 57 + 58 + // ParseJSONL parses newline-delimited JSON into operations 59 + // This is the ONE method everyone should use for deserialization 60 + func (op *Operations) ParseJSONL(data []byte) ([]plcclient.PLCOperation, error) { 61 + var operations []plcclient.PLCOperation 62 + scanner := bufio.NewScanner(bytes.NewReader(data)) 63 + buf := make([]byte, 0, 64*1024) 64 + scanner.Buffer(buf, 1024*1024) 65 + 66 + for scanner.Scan() { 67 + line := scanner.Bytes() 68 + if len(line) == 0 { 69 + continue 70 + } 71 + 72 + var operation plcclient.PLCOperation 73 + if err := json.UnmarshalNoEscape(line, &operation); err != nil { 74 + return nil, fmt.Errorf("failed to parse line: %w", err) 75 + } 76 + 77 + operation.RawJSON = make([]byte, len(line)) 78 + copy(operation.RawJSON, line) 79 + operations = append(operations, operation) 80 + } 81 + 82 + return operations, nil 83 + } 84 + 85 + // ======================================== 86 + // FILE OPERATIONS (uses JSONL + compression) 87 + // ======================================== 88 + 89 + // LoadBundle loads a compressed bundle 90 + func (op *Operations) LoadBundle(path string) ([]plcclient.PLCOperation, error) { 91 + compressed, err := os.ReadFile(path) 92 + if err != nil { 93 + return nil, fmt.Errorf("failed to read file: %w", err) 94 + } 95 + 96 + decompressed, err := gozstd.Decompress(nil, compressed) 97 + if err != nil { 98 + return nil, fmt.Errorf("failed to decompress: %w", err) 99 + } 100 + 101 + return op.ParseJSONL(decompressed) 102 + } 103 + 104 + // SaveBundle saves operations to disk (compressed) 105 + // Returns: contentHash, compressedHash, contentSize, compressedSize, error 106 + func (op *Operations) SaveBundle(path string, operations []plcclient.PLCOperation) (string, string, int64, int64, error) { 107 + jsonlData := op.SerializeJSONL(operations) 108 + contentSize := int64(len(jsonlData)) 109 + contentHash := op.Hash(jsonlData) 110 + 111 + // DataDog zstd.Compress returns ([]byte, error) 112 + compressed, err := gozstd.Compress(nil, jsonlData) 113 + if err != nil { 114 + return "", "", 0, 0, fmt.Errorf("failed to compress: %w", err) 115 + } 116 + 117 + compressedSize := int64(len(compressed)) 118 + compressedHash := op.Hash(compressed) 119 + 120 + if err := os.WriteFile(path, compressed, 0644); err != nil { 121 + return "", "", 0, 0, fmt.Errorf("failed to write file: %w", err) 122 + } 123 + 124 + return contentHash, compressedHash, contentSize, compressedSize, nil 125 + } 126 + 127 + // Pool for scanner buffers (reuse across requests) 128 + var scannerBufPool = sync.Pool{ 129 + New: func() interface{} { 130 + buf := make([]byte, 64*1024) 131 + return &buf 132 + }, 133 + } 134 + 135 + // LoadOperationAtPosition loads a single operation from a bundle without loading the entire bundle 136 + // This is much more efficient for single-operation lookups 137 + func (op *Operations) LoadOperationAtPosition(path string, position int) (*plcclient.PLCOperation, error) { 138 + if position < 0 { 139 + return nil, fmt.Errorf("invalid position: %d", position) 140 + } 141 + 142 + file, err := os.Open(path) 143 + if err != nil { 144 + return nil, fmt.Errorf("failed to open file: %w", err) 145 + } 146 + defer file.Close() 147 + 148 + reader := gozstd.NewReader(file) 149 + defer reader.Close() 150 + 151 + // ✨ Get buffer from pool 152 + bufPtr := scannerBufPool.Get().(*[]byte) 153 + defer scannerBufPool.Put(bufPtr) // Return to pool when done 154 + 155 + scanner := bufio.NewScanner(reader) 156 + scanner.Buffer(*bufPtr, 512*1024) 157 + 158 + lineNum := 0 159 + for scanner.Scan() { 160 + if lineNum == position { 161 + line := scanner.Bytes() 162 + 163 + var operation plcclient.PLCOperation 164 + if err := json.UnmarshalNoEscape(line, &operation); err != nil { 165 + return nil, fmt.Errorf("failed to parse operation at position %d: %w", position, err) 166 + } 167 + 168 + // Copy raw JSON 169 + operation.RawJSON = make([]byte, len(line)) 170 + copy(operation.RawJSON, line) 171 + 172 + return &operation, nil 173 + } 174 + lineNum++ 175 + } 176 + 177 + if err := scanner.Err(); err != nil { 178 + return nil, fmt.Errorf("scanner error: %w", err) 179 + } 180 + 181 + return nil, fmt.Errorf("position %d not found", position) 182 + } 183 + 184 + // ======================================== 185 + // STREAMING 186 + // ======================================== 187 + 188 + // StreamRaw returns a reader for the raw compressed bundle file 189 + func (op *Operations) StreamRaw(path string) (io.ReadCloser, error) { 190 + file, err := os.Open(path) 191 + if err != nil { 192 + return nil, fmt.Errorf("failed to open bundle: %w", err) 193 + } 194 + return file, nil 195 + } 196 + 197 + // StreamDecompressed returns a reader for decompressed bundle data 198 + func (op *Operations) StreamDecompressed(path string) (io.ReadCloser, error) { 199 + file, err := os.Open(path) 200 + if err != nil { 201 + return nil, fmt.Errorf("failed to open bundle: %w", err) 202 + } 203 + 204 + // Create zstd reader using DataDog's package 205 + reader := gozstd.NewReader(file) 206 + 207 + // Return a wrapper that closes both the reader and file 208 + return &decompressedReader{ 209 + reader: reader, 210 + file: file, 211 + }, nil 212 + } 213 + 214 + // decompressedReader wraps a zstd decoder and underlying file 215 + type decompressedReader struct { 216 + reader io.ReadCloser 217 + file *os.File 218 + } 219 + 220 + func (dr *decompressedReader) Read(p []byte) (int, error) { 221 + return dr.reader.Read(p) 222 + } 223 + 224 + func (dr *decompressedReader) Close() error { 225 + dr.reader.Close() 226 + return dr.file.Close() 227 + } 228 + 229 + // ======================================== 230 + // HASHING 231 + // ======================================== 232 + 233 + // Hash computes SHA256 hash of data 234 + func (op *Operations) Hash(data []byte) string { 235 + h := sha256.Sum256(data) 236 + return hex.EncodeToString(h[:]) 237 + } 238 + 239 + // CalculateChainHash calculates the cumulative chain hash 240 + func (op *Operations) CalculateChainHash(parent string, contentHash string) string { 241 + var data string 242 + if parent == "" { 243 + // Genesis bundle (first bundle) 244 + data = "plcbundle:genesis:" + contentHash 245 + } else { 246 + // Subsequent bundles - chain parent hash with current content 247 + data = parent + ":" + contentHash 248 + } 249 + return op.Hash([]byte(data)) 250 + } 251 + 252 + // CalculateFileHashes calculates both content and compressed hashes efficiently 253 + func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, contentHash string, contentSize int64, err error) { 254 + // Read compressed file 255 + compressedData, err := os.ReadFile(path) 256 + if err != nil { 257 + return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err) 258 + } 259 + 260 + // Calculate compressed hash 261 + compressedHash = op.Hash(compressedData) 262 + compressedSize = int64(len(compressedData)) 263 + 264 + // Decompress with DataDog zstd 265 + decompressed, err := gozstd.Decompress(nil, compressedData) 266 + if err != nil { 267 + return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 268 + } 269 + 270 + // Calculate content hash 271 + contentHash = op.Hash(decompressed) 272 + contentSize = int64(len(decompressed)) 273 + 274 + return compressedHash, compressedSize, contentHash, contentSize, nil 275 + } 276 + 277 + // VerifyHash verifies the hash of a bundle file 278 + func (op *Operations) VerifyHash(path string, expectedHash string) (bool, string, error) { 279 + data, err := os.ReadFile(path) 280 + if err != nil { 281 + return false, "", fmt.Errorf("failed to read file: %w", err) 282 + } 283 + 284 + actualHash := op.Hash(data) 285 + return actualHash == expectedHash, actualHash, nil 286 + } 287 + 288 + // ======================================== 289 + // UTILITY FUNCTIONS 290 + // ======================================== 291 + 292 + // FileExists checks if a file exists 293 + func (op *Operations) FileExists(path string) bool { 294 + _, err := os.Stat(path) 295 + return err == nil 296 + } 297 + 298 + // GetFileSize returns the size of a file 299 + func (op *Operations) GetFileSize(path string) (int64, error) { 300 + info, err := os.Stat(path) 301 + if err != nil { 302 + return 0, err 303 + } 304 + return info.Size(), nil 305 + } 306 + 307 + // ExtractUniqueDIDs extracts unique DIDs from operations 308 + func (op *Operations) ExtractUniqueDIDs(operations []plcclient.PLCOperation) []string { 309 + didSet := make(map[string]bool) 310 + for _, operation := range operations { 311 + didSet[operation.DID] = true 312 + } 313 + 314 + dids := make([]string, 0, len(didSet)) 315 + for did := range didSet { 316 + dids = append(dids, did) 317 + } 318 + 319 + return dids 320 + } 321 + 322 + // GetBoundaryCIDs returns CIDs that share the same timestamp as the last operation 323 + func (op *Operations) GetBoundaryCIDs(operations []plcclient.PLCOperation) (time.Time, map[string]bool) { 324 + if len(operations) == 0 { 325 + return time.Time{}, nil 326 + } 327 + 328 + lastOp := operations[len(operations)-1] 329 + boundaryTime := lastOp.CreatedAt 330 + cidSet := make(map[string]bool) 331 + 332 + // Walk backwards from the end 333 + for i := len(operations) - 1; i >= 0; i-- { 334 + op := operations[i] 335 + if op.CreatedAt.Equal(boundaryTime) { 336 + cidSet[op.CID] = true 337 + } else { 338 + break 339 + } 340 + } 341 + 342 + return boundaryTime, cidSet 343 + } 344 + 345 + // StripBoundaryDuplicates removes operations that are in prevBoundaryCIDs 346 + func (op *Operations) StripBoundaryDuplicates(operations []plcclient.PLCOperation, boundaryTimestamp string, prevBoundaryCIDs map[string]bool) []plcclient.PLCOperation { 347 + if len(operations) == 0 || len(prevBoundaryCIDs) == 0 { 348 + return operations 349 + } 350 + 351 + boundaryTime, err := time.Parse(time.RFC3339Nano, boundaryTimestamp) 352 + if err != nil { 353 + return operations 354 + } 355 + 356 + startIdx := 0 357 + for startIdx < len(operations) { 358 + op := operations[startIdx] 359 + 360 + if op.CreatedAt.After(boundaryTime) { 361 + break 362 + } 363 + 364 + if op.CreatedAt.Equal(boundaryTime) && prevBoundaryCIDs[op.CID] { 365 + startIdx++ 366 + continue 367 + } 368 + 369 + break 370 + } 371 + 372 + return operations[startIdx:] 373 + } 374 + 375 + // CreateBundle creates a complete bundle structure from operations 376 + func (op *Operations) CreateBundle(bundleNumber int, operations []plcclient.PLCOperation, cursor string, parent string) *bundle.Bundle { 377 + if len(operations) != bundle.BUNDLE_SIZE { 378 + op.logger.Printf("Warning: bundle has %d operations, expected %d", len(operations), bundle.BUNDLE_SIZE) 379 + } 380 + 381 + dids := op.ExtractUniqueDIDs(operations) 382 + _, boundaryCIDs := op.GetBoundaryCIDs(operations) 383 + 384 + // Convert boundary CIDs map to slice 385 + cidSlice := make([]string, 0, len(boundaryCIDs)) 386 + for cid := range boundaryCIDs { 387 + cidSlice = append(cidSlice, cid) 388 + } 389 + 390 + bundle := &bundle.Bundle{ 391 + BundleNumber: bundleNumber, 392 + StartTime: operations[0].CreatedAt, 393 + EndTime: operations[len(operations)-1].CreatedAt, 394 + Operations: operations, 395 + DIDCount: len(dids), 396 + Cursor: cursor, 397 + Parent: parent, 398 + BoundaryCIDs: cidSlice, 399 + Compressed: true, 400 + CreatedAt: time.Now().UTC(), 401 + } 402 + 403 + return bundle 404 + } 405 + 406 + // ======================================== 407 + // METADATA CALCULATION 408 + // ======================================== 409 + 410 + // CalculateBundleMetadata calculates complete metadata for a bundle 411 + // This is the ONE method everyone should use for metadata calculation 412 + func (op *Operations) CalculateBundleMetadata(bundleNumber int, path string, operations []plcclient.PLCOperation, parent string, cursor string) (*bundle.BundleMetadata, error) { 413 + if len(operations) == 0 { 414 + return nil, fmt.Errorf("bundle is empty") 415 + } 416 + 417 + // Get file info 418 + info, err := os.Stat(path) 419 + if err != nil { 420 + return nil, fmt.Errorf("failed to stat file: %w", err) 421 + } 422 + 423 + // Extract unique DIDs 424 + dids := op.ExtractUniqueDIDs(operations) 425 + 426 + // Serialize to JSONL and calculate content hash 427 + jsonlData := op.SerializeJSONL(operations) 428 + contentSize := int64(len(jsonlData)) 429 + contentHash := op.Hash(jsonlData) 430 + 431 + // Read compressed file and calculate compressed hash 432 + compressedData, err := os.ReadFile(path) 433 + if err != nil { 434 + return nil, fmt.Errorf("failed to read compressed file: %w", err) 435 + } 436 + compressedHash := op.Hash(compressedData) 437 + compressedSize := info.Size() 438 + 439 + // Calculate chain hash 440 + chainHash := op.CalculateChainHash(parent, contentHash) 441 + 442 + return &bundle.BundleMetadata{ 443 + BundleNumber: bundleNumber, 444 + StartTime: operations[0].CreatedAt, 445 + EndTime: operations[len(operations)-1].CreatedAt, 446 + OperationCount: len(operations), 447 + DIDCount: len(dids), 448 + Hash: chainHash, // Chain hash (primary) 449 + ContentHash: contentHash, // Content hash 450 + Parent: parent, // Parent chain hash 451 + CompressedHash: compressedHash, 452 + CompressedSize: compressedSize, 453 + UncompressedSize: contentSize, 454 + Cursor: cursor, 455 + CreatedAt: time.Now().UTC(), 456 + }, nil 457 + } 458 + 459 + // CalculateBundleMetadataFast calculates metadata quickly without chain hash 460 + // Used during parallel scanning - chain hash calculated later sequentially 461 + func (op *Operations) CalculateBundleMetadataFast(bundleNumber int, path string, operations []plcclient.PLCOperation, cursor string) (*bundle.BundleMetadata, error) { 462 + if len(operations) == 0 { 463 + return nil, fmt.Errorf("bundle is empty") 464 + } 465 + 466 + // Calculate hashes efficiently (read file once) 467 + compressedHash, compressedSize, contentHash, contentSize, err := op.CalculateFileHashes(path) 468 + if err != nil { 469 + return nil, err 470 + } 471 + 472 + // Extract unique DIDs 473 + dids := op.ExtractUniqueDIDs(operations) 474 + 475 + // Note: Hash, Parent, and Cursor are set to empty - will be calculated later sequentially 476 + return &bundle.BundleMetadata{ 477 + BundleNumber: bundleNumber, 478 + StartTime: operations[0].CreatedAt, 479 + EndTime: operations[len(operations)-1].CreatedAt, 480 + OperationCount: len(operations), 481 + DIDCount: len(dids), 482 + Hash: "", // Chain hash - calculated later 483 + ContentHash: contentHash, // Content hash 484 + Parent: "", // Parent - set later 485 + CompressedHash: compressedHash, 486 + CompressedSize: compressedSize, 487 + UncompressedSize: contentSize, 488 + Cursor: cursor, 489 + CreatedAt: time.Now().UTC(), 490 + }, nil 491 + }
+1 -1
plc/client.go plcclient/client.go
··· 1 - package plc 1 + package plcclient 2 2 3 3 import ( 4 4 "bufio"
+20 -20
plc/plc_test.go plcclient/plc_test.go
··· 1 - package plc_test 1 + package plcclient_test 2 2 3 3 import ( 4 4 "context" ··· 8 8 "testing" 9 9 "time" 10 10 11 - "tangled.org/atscan.net/plcbundle/bundle" 12 - "tangled.org/atscan.net/plcbundle/plc" 11 + "tangled.org/atscan.net/plcbundle/internal/storage" 12 + "tangled.org/atscan.net/plcbundle/plcclient" 13 13 ) 14 14 15 15 // TestPLCOperation tests operation parsing and methods ··· 29 29 30 30 for _, tt := range tests { 31 31 t.Run(tt.name, func(t *testing.T) { 32 - op := plc.PLCOperation{Nullified: tt.nullified} 32 + op := plcclient.PLCOperation{Nullified: tt.nullified} 33 33 if got := op.IsNullified(); got != tt.want { 34 34 t.Errorf("IsNullified() = %v, want %v", got, tt.want) 35 35 } ··· 38 38 }) 39 39 40 40 t.Run("GetNullifyingCID", func(t *testing.T) { 41 - op := plc.PLCOperation{Nullified: "bafytest123"} 41 + op := plcclient.PLCOperation{Nullified: "bafytest123"} 42 42 if cid := op.GetNullifyingCID(); cid != "bafytest123" { 43 43 t.Errorf("expected 'bafytest123', got '%s'", cid) 44 44 } 45 45 46 - op2 := plc.PLCOperation{Nullified: true} 46 + op2 := plcclient.PLCOperation{Nullified: true} 47 47 if cid := op2.GetNullifyingCID(); cid != "" { 48 48 t.Errorf("expected empty string, got '%s'", cid) 49 49 } ··· 58 58 "nullified": false 59 59 }` 60 60 61 - var op plc.PLCOperation 61 + var op plcclient.PLCOperation 62 62 if err := json.Unmarshal([]byte(jsonData), &op); err != nil { 63 63 t.Fatalf("failed to parse operation: %v", err) 64 64 } ··· 90 90 // Return mock JSONL data 91 91 w.Header().Set("Content-Type", "application/x-ndjson") 92 92 for i := 0; i < 10; i++ { 93 - op := plc.PLCOperation{ 93 + op := plcclient.PLCOperation{ 94 94 DID: "did:plc:test" + string(rune(i)), 95 95 CID: "bafytest" + string(rune(i)), 96 96 CreatedAt: time.Now(), ··· 102 102 defer server.Close() 103 103 104 104 // Create client 105 - client := plc.NewClient(server.URL) 105 + client := plcclient.NewClient(server.URL) 106 106 defer client.Close() 107 107 108 108 // Test export 109 109 ctx := context.Background() 110 - ops, err := client.Export(ctx, plc.ExportOptions{ 110 + ops, err := client.Export(ctx, plcclient.ExportOptions{ 111 111 Count: 100, 112 112 }) 113 113 if err != nil { ··· 136 136 } 137 137 // Success on second attempt 138 138 w.Header().Set("Content-Type", "application/x-ndjson") 139 - op := plc.PLCOperation{DID: "did:plc:test", CID: "bafytest", CreatedAt: time.Now()} 139 + op := plcclient.PLCOperation{DID: "did:plc:test", CID: "bafytest", CreatedAt: time.Now()} 140 140 json.NewEncoder(w).Encode(op) 141 141 })) 142 142 defer server.Close() 143 143 144 - client := plc.NewClient(server.URL) 144 + client := plcclient.NewClient(server.URL) 145 145 defer client.Close() 146 146 147 147 ctx := context.Background() 148 - ops, err := client.Export(ctx, plc.ExportOptions{Count: 1}) 148 + ops, err := client.Export(ctx, plcclient.ExportOptions{Count: 1}) 149 149 if err != nil { 150 150 t.Fatalf("Export failed after retry: %v", err) 151 151 } ··· 165 165 t.Errorf("unexpected path: %s", r.URL.Path) 166 166 } 167 167 168 - doc := plc.DIDDocument{ 168 + doc := plcclient.DIDDocument{ 169 169 Context: []string{"https://www.w3.org/ns/did/v1"}, 170 170 ID: "did:plc:test123", 171 171 } ··· 173 173 })) 174 174 defer server.Close() 175 175 176 - client := plc.NewClient(server.URL) 176 + client := plcclient.NewClient(server.URL) 177 177 defer client.Close() 178 178 179 179 ctx := context.Background() ··· 192 192 func TestRateLimiter(t *testing.T) { 193 193 t.Run("BasicRateLimit", func(t *testing.T) { 194 194 // 10 requests per second 195 - rl := plc.NewRateLimiter(10, time.Second) 195 + rl := plcclient.NewRateLimiter(10, time.Second) 196 196 defer rl.Stop() 197 197 198 198 ctx := context.Background() ··· 213 213 }) 214 214 215 215 t.Run("ContextCancellation", func(t *testing.T) { 216 - rl := plc.NewRateLimiter(1, time.Minute) // Very slow rate 216 + rl := plcclient.NewRateLimiter(1, time.Minute) // Very slow rate 217 217 defer rl.Stop() 218 218 219 219 // Consume the one available token ··· 233 233 234 234 // Benchmark tests 235 235 func BenchmarkSerializeJSONL(b *testing.B) { 236 - ops := make([]plc.PLCOperation, 10000) 236 + ops := make([]plcclient.PLCOperation, 10000) 237 237 for i := 0; i < 10000; i++ { 238 - ops[i] = plc.PLCOperation{ 238 + ops[i] = plcclient.PLCOperation{ 239 239 DID: "did:plc:test", 240 240 CID: "bafytest", 241 241 CreatedAt: time.Now(), ··· 244 244 } 245 245 246 246 logger := &benchLogger{} 247 - operations, _ := bundle.NewOperations(logger) 247 + operations, _ := storage.NewOperations(logger) 248 248 defer operations.Close() 249 249 250 250 b.ResetTimer()
+1 -1
plc/ratelimiter.go plcclient/ratelimiter.go
··· 1 - package plc 1 + package plcclient 2 2 3 3 import ( 4 4 "context"
+1 -1
plc/resolver.go plcclient/resolver.go
··· 1 - package plc 1 + package plcclient 2 2 3 3 import ( 4 4 "fmt"
+1 -1
plc/types.go plcclient/types.go
··· 1 - package plc 1 + package plcclient 2 2 3 3 import ( 4 4 "time"
+6 -6
plcbundle.go
··· 6 6 "time" 7 7 8 8 "tangled.org/atscan.net/plcbundle/bundle" 9 - "tangled.org/atscan.net/plcbundle/plc" 9 + "tangled.org/atscan.net/plcbundle/plcclient" 10 10 ) 11 11 12 12 // Re-export commonly used types for convenience ··· 21 21 DirectoryScanResult = bundle.DirectoryScanResult 22 22 Logger = bundle.Logger 23 23 24 - PLCOperation = plc.PLCOperation 25 - PLCClient = plc.Client 26 - ExportOptions = plc.ExportOptions 24 + PLCOperation = plcclient.PLCOperation 25 + PLCClient = plcclient.Client 26 + ExportOptions = plcclient.ExportOptions 27 27 ) 28 28 29 29 // Re-export constants ··· 38 38 } 39 39 40 40 // NewPLCClient creates a new PLC client (convenience wrapper) 41 - func NewPLCClient(baseURL string, opts ...plc.ClientOption) *PLCClient { 42 - return plc.NewClient(baseURL, opts...) 41 + func NewPLCClient(baseURL string, opts ...plcclient.ClientOption) *PLCClient { 42 + return plcclient.NewClient(baseURL, opts...) 43 43 } 44 44 45 45 // DefaultConfig returns default configuration (convenience wrapper)