···126 * This specifies how the client should communicate,
127 * and what kind of media client and server have negotiated to exchange.
128 */
129- let response = await postSDPOffer(endpoint, ofr.sdp, bearerToken);
130 if (response.status === 201) {
131 let answerSDP = await response.text();
132 if ((peerConnection.connectionState as string) === "closed") {
···126 * This specifies how the client should communicate,
127 * and what kind of media client and server have negotiated to exchange.
128 */
129+ let response = await postSDPOffer(`${endpoint}`, ofr.sdp, bearerToken);
130 if (response.status === 201) {
131 let answerSDP = await response.text();
132 if ((peerConnection.connectionState as string) === "closed") {
···26 }
27 switch msg.Type() {
28 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
29- log.Log(ctx, "got gst.MessageEOS, exiting")
30 return
31 case gst.MessageError: // Error messages are always fatal
32 err := msg.ParseError()
···26 }
27 switch msg.Type() {
28 case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop
29+ log.Debug(ctx, "got gst.MessageEOS, exiting")
30 return
31 case gst.MessageError: // Error messages are always fatal
32 err := msg.ParseError()
+17-27
pkg/media/concat.go
···1package media
23import (
04 "context"
5 "errors"
6 "fmt"
7 "io"
8- "os"
9 "strings"
10 "sync"
1112 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/log"
015)
1617type ConcatStreamer interface {
18- SubscribeSegment(ctx context.Context, user string) <-chan string
19- UnsubscribeSegment(ctx context.Context, user string, ch <-chan string)
20}
2122// This function remains in scope for the duration of a single users' playback
23-func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
24 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
25 ctx, cancel := context.WithCancel(ctx)
26···34 err = pipeline.Add(inputQueue)
35 if err != nil {
36 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
37- }
38- for _, tmpl := range inputQueue.GetPadTemplates() {
39- log.Warn(ctx, "pad template", "name", tmpl.GetName(), "direction", tmpl.Direction())
40 }
41 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
42 if inputQueuePadVideoSink == nil {
···125126 // this goroutine will read all the files from the segment queue and buffer
127 // them in a pipe so that we don't miss any in between iterations of the output
128- allFiles := make(chan string, 1024)
129 go func() {
130 for {
131- ch := streamer.SubscribeSegment(ctx, user)
132 select {
133 case <-ctx.Done():
134- log.Warn(ctx, "exiting segment reader")
135- streamer.UnsubscribeSegment(ctx, user, ch)
136 return
137 case file := <-ch:
138- log.Debug(ctx, "got segment", "file", file)
139- allFiles <- file
140- if file == "" {
141 log.Warn(ctx, "no more segments")
142 return
143 }
···156 pr.Close()
157 pw.Close()
158 return
159- case fullpath := <-allFiles:
160- if fullpath == "" {
161 log.Warn(ctx, "no more segments")
162 cancel()
163 return
164 }
165- f, err := os.Open(fullpath)
166- log.Debug(ctx, "opening segment file", "file", fullpath)
167- if err != nil {
168- log.Debug(ctx, "failed to open segment file", "error", err, "file", fullpath)
169- cancel()
170- return
171- }
172- defer f.Close()
173- _, err = io.Copy(pw, f)
174 if err != nil {
175- log.Error(ctx, "failed to copy segment file", "error", err, "file", fullpath)
176 cancel()
177 return
178 }
···304 done()
305 return
306 } else {
307- log.Error(ctx, "failed to read data", "error", err)
308 cancel()
309 return
310 }
···1package media
23import (
4+ "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
09 "strings"
10 "sync"
1112 "github.com/go-gst/go-gst/gst"
13 "github.com/go-gst/go-gst/gst/app"
14 "stream.place/streamplace/pkg/log"
15+ "stream.place/streamplace/pkg/media/segchanman"
16)
1718type ConcatStreamer interface {
19+ SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg
20+ UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg)
21}
2223// This function remains in scope for the duration of a single users' playback
24+func ConcatStream(ctx context.Context, pipeline *gst.Pipeline, user string, rendition string, streamer ConcatStreamer) (*gst.Element, <-chan struct{}, error) {
25 ctx = log.WithLogValues(ctx, "func", "ConcatStream")
26 ctx, cancel := context.WithCancel(ctx)
27···35 err = pipeline.Add(inputQueue)
36 if err != nil {
37 return nil, nil, fmt.Errorf("failed to add input multiqueue to pipeline: %w", err)
00038 }
39 inputQueuePadVideoSink := inputQueue.GetRequestPad("sink_%u")
40 if inputQueuePadVideoSink == nil {
···123124 // this goroutine will read all the files from the segment queue and buffer
125 // them in a pipe so that we don't miss any in between iterations of the output
126+ allFiles := make(chan []byte, 1024)
127 go func() {
128 for {
129+ ch := streamer.SubscribeSegment(ctx, user, rendition)
130 select {
131 case <-ctx.Done():
132+ log.Debug(ctx, "exiting segment reader")
133+ streamer.UnsubscribeSegment(ctx, user, rendition, ch)
134 return
135 case file := <-ch:
136+ log.Debug(ctx, "got segment", "file", file.Filepath)
137+ allFiles <- file.Data
138+ if len(file.Data) == 0 {
139 log.Warn(ctx, "no more segments")
140 return
141 }
···154 pr.Close()
155 pw.Close()
156 return
157+ case bs := <-allFiles:
158+ if len(bs) == 0 {
159 log.Warn(ctx, "no more segments")
160 cancel()
161 return
162 }
163+ _, err = io.Copy(pw, bytes.NewReader(bs))
00000000164 if err != nil {
165+ log.Error(ctx, "failed to copy segment file", "error", err)
166 cancel()
167 return
168 }
···294 done()
295 return
296 } else {
297+ log.Debug(ctx, "failed to read data, ending stream", "error", err)
298 cancel()
299 return
300 }
···16 LexiconTypeID string `json:"$type,const=place.stream.segment" cborgen:"$type,const=place.stream.segment"`
17 Audio []*Segment_Audio `json:"audio,omitempty" cborgen:"audio,omitempty"`
18 Creator string `json:"creator" cborgen:"creator"`
0019 // id: Unique identifier for the segment
20 Id string `json:"id" cborgen:"id"`
21 // signingKey: The DID of the signing key used for this segment
···32 Rate int64 `json:"rate" cborgen:"rate"`
33}
3400000035// Segment_Video is a "video" in the place.stream.segment schema.
36type Segment_Video struct {
37- Codec string `json:"codec" cborgen:"codec"`
38- Height int64 `json:"height" cborgen:"height"`
39- Width int64 `json:"width" cborgen:"width"`
040}
···16 LexiconTypeID string `json:"$type,const=place.stream.segment" cborgen:"$type,const=place.stream.segment"`
17 Audio []*Segment_Audio `json:"audio,omitempty" cborgen:"audio,omitempty"`
18 Creator string `json:"creator" cborgen:"creator"`
19+ // duration: The duration of the segment in nanoseconds
20+ Duration *int64 `json:"duration,omitempty" cborgen:"duration,omitempty"`
21 // id: Unique identifier for the segment
22 Id string `json:"id" cborgen:"id"`
23 // signingKey: The DID of the signing key used for this segment
···34 Rate int64 `json:"rate" cborgen:"rate"`
35}
3637+// Segment_Framerate is a "framerate" in the place.stream.segment schema.
38+type Segment_Framerate struct {
39+ Den int64 `json:"den" cborgen:"den"`
40+ Num int64 `json:"num" cborgen:"num"`
41+}
42+43// Segment_Video is a "video" in the place.stream.segment schema.
44type Segment_Video struct {
45+ Codec string `json:"codec" cborgen:"codec"`
46+ Framerate *Segment_Framerate `json:"framerate,omitempty" cborgen:"framerate,omitempty"`
47+ Height int64 `json:"height" cborgen:"height"`
48+ Width int64 `json:"width" cborgen:"width"`
49}