···991010// Bus is a simple pub/sub system for backing websocket connections
1111type Bus struct {
1212- mu sync.Mutex
1313- clients map[string][]Subscription
1212+ mu sync.Mutex
1313+ clients map[string][]Subscription
1414+ segChans map[string][]*SegChan
1515+ segChansMutex sync.Mutex
1616+ segBuf map[string][]*Seg
1717+ segBufMutex sync.RWMutex
1418}
15191620func NewBus() *Bus {
1721 return &Bus{
1818- clients: make(map[string][]Subscription),
2222+ clients: make(map[string][]Subscription),
2323+ segChans: make(map[string][]*SegChan),
2424+ segBuf: make(map[string][]*Seg),
1925 }
2026}
21272228func (b *Bus) Subscribe(user string) <-chan Message {
2929+ if b == nil {
3030+ return make(<-chan Message)
3131+ }
2332 b.mu.Lock()
2433 defer b.mu.Unlock()
2534 ch := make(chan Message, 100)
···2837}
29383039func (b *Bus) Unsubscribe(user string, ch <-chan Message) {
4040+ if b == nil {
4141+ return
4242+ }
3143 b.mu.Lock()
3244 defer b.mu.Unlock()
3345
+129
pkg/bus/segchanman.go
···11+package bus
22+33+import (
44+ "context"
55+ "fmt"
66+ "time"
77+88+ "go.opentelemetry.io/otel"
99+ "stream.place/streamplace/pkg/log"
1010+ "stream.place/streamplace/pkg/spmetrics"
1111+)
1212+1313+// it's a segment channel manager, you see
1414+1515+type Seg struct {
1616+ Filepath string
1717+ Data []byte
1818+ PacketizedData *PacketizedSegment
1919+}
2020+2121+type PacketizedSegment struct {
2222+ Video [][]byte
2323+ Audio [][]byte
2424+ Duration time.Duration
2525+}
2626+2727+var chanSize = 1024
2828+2929+type SegChan struct {
3030+ C chan *Seg
3131+ Context context.Context
3232+}
3333+3434+var bufSize = 10
3535+3636+func segChanKey(user string, rendition string) string {
3737+ return fmt.Sprintf("%s::%s", user, rendition)
3838+}
3939+4040+// get a channel to subscribe to new segments for a given user and rendition
4141+func (b *Bus) SubscribeSegment(ctx context.Context, user string, rendition string) *SegChan {
4242+ return b.SubscribeSegmentBuf(ctx, user, rendition, 0)
4343+}
4444+4545+// get a channel to subscribe to new segments for a given user and rendition,
4646+// starting with bufSize cached segments that we already have
4747+func (b *Bus) SubscribeSegmentBuf(ctx context.Context, user string, rendition string, bufSize int) *SegChan {
4848+ key := segChanKey(user, rendition)
4949+ b.segChansMutex.Lock()
5050+ defer b.segChansMutex.Unlock()
5151+ chs, ok := b.segChans[key]
5252+ if !ok {
5353+ chs = []*SegChan{}
5454+ b.segChans[key] = chs
5555+ }
5656+ ch := make(chan *Seg)
5757+ b.segBufMutex.RLock()
5858+ defer b.segBufMutex.RUnlock()
5959+ curBuf, ok := b.segBuf[key]
6060+ myCh := make(chan *Seg, chanSize)
6161+ if ok {
6262+ if bufSize > len(curBuf) {
6363+ bufSize = len(curBuf)
6464+ }
6565+ for i := 0; i < bufSize; i += 1 {
6666+ myCh <- curBuf[len(curBuf)-bufSize+i]
6767+ }
6868+ }
6969+ segChan := &SegChan{C: ch, Context: ctx}
7070+ chs = append(chs, segChan)
7171+ b.segChans[key] = chs
7272+ spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs)))
7373+ return segChan
7474+}
7575+7676+// unsubscribe from a channel for a given user and rendition
7777+func (b *Bus) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch *SegChan) {
7878+ key := segChanKey(user, rendition)
7979+ b.segChansMutex.Lock()
8080+ defer b.segChansMutex.Unlock()
8181+ chs, ok := b.segChans[key]
8282+ if !ok {
8383+ return
8484+ }
8585+ for i, c := range chs {
8686+ if c == ch {
8787+ chs = append(chs[:i], chs[i+1:]...)
8888+ break
8989+ }
9090+ }
9191+ spmetrics.SegmentSubscriptionsOpen.WithLabelValues(user, rendition).Set(float64(len(chs)))
9292+ b.segChans[key] = chs
9393+}
9494+9595+func (b *Bus) PublishSegment(ctx context.Context, user string, rendition string, seg *Seg) {
9696+ ctx, span := otel.Tracer("signer").Start(ctx, "PublishSegment")
9797+ defer span.End()
9898+ key := segChanKey(user, rendition)
9999+ b.segChansMutex.Lock()
100100+ defer b.segChansMutex.Unlock()
101101+ b.segBufMutex.Lock()
102102+ defer b.segBufMutex.Unlock()
103103+ curBuf, ok := b.segBuf[key]
104104+ if !ok {
105105+ curBuf = []*Seg{}
106106+ b.segBuf[key] = curBuf
107107+ }
108108+ curBuf = append(curBuf, seg)
109109+ if len(curBuf) > bufSize {
110110+ curBuf = curBuf[1:]
111111+ }
112112+ b.segBuf[key] = curBuf
113113+ chs, ok := b.segChans[key]
114114+ if !ok {
115115+ return
116116+ }
117117+ for _, ch := range chs {
118118+ go func(segChan *SegChan) {
119119+ select {
120120+ case segChan.C <- seg:
121121+ case <-segChan.Context.Done():
122122+ return
123123+ case <-time.After(1 * time.Minute):
124124+ log.Warn(ctx, "failed to send segment to channel, timing out", "user", user, "rendition", rendition)
125125+ }
126126+127127+ }(ch)
128128+ }
129129+}
+1-58
pkg/cmd/streamplace.go
···116116 }
117117 _ = flag.Set("logtostderr", "true")
118118 vFlag := flag.Lookup("v")
119119- fs := flag.NewFlagSet("streamplace", flag.ExitOnError)
120119 cli := config.CLI{Build: build}
121121- fs.StringVar(&cli.DataDir, "data-dir", config.DefaultDataDir(), "directory for keeping all streamplace data")
122122- fs.StringVar(&cli.HTTPAddr, "http-addr", ":38080", "Public HTTP address")
123123- fs.StringVar(&cli.HTTPInternalAddr, "http-internal-addr", "127.0.0.1:39090", "Private, admin-only HTTP address")
124124- fs.StringVar(&cli.HTTPSAddr, "https-addr", ":38443", "Public HTTPS address")
125125- fs.BoolVar(&cli.Secure, "secure", false, "Run with HTTPS. Required for WebRTC output")
126126- cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate")
127127- cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key")
128128- fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app")
129129- cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file")
130130- fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node")
131131- fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key")
132132- fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
133133- cli.DataDirFlag(fs, &cli.EthKeystorePath, "eth-keystore-path", "keystore", "path to ethereum keystore")
134134- fs.StringVar(&cli.EthAccountAddr, "eth-account-addr", "", "ethereum account address to use (if keystore contains more than one)")
135135- fs.StringVar(&cli.EthPassword, "eth-password", "", "password for encrypting keystore")
136136- fs.StringVar(&cli.TAURL, "ta-url", "http://timestamp.digicert.com", "timestamp authority server for signing")
137137- fs.StringVar(&cli.PKCS11ModulePath, "pkcs11-module-path", "", "path to a PKCS11 module for HSM signing, for example /usr/lib/x86_64-linux-gnu/opensc-pkcs11.so")
138138- fs.StringVar(&cli.PKCS11Pin, "pkcs11-pin", "", "PIN for logging into PKCS11 token. if not provided, will be prompted interactively")
139139- fs.StringVar(&cli.PKCS11TokenSlot, "pkcs11-token-slot", "", "slot number of PKCS11 token (only use one of slot, label, or serial)")
140140- fs.StringVar(&cli.PKCS11TokenLabel, "pkcs11-token-label", "", "label of PKCS11 token (only use one of slot, label, or serial)")
141141- fs.StringVar(&cli.PKCS11TokenSerial, "pkcs11-token-serial", "", "serial number of PKCS11 token (only use one of slot, label, or serial)")
142142- fs.StringVar(&cli.PKCS11KeypairLabel, "pkcs11-keypair-label", "", "label of signing keypair on PKCS11 token")
143143- fs.StringVar(&cli.PKCS11KeypairID, "pkcs11-keypair-id", "", "id of signing keypair on PKCS11 token")
144144- fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for")
145145- fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node")
146146- fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend")
147147- fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding")
148148- fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)")
149149- cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node")
150150- cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to")
151151- cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four")
152152- cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4")
153153- fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot")
154154- fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose")
155155- fs.BoolVar(&cli.PrintChat, "print-chat", false, "print chat messages to stdout")
156156- fs.StringVar(&cli.WHIPTest, "whip-test", "", "run a WHIP self-test with the given parameters")
120120+ fs := cli.NewFlagSet("streamplace")
157121 verbosity := fs.String("v", "3", "log verbosity level")
158158- fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose")
159159- fs.Bool("insecure", false, "DEPRECATED, does nothing.")
160160- fs.StringVar(&cli.Color, "color", "", "'true' to enable colorized logging, 'false' to disable")
161161- fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)")
162162- fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation")
163163- fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps")
164164- fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)")
165165- fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to")
166166- fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip")
167167- fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
168168- fs.IntVar(&cli.RateLimitWebsocket, "rate-limit-websocket", 10, "number of concurrent websocket connections allowed per ip")
169169- fs.StringVar(&cli.RTMPServerAddon, "rtmp-server-addon", "", "address of external RTMP server to forward streams to")
170170- fs.StringVar(&cli.RtmpsAddr, "rtmps-addr", ":1935", "address to listen for RTMPS connections")
171171- cli.JSONFlag(fs, &cli.DiscordWebhooks, "discord-webhooks", "[]", "JSON array of Discord webhooks to send notifications to")
172122 version := fs.Bool("version", false, "print version and exit")
173173-174174- if runtime.GOOS == "linux" {
175175- fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")
176176- fs.IntVar(&cli.MistAdminPort, "mist-admin-port", 14242, "MistServer admin port (internal use only)")
177177- fs.IntVar(&cli.MistRTMPPort, "mist-rtmp-port", 11935, "MistServer RTMP port (internal use only)")
178178- fs.IntVar(&cli.MistHTTPPort, "mist-http-port", 18080, "MistServer HTTP port (internal use only)")
179179- }
180123181124 err = cli.Parse(
182125 fs, os.Args[1:],