tangled
alpha
login
or
join now
stream.place
/
streamplace
77
fork
atom
Live video on the AT Protocol
77
fork
atom
overview
issues
1
pulls
pipelines
move to your favourite cli library
Natalie B.
1 month ago
7b48c1eb
97ba8748
+946
-389
6 changed files
expand all
collapse all
unified
split
go.mod
go.sum
pkg
cmd
combine.go
streamplace.go
config
config.go
media
segment_roundtrip_test.go
+1
go.mod
···
475
github.com/ultraware/funlen v0.2.0 // indirect
476
github.com/ultraware/whitespace v0.2.0 // indirect
477
github.com/urfave/cli/v2 v2.27.7 // indirect
0
478
github.com/uudashr/gocognit v1.2.0 // indirect
479
github.com/uudashr/iface v1.3.1 // indirect
480
github.com/valyala/bytebufferpool v1.0.0 // indirect
···
475
github.com/ultraware/funlen v0.2.0 // indirect
476
github.com/ultraware/whitespace v0.2.0 // indirect
477
github.com/urfave/cli/v2 v2.27.7 // indirect
478
+
github.com/urfave/cli/v3 v3.6.2 // indirect
479
github.com/uudashr/gocognit v1.2.0 // indirect
480
github.com/uudashr/iface v1.3.1 // indirect
481
github.com/valyala/bytebufferpool v1.0.0 // indirect
+2
go.sum
···
1389
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
1390
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
1391
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
0
0
1392
github.com/uudashr/gocognit v1.2.0 h1:3BU9aMr1xbhPlvJLSydKwdLN3tEUUrzPSSM8S4hDYRA=
1393
github.com/uudashr/gocognit v1.2.0/go.mod h1:k/DdKPI6XBZO1q7HgoV2juESI2/Ofj9AcHPZhBBdrTU=
1394
github.com/uudashr/iface v1.3.1 h1:bA51vmVx1UIhiIsQFSNq6GZ6VPTk3WNMZgRiCe9R29U=
···
1389
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
1390
github.com/urfave/cli/v2 v2.27.7 h1:bH59vdhbjLv3LAvIu6gd0usJHgoTTPhCFib8qqOwXYU=
1391
github.com/urfave/cli/v2 v2.27.7/go.mod h1:CyNAG/xg+iAOg0N4MPGZqVmv2rCoP267496AOXUZjA4=
1392
+
github.com/urfave/cli/v3 v3.6.2 h1:lQuqiPrZ1cIz8hz+HcrG0TNZFxU70dPZ3Yl+pSrH9A8=
1393
+
github.com/urfave/cli/v3 v3.6.2/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso=
1394
github.com/uudashr/gocognit v1.2.0 h1:3BU9aMr1xbhPlvJLSydKwdLN3tEUUrzPSSM8S4hDYRA=
1395
github.com/uudashr/gocognit v1.2.0/go.mod h1:k/DdKPI6XBZO1q7HgoV2juESI2/Ofj9AcHPZhBBdrTU=
1396
github.com/uudashr/iface v1.3.1 h1:bA51vmVx1UIhiIsQFSNq6GZ6VPTk3WNMZgRiCe9R29U=
+19
-10
pkg/cmd/combine.go
···
17
func Combine(ctx context.Context, build *config.BuildFlags, allArgs []string) error {
18
gstinit.InitGST()
19
cli := &config.CLI{Build: build}
20
-
fs := cli.NewFlagSet("streamplace combine")
21
-
debugDir := fs.String("debug-dir", "", "directory to write debug files to")
22
23
-
err := cli.Parse(fs, allArgs)
24
-
if err != nil {
25
-
return err
0
0
0
0
0
0
0
26
}
27
-
if *debugDir != "" {
28
-
err := os.MkdirAll(*debugDir, 0755)
0
29
if err != nil {
30
return fmt.Errorf("failed to create debug directory: %w", err)
31
}
32
}
33
-
log.Debug(context.Background(), "combine command: starting", "args", fs.Args())
34
ctx = log.WithDebugValue(ctx, cli.Debug)
35
cryptoSigner, err := createSigner(ctx, cli)
36
if err != nil {
···
40
if err != nil {
41
return err
42
}
43
-
args := fs.Args()
0
0
0
44
outFile := args[0]
45
inputs := args[1:]
46
log.Log(ctx, "combining segments", "outFile", outFile, "inputs", inputs)
···
62
if err != nil {
63
return err
64
}
65
-
err = CheckCombined(ctx, cli, outFd, *debugDir)
66
if err != nil {
67
return err
68
}
···
17
func Combine(ctx context.Context, build *config.BuildFlags, allArgs []string) error {
18
gstinit.InitGST()
19
cli := &config.CLI{Build: build}
0
0
20
21
+
var debugDir string
22
+
// Simple flag parsing for debug-dir
23
+
args := allArgs
24
+
for i, arg := range allArgs {
25
+
if arg == "--debug-dir" && i+1 < len(allArgs) {
26
+
debugDir = allArgs[i+1]
27
+
// Remove the flag from args
28
+
args = append(allArgs[:i], allArgs[i+2:]...)
29
+
break
30
+
}
31
}
32
+
33
+
if debugDir != "" {
34
+
err := os.MkdirAll(debugDir, 0755)
35
if err != nil {
36
return fmt.Errorf("failed to create debug directory: %w", err)
37
}
38
}
39
+
log.Debug(context.Background(), "combine command: starting", "args", args)
40
ctx = log.WithDebugValue(ctx, cli.Debug)
41
cryptoSigner, err := createSigner(ctx, cli)
42
if err != nil {
···
46
if err != nil {
47
return err
48
}
49
+
50
+
if len(args) < 2 {
51
+
return fmt.Errorf("usage: streamplace combine [--debug-dir dir] <output> <input1> [input2...]")
52
+
}
53
outFile := args[0]
54
inputs := args[1:]
55
log.Log(ctx, "combining segments", "outFile", outFile, "inputs", inputs)
···
71
if err != nil {
72
return err
73
}
74
+
err = CheckCombined(ctx, cli, outFd, debugDir)
75
if err != nil {
76
return err
77
}
+215
-141
pkg/cmd/streamplace.go
···
21
"github.com/bluesky-social/indigo/carstore"
22
"github.com/ethereum/go-ethereum/common/hexutil"
23
"github.com/livepeer/go-livepeer/cmd/livepeer/starter"
24
-
"github.com/peterbourgon/ff/v3"
25
"github.com/streamplace/oatproxy/pkg/oatproxy"
0
26
"stream.place/streamplace/pkg/aqhttp"
27
"stream.place/streamplace/pkg/atproto"
28
"stream.place/streamplace/pkg/bus"
···
54
// parse the CLI and fire up an streamplace node!
55
func start(build *config.BuildFlags, platformJobs []jobFunc) error {
56
iroh_streamplace.InitLogging()
57
-
selfTest := len(os.Args) > 1 && os.Args[1] == "self-test"
58
-
err := media.RunSelfTest(context.Background())
59
-
if err != nil {
60
-
if selfTest {
61
-
fmt.Println(err.Error())
62
-
os.Exit(1)
63
-
} else {
64
-
retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
65
-
if retryCount >= 3 {
66
-
log.Error(context.Background(), "gstreamer self-test failed 3 times, giving up", "error", err)
67
-
return err
68
-
}
69
-
log.Log(context.Background(), "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
70
-
os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
71
-
err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
72
-
if err != nil {
73
-
log.Error(context.Background(), "error in gstreamer self-test, could not restart", "error", err)
74
-
return err
75
-
}
76
-
panic("invalid code path: exec succeeded but we're still here???")
77
-
}
78
-
}
79
-
if selfTest {
80
-
runtime.GC()
81
-
if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
82
-
log.Error(context.Background(), "error creating pprof", "error", err)
83
-
}
84
-
fmt.Println("self-test successful!")
85
-
os.Exit(0)
86
-
}
87
88
-
if len(os.Args) > 1 && os.Args[1] == "stream" {
89
-
if len(os.Args) != 3 {
90
-
fmt.Println("usage: streamplace stream [user]")
91
-
os.Exit(1)
92
-
}
93
-
return Stream(os.Args[2])
94
-
}
95
-
96
-
if len(os.Args) > 1 && os.Args[1] == "live" {
97
-
cli := config.CLI{Build: build}
98
-
fs := cli.NewFlagSet("streamplace live")
99
-
100
-
err := cli.Parse(fs, os.Args[2:])
101
-
if err != nil {
102
-
return err
103
-
}
104
-
105
-
args := fs.Args()
106
-
if len(args) != 1 {
107
-
fmt.Println("usage: streamplace live [flags] [stream-key]")
108
-
os.Exit(1)
109
-
}
110
-
111
-
return Live(args[0], cli.HTTPInternalAddr)
112
-
}
113
-
114
-
if len(os.Args) > 1 && os.Args[1] == "sign" {
115
-
return Sign(context.Background())
116
-
}
117
-
118
-
if len(os.Args) > 1 && os.Args[1] == "whep" {
119
-
return WHEP(os.Args[2:])
120
-
}
121
-
if len(os.Args) > 1 && os.Args[1] == "whip" {
122
-
return WHIP(os.Args[2:])
123
}
124
-
125
-
if len(os.Args) > 1 && os.Args[1] == "combine" {
126
-
return Combine(context.Background(), build, os.Args[2:])
127
-
}
128
-
129
-
if len(os.Args) > 1 && os.Args[1] == "split" {
130
-
cli := config.CLI{Build: build}
131
-
fs := cli.NewFlagSet("streamplace split")
132
-
133
-
err := cli.Parse(fs, os.Args[2:])
134
if err != nil {
135
-
return err
136
-
}
137
-
ctx := context.Background()
138
-
ctx = log.WithDebugValue(ctx, cli.Debug)
139
-
if len(fs.Args()) != 2 {
140
-
fmt.Println("usage: streamplace split [flags] [input file] [output directory]")
141
-
os.Exit(1)
0
0
0
0
0
0
0
0
0
0
0
142
}
143
-
gstinit.InitGST()
144
-
return Split(ctx, fs.Args()[0], fs.Args()[1])
145
}
146
-
147
-
if len(os.Args) > 1 && os.Args[1] == "self-test" {
148
-
err := media.RunSelfTest(context.Background())
149
-
if err != nil {
150
-
fmt.Println(err.Error())
151
-
os.Exit(1)
152
-
}
153
-
fmt.Println("self-test successful!")
154
-
os.Exit(0)
155
}
156
157
-
if len(os.Args) > 1 && os.Args[1] == "livepeer" {
158
-
lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
159
-
_ = starter.NewLivepeerConfig(lpfs)
160
-
err = ff.Parse(lpfs, os.Args[2:],
161
-
ff.WithConfigFileFlag("config"),
162
-
ff.WithEnvVarPrefix("LP"),
163
-
)
164
-
if err != nil {
165
-
return err
166
-
}
167
-
err = GoLivepeer(context.Background(), lpfs)
168
-
if err != nil {
169
-
log.Error(context.Background(), "error in livepeer", "error", err)
170
-
os.Exit(1)
171
-
}
172
-
os.Exit(0)
173
-
}
174
0
175
_ = flag.Set("logtostderr", "true")
176
vFlag := flag.Lookup("v")
177
-
cli := config.CLI{Build: build}
178
-
fs := cli.NewFlagSet("streamplace")
179
-
verbosity := fs.String("v", "3", "log verbosity level")
180
-
version := fs.Bool("version", false, "print version and exit")
181
182
-
err = cli.Parse(
183
-
fs, os.Args[1:],
184
-
)
185
if err != nil {
186
return err
187
}
···
190
if err != nil {
191
return err
192
}
193
-
_ = vFlag.Value.Set(*verbosity)
0
194
log.SetColorLogger(cli.Color)
195
-
ctx := context.Background()
196
ctx = log.WithDebugValue(ctx, cli.Debug)
197
198
log.Log(ctx,
···
203
"runtime.GOOS", runtime.GOOS,
204
"runtime.GOARCH", runtime.GOARCH,
205
"runtime.Version", runtime.Version())
206
-
if *version {
207
-
return nil
208
-
}
209
-
signer, err := createSigner(ctx, &cli)
210
if err != nil {
211
return err
212
}
213
214
if len(os.Args) > 1 && os.Args[1] == "migrate" {
215
-
return statedb.Migrate(&cli)
216
}
217
218
spmetrics.Version.WithLabelValues(build.Version).Inc()
···
262
if err != nil {
263
return err
264
}
265
-
state, err := statedb.MakeDB(ctx, &cli, noter, mod)
266
if err != nil {
267
return err
268
}
269
-
handle, err := atproto.MakeLexiconRepo(ctx, &cli, mod, state)
270
if err != nil {
271
return err
272
}
···
286
287
b := bus.NewBus()
288
atsync := &atproto.ATProtoSynchronizer{
289
-
CLI: &cli,
290
Model: mod,
291
StatefulDB: state,
292
Noter: noter,
···
297
return fmt.Errorf("failed to migrate: %w", err)
298
}
299
300
-
mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync, ldb)
301
if err != nil {
302
return err
303
}
304
305
-
ms, err := media.MakeMediaSigner(ctx, &cli, cli.StreamerName, signer, mod)
306
if err != nil {
307
return err
308
}
···
365
return err
366
}
367
}
368
-
replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod)
369
if err != nil {
370
return err
371
}
···
387
Public: cli.PublicOAuth,
388
HTTPClient: &aqhttp.Client,
389
})
390
-
d := director.NewDirector(mm, mod, &cli, b, op, state, replicator, ldb)
391
-
a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb)
392
if err != nil {
393
return err
394
}
···
418
})
419
if cli.RTMPServerAddon != "" {
420
group.Go(func() error {
421
-
return rtmps.ServeRTMPSAddon(ctx, &cli)
422
})
423
}
424
group.Go(func() error {
425
-
return a.ServeRTMPS(ctx, &cli)
426
})
427
} else {
428
group.Go(func() error {
···
453
})
454
455
group.Go(func() error {
456
-
return storage.StartSegmentCleaner(ctx, ldb, &cli)
457
})
458
459
group.Go(func() error {
···
461
})
462
463
group.Go(func() error {
464
-
return replicator.Start(ctx, &cli)
465
})
466
467
if cli.LivepeerGateway {
···
475
return err
476
}
477
group.Go(func() error {
478
-
err := GoLivepeer(ctx, fs)
0
0
0
0
0
0
0
479
if err != nil {
480
return err
481
}
···
497
return err
498
}
499
did := atkey.DIDKey()
500
-
testMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did, signer, mod)
501
if err != nil {
502
return err
503
}
···
524
return err
525
}
526
did2 := atkey2.DIDKey()
527
-
intermittentMediaSigner, err := media.MakeMediaSigner(ctx, &cli, did2, signer, mod)
528
if err != nil {
529
return err
530
}
···
561
562
for _, job := range platformJobs {
563
group.Go(func() error {
564
-
return job(ctx, &cli)
565
})
566
}
567
···
599
}
600
}
601
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
21
"github.com/bluesky-social/indigo/carstore"
22
"github.com/ethereum/go-ethereum/common/hexutil"
23
"github.com/livepeer/go-livepeer/cmd/livepeer/starter"
0
24
"github.com/streamplace/oatproxy/pkg/oatproxy"
25
+
urfavecli "github.com/urfave/cli/v3"
26
"stream.place/streamplace/pkg/aqhttp"
27
"stream.place/streamplace/pkg/atproto"
28
"stream.place/streamplace/pkg/bus"
···
54
// parse the CLI and fire up an streamplace node!
55
func start(build *config.BuildFlags, platformJobs []jobFunc) error {
56
iroh_streamplace.InitLogging()
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
57
58
+
cli := config.CLI{Build: build}
59
+
app := cli.NewCommand("streamplace")
60
+
app.Usage = "decentralized live streaming platform"
61
+
app.Version = build.Version
62
+
app.Commands = []*urfavecli.Command{
63
+
makeSelfTestCommand(build),
64
+
makeStreamCommand(build),
65
+
makeLiveCommand(build),
66
+
makeSignCommand(build),
67
+
makeWhepCommand(build),
68
+
makeWhipCommand(build),
69
+
makeCombineCommand(build),
70
+
makeSplitCommand(build),
71
+
makeLivepeerCommand(build),
72
+
makeMigrateCommand(build),
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
73
}
74
+
// Add the verbosity flag
75
+
app.Flags = append(app.Flags, &urfavecli.StringFlag{
76
+
Name: "v",
77
+
Usage: "log verbosity level",
78
+
Value: "3",
79
+
})
80
+
app.Before = func(ctx context.Context, cmd *urfavecli.Command) (context.Context, error) {
81
+
// Run self-test before starting
82
+
selfTest := cmd.Name == "self-test"
83
+
err := media.RunSelfTest(ctx)
84
if err != nil {
85
+
if selfTest {
86
+
fmt.Println(err.Error())
87
+
os.Exit(1)
88
+
} else {
89
+
retryCount, _ := strconv.Atoi(os.Getenv("STREAMPLACE_SELFTEST_RETRY"))
90
+
if retryCount >= 3 {
91
+
log.Error(ctx, "gstreamer self-test failed 3 times, giving up", "error", err)
92
+
return ctx, err
93
+
}
94
+
log.Log(ctx, "error in gstreamer self-test, attempting recovery", "error", err, "retry", retryCount+1)
95
+
os.Setenv("STREAMPLACE_SELFTEST_RETRY", strconv.Itoa(retryCount+1))
96
+
err := syscall.Exec(os.Args[0], os.Args[1:], os.Environ())
97
+
if err != nil {
98
+
log.Error(ctx, "error in gstreamer self-test, could not restart", "error", err)
99
+
return ctx, err
100
+
}
101
+
panic("invalid code path: exec succeeded but we're still here???")
102
+
}
103
}
104
+
return ctx, nil
0
105
}
106
+
app.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
107
+
return runMain(ctx, build, platformJobs, cmd, &cli)
0
0
0
0
0
0
0
108
}
109
110
+
return app.Run(context.Background(), os.Args)
111
+
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
112
113
+
func runMain(ctx context.Context, build *config.BuildFlags, platformJobs []jobFunc, cmd *urfavecli.Command, cli *config.CLI) error {
114
_ = flag.Set("logtostderr", "true")
115
vFlag := flag.Lookup("v")
0
0
0
0
116
117
+
err := cli.Validate(cmd)
0
0
118
if err != nil {
119
return err
120
}
···
123
if err != nil {
124
return err
125
}
126
+
verbosity := cmd.String("v")
127
+
_ = vFlag.Value.Set(verbosity)
128
log.SetColorLogger(cli.Color)
0
129
ctx = log.WithDebugValue(ctx, cli.Debug)
130
131
log.Log(ctx,
···
136
"runtime.GOOS", runtime.GOOS,
137
"runtime.GOARCH", runtime.GOARCH,
138
"runtime.Version", runtime.Version())
139
+
140
+
signer, err := createSigner(ctx, cli)
0
0
141
if err != nil {
142
return err
143
}
144
145
if len(os.Args) > 1 && os.Args[1] == "migrate" {
146
+
return statedb.Migrate(cli)
147
}
148
149
spmetrics.Version.WithLabelValues(build.Version).Inc()
···
193
if err != nil {
194
return err
195
}
196
+
state, err := statedb.MakeDB(ctx, cli, noter, mod)
197
if err != nil {
198
return err
199
}
200
+
handle, err := atproto.MakeLexiconRepo(ctx, cli, mod, state)
201
if err != nil {
202
return err
203
}
···
217
218
b := bus.NewBus()
219
atsync := &atproto.ATProtoSynchronizer{
220
+
CLI: cli,
221
Model: mod,
222
StatefulDB: state,
223
Noter: noter,
···
228
return fmt.Errorf("failed to migrate: %w", err)
229
}
230
231
+
mm, err := media.MakeMediaManager(ctx, cli, signer, mod, b, atsync, ldb)
232
if err != nil {
233
return err
234
}
235
236
+
ms, err := media.MakeMediaSigner(ctx, cli, cli.StreamerName, signer, mod)
237
if err != nil {
238
return err
239
}
···
296
return err
297
}
298
}
299
+
replicator, err = iroh_replicator.NewSwarm(ctx, cli, secret, topic, mm, b, mod)
300
if err != nil {
301
return err
302
}
···
318
Public: cli.PublicOAuth,
319
HTTPClient: &aqhttp.Client,
320
})
321
+
d := director.NewDirector(mm, mod, cli, b, op, state, replicator, ldb)
322
+
a, err := api.MakeStreamplaceAPI(cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb)
323
if err != nil {
324
return err
325
}
···
349
})
350
if cli.RTMPServerAddon != "" {
351
group.Go(func() error {
352
+
return rtmps.ServeRTMPSAddon(ctx, cli)
353
})
354
}
355
group.Go(func() error {
356
+
return a.ServeRTMPS(ctx, cli)
357
})
358
} else {
359
group.Go(func() error {
···
384
})
385
386
group.Go(func() error {
387
+
return storage.StartSegmentCleaner(ctx, ldb, cli)
388
})
389
390
group.Go(func() error {
···
392
})
393
394
group.Go(func() error {
395
+
return replicator.Start(ctx, cli)
396
})
397
398
if cli.LivepeerGateway {
···
406
return err
407
}
408
group.Go(func() error {
409
+
lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
410
+
_ = starter.NewLivepeerConfig(lpfs)
411
+
// Parse livepeer flags from mainCmd
412
+
err := lpfs.Parse([]string{})
413
+
if err != nil {
414
+
return err
415
+
}
416
+
err = GoLivepeer(ctx, lpfs)
417
if err != nil {
418
return err
419
}
···
435
return err
436
}
437
did := atkey.DIDKey()
438
+
testMediaSigner, err := media.MakeMediaSigner(ctx, cli, did, signer, mod)
439
if err != nil {
440
return err
441
}
···
462
return err
463
}
464
did2 := atkey2.DIDKey()
465
+
intermittentMediaSigner, err := media.MakeMediaSigner(ctx, cli, did2, signer, mod)
466
if err != nil {
467
return err
468
}
···
499
500
for _, job := range platformJobs {
501
group.Go(func() error {
502
+
return job(ctx, cli)
503
})
504
}
505
···
537
}
538
}
539
}
540
+
541
+
func makeSelfTestCommand(build *config.BuildFlags) *urfavecli.Command {
542
+
return &urfavecli.Command{
543
+
Name: "self-test",
544
+
Usage: "run gstreamer self-test",
545
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
546
+
err := media.RunSelfTest(ctx)
547
+
if err != nil {
548
+
fmt.Println(err.Error())
549
+
os.Exit(1)
550
+
}
551
+
runtime.GC()
552
+
if err := pprof.Lookup("goroutine").WriteTo(os.Stderr, 2); err != nil {
553
+
log.Error(ctx, "error creating pprof", "error", err)
554
+
}
555
+
fmt.Println("self-test successful!")
556
+
return nil
557
+
},
558
+
}
559
+
}
560
+
561
+
func makeStreamCommand(build *config.BuildFlags) *urfavecli.Command {
562
+
return &urfavecli.Command{
563
+
Name: "stream",
564
+
Usage: "stream command",
565
+
ArgsUsage: "[user]",
566
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
567
+
args := cmd.Args()
568
+
if args.Len() != 1 {
569
+
return fmt.Errorf("usage: streamplace stream [user]")
570
+
}
571
+
return Stream(args.First())
572
+
},
573
+
}
574
+
}
575
+
576
+
func makeLiveCommand(build *config.BuildFlags) *urfavecli.Command {
577
+
cli := config.CLI{Build: build}
578
+
liveCmd := cli.NewCommand("live")
579
+
liveCmd.Usage = "start live stream"
580
+
liveCmd.ArgsUsage = "[stream-key]"
581
+
liveCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
582
+
args := cmd.Args()
583
+
if args.Len() != 1 {
584
+
return fmt.Errorf("usage: streamplace live [flags] [stream-key]")
585
+
}
586
+
return Live(args.First(), cli.HTTPInternalAddr)
587
+
}
588
+
return liveCmd
589
+
}
590
+
591
+
func makeSignCommand(build *config.BuildFlags) *urfavecli.Command {
592
+
return &urfavecli.Command{
593
+
Name: "sign",
594
+
Usage: "sign command",
595
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
596
+
return Sign(ctx)
597
+
},
598
+
}
599
+
}
600
+
601
+
func makeWhepCommand(build *config.BuildFlags) *urfavecli.Command {
602
+
return &urfavecli.Command{
603
+
Name: "whep",
604
+
Usage: "WHEP client",
605
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
606
+
return WHEP(cmd.Args().Slice())
607
+
},
608
+
}
609
+
}
610
+
611
+
func makeWhipCommand(build *config.BuildFlags) *urfavecli.Command {
612
+
return &urfavecli.Command{
613
+
Name: "whip",
614
+
Usage: "WHIP client",
615
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
616
+
return WHIP(cmd.Args().Slice())
617
+
},
618
+
}
619
+
}
620
+
621
+
func makeCombineCommand(build *config.BuildFlags) *urfavecli.Command {
622
+
return &urfavecli.Command{
623
+
Name: "combine",
624
+
Usage: "combine segments",
625
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
626
+
return Combine(ctx, build, cmd.Args().Slice())
627
+
},
628
+
}
629
+
}
630
+
631
+
func makeSplitCommand(build *config.BuildFlags) *urfavecli.Command {
632
+
cli := config.CLI{Build: build}
633
+
splitCmd := cli.NewCommand("split")
634
+
splitCmd.Usage = "split video file"
635
+
splitCmd.ArgsUsage = "[input file] [output directory]"
636
+
splitCmd.Action = func(ctx context.Context, cmd *urfavecli.Command) error {
637
+
args := cmd.Args()
638
+
if args.Len() != 2 {
639
+
return fmt.Errorf("usage: streamplace split [flags] [input file] [output directory]")
640
+
}
641
+
ctx = log.WithDebugValue(ctx, cli.Debug)
642
+
gstinit.InitGST()
643
+
return Split(ctx, args.Get(0), args.Get(1))
644
+
}
645
+
return splitCmd
646
+
}
647
+
648
+
func makeLivepeerCommand(build *config.BuildFlags) *urfavecli.Command {
649
+
return &urfavecli.Command{
650
+
Name: "livepeer",
651
+
Usage: "run livepeer gateway",
652
+
SkipFlagParsing: true,
653
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
654
+
args := cmd.Args().Slice()
655
+
lpfs := flag.NewFlagSet("livepeer", flag.ExitOnError)
656
+
_ = starter.NewLivepeerConfig(lpfs)
657
+
err := lpfs.Parse(args)
658
+
if err != nil {
659
+
return err
660
+
}
661
+
return GoLivepeer(ctx, lpfs)
662
+
},
663
+
}
664
+
}
665
+
666
+
func makeMigrateCommand(build *config.BuildFlags) *urfavecli.Command {
667
+
cli := config.CLI{Build: build}
668
+
return &urfavecli.Command{
669
+
Name: "migrate",
670
+
Usage: "run database migrations",
671
+
Action: func(ctx context.Context, cmd *urfavecli.Command) error {
672
+
return statedb.Migrate(&cli)
673
+
},
674
+
}
675
+
}
+704
-235
pkg/config/config.go
···
7
"encoding/json"
8
"encoding/pem"
9
"errors"
10
-
"flag"
11
"fmt"
12
"io"
13
"net"
14
"os"
15
"path/filepath"
16
"runtime"
0
17
"strconv"
18
"strings"
19
"time"
···
21
"math/rand/v2"
22
23
"github.com/lestrrat-go/jwx/v2/jwk"
24
-
"github.com/livepeer/go-livepeer/cmd/livepeer/starter"
25
"github.com/lmittmann/tint"
26
slogGorm "github.com/orandin/slog-gorm"
27
-
"github.com/peterbourgon/ff/v3"
28
"stream.place/streamplace/pkg/aqtime"
29
"stream.place/streamplace/pkg/constants"
30
"stream.place/streamplace/pkg/crypto/aqpub"
···
160
ReplicatorIroh string = "iroh"
161
)
162
163
-
func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
164
-
fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
165
-
fs.StringVar(&cli.DataDir, "data-dir", DefaultDataDir(), "directory for keeping all streamplace data")
166
-
fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
167
-
fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
168
-
fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
169
-
fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
170
-
cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
171
-
cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
172
-
fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
173
-
fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state")
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
174
cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
175
-
fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
176
-
fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "Base64-encoded JSON string of a firebase service account key")
177
-
fs.StringVar(&cli.FirebaseServiceAccountFile, "firebase-service-account-file", "", "Path to a JSON file containing a firebase service account key")
178
-
fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
179
-
cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
180
-
fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
181
-
fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
182
-
fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
183
-
fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
184
-
fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
185
-
fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
186
-
fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
187
-
fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
188
-
fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
189
-
fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
190
-
fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
191
-
fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
192
-
fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
193
-
fs.BoolVar(&cli.PublicOAuth, "dev-public-oauth", false, "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development")
194
-
fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
195
-
fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway")
196
-
fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
197
-
cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node")
198
-
cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to")
199
-
cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four")
200
-
cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
201
-
fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
202
-
fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
203
-
fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
204
-
fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
205
-
fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
206
-
fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
207
-
fs.StringVar(&cli.BroadcasterHost, "broadcaster-host", "", "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)")
208
-
fs.StringVar(&cli.XXDeprecatedPublicHost, "public-host", "", "deprecated, use broadcaster-host or server-host instead as appropriate")
209
-
fs.StringVar(&cli.ServerHost, "server-host", "", "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters")
210
-
fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
211
-
fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
212
-
213
-
fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
214
-
fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
215
-
fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
216
-
fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
217
-
fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
218
-
fs.StringVar(&cli.RTMPSAddonAddr, "rtmps-addon-addr", ":1936", "address to listen for RTMPS on the addon server")
219
-
fs.StringVar(&cli.RTMPSAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections (when --secure=true)")
220
-
fs.StringVar(&cli.RTMPAddr, "rtmp-addr", ":1935", "address to listen for RTMP connections (when --secure=false)")
221
-
cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
222
-
fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback")
223
-
fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking")
224
-
fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking")
225
-
cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to")
226
-
fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)")
227
-
cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules")
228
-
cli.StringSliceFlag(fs, &cli.DefaultRecommendedStreamers, "default-recommended-streamers", []string{}, "comma-separated list of streamer DIDs to recommend by default when no other recommendations are available")
229
-
fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit")
230
-
fs.StringVar(&cli.PLCURL, "plc-url", "https://plc.directory", "url of the plc directory")
231
-
fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
232
-
fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
233
-
fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
234
-
fs.StringVar(&cli.SegmentDebugDir, "segment-debug-dir", "", "directory to log segment validation to")
235
-
cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with")
236
-
fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)")
237
-
fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay")
238
-
cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth")
239
-
fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?")
240
-
cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorWebsocket}, "list of replication protocols to use (http, iroh)")
241
-
fs.StringVar(&cli.WebsocketURL, "websocket-url", "", "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)")
242
-
fs.BoolVar(&cli.BehindHTTPSProxy, "behind-https-proxy", false, "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS")
243
-
cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations")
244
-
cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)")
245
-
fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry")
246
-
fs.StringVar(&cli.LocalDBURL, "local-db-url", "sqlite://$SP_DATA_DIR/localdb.sqlite", "URL of the local database to use for storing local data")
247
cli.dataDirFlags = append(cli.dataDirFlags, &cli.LocalDBURL)
248
-
249
-
fs.Bool("external-signing", true, "DEPRECATED, does nothing.")
250
-
fs.Bool("insecure", false, "DEPRECATED, does nothing.")
251
-
252
-
lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
253
-
_ = starter.NewLivepeerConfig(lpFlags)
254
-
lpFlags.VisitAll(func(f *flag.Flag) {
255
-
adapted := LivepeerFlags.CamelToSnake[f.Name]
256
-
fs.Var(f.Value, fmt.Sprintf("livepeer.%s", adapted), f.Usage)
257
-
})
258
259
if runtime.GOOS == "linux" {
260
-
fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
261
-
fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
262
-
fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
263
-
fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
264
}
265
-
return fs
0
266
}
267
268
var StreamplaceSchemePrefix = "streamplace://"
···
350
)
351
}
352
353
-
func (cli *CLI) Parse(fs *flag.FlagSet, args []string) error {
354
-
err := ff.Parse(
355
-
fs, args,
356
-
ff.WithEnvVarPrefix("SP"),
357
-
)
358
-
if err != nil {
359
-
return err
360
-
}
361
if cli.DataDir == "" {
362
return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
363
}
···
366
}
367
if cli.LivepeerGateway {
368
log.MonkeypatchStderr()
369
-
gatewayPath := cli.DataFilePath([]string{"livepeer", "gateway"})
370
-
err = fs.Set("livepeer.rtmp-addr", "127.0.0.1:0")
371
-
if err != nil {
372
-
return err
373
-
}
374
-
err = fs.Set("livepeer.data-dir", gatewayPath)
375
-
if err != nil {
376
-
return err
377
-
}
378
-
err = fs.Set("livepeer.gateway", "true")
379
-
if err != nil {
380
-
return err
381
-
}
382
-
httpAddrFlag := fs.Lookup("livepeer.http-addr")
383
-
if httpAddrFlag == nil {
384
-
return fmt.Errorf("livepeer.http-addr not found")
385
-
}
386
-
httpAddr := httpAddrFlag.Value.String()
387
-
if httpAddr == "" {
388
-
httpAddr = "127.0.0.1:8935"
389
-
err = fs.Set("livepeer.http-addr", httpAddr)
390
-
if err != nil {
391
-
return err
392
-
}
393
-
}
394
-
cli.LivepeerGatewayURL = fmt.Sprintf("http://%s", httpAddr)
395
}
396
for _, dest := range cli.dataDirFlags {
397
*dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
···
420
return err
421
}
422
cli.FirebaseServiceAccount = string(bs)
0
0
0
0
423
}
424
return nil
425
}
···
529
return nil
530
}
531
532
-
func (cli *CLI) DataDirFlag(fs *flag.FlagSet, dest *string, name, defaultValue, usage string) {
533
-
cli.dataDirFlags = append(cli.dataDirFlags, dest)
534
-
*dest = filepath.Join(SPDataDir, defaultValue)
535
-
usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
536
-
fs.Func(name, usage, func(s string) error {
537
-
*dest = s
538
-
return nil
539
-
})
540
-
}
541
-
542
func (cli *CLI) HasMist() bool {
543
return runtime.GOOS == "linux"
544
}
545
546
// type for comma-separated ethereum addresses
547
-
func (cli *CLI) AddressSliceFlag(fs *flag.FlagSet, dest *[]aqpub.Pub, name, defaultValue, usage string) {
548
*dest = []aqpub.Pub{}
549
-
usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
550
-
fs.Func(name, usage, func(s string) error {
551
-
if s == "" {
552
-
return nil
553
-
}
554
-
strs := strings.Split(s, ",")
555
-
for _, str := range strs {
556
-
pub, err := aqpub.FromHexString(str)
557
-
if err != nil {
558
-
return err
559
-
}
560
-
*dest = append(*dest, pub)
561
-
}
562
-
return nil
563
-
})
564
-
}
565
-
566
-
func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) {
567
-
*dest = defaultValue
568
-
usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
569
-
fs.Func(name, usage, func(s string) error {
570
-
if s == "" {
571
-
return nil
572
-
}
573
-
strs := strings.Split(s, ",")
574
-
*dest = append([]string{}, strs...)
575
-
return nil
576
-
})
577
-
}
578
-
579
-
func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) {
580
-
*dest = map[string]string{}
581
-
usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest)
582
-
fs.Func(name, usage, func(s string) error {
583
-
if s == "" {
584
-
return nil
585
-
}
586
-
pairs := strings.Split(s, ",")
587
-
for _, pair := range pairs {
588
-
parts := strings.Split(pair, "=")
589
-
if len(parts) != 2 {
590
-
return fmt.Errorf("invalid kv flag: %s", pair)
591
-
}
592
-
(*dest)[parts[0]] = parts[1]
593
-
}
594
-
return nil
595
-
})
596
-
}
597
-
598
-
func (cli *CLI) JSONFlag(fs *flag.FlagSet, dest any, name, defaultValue, usage string) {
599
usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
600
-
fs.Func(name, usage, func(s string) error {
601
-
if s == "" {
602
-
return nil
603
-
}
604
-
return json.Unmarshal([]byte(s), dest)
605
-
})
606
-
}
607
608
-
// debug flag for turning func=ToHLS:3,file=gstreamer.go:4 into {"func": {"ToHLS": 3}, "file": {"gstreamer.go": 4}}
609
-
func (cli *CLI) DebugFlag(fs *flag.FlagSet, dest *map[string]map[string]int, name, defaultValue, usage string) {
610
-
*dest = map[string]map[string]int{}
611
-
fs.Func(name, usage, func(s string) error {
612
-
if s == "" {
613
-
return nil
614
-
}
615
-
pairs := strings.Split(s, ",")
616
-
for _, pair := range pairs {
617
-
scoreSplit := strings.Split(pair, ":")
618
-
if len(scoreSplit) != 2 {
619
-
return fmt.Errorf("invalid debug flag: %s", pair)
620
-
}
621
-
score, err := strconv.Atoi(scoreSplit[1])
622
-
if err != nil {
623
-
return fmt.Errorf("invalid debug flag: %s", pair)
624
-
}
625
-
selectorSplit := strings.Split(scoreSplit[0], "=")
626
-
if len(selectorSplit) != 2 {
627
-
return fmt.Errorf("invalid debug flag: %s", pair)
628
}
629
-
_, ok := (*dest)[selectorSplit[0]]
630
-
if !ok {
631
-
(*dest)[selectorSplit[0]] = map[string]int{}
0
0
0
0
632
}
633
-
(*dest)[selectorSplit[0]][selectorSplit[1]] = score
634
-
}
635
-
636
-
return nil
637
-
})
638
}
639
640
func (cli *CLI) StreamIsAllowed(did string) error {
···
648
if openServer && !isDIDKey {
649
return nil
650
}
651
-
for _, a := range cli.AllowedStreams {
652
-
if a == did {
653
-
return nil
654
-
}
655
}
656
return fmt.Errorf("user is not allowed to stream")
657
}
···
7
"encoding/json"
8
"encoding/pem"
9
"errors"
0
10
"fmt"
11
"io"
12
"net"
13
"os"
14
"path/filepath"
15
"runtime"
16
+
"slices"
17
"strconv"
18
"strings"
19
"time"
···
21
"math/rand/v2"
22
23
"github.com/lestrrat-go/jwx/v2/jwk"
0
24
"github.com/lmittmann/tint"
25
slogGorm "github.com/orandin/slog-gorm"
26
+
urfavecli "github.com/urfave/cli/v3"
27
"stream.place/streamplace/pkg/aqtime"
28
"stream.place/streamplace/pkg/constants"
29
"stream.place/streamplace/pkg/crypto/aqpub"
···
159
ReplicatorIroh string = "iroh"
160
)
161
162
+
func (cli *CLI) NewCommand(name string) *urfavecli.Command {
163
+
cmd := &urfavecli.Command{
164
+
Name: name,
165
+
Usage: "streamplace server",
166
+
Flags: []urfavecli.Flag{
167
+
&urfavecli.StringFlag{
168
+
Name: "data-dir",
169
+
Usage: "directory for keeping all streamplace data",
170
+
Value: DefaultDataDir(),
171
+
Destination: &cli.DataDir,
172
+
Sources: urfavecli.EnvVars("SP_DATA_DIR"),
173
+
},
174
+
&urfavecli.StringFlag{
175
+
Name: "http-addr",
176
+
Usage: "Public HTTP address",
177
+
Value: ":38080",
178
+
Destination: &cli.HTTPAddr,
179
+
Sources: urfavecli.EnvVars("SP_HTTP_ADDR"),
180
+
},
181
+
&urfavecli.StringFlag{
182
+
Name: "http-internal-addr",
183
+
Usage: "Private, admin-only HTTP address",
184
+
Value: "127.0.0.1:39090",
185
+
Destination: &cli.HTTPInternalAddr,
186
+
Sources: urfavecli.EnvVars("SP_HTTP_INTERNAL_ADDR"),
187
+
},
188
+
&urfavecli.StringFlag{
189
+
Name: "https-addr",
190
+
Usage: "Public HTTPS address",
191
+
Value: ":38443",
192
+
Destination: &cli.HTTPSAddr,
193
+
Sources: urfavecli.EnvVars("SP_HTTPS_ADDR"),
194
+
},
195
+
&urfavecli.BoolFlag{
196
+
Name: "secure",
197
+
Usage: "Run with HTTPS. Required for WebRTC output",
198
+
Value: false,
199
+
Destination: &cli.Secure,
200
+
Sources: urfavecli.EnvVars("SP_SECURE"),
201
+
},
202
+
&urfavecli.StringFlag{
203
+
Name: "tls-cert",
204
+
Usage: fmt.Sprintf(`Path to TLS certificate (default: "%s")`, filepath.Join(SPDataDir, "tls", "tls.crt")),
205
+
Destination: &cli.TLSCertPath,
206
+
Value: filepath.Join(SPDataDir, "tls", "tls.crt"),
207
+
Sources: urfavecli.EnvVars("SP_TLS_CERT"),
208
+
},
209
+
&urfavecli.StringFlag{
210
+
Name: "tls-key",
211
+
Usage: fmt.Sprintf(`Path to TLS key (default: "%s")`, filepath.Join(SPDataDir, "tls", "tls.key")),
212
+
Destination: &cli.TLSKeyPath,
213
+
Value: filepath.Join(SPDataDir, "tls", "tls.key"),
214
+
Sources: urfavecli.EnvVars("SP_TLS_KEY"),
215
+
},
216
+
&urfavecli.StringFlag{
217
+
Name: "signing-key",
218
+
Usage: "Path to signing key for pushing OTA updates to the app",
219
+
Destination: &cli.SigningKeyPath,
220
+
Sources: urfavecli.EnvVars("SP_SIGNING_KEY"),
221
+
},
222
+
&urfavecli.StringFlag{
223
+
Name: "db-url",
224
+
Usage: "URL of the database to use for storing private streamplace state",
225
+
Value: "sqlite://$SP_DATA_DIR/state.sqlite",
226
+
Destination: &cli.DBURL,
227
+
Sources: urfavecli.EnvVars("SP_DB_URL"),
228
+
},
229
+
&urfavecli.StringFlag{
230
+
Name: "admin-account",
231
+
Usage: "ethereum account that administrates this streamplace node",
232
+
Destination: &cli.AdminAccount,
233
+
Sources: urfavecli.EnvVars("SP_ADMIN_ACCOUNT"),
234
+
},
235
+
&urfavecli.StringFlag{
236
+
Name: "firebase-service-account",
237
+
Usage: "Base64-encoded JSON string of a firebase service account key",
238
+
Destination: &cli.FirebaseServiceAccount,
239
+
Sources: urfavecli.EnvVars("SP_FIREBASE_SERVICE_ACCOUNT"),
240
+
},
241
+
&urfavecli.StringFlag{
242
+
Name: "firebase-service-account-file",
243
+
Usage: "Path to a JSON file containing a firebase service account key",
244
+
Destination: &cli.FirebaseServiceAccountFile,
245
+
Sources: urfavecli.EnvVars("SP_FIREBASE_SERVICE_ACCOUNT_FILE"),
246
+
},
247
+
&urfavecli.StringFlag{
248
+
Name: "gitlab-url",
249
+
Usage: "gitlab url for generating download links",
250
+
Value: "https://git.stream.place/api/v4/projects/1",
251
+
Destination: &cli.GitLabURL,
252
+
Sources: urfavecli.EnvVars("SP_GITLAB_URL"),
253
+
},
254
+
&urfavecli.StringFlag{
255
+
Name: "eth-keystore-path",
256
+
Usage: fmt.Sprintf(`path to ethereum keystore (default: "%s")`, filepath.Join(SPDataDir, "keystore")),
257
+
Destination: &cli.EthKeystorePath,
258
+
Value: filepath.Join(SPDataDir, "keystore"),
259
+
Sources: urfavecli.EnvVars("SP_ETH_KEYSTORE_PATH"),
260
+
},
261
+
&urfavecli.StringFlag{
262
+
Name: "eth-account-addr",
263
+
Usage: "ethereum account address to use (if keystore contains more than one)",
264
+
Destination: &cli.EthAccountAddr,
265
+
Sources: urfavecli.EnvVars("SP_ETH_ACCOUNT_ADDR"),
266
+
},
267
+
&urfavecli.StringFlag{
268
+
Name: "eth-password",
269
+
Usage: "password for encrypting keystore",
270
+
Destination: &cli.EthPassword,
271
+
Sources: urfavecli.EnvVars("SP_ETH_PASSWORD"),
272
+
},
273
+
&urfavecli.StringFlag{
274
+
Name: "ta-url",
275
+
Usage: "timestamp authority server for signing",
276
+
Value: "http://timestamp.digicert.com",
277
+
Destination: &cli.TAURL,
278
+
Sources: urfavecli.EnvVars("SP_TA_URL"),
279
+
},
280
+
&urfavecli.StringFlag{
281
+
Name: "pkcs11-module-path",
282
+
Usage: "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so",
283
+
Destination: &cli.PKCS11ModulePath,
284
+
Sources: urfavecli.EnvVars("SP_PKCS11_MODULE_PATH"),
285
+
},
286
+
&urfavecli.StringFlag{
287
+
Name: "pkcs11-pin",
288
+
Usage: "PIN for logging into PKCS11 token. if not provided, will be prompted interactively",
289
+
Destination: &cli.PKCS11Pin,
290
+
Sources: urfavecli.EnvVars("SP_PKCS11_PIN"),
291
+
},
292
+
&urfavecli.StringFlag{
293
+
Name: "pkcs11-token-slot",
294
+
Usage: "slot number of PKCS11 token (only use one of slot, label, or serial)",
295
+
Destination: &cli.PKCS11TokenSlot,
296
+
Sources: urfavecli.EnvVars("SP_PKCS11_TOKEN_SLOT"),
297
+
},
298
+
&urfavecli.StringFlag{
299
+
Name: "pkcs11-token-label",
300
+
Usage: "label of PKCS11 token (only use one of slot, label, or serial)",
301
+
Destination: &cli.PKCS11TokenLabel,
302
+
Sources: urfavecli.EnvVars("SP_PKCS11_TOKEN_LABEL"),
303
+
},
304
+
&urfavecli.StringFlag{
305
+
Name: "pkcs11-token-serial",
306
+
Usage: "serial number of PKCS11 token (only use one of slot, label, or serial)",
307
+
Destination: &cli.PKCS11TokenSerial,
308
+
Sources: urfavecli.EnvVars("SP_PKCS11_TOKEN_SERIAL"),
309
+
},
310
+
&urfavecli.StringFlag{
311
+
Name: "pkcs11-keypair-label",
312
+
Usage: "label of signing keypair on PKCS11 token",
313
+
Destination: &cli.PKCS11KeypairLabel,
314
+
Sources: urfavecli.EnvVars("SP_PKCS11_KEYPAIR_LABEL"),
315
+
},
316
+
&urfavecli.StringFlag{
317
+
Name: "pkcs11-keypair-id",
318
+
Usage: "id of signing keypair on PKCS11 token",
319
+
Destination: &cli.PKCS11KeypairID,
320
+
Sources: urfavecli.EnvVars("SP_PKCS11_KEYPAIR_ID"),
321
+
},
322
+
&urfavecli.StringFlag{
323
+
Name: "app-bundle-id",
324
+
Usage: "bundle id of an app that we facilitate oauth login for",
325
+
Destination: &cli.AppBundleID,
326
+
Sources: urfavecli.EnvVars("SP_APP_BUNDLE_ID"),
327
+
},
328
+
&urfavecli.StringFlag{
329
+
Name: "streamer-name",
330
+
Usage: "name of the person streaming from this streamplace node",
331
+
Destination: &cli.StreamerName,
332
+
Sources: urfavecli.EnvVars("SP_STREAMER_NAME"),
333
+
},
334
+
&urfavecli.StringFlag{
335
+
Name: "dev-frontend-proxy",
336
+
Usage: "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend",
337
+
Destination: &cli.FrontendProxy,
338
+
Sources: urfavecli.EnvVars("SP_DEV_FRONTEND_PROXY"),
339
+
},
340
+
&urfavecli.BoolFlag{
341
+
Name: "dev-public-oauth",
342
+
Usage: "(FOR DEVELOPMENT ONLY) enable public oauth login for http://127.0.0.1 development",
343
+
Value: false,
344
+
Destination: &cli.PublicOAuth,
345
+
Sources: urfavecli.EnvVars("SP_DEV_PUBLIC_OAUTH"),
346
+
},
347
+
&urfavecli.StringFlag{
348
+
Name: "livepeer-gateway-url",
349
+
Usage: "URL of the Livepeer Gateway to use for transcoding",
350
+
Destination: &cli.LivepeerGatewayURL,
351
+
Sources: urfavecli.EnvVars("SP_LIVEPEER_GATEWAY_URL"),
352
+
},
353
+
&urfavecli.BoolFlag{
354
+
Name: "livepeer-gateway",
355
+
Usage: "enable embedded Livepeer Gateway",
356
+
Value: false,
357
+
Destination: &cli.LivepeerGateway,
358
+
Sources: urfavecli.EnvVars("SP_LIVEPEER_GATEWAY"),
359
+
},
360
+
&urfavecli.BoolFlag{
361
+
Name: "wide-open",
362
+
Usage: "allow ALL streams to be uploaded to this node (not recommended for production)",
363
+
Value: false,
364
+
Destination: &cli.WideOpen,
365
+
Sources: urfavecli.EnvVars("SP_WIDE_OPEN"),
366
+
},
367
+
&urfavecli.StringFlag{
368
+
Name: "allowed-streams",
369
+
Usage: `if set, only allow these addresses or atproto DIDs to upload to this node (default: "")`,
370
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
371
+
if s == "" {
372
+
return nil
373
+
}
374
+
cli.AllowedStreams = strings.Split(s, ",")
375
+
return nil
376
+
},
377
+
Sources: urfavecli.EnvVars("SP_ALLOWED_STREAMS"),
378
+
},
379
+
&urfavecli.StringFlag{
380
+
Name: "peers",
381
+
Usage: `other streamplace nodes to replicate to (default: "")`,
382
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
383
+
if s == "" {
384
+
return nil
385
+
}
386
+
cli.Peers = strings.Split(s, ",")
387
+
return nil
388
+
},
389
+
Sources: urfavecli.EnvVars("SP_PEERS"),
390
+
},
391
+
&urfavecli.StringFlag{
392
+
Name: "redirects",
393
+
Usage: `http 302s /path/one:/path/two,/path/three:/path/four (default: "")`,
394
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
395
+
if s == "" {
396
+
return nil
397
+
}
398
+
cli.Redirects = strings.Split(s, ",")
399
+
return nil
400
+
},
401
+
Sources: urfavecli.EnvVars("SP_REDIRECTS"),
402
+
},
403
+
&urfavecli.StringFlag{
404
+
Name: "debug",
405
+
Usage: "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4",
406
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
407
+
if s == "" {
408
+
return nil
409
+
}
410
+
cli.Debug = map[string]map[string]int{}
411
+
pairs := strings.SplitSeq(s, ",")
412
+
for pair := range pairs {
413
+
scoreSplit := strings.Split(pair, ":")
414
+
if len(scoreSplit) != 2 {
415
+
return fmt.Errorf("invalid debug flag: %s", pair)
416
+
}
417
+
score, err := strconv.Atoi(scoreSplit[1])
418
+
if err != nil {
419
+
return fmt.Errorf("invalid debug flag: %s", pair)
420
+
}
421
+
selectorSplit := strings.Split(scoreSplit[0], "=")
422
+
if len(selectorSplit) != 2 {
423
+
return fmt.Errorf("invalid debug flag: %s", pair)
424
+
}
425
+
_, ok := cli.Debug[selectorSplit[0]]
426
+
if !ok {
427
+
cli.Debug[selectorSplit[0]] = map[string]int{}
428
+
}
429
+
cli.Debug[selectorSplit[0]][selectorSplit[1]] = score
430
+
}
431
+
return nil
432
+
},
433
+
Sources: urfavecli.EnvVars("SP_DEBUG"),
434
+
},
435
+
&urfavecli.BoolFlag{
436
+
Name: "test-stream",
437
+
Usage: "run a built-in test stream on boot",
438
+
Value: false,
439
+
Destination: &cli.TestStream,
440
+
Sources: urfavecli.EnvVars("SP_TEST_STREAM"),
441
+
},
442
+
&urfavecli.BoolFlag{
443
+
Name: "no-firehose",
444
+
Usage: "disable the bluesky firehose",
445
+
Value: false,
446
+
Destination: &cli.NoFirehose,
447
+
Sources: urfavecli.EnvVars("SP_NO_FIREHOSE"),
448
+
},
449
+
&urfavecli.BoolFlag{
450
+
Name: "print-chat",
451
+
Usage: "print chat messages to stdout",
452
+
Value: false,
453
+
Destination: &cli.PrintChat,
454
+
Sources: urfavecli.EnvVars("SP_PRINT_CHAT"),
455
+
},
456
+
&urfavecli.StringFlag{
457
+
Name: "whip-test",
458
+
Usage: "run a WHIP self-test with the given parameters",
459
+
Destination: &cli.WHIPTest,
460
+
Sources: urfavecli.EnvVars("SP_WHIP_TEST"),
461
+
},
462
+
&urfavecli.StringFlag{
463
+
Name: "relay-host",
464
+
Usage: "websocket url for relay firehose",
465
+
Value: "wss://bsky.network",
466
+
Destination: &cli.RelayHost,
467
+
Sources: urfavecli.EnvVars("SP_RELAY_HOST"),
468
+
},
469
+
&urfavecli.StringFlag{
470
+
Name: "color",
471
+
Usage: "'true' to enable colorized logging, 'false' to disable",
472
+
Destination: &cli.Color,
473
+
Sources: urfavecli.EnvVars("SP_COLOR"),
474
+
},
475
+
&urfavecli.StringFlag{
476
+
Name: "broadcaster-host",
477
+
Usage: "public host for the broadcaster group that this node is a part of (excluding https:// e.g. stream.place)",
478
+
Destination: &cli.BroadcasterHost,
479
+
Sources: urfavecli.EnvVars("SP_BROADCASTER_HOST"),
480
+
},
481
+
&urfavecli.StringFlag{
482
+
Name: "public-host",
483
+
Usage: "deprecated, use broadcaster-host or server-host instead as appropriate",
484
+
Destination: &cli.XXDeprecatedPublicHost,
485
+
Sources: urfavecli.EnvVars("SP_PUBLIC_HOST"),
486
+
},
487
+
&urfavecli.StringFlag{
488
+
Name: "server-host",
489
+
Usage: "public host for this particular physical streamplace node. defaults to broadcaster-host and only must be set for multi-node broadcasters",
490
+
Destination: &cli.ServerHost,
491
+
Sources: urfavecli.EnvVars("SP_SERVER_HOST"),
492
+
},
493
+
&urfavecli.BoolFlag{
494
+
Name: "thumbnail",
495
+
Usage: "enable thumbnail generation",
496
+
Value: true,
497
+
Destination: &cli.Thumbnail,
498
+
Sources: urfavecli.EnvVars("SP_THUMBNAIL"),
499
+
},
500
+
&urfavecli.BoolFlag{
501
+
Name: "smear-audio",
502
+
Usage: "enable audio smearing to create 'perfect' segment timestamps",
503
+
Value: false,
504
+
Destination: &cli.SmearAudio,
505
+
Sources: urfavecli.EnvVars("SP_SMEAR_AUDIO"),
506
+
},
507
+
&urfavecli.StringFlag{
508
+
Name: "tracing-endpoint",
509
+
Usage: "gRPC endpoint to send traces to",
510
+
Destination: &cli.TracingEndpoint,
511
+
Sources: urfavecli.EnvVars("SP_TRACING_ENDPOINT"),
512
+
},
513
+
&urfavecli.IntFlag{
514
+
Name: "rate-limit-per-second",
515
+
Usage: "rate limit for requests per second per ip",
516
+
Value: 0,
517
+
Destination: &cli.RateLimitPerSecond,
518
+
Sources: urfavecli.EnvVars("SP_RATE_LIMIT_PER_SECOND"),
519
+
},
520
+
&urfavecli.IntFlag{
521
+
Name: "rate-limit-burst",
522
+
Usage: "rate limit burst for requests per ip",
523
+
Value: 0,
524
+
Destination: &cli.RateLimitBurst,
525
+
Sources: urfavecli.EnvVars("SP_RATE_LIMIT_BURST"),
526
+
},
527
+
&urfavecli.IntFlag{
528
+
Name: "rate-limit-websocket",
529
+
Usage: "number of concurrent websocket connections allowed per ip",
530
+
Value: 10,
531
+
Destination: &cli.RateLimitWebsocket,
532
+
Sources: urfavecli.EnvVars("SP_RATE_LIMIT_WEBSOCKET"),
533
+
},
534
+
&urfavecli.StringFlag{
535
+
Name: "rtmp-server-addon",
536
+
Usage: "address of external RTMP server to forward streams to",
537
+
Destination: &cli.RTMPServerAddon,
538
+
Sources: urfavecli.EnvVars("SP_RTMP_SERVER_ADDON"),
539
+
},
540
+
&urfavecli.StringFlag{
541
+
Name: "rtmps-addon-addr",
542
+
Usage: "address to listen for RTMPS on the addon server",
543
+
Value: ":1936",
544
+
Destination: &cli.RTMPSAddonAddr,
545
+
Sources: urfavecli.EnvVars("SP_RTMPS_ADDON_ADDR"),
546
+
},
547
+
&urfavecli.StringFlag{
548
+
Name: "rtmps-addr",
549
+
Usage: "address to listen for RTMPS connections (when --secure=true)",
550
+
Value: ":1935",
551
+
Destination: &cli.RTMPSAddr,
552
+
Sources: urfavecli.EnvVars("SP_RTMPS_ADDR"),
553
+
},
554
+
&urfavecli.StringFlag{
555
+
Name: "rtmp-addr",
556
+
Usage: "address to listen for RTMP connections (when --secure=false)",
557
+
Value: ":1935",
558
+
Destination: &cli.RTMPAddr,
559
+
Sources: urfavecli.EnvVars("SP_RTMP_ADDR"),
560
+
},
561
+
&urfavecli.StringFlag{
562
+
Name: "discord-webhooks",
563
+
Usage: `JSON array of Discord webhooks to send notifications to (default: "[]")`,
564
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
565
+
if s == "" {
566
+
return nil
567
+
}
568
+
return json.Unmarshal([]byte(s), &cli.DiscordWebhooks)
569
+
},
570
+
Sources: urfavecli.EnvVars("SP_DISCORD_WEBHOOKS"),
571
+
},
572
+
&urfavecli.BoolFlag{
573
+
Name: "new-webrtc-playback",
574
+
Usage: "enable new webrtc playback",
575
+
Value: true,
576
+
Destination: &cli.NewWebRTCPlayback,
577
+
Sources: urfavecli.EnvVars("SP_NEW_WEBRTC_PLAYBACK"),
578
+
},
579
+
&urfavecli.StringFlag{
580
+
Name: "apple-team-id",
581
+
Usage: "apple team id for deep linking",
582
+
Destination: &cli.AppleTeamID,
583
+
Sources: urfavecli.EnvVars("SP_APPLE_TEAM_ID"),
584
+
},
585
+
&urfavecli.StringFlag{
586
+
Name: "android-cert-fingerprint",
587
+
Usage: "android cert fingerprint for deep linking",
588
+
Destination: &cli.AndroidCertFingerprint,
589
+
Sources: urfavecli.EnvVars("SP_ANDROID_CERT_FINGERPRINT"),
590
+
},
591
+
&urfavecli.StringFlag{
592
+
Name: "labelers",
593
+
Usage: `did of labelers that this instance should subscribe to (default: "")`,
594
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
595
+
if s == "" {
596
+
return nil
597
+
}
598
+
cli.Labelers = strings.Split(s, ",")
599
+
return nil
600
+
},
601
+
Sources: urfavecli.EnvVars("SP_LABELERS"),
602
+
},
603
+
&urfavecli.StringFlag{
604
+
Name: "atproto-did",
605
+
Usage: "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)",
606
+
Destination: &cli.AtprotoDID,
607
+
Sources: urfavecli.EnvVars("SP_ATPROTO_DID"),
608
+
},
609
+
&urfavecli.StringFlag{
610
+
Name: "content-filters",
611
+
Usage: `JSON content filtering rules (default: "{}")`,
612
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
613
+
if s == "" {
614
+
return nil
615
+
}
616
+
return json.Unmarshal([]byte(s), &cli.ContentFilters)
617
+
},
618
+
Sources: urfavecli.EnvVars("SP_CONTENT_FILTERS"),
619
+
},
620
+
&urfavecli.StringFlag{
621
+
Name: "default-recommended-streamers",
622
+
Usage: `comma-separated list of streamer DIDs to recommend by default when no other recommendations are available (default: "")`,
623
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
624
+
if s == "" {
625
+
return nil
626
+
}
627
+
cli.DefaultRecommendedStreamers = strings.Split(s, ",")
628
+
return nil
629
+
},
630
+
Sources: urfavecli.EnvVars("SP_DEFAULT_RECOMMENDED_STREAMERS"),
631
+
},
632
+
&urfavecli.BoolFlag{
633
+
Name: "livepeer-help",
634
+
Usage: "print help for livepeer flags and exit",
635
+
Value: false,
636
+
Destination: &cli.LivepeerHelp,
637
+
Sources: urfavecli.EnvVars("SP_LIVEPEER_HELP"),
638
+
},
639
+
&urfavecli.StringFlag{
640
+
Name: "plc-url",
641
+
Usage: "url of the plc directory",
642
+
Value: "https://plc.directory",
643
+
Destination: &cli.PLCURL,
644
+
Sources: urfavecli.EnvVars("SP_PLC_URL"),
645
+
},
646
+
&urfavecli.BoolFlag{
647
+
Name: "sql-logging",
648
+
Usage: "enable sql logging",
649
+
Value: false,
650
+
Destination: &cli.SQLLogging,
651
+
Sources: urfavecli.EnvVars("SP_SQL_LOGGING"),
652
+
},
653
+
&urfavecli.StringFlag{
654
+
Name: "sentry-dsn",
655
+
Usage: "sentry dsn for error reporting",
656
+
Destination: &cli.SentryDSN,
657
+
Sources: urfavecli.EnvVars("SP_SENTRY_DSN"),
658
+
},
659
+
&urfavecli.BoolFlag{
660
+
Name: "livepeer-debug",
661
+
Usage: "log livepeer segments to $SP_DATA_DIR/livepeer-debug",
662
+
Value: false,
663
+
Destination: &cli.LivepeerDebug,
664
+
Sources: urfavecli.EnvVars("SP_LIVEPEER_DEBUG"),
665
+
},
666
+
&urfavecli.StringFlag{
667
+
Name: "segment-debug-dir",
668
+
Usage: "directory to log segment validation to",
669
+
Destination: &cli.SegmentDebugDir,
670
+
Sources: urfavecli.EnvVars("SP_SEGMENT_DEBUG_DIR"),
671
+
},
672
+
&urfavecli.StringFlag{
673
+
Name: "tickets",
674
+
Usage: `tickets to join the swarm with (default: "")`,
675
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
676
+
if s == "" {
677
+
return nil
678
+
}
679
+
cli.Tickets = strings.Split(s, ",")
680
+
return nil
681
+
},
682
+
Sources: urfavecli.EnvVars("SP_TICKETS"),
683
+
},
684
+
&urfavecli.StringFlag{
685
+
Name: "iroh-topic",
686
+
Usage: "topic to use for the iroh swarm (must be 32 bytes in hex)",
687
+
Destination: &cli.IrohTopic,
688
+
Sources: urfavecli.EnvVars("SP_IROH_TOPIC"),
689
+
},
690
+
&urfavecli.BoolFlag{
691
+
Name: "disable-iroh-relay",
692
+
Usage: "disable the iroh relay",
693
+
Value: false,
694
+
Destination: &cli.DisableIrohRelay,
695
+
Sources: urfavecli.EnvVars("SP_DISABLE_IROH_RELAY"),
696
+
},
697
+
&urfavecli.StringFlag{
698
+
Name: "dev-account-creds",
699
+
Usage: `(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth (default: "")`,
700
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
701
+
if s == "" {
702
+
return nil
703
+
}
704
+
cli.DevAccountCreds = map[string]string{}
705
+
pairs := strings.Split(s, ",")
706
+
for _, pair := range pairs {
707
+
parts := strings.Split(pair, "=")
708
+
if len(parts) != 2 {
709
+
return fmt.Errorf("invalid kv flag: %s", pair)
710
+
}
711
+
cli.DevAccountCreds[parts[0]] = parts[1]
712
+
}
713
+
return nil
714
+
},
715
+
Sources: urfavecli.EnvVars("SP_DEV_ACCOUNT_CREDS"),
716
+
},
717
+
&urfavecli.DurationFlag{
718
+
Name: "stream-session-timeout",
719
+
Usage: "how long to wait before considering a stream inactive on this node?",
720
+
Value: 60 * time.Second,
721
+
Destination: &cli.StreamSessionTimeout,
722
+
Sources: urfavecli.EnvVars("SP_STREAM_SESSION_TIMEOUT"),
723
+
},
724
+
&urfavecli.StringFlag{
725
+
Name: "replicators",
726
+
Usage: "comma-separated list of replication protocols to use (websocket, iroh)",
727
+
Value: ReplicatorWebsocket,
728
+
Sources: urfavecli.EnvVars("SP_REPLICATORS"),
729
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
730
+
if s != "" {
731
+
cli.Replicators = strings.Split(s, ",")
732
+
}
733
+
return nil
734
+
},
735
+
},
736
+
&urfavecli.StringFlag{
737
+
Name: "websocket-url",
738
+
Usage: "override the websocket (ws:// or wss://) url to use for replication (normally not necessary, used for testing)",
739
+
Destination: &cli.WebsocketURL,
740
+
Sources: urfavecli.EnvVars("SP_WEBSOCKET_URL"),
741
+
},
742
+
&urfavecli.BoolFlag{
743
+
Name: "behind-https-proxy",
744
+
Usage: "set to true if this node is behind an https proxy and we should report https URLs even though the node isn't serving HTTPS",
745
+
Value: false,
746
+
Destination: &cli.BehindHTTPSProxy,
747
+
Sources: urfavecli.EnvVars("SP_BEHIND_HTTPS_PROXY"),
748
+
},
749
+
&urfavecli.StringFlag{
750
+
Name: "admin-dids",
751
+
Usage: `comma-separated list of DIDs that are authorized to modify branding and other admin operations (default: "")`,
752
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
753
+
if s == "" {
754
+
return nil
755
+
}
756
+
cli.AdminDIDs = strings.Split(s, ",")
757
+
return nil
758
+
},
759
+
Sources: urfavecli.EnvVars("SP_ADMIN_DIDS"),
760
+
},
761
+
&urfavecli.StringFlag{
762
+
Name: "syndicate",
763
+
Usage: `list of DIDs that we should rebroadcast ('*' for everybody) (default: "")`,
764
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
765
+
if s == "" {
766
+
return nil
767
+
}
768
+
cli.Syndicate = strings.Split(s, ",")
769
+
return nil
770
+
},
771
+
Sources: urfavecli.EnvVars("SP_SYNDICATE"),
772
+
},
773
+
&urfavecli.BoolFlag{
774
+
Name: "player-telemetry",
775
+
Usage: "enable player telemetry",
776
+
Value: true,
777
+
Destination: &cli.PlayerTelemetry,
778
+
Sources: urfavecli.EnvVars("SP_PLAYER_TELEMETRY"),
779
+
},
780
+
&urfavecli.StringFlag{
781
+
Name: "local-db-url",
782
+
Usage: "URL of the local database to use for storing local data",
783
+
Value: "sqlite://$SP_DATA_DIR/localdb.sqlite",
784
+
Destination: &cli.LocalDBURL,
785
+
Sources: urfavecli.EnvVars("SP_LOCAL_DB_URL"),
786
+
},
787
+
&urfavecli.BoolFlag{
788
+
Name: "external-signing",
789
+
Usage: "DEPRECATED, does nothing.",
790
+
Value: true,
791
+
},
792
+
&urfavecli.BoolFlag{
793
+
Name: "insecure",
794
+
Usage: "DEPRECATED, does nothing.",
795
+
Value: false,
796
+
},
797
+
},
798
+
Before: func(ctx context.Context, cmd *urfavecli.Command) (context.Context, error) {
799
+
return ctx, cli.Validate(cmd)
800
+
},
801
+
}
802
+
803
+
// Add data dir flags
804
cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
805
cli.dataDirFlags = append(cli.dataDirFlags, &cli.LocalDBURL)
806
+
cli.dataDirFlags = append(cli.dataDirFlags, &cli.TLSCertPath)
807
+
cli.dataDirFlags = append(cli.dataDirFlags, &cli.TLSKeyPath)
808
+
cli.dataDirFlags = append(cli.dataDirFlags, &cli.EthKeystorePath)
0
0
0
0
0
0
0
809
810
if runtime.GOOS == "linux" {
811
+
cmd.Flags = append(cmd.Flags, &urfavecli.BoolFlag{
812
+
Name: "no-mist",
813
+
Usage: "Disable MistServer",
814
+
Value: true,
815
+
Destination: &cli.NoMist,
816
+
Sources: urfavecli.EnvVars("SP_NO_MIST"),
817
+
})
818
+
cmd.Flags = append(cmd.Flags, &urfavecli.IntFlag{
819
+
Name: "mist-admin-port",
820
+
Usage: "MistServer admin port (internal use only)",
821
+
Value: 14242,
822
+
Destination: &cli.MistAdminPort,
823
+
Sources: urfavecli.EnvVars("SP_MIST_ADMIN_PORT"),
824
+
})
825
+
cmd.Flags = append(cmd.Flags, &urfavecli.IntFlag{
826
+
Name: "mist-rtmp-port",
827
+
Usage: "MistServer RTMP port (internal use only)",
828
+
Value: 11935,
829
+
Destination: &cli.MistRTMPPort,
830
+
Sources: urfavecli.EnvVars("SP_MIST_RTMP_PORT"),
831
+
})
832
+
cmd.Flags = append(cmd.Flags, &urfavecli.IntFlag{
833
+
Name: "mist-http-port",
834
+
Usage: "MistServer HTTP port (internal use only)",
835
+
Value: 18080,
836
+
Destination: &cli.MistHTTPPort,
837
+
Sources: urfavecli.EnvVars("SP_MIST_HTTP_PORT"),
838
+
})
839
}
840
+
841
+
return cmd
842
}
843
844
var StreamplaceSchemePrefix = "streamplace://"
···
926
)
927
}
928
929
+
func (cli *CLI) Validate(cmd *urfavecli.Command) error {
0
0
0
0
0
0
0
930
if cli.DataDir == "" {
931
return fmt.Errorf("could not determine default data dir (no $HOME) and none provided, please set --data-dir")
932
}
···
935
}
936
if cli.LivepeerGateway {
937
log.MonkeypatchStderr()
938
+
// Livepeer gateway configuration will be handled in the caller
939
+
cli.LivepeerGatewayURL = "http://127.0.0.1:8935"
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
940
}
941
for _, dest := range cli.dataDirFlags {
942
*dest = strings.Replace(*dest, SPDataDir, cli.DataDir, 1)
···
965
return err
966
}
967
cli.FirebaseServiceAccount = string(bs)
968
+
}
969
+
// Set default replicator if none specified
970
+
if len(cli.Replicators) == 0 {
971
+
cli.Replicators = []string{ReplicatorWebsocket}
972
}
973
return nil
974
}
···
1078
return nil
1079
}
1080
0
0
0
0
0
0
0
0
0
0
1081
func (cli *CLI) HasMist() bool {
1082
return runtime.GOOS == "linux"
1083
}
1084
1085
// type for comma-separated ethereum addresses
1086
+
func (cli *CLI) AddressSliceFlag(name, defaultValue, usage string, dest *[]aqpub.Pub) urfavecli.Flag {
1087
*dest = []aqpub.Pub{}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1088
usage = fmt.Sprintf(`%s (default: "%s")`, usage, defaultValue)
0
0
0
0
0
0
0
1089
1090
+
return &urfavecli.StringFlag{
1091
+
Name: name,
1092
+
Usage: usage,
1093
+
Action: func(ctx context.Context, cmd *urfavecli.Command, s string) error {
1094
+
if s == "" {
1095
+
return nil
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1096
}
1097
+
strs := strings.Split(s, ",")
1098
+
for _, str := range strs {
1099
+
pub, err := aqpub.FromHexString(str)
1100
+
if err != nil {
1101
+
return err
1102
+
}
1103
+
*dest = append(*dest, pub)
1104
}
1105
+
return nil
1106
+
},
1107
+
Sources: urfavecli.EnvVars(fmt.Sprintf("SP_%s", strings.ToUpper(strings.ReplaceAll(name, "-", "_")))),
1108
+
}
0
1109
}
1110
1111
func (cli *CLI) StreamIsAllowed(did string) error {
···
1119
if openServer && !isDIDKey {
1120
return nil
1121
}
1122
+
if slices.Contains(cli.AllowedStreams, did) {
1123
+
return nil
0
0
1124
}
1125
return fmt.Errorf("user is not allowed to stream")
1126
}
+5
-3
pkg/media/segment_roundtrip_test.go
···
87
require.NoError(t, err)
88
89
signedSplitSegDir := makeTestSubdir(t, tempDir, "signed-split-segments")
90
-
cli := &config.CLI{}
91
-
fs := cli.NewFlagSet("rtcrec-test")
92
-
err = cli.Parse(fs, []string{})
0
0
93
require.NoError(t, err)
94
err = SplitSegments(context.Background(), cli, rws, func(fname string) ReadWriteSeekCloser {
95
fd, err := os.Create(filepath.Join(signedSplitSegDir, fname))
···
87
require.NoError(t, err)
88
89
signedSplitSegDir := makeTestSubdir(t, tempDir, "signed-split-segments")
90
+
cli := &config.CLI{
91
+
DataDir: tempDir, // Set data dir for test
92
+
}
93
+
cmd := cli.NewCommand("rtcrec-test")
94
+
err = cli.Validate(cmd)
95
require.NoError(t, err)
96
err = SplitSegments(context.Background(), cli, rws, func(fname string) ReadWriteSeekCloser {
97
fd, err := os.Create(filepath.Join(signedSplitSegDir, fname))