tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
cmd export
tree.fail
4 months ago
cb210912
f5f4282b
+297
-317
4 changed files
expand all
collapse all
unified
split
cmd
plcbundle
commands
export.go
stream.go
main.go
server
helpers_test.go
+295
-66
cmd/plcbundle/commands/export.go
···
6
6
"os"
7
7
"time"
8
8
9
9
-
flag "github.com/spf13/pflag"
10
10
-
11
9
"github.com/goccy/go-json"
10
10
+
"github.com/spf13/cobra"
11
11
+
internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
12
12
)
13
13
14
14
-
// ExportCommand handles the export subcommand
15
15
-
func ExportCommand(args []string) error {
16
16
-
fs := flag.NewFlagSet("export", flag.ExitOnError)
17
17
-
bundles := fs.String("bundles", "", "bundle number or range (e.g., '42' or '1-100')")
18
18
-
all := fs.Bool("all", false, "export all bundles")
19
19
-
count := fs.Int("count", 0, "limit number of operations (0 = all)")
20
20
-
after := fs.String("after", "", "timestamp to start after (RFC3339)")
14
14
+
func NewExportCommand() *cobra.Command {
15
15
+
var (
16
16
+
all bool
17
17
+
rangeStr string
18
18
+
sync bool
19
19
+
plcURL string
20
20
+
count int
21
21
+
after string
22
22
+
)
21
23
22
22
-
if err := fs.Parse(args); err != nil {
23
23
-
return err
24
24
-
}
24
24
+
cmd := &cobra.Command{
25
25
+
Use: "export [flags]",
26
26
+
Aliases: []string{"stream", "backfill"},
27
27
+
Short: "Export operations to stdout (JSONL)",
28
28
+
Long: `Export operations to stdout in JSONL format
25
29
26
26
-
if !*all && *bundles == "" {
27
27
-
fmt.Fprint(os.Stderr, `usage: plcbundle export --bundles <number|range> [options]
28
28
-
or: plcbundle export --all [options]
30
30
+
Outputs PLC operations as newline-delimited JSON to stdout.
31
31
+
Progress and status messages go to stderr.
32
32
+
33
33
+
By default, exports only existing bundles. Use --sync to also fetch
34
34
+
new bundles from PLC directory until caught up.
35
35
+
36
36
+
Supports filtering by count and timestamp for selective exports.`,
29
37
30
30
-
Examples:
31
31
-
plcbundle export --bundles 42
32
32
-
plcbundle export --bundles 1-100
38
38
+
Example: ` # Export all existing bundles
33
39
plcbundle export --all
40
40
+
41
41
+
# Export and sync new bundles
42
42
+
plcbundle export --all --sync
43
43
+
44
44
+
# Export specific range (existing only)
45
45
+
plcbundle export --range 1-100
46
46
+
47
47
+
# Export with limit
34
48
plcbundle export --all --count 50000
35
35
-
plcbundle export --bundles 42 | jq .
49
49
+
50
50
+
# Export after timestamp
51
51
+
plcbundle export --all --after 2024-01-01T00:00:00Z
52
52
+
53
53
+
# Combine filters
54
54
+
plcbundle export --range 1-100 --count 10000 --after 2024-01-01T00:00:00Z
55
55
+
56
56
+
# Export from specific PLC directory
57
57
+
plcbundle export --all --sync --plc https://plc.directory
58
58
+
59
59
+
# Pipe to file
60
60
+
plcbundle export --all > operations.jsonl
61
61
+
62
62
+
# Process with jq
63
63
+
plcbundle export --all | jq -r .did | sort | uniq -c
64
64
+
65
65
+
# Sync and filter with detector
66
66
+
plcbundle export --all --sync | plcbundle detector filter spam
67
67
+
68
68
+
# Using aliases (backwards compatible)
69
69
+
plcbundle stream --all --sync
70
70
+
plcbundle backfill --all`,
71
71
+
72
72
+
RunE: func(cmd *cobra.Command, args []string) error {
73
73
+
verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose")
74
74
+
quiet, _ := cmd.Root().PersistentFlags().GetBool("quiet")
75
75
+
76
76
+
mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd, PLCURL: plcURL})
77
77
+
if err != nil {
78
78
+
return err
79
79
+
}
80
80
+
defer mgr.Close()
81
81
+
82
82
+
if !quiet {
83
83
+
fmt.Fprintf(os.Stderr, "Exporting from: %s\n", dir)
84
84
+
}
85
85
+
86
86
+
// Parse after timestamp if provided
87
87
+
var afterTime time.Time
88
88
+
if after != "" {
89
89
+
afterTime, err = time.Parse(time.RFC3339, after)
90
90
+
if err != nil {
91
91
+
return fmt.Errorf("invalid --after timestamp (use RFC3339 format): %w", err)
92
92
+
}
93
93
+
}
94
94
+
95
95
+
// Determine bundle range
96
96
+
var start, end int
97
97
+
98
98
+
if all {
99
99
+
index := mgr.GetIndex()
100
100
+
bundles := index.GetBundles()
101
101
+
102
102
+
if len(bundles) == 0 {
103
103
+
if sync {
104
104
+
// No bundles but sync enabled - start from 1
105
105
+
start = 1
106
106
+
end = 0 // Will be updated after sync
107
107
+
} else {
108
108
+
if !quiet {
109
109
+
fmt.Fprintf(os.Stderr, "No bundles available (use --sync to fetch)\n")
110
110
+
}
111
111
+
return nil
112
112
+
}
113
113
+
} else {
114
114
+
start = bundles[0].BundleNumber
115
115
+
end = bundles[len(bundles)-1].BundleNumber
116
116
+
}
117
117
+
118
118
+
} else if rangeStr != "" {
119
119
+
var err error
120
120
+
start, end, err = parseBundleRange(rangeStr)
121
121
+
if err != nil {
122
122
+
return err
123
123
+
}
124
124
+
125
125
+
} else {
126
126
+
return fmt.Errorf("either --all or --range required")
127
127
+
}
128
128
+
129
129
+
if !quiet {
130
130
+
if sync {
131
131
+
fmt.Fprintf(os.Stderr, "Mode: export existing + sync new bundles\n")
132
132
+
} else {
133
133
+
fmt.Fprintf(os.Stderr, "Mode: export existing only\n")
134
134
+
}
135
135
+
136
136
+
if count > 0 {
137
137
+
fmt.Fprintf(os.Stderr, "Limit: %d operations\n", count)
138
138
+
}
139
139
+
if after != "" {
140
140
+
fmt.Fprintf(os.Stderr, "After: %s\n", after)
141
141
+
}
142
142
+
fmt.Fprintf(os.Stderr, "\n")
143
143
+
}
144
144
+
145
145
+
return exportBundles(cmd.Context(), mgr, exportOptions{
146
146
+
start: start,
147
147
+
end: end,
148
148
+
sync: sync,
149
149
+
count: count,
150
150
+
afterTime: afterTime,
151
151
+
verbose: verbose,
152
152
+
quiet: quiet,
153
153
+
})
154
154
+
},
155
155
+
}
156
156
+
157
157
+
cmd.Flags().BoolVar(&all, "all", false, "Export all bundles")
158
158
+
cmd.Flags().StringVarP(&rangeStr, "range", "r", "", "Export bundle range (e.g., '1-100')")
159
159
+
cmd.Flags().BoolVar(&sync, "sync", false, "Also fetch new bundles from PLC (until caught up)")
160
160
+
cmd.Flags().StringVar(&plcURL, "plc", "https://plc.directory", "PLC directory URL (for --sync)")
161
161
+
cmd.Flags().IntVarP(&count, "count", "n", 0, "Limit number of operations (0 = unlimited)")
162
162
+
cmd.Flags().StringVar(&after, "after", "", "Only export operations after this timestamp (RFC3339)")
163
163
+
164
164
+
return cmd
165
165
+
}
166
166
+
167
167
+
type exportOptions struct {
168
168
+
start int
169
169
+
end int
170
170
+
sync bool
171
171
+
count int
172
172
+
afterTime time.Time
173
173
+
verbose bool
174
174
+
quiet bool
175
175
+
}
176
176
+
177
177
+
func exportBundles(ctx context.Context, mgr BundleManager, opts exportOptions) error {
178
178
+
operationCount := 0
179
179
+
exported := 0
36
180
37
37
-
`)
38
38
-
return fmt.Errorf("missing required flag: --bundles or --all")
181
181
+
// Phase 1: Export existing bundles
182
182
+
existingCount := 0
183
183
+
if opts.end > 0 {
184
184
+
existingCount, exported = exportExistingBundles(
185
185
+
ctx, mgr, opts.start, opts.end,
186
186
+
&operationCount, opts.count, opts.afterTime,
187
187
+
opts.verbose, opts.quiet,
188
188
+
)
39
189
}
40
190
41
41
-
mgr, _, err := getManager(&ManagerOptions{Cmd: nil})
42
42
-
if err != nil {
43
43
-
return err
191
191
+
// Check if we hit the count limit
192
192
+
if opts.count > 0 && exported >= opts.count {
193
193
+
if !opts.quiet {
194
194
+
fmt.Fprintf(os.Stderr, "\n✓ Export complete (limit reached)\n")
195
195
+
fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount)
196
196
+
fmt.Fprintf(os.Stderr, " Operations: %d\n", exported)
197
197
+
}
198
198
+
return nil
44
199
}
45
45
-
defer mgr.Close()
46
200
47
47
-
// Determine bundle range
48
48
-
var start, end int
49
49
-
if *all {
50
50
-
index := mgr.GetIndex()
51
51
-
bundleList := index.GetBundles()
52
52
-
if len(bundleList) == 0 {
53
53
-
return fmt.Errorf("no bundles available")
201
201
+
// Phase 2: Sync and export new bundles (if enabled and not at limit)
202
202
+
fetchedCount := 0
203
203
+
if opts.sync && (opts.count == 0 || exported < opts.count) {
204
204
+
if !opts.quiet {
205
205
+
fmt.Fprintf(os.Stderr, "\nSyncing new bundles from PLC...\n")
54
206
}
55
55
-
start = bundleList[0].BundleNumber
56
56
-
end = bundleList[len(bundleList)-1].BundleNumber
57
207
58
58
-
fmt.Fprintf(os.Stderr, "Exporting all bundles (%d-%d)\n", start, end)
59
59
-
} else {
208
208
+
logger := &exportLogger{quiet: opts.quiet}
209
209
+
config := &internalsync.SyncLoopConfig{
210
210
+
MaxBundles: 0,
211
211
+
Verbose: opts.verbose,
212
212
+
Logger: logger,
213
213
+
OnBundleSynced: func(bundleNum, synced, mempoolCount int, duration, indexTime time.Duration) {
214
214
+
// Stream the bundle we just created
215
215
+
if bundle, err := mgr.LoadBundle(ctx, bundleNum); err == nil {
216
216
+
for _, op := range bundle.Operations {
217
217
+
// Apply filters
218
218
+
if !opts.afterTime.IsZero() && op.CreatedAt.Before(opts.afterTime) {
219
219
+
continue
220
220
+
}
221
221
+
222
222
+
if opts.count > 0 && exported >= opts.count {
223
223
+
return // Stop when limit reached
224
224
+
}
225
225
+
226
226
+
// Output operation
227
227
+
if len(op.RawJSON) > 0 {
228
228
+
fmt.Println(string(op.RawJSON))
229
229
+
} else {
230
230
+
data, _ := json.Marshal(op)
231
231
+
fmt.Println(string(data))
232
232
+
}
233
233
+
exported++
234
234
+
operationCount++
235
235
+
}
236
236
+
}
237
237
+
},
238
238
+
}
239
239
+
60
240
var err error
61
61
-
start, end, err = parseBundleRange(*bundles)
241
241
+
fetchedCount, err = mgr.RunSyncOnce(ctx, config, opts.verbose)
62
242
if err != nil {
63
243
return err
64
244
}
65
65
-
fmt.Fprintf(os.Stderr, "Exporting bundles %d-%d\n", start, end)
66
245
}
67
246
68
68
-
if *count > 0 {
69
69
-
fmt.Fprintf(os.Stderr, "Limit: %d operations\n", *count)
70
70
-
}
71
71
-
if *after != "" {
72
72
-
fmt.Fprintf(os.Stderr, "After: %s\n", *after)
73
73
-
}
74
74
-
fmt.Fprintf(os.Stderr, "\n")
247
247
+
// Summary
248
248
+
if !opts.quiet {
249
249
+
fmt.Fprintf(os.Stderr, "\n✓ Export complete\n")
75
250
76
76
-
// Parse after time
77
77
-
var afterTime time.Time
78
78
-
if *after != "" {
79
79
-
afterTime, err = time.Parse(time.RFC3339, *after)
80
80
-
if err != nil {
81
81
-
return fmt.Errorf("invalid after time: %w", err)
251
251
+
if opts.sync && fetchedCount > 0 {
252
252
+
fmt.Fprintf(os.Stderr, " Bundles: %d (%d existing + %d synced)\n",
253
253
+
existingCount+fetchedCount, existingCount, fetchedCount)
254
254
+
} else if opts.sync {
255
255
+
fmt.Fprintf(os.Stderr, " Bundles: %d (already up to date)\n", existingCount)
256
256
+
} else {
257
257
+
fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount)
258
258
+
}
259
259
+
260
260
+
fmt.Fprintf(os.Stderr, " Operations: %d", exported)
261
261
+
if opts.count > 0 {
262
262
+
fmt.Fprintf(os.Stderr, " (limit: %d)", opts.count)
82
263
}
264
264
+
fmt.Fprintf(os.Stderr, "\n")
83
265
}
84
266
85
85
-
ctx := context.Background()
86
86
-
exported := 0
267
267
+
return nil
268
268
+
}
269
269
+
270
270
+
func exportExistingBundles(
271
271
+
ctx context.Context,
272
272
+
mgr BundleManager,
273
273
+
start, end int,
274
274
+
operationCount *int,
275
275
+
limit int,
276
276
+
afterTime time.Time,
277
277
+
verbose bool,
278
278
+
quiet bool,
279
279
+
) (bundleCount int, exported int) {
280
280
+
281
281
+
processedCount := 0
282
282
+
exportedOps := 0
87
283
88
88
-
// Export operations
89
284
for bundleNum := start; bundleNum <= end; bundleNum++ {
90
90
-
if *count > 0 && exported >= *count {
91
91
-
break
285
285
+
select {
286
286
+
case <-ctx.Done():
287
287
+
return processedCount, exportedOps
288
288
+
default:
92
289
}
93
93
-
94
94
-
fmt.Fprintf(os.Stderr, "Processing bundle %d...\r", bundleNum)
95
290
96
291
bundle, err := mgr.LoadBundle(ctx, bundleNum)
97
292
if err != nil {
98
98
-
fmt.Fprintf(os.Stderr, "\nWarning: failed to load bundle %d: %v\n", bundleNum, err)
293
293
+
if verbose {
294
294
+
fmt.Fprintf(os.Stderr, "Bundle %06d: not found (skipped)\n", bundleNum)
295
295
+
}
99
296
continue
100
297
}
101
298
299
299
+
// Export operations with filters
102
300
for _, op := range bundle.Operations {
301
301
+
// Filter by timestamp
103
302
if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
104
303
continue
105
304
}
106
305
107
107
-
if *count > 0 && exported >= *count {
108
108
-
break
306
306
+
// Check count limit
307
307
+
if limit > 0 && exportedOps >= limit {
308
308
+
if verbose {
309
309
+
fmt.Fprintf(os.Stderr, "Bundle %06d: limit reached, stopping\n", bundleNum)
310
310
+
}
311
311
+
return processedCount, exportedOps
109
312
}
110
313
111
111
-
// Output as JSONL
314
314
+
// Output operation to stdout (JSONL)
112
315
if len(op.RawJSON) > 0 {
113
316
fmt.Println(string(op.RawJSON))
114
317
} else {
115
318
data, _ := json.Marshal(op)
116
319
fmt.Println(string(data))
117
320
}
321
321
+
exportedOps++
322
322
+
}
118
323
119
119
-
exported++
324
324
+
*operationCount += len(bundle.Operations)
325
325
+
processedCount++
326
326
+
327
327
+
if verbose {
328
328
+
fmt.Fprintf(os.Stderr, "Bundle %06d: ✓ (%d ops, %d exported)\n",
329
329
+
bundleNum, len(bundle.Operations), exportedOps)
330
330
+
} else if !quiet && processedCount%100 == 0 {
331
331
+
fmt.Fprintf(os.Stderr, "Exported: %d bundles, %d ops\r", processedCount, exportedOps)
120
332
}
121
333
}
122
334
123
123
-
fmt.Fprintf(os.Stderr, "\n\n✓ Export complete\n")
124
124
-
fmt.Fprintf(os.Stderr, " Exported: %d operations\n", exported)
335
335
+
if !quiet && !verbose && processedCount > 0 {
336
336
+
fmt.Fprintf(os.Stderr, "Existing: %d bundles, %d ops\n", processedCount, exportedOps)
337
337
+
}
338
338
+
339
339
+
return processedCount, exportedOps
340
340
+
}
125
341
126
126
-
return nil
342
342
+
type exportLogger struct {
343
343
+
quiet bool
344
344
+
}
345
345
+
346
346
+
func (l *exportLogger) Printf(format string, v ...interface{}) {
347
347
+
if !l.quiet {
348
348
+
fmt.Fprintf(os.Stderr, format+"\n", v...)
349
349
+
}
350
350
+
}
351
351
+
352
352
+
func (l *exportLogger) Println(v ...interface{}) {
353
353
+
if !l.quiet {
354
354
+
fmt.Fprintln(os.Stderr, v...)
355
355
+
}
127
356
}
-246
cmd/plcbundle/commands/stream.go
···
1
1
-
package commands
2
2
-
3
3
-
import (
4
4
-
"context"
5
5
-
"fmt"
6
6
-
"os"
7
7
-
"time"
8
8
-
9
9
-
"github.com/goccy/go-json"
10
10
-
"github.com/spf13/cobra"
11
11
-
internalsync "tangled.org/atscan.net/plcbundle/internal/sync"
12
12
-
)
13
13
-
14
14
-
func NewStreamCommand() *cobra.Command {
15
15
-
var (
16
16
-
all bool
17
17
-
rangeStr string
18
18
-
sync bool
19
19
-
plcURL string
20
20
-
)
21
21
-
22
22
-
cmd := &cobra.Command{
23
23
-
Use: "stream [flags]",
24
24
-
Aliases: []string{"backfill"},
25
25
-
Short: "Stream operations to stdout (JSONL)",
26
26
-
Long: `Stream operations to stdout in JSONL format
27
27
-
28
28
-
Outputs PLC operations as newline-delimited JSON to stdout.
29
29
-
Progress and status messages go to stderr.
30
30
-
31
31
-
By default, streams only existing bundles. Use --sync to also fetch
32
32
-
new bundles from PLC directory until caught up.`,
33
33
-
34
34
-
Example: ` # Stream all existing bundles
35
35
-
plcbundle stream --all
36
36
-
37
37
-
# Stream and sync new bundles
38
38
-
plcbundle stream --all --sync
39
39
-
40
40
-
# Stream specific range (existing only)
41
41
-
plcbundle stream --range 1-100
42
42
-
43
43
-
# Stream from specific PLC directory
44
44
-
plcbundle stream --all --sync --plc https://plc.directory
45
45
-
46
46
-
# Pipe to file
47
47
-
plcbundle stream --all > operations.jsonl
48
48
-
49
49
-
# Process with jq
50
50
-
plcbundle stream --all | jq -r .did | sort | uniq -c
51
51
-
52
52
-
# Sync and filter with detector
53
53
-
plcbundle stream --all --sync | plcbundle detector filter spam`,
54
54
-
55
55
-
RunE: func(cmd *cobra.Command, args []string) error {
56
56
-
verbose, _ := cmd.Root().PersistentFlags().GetBool("verbose")
57
57
-
quiet, _ := cmd.Root().PersistentFlags().GetBool("quiet")
58
58
-
59
59
-
mgr, dir, err := getManager(&ManagerOptions{Cmd: cmd, PLCURL: plcURL})
60
60
-
if err != nil {
61
61
-
return err
62
62
-
}
63
63
-
defer mgr.Close()
64
64
-
65
65
-
if !quiet {
66
66
-
fmt.Fprintf(os.Stderr, "Streaming from: %s\n", dir)
67
67
-
}
68
68
-
69
69
-
// Determine bundle range
70
70
-
var start, end int
71
71
-
72
72
-
if all {
73
73
-
index := mgr.GetIndex()
74
74
-
bundles := index.GetBundles()
75
75
-
76
76
-
if len(bundles) == 0 {
77
77
-
if sync {
78
78
-
// No bundles but sync enabled - start from 1
79
79
-
start = 1
80
80
-
end = 0 // Will be updated after sync
81
81
-
} else {
82
82
-
if !quiet {
83
83
-
fmt.Fprintf(os.Stderr, "No bundles available (use --sync to fetch)\n")
84
84
-
}
85
85
-
return nil
86
86
-
}
87
87
-
} else {
88
88
-
start = bundles[0].BundleNumber
89
89
-
end = bundles[len(bundles)-1].BundleNumber
90
90
-
}
91
91
-
92
92
-
} else if rangeStr != "" {
93
93
-
var err error
94
94
-
start, end, err = parseBundleRange(rangeStr)
95
95
-
if err != nil {
96
96
-
return err
97
97
-
}
98
98
-
99
99
-
} else {
100
100
-
return fmt.Errorf("either --all or --range required")
101
101
-
}
102
102
-
103
103
-
if !quiet {
104
104
-
if sync {
105
105
-
fmt.Fprintf(os.Stderr, "Mode: stream existing + sync new bundles\n")
106
106
-
} else {
107
107
-
fmt.Fprintf(os.Stderr, "Mode: stream existing only\n")
108
108
-
}
109
109
-
fmt.Fprintf(os.Stderr, "\n")
110
110
-
}
111
111
-
112
112
-
return streamBundles(cmd.Context(), mgr, start, end, sync, verbose, quiet)
113
113
-
},
114
114
-
}
115
115
-
116
116
-
cmd.Flags().BoolVar(&all, "all", false, "Stream all bundles")
117
117
-
cmd.Flags().StringVarP(&rangeStr, "range", "r", "", "Stream bundle range (e.g., '1-100')")
118
118
-
cmd.Flags().BoolVar(&sync, "sync", false, "Also fetch new bundles from PLC (until caught up)")
119
119
-
cmd.Flags().StringVar(&plcURL, "plc", "https://plc.directory", "PLC directory URL (for --sync)")
120
120
-
121
121
-
return cmd
122
122
-
}
123
123
-
124
124
-
func streamBundles(ctx context.Context, mgr BundleManager, start, end int, doSync bool, verbose bool, quiet bool) error {
125
125
-
operationCount := 0
126
126
-
127
127
-
// Phase 1: Stream existing bundles
128
128
-
existingCount := 0
129
129
-
if end > 0 {
130
130
-
existingCount = streamExistingBundles(ctx, mgr, start, end, &operationCount, verbose, quiet)
131
131
-
}
132
132
-
133
133
-
// Phase 2: Sync and stream new bundles (if enabled)
134
134
-
fetchedCount := 0
135
135
-
if doSync {
136
136
-
if !quiet {
137
137
-
fmt.Fprintf(os.Stderr, "\nSyncing new bundles from PLC...\n")
138
138
-
}
139
139
-
140
140
-
logger := &streamLogger{quiet: quiet}
141
141
-
config := &internalsync.SyncLoopConfig{
142
142
-
MaxBundles: 0,
143
143
-
Verbose: verbose,
144
144
-
Logger: logger,
145
145
-
OnBundleSynced: func(bundleNum, synced, mempoolCount int, duration, indexTime time.Duration) {
146
146
-
// Stream the bundle we just created
147
147
-
if bundle, err := mgr.LoadBundle(ctx, bundleNum); err == nil {
148
148
-
for _, op := range bundle.Operations {
149
149
-
if len(op.RawJSON) > 0 {
150
150
-
fmt.Println(string(op.RawJSON))
151
151
-
} else {
152
152
-
data, _ := json.Marshal(op)
153
153
-
fmt.Println(string(data))
154
154
-
}
155
155
-
}
156
156
-
operationCount += len(bundle.Operations)
157
157
-
}
158
158
-
},
159
159
-
}
160
160
-
161
161
-
var err error
162
162
-
fetchedCount, err = mgr.RunSyncOnce(ctx, config, verbose)
163
163
-
if err != nil {
164
164
-
return err
165
165
-
}
166
166
-
}
167
167
-
168
168
-
// Summary
169
169
-
if !quiet {
170
170
-
fmt.Fprintf(os.Stderr, "\n✓ Stream complete\n")
171
171
-
172
172
-
if doSync && fetchedCount > 0 {
173
173
-
fmt.Fprintf(os.Stderr, " Bundles: %d (%d existing + %d synced)\n",
174
174
-
existingCount+fetchedCount, existingCount, fetchedCount)
175
175
-
} else if doSync {
176
176
-
fmt.Fprintf(os.Stderr, " Bundles: %d (already up to date)\n", existingCount)
177
177
-
} else {
178
178
-
fmt.Fprintf(os.Stderr, " Bundles: %d\n", existingCount)
179
179
-
}
180
180
-
181
181
-
fmt.Fprintf(os.Stderr, " Operations: %d\n", operationCount)
182
182
-
}
183
183
-
184
184
-
return nil
185
185
-
}
186
186
-
187
187
-
func streamExistingBundles(ctx context.Context, mgr BundleManager, start, end int, operationCount *int, verbose bool, quiet bool) int {
188
188
-
processedCount := 0
189
189
-
190
190
-
for bundleNum := start; bundleNum <= end; bundleNum++ {
191
191
-
select {
192
192
-
case <-ctx.Done():
193
193
-
return processedCount
194
194
-
default:
195
195
-
}
196
196
-
197
197
-
bundle, err := mgr.LoadBundle(ctx, bundleNum)
198
198
-
if err != nil {
199
199
-
if verbose {
200
200
-
fmt.Fprintf(os.Stderr, "Bundle %06d: not found (skipped)\n", bundleNum)
201
201
-
}
202
202
-
continue
203
203
-
}
204
204
-
205
205
-
// Output operations to stdout (JSONL)
206
206
-
for _, op := range bundle.Operations {
207
207
-
if len(op.RawJSON) > 0 {
208
208
-
fmt.Println(string(op.RawJSON))
209
209
-
} else {
210
210
-
data, _ := json.Marshal(op)
211
211
-
fmt.Println(string(data))
212
212
-
}
213
213
-
}
214
214
-
215
215
-
*operationCount += len(bundle.Operations)
216
216
-
processedCount++
217
217
-
218
218
-
if verbose {
219
219
-
fmt.Fprintf(os.Stderr, "Bundle %06d: ✓ (%d ops)\n", bundleNum, len(bundle.Operations))
220
220
-
} else if !quiet && processedCount%100 == 0 {
221
221
-
fmt.Fprintf(os.Stderr, "Streamed: %d bundles, %d ops\r", processedCount, *operationCount)
222
222
-
}
223
223
-
}
224
224
-
225
225
-
if !quiet && !verbose && processedCount > 0 {
226
226
-
fmt.Fprintf(os.Stderr, "Existing: %d bundles, %d ops\n", processedCount, *operationCount)
227
227
-
}
228
228
-
229
229
-
return processedCount
230
230
-
}
231
231
-
232
232
-
type streamLogger struct {
233
233
-
quiet bool
234
234
-
}
235
235
-
236
236
-
func (l *streamLogger) Printf(format string, v ...interface{}) {
237
237
-
if !l.quiet {
238
238
-
fmt.Fprintf(os.Stderr, format+"\n", v...)
239
239
-
}
240
240
-
}
241
241
-
242
242
-
func (l *streamLogger) Println(v ...interface{}) {
243
243
-
if !l.quiet {
244
244
-
fmt.Fprintln(os.Stderr, v...)
245
245
-
}
246
246
-
}
+2
-4
cmd/plcbundle/main.go
···
41
41
cmd.PersistentFlags().StringP("dir", "C", "", "Repository directory (default: current directory)")
42
42
cmd.PersistentFlags().BoolP("verbose", "v", false, "Show detailed output and progress")
43
43
cmd.PersistentFlags().BoolP("quiet", "q", false, "Suppress non-error output")
44
44
-
//cmd.PersistentFlags().Bool("json", false, "Output as JSON (where applicable)")
45
44
46
45
// Bundle operations (root level - most common)
47
46
cmd.AddCommand(commands.NewSyncCommand())
48
47
cmd.AddCommand(commands.NewCloneCommand())
49
49
-
//cmd.AddCommand(commands.NewExportCommand())
50
50
-
cmd.AddCommand(commands.NewStreamCommand())
48
48
+
cmd.AddCommand(commands.NewExportCommand())
51
49
cmd.AddCommand(commands.NewRollbackCommand())
52
50
53
51
// Status & info (root level)
···
150
148
server Start HTTP server
151
149
152
150
Command Groups:
153
153
-
Bundle: clone, sync, pull, export, stream, get, rollback
151
151
+
Bundle: clone, sync, pull, export, get, rollback
154
152
Status: status, log, gaps, verify, diff, stats, inspect
155
153
DID: did <lookup|resolve|history|batch|search|stats>
156
154
Index: index <build|repair|stats|verify>
-1
server/helpers_test.go
···
1
1
-
// repo/server/helpers_test.go
2
1
package server_test
3
2
4
3
import (