···126126 * This specifies how the client should communicate,
127127 * and what kind of media client and server have negotiated to exchange.
128128 */
129129- let response = await postSDPOffer(`${endpoint}`, ofr.sdp, bearerToken);
129129+ let response = await postSDPOffer(endpoint, ofr.sdp, bearerToken);
130130 if (response.status === 201) {
131131 let answerSDP = await response.text();
132132 if ((peerConnection.connectionState as string) === "closed") {
···2626 }
2727 switch msg.Type() {
2828 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
2929- log.Debug(ctx, "got gst.MessageEOS, exiting")
2929+ log.Log(ctx, "got gst.MessageEOS, exiting")
3030 return
3131 case gst.MessageError: // Error messages are always fatal
3232 err := msg.ParseError()
+27-17
pkg/media/concat.go
···11package media
2233import (
44- "bytes"
54 "context"
65 "errors"
76 "fmt"
87 "io"
88+ "os"
99 "strings"
1010 "sync"
11111212 "github.com/go-gst/go-gst/gst"
1313 "github.com/go-gst/go-gst/gst/app"
1414 "stream.place/streamplace/pkg/log"
1515- "stream.place/streamplace/pkg/media/segchanman"
1615)
17161817type ConcatStreamer interface {
1919- SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg
2020- UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg)
1818+ SubscribeSegment(ctx context.Context, user string) <-chan string
1919+ UnsubscribeSegment(ctx context.Context, user string, ch <-chan string)
2120}
22212322// This function remains in scope for the duration of a single users' playback
2424-func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
2323+func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
2524 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
2625 ctx, cancel := context.WithCancel(ctx)
2726···3534 err = pipeline.Add(inputQueue)
3635 if err != nil {
3736 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
3737+ }
3838+ for _, tmpl := range inputQueue.GetPadTemplates() {
3939+ log.Warn(ctx, "pad template", "name", tmpl.GetName(), "direction", tmpl.Direction())
3840 }
3941 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
4042 if inputQueuePadVideoSink == nil {
···123125124126 // this goroutine will read all the files from the segment queue and buffer
125127 // them in a pipe so that we don't miss any in between iterations of the output
126126- allFiles := make(chan []byte, 1024)
128128+ allFiles := make(chan string, 1024)
127129 go func() {
128130 for {
129129- ch := streamer.SubscribeSegment(ctx, user, rendition)
131131+ ch := streamer.SubscribeSegment(ctx, user)
130132 select {
131133 case <-ctx.Done():
132132- log.Debug(ctx, "exiting segment reader")
133133- streamer.UnsubscribeSegment(ctx, user, rendition, ch)
134134+ log.Warn(ctx, "exiting segment reader")
135135+ streamer.UnsubscribeSegment(ctx, user, ch)
134136 return
135137 case file := <-ch:
136136- log.Debug(ctx, "got segment", "file", file.Filepath)
137137- allFiles <- file.Data
138138- if len(file.Data) == 0 {
138138+ log.Debug(ctx, "got segment", "file", file)
139139+ allFiles <- file
140140+ if file == "" {
139141 log.Warn(ctx, "no more segments")
140142 return
141143 }
···154156 pr.Close()
155157 pw.Close()
156158 return
157157- case bs := <-allFiles:
158158- if len(bs) == 0 {
159159+ case fullpath := <-allFiles:
160160+ if fullpath == "" {
159161 log.Warn(ctx, "no more segments")
160162 cancel()
161163 return
162164 }
163163- _, err = io.Copy(pw, bytes.NewReader(bs))
165165+ f, err := os.Open(fullpath)
166166+ log.Debug(ctx, "opening segment file", "file", fullpath)
164167 if err != nil {
165165- log.Error(ctx, "failed to copy segment file", "error", err)
168168+ log.Debug(ctx, "failed to open segment file", "error", err, "file", fullpath)
169169+ cancel()
170170+ return
171171+ }
172172+ defer f.Close()
173173+ _, err = io.Copy(pw, f)
174174+ if err != nil {
175175+ log.Error(ctx, "failed to copy segment file", "error", err, "file", fullpath)
166176 cancel()
167177 return
168178 }
···294304 done()
295305 return
296306 } else {
297297- log.Debug(ctx, "failed to read data, ending stream", "error", err)
307307+ log.Error(ctx, "failed to read data", "error", err)
298308 cancel()
299309 return
300310 }