[DEPRECATED] Go implementation of plcbundle

cmd stream

+343 -252
-148
cmd/plcbundle/commands/backfill.go
··· 1 - package commands 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "os" 7 - 8 - flag "github.com/spf13/pflag" 9 - ) 10 - 11 - // BackfillCommand handles the backfill subcommand 12 - func BackfillCommand(args []string) error { 13 - fs := flag.NewFlagSet("backfill", flag.ExitOnError) 14 - plcURL := fs.String("plc", "https://plc.directory", "PLC directory URL") 15 - startFrom := fs.Int("start", 1, "bundle number to start from") 16 - endAt := fs.Int("end", 0, "bundle number to end at (0 = until caught up)") 17 - verbose := fs.Bool("verbose", false, "verbose sync logging") 18 - 19 - if err := fs.Parse(args); err != nil { 20 - return err 21 - } 22 - 23 - mgr, dir, err := getManager(*plcURL) 24 - if err != nil { 25 - return err 26 - } 27 - defer mgr.Close() 28 - 29 - fmt.Fprintf(os.Stderr, "Starting backfill from: %s\n", dir) 30 - fmt.Fprintf(os.Stderr, "Starting from bundle: %06d\n", *startFrom) 31 - if *endAt > 0 { 32 - fmt.Fprintf(os.Stderr, "Ending at bundle: %06d\n", *endAt) 33 - } else { 34 - fmt.Fprintf(os.Stderr, "Ending: when caught up\n") 35 - } 36 - fmt.Fprintf(os.Stderr, "\n") 37 - 38 - ctx := context.Background() 39 - 40 - currentBundle := *startFrom 41 - processedCount := 0 42 - fetchedCount := 0 43 - loadedCount := 0 44 - operationCount := 0 45 - 46 - for { 47 - if *endAt > 0 && currentBundle > *endAt { 48 - break 49 - } 50 - 51 - fmt.Fprintf(os.Stderr, "Processing bundle %06d... ", currentBundle) 52 - 53 - // Try to load from disk first 54 - bundle, err := mgr.LoadBundle(ctx, currentBundle) 55 - 56 - if err != nil { 57 - // Bundle doesn't exist, fetch it 58 - fmt.Fprintf(os.Stderr, "fetching... ") 59 - 60 - bundle, err = mgr.FetchNextBundle(ctx, !*verbose) 61 - if err != nil { 62 - if isEndOfDataError(err) { 63 - fmt.Fprintf(os.Stderr, "\n✓ Caught up! No more complete bundles available.\n") 64 - break 65 - } 66 - fmt.Fprintf(os.Stderr, "ERROR: %v\n", err) 67 - break 68 - } 69 - 70 - if _, err := mgr.SaveBundle(ctx, bundle, !*verbose); err != nil { 71 - return fmt.Errorf("error saving: %w", err) 72 - } 73 - 74 - fetchedCount++ 75 - fmt.Fprintf(os.Stderr, "saved... ") 76 - } else { 77 - loadedCount++ 78 - } 79 - 80 - // Output operations to stdout (JSONL) 81 - for _, op := range bundle.Operations { 82 - if len(op.RawJSON) > 0 { 83 - fmt.Println(string(op.RawJSON)) 84 - } 85 - } 86 - 87 - operationCount += len(bundle.Operations) 88 - processedCount++ 89 - 90 - fmt.Fprintf(os.Stderr, "✓ (%d ops, %d DIDs)\n", len(bundle.Operations), bundle.DIDCount) 91 - 92 - currentBundle++ 93 - 94 - // Progress summary every 100 bundles 95 - if processedCount%100 == 0 { 96 - fmt.Fprintf(os.Stderr, "\n--- Progress: %d bundles processed (%d fetched, %d loaded) ---\n", 97 - processedCount, fetchedCount, loadedCount) 98 - fmt.Fprintf(os.Stderr, " Total operations: %d\n\n", operationCount) 99 - } 100 - } 101 - 102 - // Final summary 103 - fmt.Fprintf(os.Stderr, "\n✓ Backfill complete\n") 104 - fmt.Fprintf(os.Stderr, " Bundles processed: %d\n", processedCount) 105 - fmt.Fprintf(os.Stderr, " Newly fetched: %d\n", fetchedCount) 106 - fmt.Fprintf(os.Stderr, " Loaded from disk: %d\n", loadedCount) 107 - fmt.Fprintf(os.Stderr, " Total operations: %d\n", operationCount) 108 - fmt.Fprintf(os.Stderr, " Range: %06d - %06d\n", *startFrom, currentBundle-1) 109 - 110 - return nil 111 - } 112 - 113 - // isEndOfDataError checks if error indicates end of available data 114 - func isEndOfDataError(err error) bool { 115 - if err == nil { 116 - return false 117 - } 118 - 119 - errMsg := err.Error() 120 - return containsAny(errMsg, 121 - "insufficient operations", 122 - "no more operations available", 123 - "reached latest data") 124 - } 125 - 126 - // Helper functions 127 - 128 - func containsAny(s string, substrs ...string) bool { 129 - for _, substr := range substrs { 130 - if contains(s, substr) { 131 - return true 132 - } 133 - } 134 - return false 135 - } 136 - 137 - func contains(s, substr string) bool { 138 - return len(s) >= len(substr) && indexOf(s, substr) >= 0 139 - } 140 - 141 - func indexOf(s, substr string) int { 142 - for i := 0; i <= len(s)-len(substr); i++ { 143 - if s[i:i+len(substr)] == substr { 144 - return i 145 - } 146 - } 147 - return -1 148 - }
+2
cmd/plcbundle/commands/common.go
··· 40 40 LoadOperation(ctx context.Context, bundleNum, position int) (*plcclient.PLCOperation, error) 41 41 CloneFromRemote(ctx context.Context, opts internalsync.CloneOptions) (*internalsync.CloneResult, error) 42 42 ResolveDID(ctx context.Context, did string) (*bundle.ResolveDIDResult, error) 43 + RunSyncOnce(ctx context.Context, config *internalsync.SyncLoopConfig, verbose bool) (int, error) 44 + RunSyncLoop(ctx context.Context, config *internalsync.SyncLoopConfig) error 43 45 } 44 46 45 47 // PLCOperationWithLocation wraps operation with location info
+253
cmd/plcbundle/commands/stream.go
··· 1 + // repo/cmd/plcbundle/commands/stream.go 2 + package commands 3 + 4 + import ( 5 + "context" 6 + "fmt" 7 + "os" 8 + "time" 9 + 10 + "github.com/goccy/go-json" 11 + "github.com/spf13/cobra" 12 + internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 13 + ) 14 + 15 + func NewStreamCommand() *cobra.Command { 16 + var ( 17 + all bool 18 + rangeStr string 19 + sync bool 20 + plcURL string 21 + ) 22 + 23 + cmd := &cobra.Command{ 24 + Use: "stream [flags]", 25 + Short: "Stream operations to stdout (JSONL)", 26 + Long: `Stream operations to stdout in JSONL format 27 + 28 + Outputs PLC operations as newline-delimited JSON to stdout. 29 + Progress and status messages go to stderr. 30 + 31 + By default, streams only existing bundles. Use --sync to also fetch 32 + new bundles from PLC directory until caught up.`, 33 + 34 + Example: ` # Stream all existing bundles 35 + plcbundle stream --all 36 + 37 + # Stream and sync new bundles 38 + plcbundle stream --all --sync 39 + 40 + # Stream specific range (existing only) 41 + plcbundle stream --range 1-100 42 + 43 + # Stream from specific PLC directory 44 + plcbundle stream --all --sync --plc https://plc.directory 45 + 46 + # Pipe to file 47 + plcbundle stream --all > operations.jsonl 48 + 49 + # Process with jq 50 + plcbundle stream --all | jq -r .did | sort | uniq -c 51 + 52 + # Sync and filter with detector 53 + plcbundle stream --all --sync | plcbundle detector filter spam`, 54 + 55 + RunE: func(cmd *cobra.Command, args []string) error { 56 + verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose") 57 + quiet, _ := cmd.Root().PersistentFlags().GetBool("quiet") 58 + 59 + // Setup PLC client only if syncing 60 + var effectivePLCURL string 61 + if sync { 62 + effectivePLCURL = plcURL 63 + } 64 + 65 + mgr, dir, err := getManagerFromCommand(cmd, effectivePLCURL) 66 + if err != nil { 67 + return err 68 + } 69 + defer mgr.Close() 70 + 71 + if !quiet { 72 + fmt.Fprintf(os.Stderr, "Streaming from: %s\n", dir) 73 + } 74 + 75 + // Determine bundle range 76 + var start, end int 77 + 78 + if all { 79 + index := mgr.GetIndex() 80 + bundles := index.GetBundles() 81 + 82 + if len(bundles) == 0 { 83 + if sync { 84 + // No bundles but sync enabled - start from 1 85 + start = 1 86 + end = 0 // Will be updated after sync 87 + } else { 88 + if !quiet { 89 + fmt.Fprintf(os.Stderr, "No bundles available (use --sync to fetch)\n") 90 + } 91 + return nil 92 + } 93 + } else { 94 + start = bundles[0].BundleNumber 95 + end = bundles[len(bundles)-1].BundleNumber 96 + } 97 + 98 + } else if rangeStr != "" { 99 + var err error 100 + start, end, err = parseBundleRange(rangeStr) 101 + if err != nil { 102 + return err 103 + } 104 + 105 + } else { 106 + return fmt.Errorf("either --all or --range required") 107 + } 108 + 109 + if !quiet { 110 + if sync { 111 + fmt.Fprintf(os.Stderr, "Mode: stream existing + sync new bundles\n") 112 + } else { 113 + fmt.Fprintf(os.Stderr, "Mode: stream existing only\n") 114 + } 115 + fmt.Fprintf(os.Stderr, "\n") 116 + } 117 + 118 + return streamBundles(cmd.Context(), mgr, start, end, sync, verbose, quiet) 119 + }, 120 + } 121 + 122 + cmd.Flags().BoolVar(&all, "all", false, "Stream all bundles") 123 + cmd.Flags().StringVarP(&rangeStr, "range", "r", "", "Stream bundle range (e.g., '1-100')") 124 + cmd.Flags().BoolVar(&sync, "sync", false, "Also fetch new bundles from PLC (until caught up)") 125 + cmd.Flags().StringVar(&plcURL, "plc", "https://plc.directory", "PLC directory URL (for --sync)") 126 + 127 + return cmd 128 + } 129 + 130 + func streamBundles(ctx context.Context, mgr BundleManager, start, end int, doSync bool, verbose bool, quiet bool) error { 131 + operationCount := 0 132 + 133 + // Phase 1: Stream existing bundles 134 + existingCount := 0 135 + if end > 0 { 136 + existingCount = streamExistingBundles(ctx, mgr, start, end, &operationCount, verbose, quiet) 137 + } 138 + 139 + // Phase 2: Sync and stream new bundles (if enabled) 140 + fetchedCount := 0 141 + if doSync { 142 + if !quiet { 143 + fmt.Fprintf(os.Stderr, "\nSyncing new bundles from PLC...\n") 144 + } 145 + 146 + logger := &streamLogger{quiet: quiet} 147 + config := &internalsync.SyncLoopConfig{ 148 + MaxBundles: 0, 149 + Verbose: verbose, 150 + Logger: logger, 151 + OnBundleSynced: func(bundleNum, synced, mempoolCount int, duration, indexTime time.Duration) { 152 + // Stream the bundle we just created 153 + if bundle, err := mgr.LoadBundle(ctx, bundleNum); err == nil { 154 + for _, op := range bundle.Operations { 155 + if len(op.RawJSON) > 0 { 156 + fmt.Println(string(op.RawJSON)) 157 + } else { 158 + data, _ := json.Marshal(op) 159 + fmt.Println(string(data)) 160 + } 161 + } 162 + operationCount += len(bundle.Operations) 163 + } 164 + }, 165 + } 166 + 167 + var err error 168 + fetchedCount, err = mgr.RunSyncOnce(ctx, config, verbose) 169 + if err != nil { 170 + return err 171 + } 172 + } 173 + 174 + // Summary 175 + if !quiet { 176 + fmt.Fprintf(os.Stderr, "\n✓ Stream complete\n") 177 + 178 + if doSync && fetchedCount > 0 { 179 + fmt.Fprintf(os.Stderr, " Bundles: %d (%d existing + %d synced)\n", 180 + existingCount+fetchedCount, existingCount, fetchedCount) 181 + } else if doSync { 182 + fmt.Fprintf(os.Stderr, " Bundles: %d (already up to date)\n", existingCount) 183 + } else { 184 + fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount) 185 + } 186 + 187 + fmt.Fprintf(os.Stderr, " Operations: %d\n", operationCount) 188 + } 189 + 190 + return nil 191 + } 192 + 193 + func streamExistingBundles(ctx context.Context, mgr BundleManager, start, end int, operationCount *int, verbose bool, quiet bool) int { 194 + processedCount := 0 195 + 196 + for bundleNum := start; bundleNum <= end; bundleNum++ { 197 + select { 198 + case <-ctx.Done(): 199 + return processedCount 200 + default: 201 + } 202 + 203 + bundle, err := mgr.LoadBundle(ctx, bundleNum) 204 + if err != nil { 205 + if verbose { 206 + fmt.Fprintf(os.Stderr, "Bundle %06d: not found (skipped)\n", bundleNum) 207 + } 208 + continue 209 + } 210 + 211 + // Output operations to stdout (JSONL) 212 + for _, op := range bundle.Operations { 213 + if len(op.RawJSON) > 0 { 214 + fmt.Println(string(op.RawJSON)) 215 + } else { 216 + data, _ := json.Marshal(op) 217 + fmt.Println(string(data)) 218 + } 219 + } 220 + 221 + *operationCount += len(bundle.Operations) 222 + processedCount++ 223 + 224 + if verbose { 225 + fmt.Fprintf(os.Stderr, "Bundle %06d: ✓ (%d ops)\n", bundleNum, len(bundle.Operations)) 226 + } else if !quiet && processedCount%100 == 0 { 227 + fmt.Fprintf(os.Stderr, "Streamed: %d bundles, %d ops\r", processedCount, *operationCount) 228 + } 229 + } 230 + 231 + if !quiet && !verbose && processedCount > 0 { 232 + fmt.Fprintf(os.Stderr, "Existing: %d bundles, %d ops\n", processedCount, *operationCount) 233 + } 234 + 235 + return processedCount 236 + } 237 + 238 + type streamLogger struct { 239 + quiet bool 240 + verbose bool 241 + } 242 + 243 + func (l *streamLogger) Printf(format string, v ...interface{}) { 244 + if !l.quiet { 245 + fmt.Fprintf(os.Stderr, format+"\n", v...) 246 + } 247 + } 248 + 249 + func (l *streamLogger) Println(v ...interface{}) { 250 + if !l.quiet { 251 + fmt.Fprintln(os.Stderr, v...) 252 + } 253 + }
+2 -2
cmd/plcbundle/main.go
··· 47 47 cmd.AddCommand(commands.NewSyncCommand()) 48 48 cmd.AddCommand(commands.NewCloneCommand()) 49 49 /*cmd.AddCommand(commands.NewPullCommand()) 50 - cmd.AddCommand(commands.NewExportCommand()) 50 + cmd.AddCommand(commands.NewExportCommand())*/ 51 51 cmd.AddCommand(commands.NewStreamCommand()) 52 - cmd.AddCommand(commands.NewGetCommand()) 52 + /*cmd.AddCommand(commands.NewGetCommand()) 53 53 cmd.AddCommand(commands.NewRollbackCommand())*/ 54 54 55 55 // Status & info (root level)
+86 -102
internal/sync/syncer.go
··· 3 3 4 4 import ( 5 5 "context" 6 - "fmt" 7 6 "time" 8 7 9 8 "tangled.org/atscan.net/plcbundle/internal/types" ··· 36 35 SaveMempool() error 37 36 } 38 37 38 + // SyncOnce performs a single sync cycle - fetches until caught up 39 + func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig, verbose bool) (int, error) { 40 + cycleStart := time.Now() 41 + startMempool := mgr.GetMempoolCount() 42 + 43 + fetchedCount := 0 44 + var totalIndexTime time.Duration 45 + 46 + // Keep fetching until caught up (detect by checking if state changes) 47 + for { 48 + // Track state before fetch 49 + bundleBefore := mgr.GetLastBundleNumber() 50 + mempoolBefore := mgr.GetMempoolCount() 51 + 52 + // Attempt to fetch and save next bundle 53 + bundleNum, indexTime, err := mgr.FetchAndSaveNextBundle(ctx, !verbose) 54 + 55 + // Check if we made any progress 56 + bundleAfter := mgr.GetLastBundleNumber() 57 + mempoolAfter := mgr.GetMempoolCount() 58 + 59 + madeProgress := bundleAfter > bundleBefore || mempoolAfter > mempoolBefore 60 + 61 + if err != nil { 62 + // If no progress and got error → caught up 63 + if !madeProgress { 64 + break 65 + } 66 + 67 + // We added to mempool but couldn't complete bundle yet 68 + // This is fine, just stop here 69 + break 70 + } 71 + 72 + // Success 73 + fetchedCount++ 74 + totalIndexTime += indexTime 75 + 76 + // Callback if provided 77 + if config.OnBundleSynced != nil { 78 + config.OnBundleSynced(bundleNum, fetchedCount, mempoolAfter, time.Since(cycleStart), totalIndexTime) 79 + } 80 + 81 + // Small delay between bundles 82 + time.Sleep(500 * time.Millisecond) 83 + 84 + // Check if we're still making progress 85 + if !madeProgress { 86 + break 87 + } 88 + } 89 + 90 + // Summary output 91 + if config.Logger != nil { 92 + mempoolAfter := mgr.GetMempoolCount() 93 + addedOps := mempoolAfter - startMempool 94 + duration := time.Since(cycleStart) 95 + currentBundle := mgr.GetLastBundleNumber() 96 + 97 + if fetchedCount > 0 { 98 + if totalIndexTime > 10*time.Millisecond { 99 + config.Logger.Printf("[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s (index: %s)", 100 + currentBundle, fetchedCount, mempoolAfter, addedOps, 101 + duration.Round(time.Millisecond), totalIndexTime.Round(time.Millisecond)) 102 + } else { 103 + config.Logger.Printf("[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s", 104 + currentBundle, fetchedCount, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 105 + } 106 + } else if addedOps > 0 { 107 + // No bundles but added to mempool 108 + config.Logger.Printf("[Sync] ✓ Bundle %06d | Mempool: %d (+%d) | %s", 109 + currentBundle, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 110 + } else { 111 + // Already up to date 112 + config.Logger.Printf("[Sync] ✓ Bundle %06d | Up to date | %s", 113 + currentBundle, duration.Round(time.Millisecond)) 114 + } 115 + } 116 + 117 + return fetchedCount, nil 118 + } 119 + 39 120 // RunSyncLoop performs continuous syncing 40 121 func RunSyncLoop(ctx context.Context, mgr SyncManager, config *SyncLoopConfig) error { 41 122 if config == nil { ··· 48 129 49 130 bundlesSynced := 0 50 131 51 - // Initial sync - always show detailed progress 52 - if config.Logger != nil { 132 + // Initial sync 133 + if config.Logger != nil && config.MaxBundles != 1 { 53 134 config.Logger.Printf("[Sync] Initial sync starting...") 54 135 } 55 136 56 - synced, err := SyncOnce(ctx, mgr, config, true) // Force verbose for initial 137 + synced, err := SyncOnce(ctx, mgr, config, config.Verbose) 57 138 if err != nil { 58 139 return err 59 140 } ··· 83 164 return ctx.Err() 84 165 85 166 case <-ticker.C: 167 + // Each tick, do one sync cycle (which fetches until caught up) 86 168 synced, err := SyncOnce(ctx, mgr, config, config.Verbose) 87 169 if err != nil { 88 170 if config.Logger != nil { ··· 103 185 } 104 186 } 105 187 } 106 - 107 - // SyncOnce performs a single sync cycle (exported for single-shot syncing) 108 - func SyncOnce(ctx context.Context, mgr SyncManager, config *SyncLoopConfig, verbose bool) (int, error) { 109 - cycleStart := time.Now() 110 - 111 - startBundle := mgr.GetLastBundleNumber() + 1 112 - mempoolBefore := mgr.GetMempoolCount() 113 - fetchedCount := 0 114 - var totalIndexTime time.Duration 115 - 116 - // Keep fetching until caught up 117 - for { 118 - // quiet = !verbose 119 - bundleNum, indexTime, err := mgr.FetchAndSaveNextBundle(ctx, !verbose) 120 - if err != nil { 121 - if isEndOfDataError(err) { 122 - break 123 - } 124 - return fetchedCount, fmt.Errorf("fetch failed: %w", err) 125 - } 126 - 127 - fetchedCount++ 128 - totalIndexTime += indexTime 129 - 130 - // Callback if provided 131 - if config.OnBundleSynced != nil { 132 - mempoolAfter := mgr.GetMempoolCount() 133 - config.OnBundleSynced(bundleNum, fetchedCount, mempoolAfter, time.Since(cycleStart), totalIndexTime) 134 - } 135 - 136 - time.Sleep(500 * time.Millisecond) 137 - } 138 - 139 - // Summary output 140 - if config.Logger != nil { 141 - mempoolAfter := mgr.GetMempoolCount() 142 - addedOps := mempoolAfter - mempoolBefore 143 - duration := time.Since(cycleStart) 144 - 145 - currentBundle := startBundle + fetchedCount - 1 146 - if fetchedCount == 0 { 147 - currentBundle = startBundle - 1 148 - } 149 - 150 - if fetchedCount > 0 { 151 - if totalIndexTime > 10*time.Millisecond { 152 - config.Logger.Printf("[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s (index: %s)", 153 - currentBundle, fetchedCount, mempoolAfter, addedOps, 154 - duration.Round(time.Millisecond), totalIndexTime.Round(time.Millisecond)) 155 - } else { 156 - config.Logger.Printf("[Sync] ✓ Bundle %06d | Synced: %d | Mempool: %d (+%d) | %s", 157 - currentBundle, fetchedCount, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 158 - } 159 - } else { 160 - config.Logger.Printf("[Sync] ✓ Bundle %06d | Up to date | Mempool: %d (+%d) | %s", 161 - currentBundle, mempoolAfter, addedOps, duration.Round(time.Millisecond)) 162 - } 163 - } 164 - 165 - return fetchedCount, nil 166 - } 167 - 168 - // isEndOfDataError checks if error indicates end of available data 169 - func isEndOfDataError(err error) bool { 170 - if err == nil { 171 - return false 172 - } 173 - 174 - errMsg := err.Error() 175 - return containsAny(errMsg, 176 - "insufficient operations", 177 - "no more operations available", 178 - "reached latest data", 179 - "caught up to latest") 180 - } 181 - 182 - // Helper functions 183 - func containsAny(s string, substrs ...string) bool { 184 - for _, substr := range substrs { 185 - if contains(s, substr) { 186 - return true 187 - } 188 - } 189 - return false 190 - } 191 - 192 - func contains(s, substr string) bool { 193 - return len(s) >= len(substr) && indexOf(s, substr) >= 0 194 - } 195 - 196 - func indexOf(s, substr string) int { 197 - for i := 0; i <= len(s)-len(substr); i++ { 198 - if s[i:i+len(substr)] == substr { 199 - return i 200 - } 201 - } 202 - return -1 203 - }