Live video on the AT Protocol

media: fix gstreamer leaks for everybody forever (#91)

* media: zero-leak test harness

* media: zero-leak writerNewSample test

* media: bus handler test

* media: start of ConcatBin

* media: failing checkpoint before pad-added

* media: pad-added, 3 leaks

* media: pad-added hack, 1 leak

* media: bit of cleanup

* media: add leak demo

* build: bump gstreamer to fix memory leak

* media: zero leaks in thumbnail!

* media: rename concat2 --> concat_demux

* media: add ConcatBin

* concat: checkpoint

* concat: working!

* concat: mostly passing???

* media: move to pushing entire buffer at once

* concat_demux: goofy way to avoid pad leak

* media: re-activate leak detection on concat

* webrtc_playback: use new concat!

* media: fix some tests

* media: perhaps these tests will pass

authored by

Eli Mallon and committed by
GitHub
22048845 b0bc0221

+1472 -629
+1 -1
Makefile
··· 71 71 PKG_CONFIG_PATH=$(SHARED_PKG_CONFIG_PATH) \ 72 72 LD_LIBRARY_PATH=$(SHARED_LD_LIBRARY_PATH) \ 73 73 DYLD_LIBRARY_PATH=$(SHARED_DYLD_LIBRARY_PATH) \ 74 - go test ./... 74 + go test -p 1 -timeout 120s ./... 75 75 76 76 .PHONY: schema 77 77 schema:
+23
hack/parallel.sh
··· 1 + #!/bin/bash 2 + 3 + set -euo pipefail 4 + 5 + SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) 6 + export LD_LIBRARY_PATH="$SCRIPT_DIR/build-darwin-arm64/lib/usr/local/lib/x86_64-linux-gnu" 7 + export DYLD_LIBRARY_PATH="$SCRIPT_DIR/build-darwin-arm64/lib/usr/local/lib" 8 + 9 + rm -rf media.test 10 + STREAMPLACE_TEST_COUNT=1 go test -c -timeout 60s -run '^TestConcatBin$' stream.place/streamplace/pkg/media --count=1 -v 11 + 12 + for i in {1..50}; do 13 + GST_DEBUG='*:5' STREAMPLACE_TEST_COUNT=1 TEST_TAG="test_output_$i.log" ./media.test -test.paniconexit0 -test.timeout=1m0s -test.run=^TestConcatBin$ -test.count=1 -test.v=true >"test_output_$i.log" 2>&1 & 14 + done 15 + 16 + # Wait for all background processes to complete 17 + wait 18 + 19 + for i in {1..50}; do 20 + if cat "test_output_$i.log" | grep "panic:" > /dev/null; then 21 + echo "test_output_$i.log: panic:" 22 + fi 23 + done
+2
meson.build
··· 263 263 timeout: 0, 264 264 args: [ 265 265 'test', 266 + '-p', '1', 267 + '-timeout', '120s', 266 268 '@0@/pkg/...'.format(meson.current_source_dir()), 267 269 '@0@/cmd/...'.format(meson.current_source_dir()), 268 270 ],
-24
pkg/api/api_internal.go
··· 160 160 http.ServeFile(w, r, fullpath) 161 161 }) 162 162 163 - router.GET("/playback/:user/:rendition/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 164 - user := p.ByName("user") 165 - if user == "" { 166 - errors.WriteHTTPBadRequest(w, "user required", nil) 167 - return 168 - } 169 - rendition := p.ByName("rendition") 170 - if rendition == "" { 171 - errors.WriteHTTPBadRequest(w, "rendition required", nil) 172 - return 173 - } 174 - user, err := a.NormalizeUser(ctx, user) 175 - if err != nil { 176 - errors.WriteHTTPBadRequest(w, "invalid user", err) 177 - return 178 - } 179 - w.Header().Set("Content-Type", "video/x-matroska") 180 - w.WriteHeader(200) 181 - err = a.MediaManager.SegmentToMKVPlusOpus(ctx, user, rendition, w) 182 - if err != nil { 183 - log.Log(ctx, "stream.mkv error", "error", err) 184 - } 185 - }) 186 - 187 163 router.GET("/playback/:user/:rendition/stream.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 188 164 user := p.ByName("user") 189 165 if user == "" {
+53
pkg/media/bus_handler.go
··· 31 31 return nil 32 32 case gst.MessageError: // Error messages are always fatal 33 33 err := msg.ParseError() 34 + if err.Error() == fmt.Sprintf("%s: %s", ErrConcatDone.Error(), ErrConcatDone.Error()) { 35 + log.Debug(ctx, "got ErrConcatDone, exiting") 36 + return nil 37 + } 34 38 log.Error(ctx, "gstreamer error", "error", err.Error()) 35 39 if debug := err.DebugString(); debug != "" { 36 40 log.Debug(ctx, "gstreamer debug", "message", debug) 37 41 } 38 42 return fmt.Errorf("gstreamer error: %w", err) 43 + case gst.MessageElement: 44 + // this one is noisy and not useful 39 45 default: 40 46 log.Debug(ctx, msg.String()) 41 47 } 42 48 } 43 49 } 50 + 51 + // func HandleBusMessages(ctx context.Context, pipeline *gst.Pipeline) error { 52 + // return HandleBusMessagesCustom(ctx, pipeline, nil) 53 + // } 54 + 55 + // func HandleBusMessagesCustom(ctx context.Context, pipeline *gst.Pipeline, handler func(msg *gst.Message)) error { 56 + // msgCh := make(chan *gst.Message, 1024) 57 + // bus := pipeline.GetPipelineBus() 58 + // bus.SetSyncHandler(func(msg *gst.Message) gst.BusSyncReply { 59 + // if ctx.Err() != nil { 60 + // log.Error(ctx, "context cancelled, dropping message", "message", msg.String()) 61 + // msg.Unref() 62 + // return gst.BusDrop 63 + // } 64 + // log.Error(ctx, "got message", "message", msg.String()) 65 + // msgCh <- msg 66 + // return gst.BusDrop 67 + // }) 68 + // for { 69 + // if ctx.Err() != nil { 70 + // return ctx.Err() 71 + // } 72 + // select { 73 + // case <-ctx.Done(): 74 + // return nil 75 + // case msg := <-msgCh: 76 + // if handler != nil { 77 + // handler(msg) 78 + // } 79 + // switch msg.Type() { 80 + // case gst.MessageEOS: // When end-of-stream is received flush the pipeline and stop the main loop 81 + // log.Debug(ctx, "got gst.MessageEOS, exiting") 82 + // return nil 83 + // case gst.MessageError: // Error messages are always fatal 84 + // err := msg.ParseError() 85 + // log.Error(ctx, "gstreamer error", "error", err.Error()) 86 + // if debug := err.DebugString(); debug != "" { 87 + // log.Debug(ctx, "gstreamer debug", "message", debug) 88 + // } 89 + // return fmt.Errorf("gstreamer error: %w", err) 90 + // default: 91 + // log.Debug(ctx, msg.String()) 92 + // msg.Unref() 93 + // } 94 + // } 95 + // } 96 + // }
+103
pkg/media/bus_handler_test.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "testing" 7 + 8 + "github.com/go-gst/go-gst/gst" 9 + "github.com/go-gst/go-gst/gst/app" 10 + "github.com/stretchr/testify/require" 11 + "go.uber.org/goleak" 12 + "golang.org/x/sync/errgroup" 13 + "stream.place/streamplace/pkg/log" 14 + ) 15 + 16 + func TestBusHandlerCleanup(t *testing.T) { 17 + ignore := goleak.IgnoreCurrent() 18 + defer goleak.VerifyNone(t, ignore) 19 + before := getLeakCount(t) 20 + defer checkGStreamerLeaks(t, before) 21 + 22 + g, ctx := errgroup.WithContext(context.Background()) 23 + ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestBusHandler": 9}}) 24 + for i := 0; i < streamplaceTestCount; i++ { 25 + g.Go(func() error { 26 + err := testBusHandlerCleanupInner(ctx, i) 27 + if err == nil { 28 + return fmt.Errorf("expected error") 29 + } 30 + return nil 31 + }) 32 + } 33 + err := g.Wait() 34 + require.NoError(t, err) 35 + } 36 + 37 + func testBusHandlerCleanupInner(ctx context.Context, i int) error { 38 + ctx = log.WithLogValues(ctx, "func", "TestBusHandler") 39 + ctx, cancel := context.WithCancel(ctx) 40 + 41 + pipeline, err := gst.NewPipeline(fmt.Sprintf("TestBusHandler-%d", i)) 42 + if err != nil { 43 + return err 44 + } 45 + 46 + busDone := make(chan struct{}) 47 + go func() { 48 + _ = HandleBusMessages(ctx, pipeline) 49 + busDone <- struct{}{} 50 + cancel() 51 + }() 52 + 53 + defer func() { 54 + cancel() 55 + <-busDone 56 + err = pipeline.SetState(gst.StateNull) 57 + if err != nil { 58 + panic(fmt.Sprintf("failed to set state to null: %s", err)) 59 + } 60 + }() 61 + 62 + fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 63 + "location": getFixture("5sec.mp4"), 64 + }) 65 + if err != nil { 66 + return err 67 + } 68 + err = pipeline.Add(fileSrc) 69 + if err != nil { 70 + return err 71 + } 72 + 73 + demux, err := gst.NewElementWithProperties("qtdemux", map[string]any{ 74 + "name": fmt.Sprintf("TestBusHandler-qtdemux-%d", i), 75 + }) 76 + if err != nil { 77 + return err 78 + } 79 + err = pipeline.Add(demux) 80 + if err != nil { 81 + return err 82 + } 83 + 84 + appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 85 + "name": fmt.Sprintf("TestBusHandler-appsink-%d", i), 86 + "sync": false, 87 + }) 88 + if err != nil { 89 + return err 90 + } 91 + err = pipeline.Add(appSink) 92 + if err != nil { 93 + return err 94 + } 95 + 96 + sink := app.SinkFromElement(appSink) 97 + sink.SetCallbacks(&app.SinkCallbacks{ 98 + NewSampleFunc: WriterNewSample(ctx, nil), 99 + }) 100 + 101 + return fmt.Errorf("test error") 102 + 103 + }
+4 -24
pkg/media/concat.go
··· 52 52 if inputQueuePadAudioSrc == nil { 53 53 return nil, nil, fmt.Errorf("failed to get input queue audio src pad") 54 54 } 55 - 56 - go func() { 57 - <-ctx.Done() 58 - inputQueue.SetState(gst.StateNull) 59 - inputQueue = nil 60 - inputQueuePadVideoSink = nil 61 - inputQueuePadVideoSrc = nil 62 - inputQueuePadAudioSink = nil 63 - inputQueuePadAudioSrc = nil 64 - }() 65 - 66 55 // streamsynchronizer 67 56 streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{}) 68 57 if err != nil { 69 58 return nil, nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 70 59 } 71 - go func() { 72 - <-ctx.Done() 73 - streamsynchronizer.SetState(gst.StateNull) 74 - streamsynchronizer = nil 75 - }() 60 + 76 61 err = pipeline.Add(streamsynchronizer) 77 62 if err != nil { 78 63 return nil, nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) ··· 113 98 if outputQueuePadAudioSink == nil { 114 99 return nil, nil, fmt.Errorf("failed to get output queue audio sink pad") 115 100 } 116 - go func() { 117 - <-ctx.Done() 118 - outputQueue.SetState(gst.StateNull) 119 - outputQueue = nil 120 - outputQueuePadVideoSink = nil 121 - outputQueuePadAudioSink = nil 122 - }() 123 - 124 101 // linking 125 102 126 103 // input queue to streamsynchronizer ··· 381 358 382 359 select { 383 360 case <-ctx.Done(): 361 + return 384 362 case <-segDone: 385 363 } 386 364 ··· 388 366 demux.SetState(gst.StateNull) 389 367 src.SetCallbacks(&app.SourceCallbacks{}) 390 368 appsrc.SetState(gst.StateNull) 369 + pipeline.Remove(demux) 370 + pipeline.Remove(appsrc) 391 371 pr.Close() 392 372 pw.Close() 393 373 }
+206
pkg/media/concat2.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + 8 + "github.com/go-gst/go-gst/gst" 9 + "stream.place/streamplace/pkg/log" 10 + "stream.place/streamplace/pkg/media/segchanman" 11 + ) 12 + 13 + var ErrConcatDone = errors.New("concat done") 14 + 15 + func ConcatBin(ctx context.Context, segCh <-chan *segchanman.Seg) (*gst.Bin, error) { 16 + ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 + bin := gst.NewBin("concat-bin") 18 + 19 + streamsynchronizer, err := gst.NewElementWithProperties("streamsynchronizer", map[string]any{ 20 + "name": "concat-streamsynchronizer", 21 + }) 22 + if err != nil { 23 + return nil, fmt.Errorf("failed to create streamsynchronizer element: %w", err) 24 + } 25 + 26 + err = bin.Add(streamsynchronizer) 27 + if err != nil { 28 + return nil, fmt.Errorf("failed to add streamsynchronizer to pipeline: %w", err) 29 + } 30 + 31 + syncPadVideoSink := streamsynchronizer.GetRequestPad("sink_%u") 32 + if syncPadVideoSink == nil { 33 + return nil, fmt.Errorf("failed to get sync video sink pad") 34 + } 35 + 36 + syncPadAudioSink := streamsynchronizer.GetRequestPad("sink_%u") 37 + if syncPadAudioSink == nil { 38 + return nil, fmt.Errorf("failed to get sync audio sink pad") 39 + } 40 + 41 + syncPadVideoSrc := streamsynchronizer.GetStaticPad("src_0") 42 + if syncPadVideoSrc == nil { 43 + return nil, fmt.Errorf("failed to get sync video src pad") 44 + } 45 + 46 + syncPadAudioSrc := streamsynchronizer.GetStaticPad("src_1") 47 + if syncPadAudioSrc == nil { 48 + return nil, fmt.Errorf("failed to get sync audio src pad") 49 + } 50 + 51 + mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 52 + "name": "concat-multiqueue", 53 + }) 54 + if err != nil { 55 + return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 56 + } 57 + err = bin.Add(mq) 58 + if err != nil { 59 + return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 60 + } 61 + 62 + mqVideoSink := mq.GetRequestPad("sink_%u") 63 + if mqVideoSink == nil { 64 + return nil, fmt.Errorf("video sink pad not found") 65 + } 66 + 67 + mqAudioSink := mq.GetRequestPad("sink_%u") 68 + if mqAudioSink == nil { 69 + return nil, fmt.Errorf("audio sink pad not found") 70 + } 71 + 72 + mqVideoSrc := mq.GetStaticPad("src_0") 73 + if mqVideoSrc == nil { 74 + return nil, fmt.Errorf("video source pad not found") 75 + } 76 + 77 + mqAudioSrc := mq.GetStaticPad("src_1") 78 + if mqAudioSrc == nil { 79 + return nil, fmt.Errorf("audio source pad not found") 80 + } 81 + 82 + linked := syncPadVideoSrc.Link(mqVideoSink) 83 + if linked != gst.PadLinkOK { 84 + return nil, fmt.Errorf("failed to link sync video src pad to multiqueue video sink pad: %v", linked) 85 + } 86 + 87 + linked = syncPadAudioSrc.Link(mqAudioSink) 88 + if linked != gst.PadLinkOK { 89 + return nil, fmt.Errorf("failed to link sync audio src pad to multiqueue audio sink pad: %v", linked) 90 + } 91 + 92 + videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 93 + if videoGhost == nil { 94 + return nil, fmt.Errorf("failed to create video ghost pad") 95 + } 96 + 97 + audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 98 + if audioGhost == nil { 99 + return nil, fmt.Errorf("failed to create audio ghost pad") 100 + } 101 + 102 + ok := bin.AddPad(videoGhost.Pad) 103 + if !ok { 104 + return nil, fmt.Errorf("failed to add video ghost pad to bin") 105 + } 106 + 107 + ok = bin.AddPad(audioGhost.Pad) 108 + if !ok { 109 + return nil, fmt.Errorf("failed to add audio ghost pad to bin") 110 + } 111 + 112 + go func() { 113 + for { 114 + select { 115 + case seg := <-segCh: 116 + if seg == nil { 117 + bin.Error(ErrConcatDone.Error(), ErrConcatDone) 118 + return 119 + } 120 + err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink) 121 + if err != nil { 122 + panic(fmt.Errorf("failed to add concat demuxer: %w", err)) 123 + } 124 + case <-ctx.Done(): 125 + return 126 + } 127 + } 128 + }() 129 + 130 + return bin, nil 131 + } 132 + 133 + func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *segchanman.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error { 134 + 135 + log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 136 + demuxBin, err := ConcatDemuxBin(ctx, seg) 137 + if err != nil { 138 + return fmt.Errorf("failed to create demux bin: %w", err) 139 + } 140 + 141 + err = bin.Add(demuxBin.Element) 142 + if err != nil { 143 + return fmt.Errorf("failed to add demux bin to bin: %w", err) 144 + } 145 + 146 + demuxBinPadVideoSrc := demuxBin.GetStaticPad("video_0") 147 + if demuxBinPadVideoSrc == nil { 148 + return fmt.Errorf("failed to get demux bin video src pad") 149 + } 150 + 151 + demuxBinPadAudioSrc := demuxBin.GetStaticPad("audio_0") 152 + if demuxBinPadAudioSrc == nil { 153 + return fmt.Errorf("failed to get demux bin audio src pad") 154 + } 155 + 156 + linked := demuxBinPadVideoSrc.Link(syncPadVideoSink) 157 + if linked != gst.PadLinkOK { 158 + return fmt.Errorf("failed to link demux bin video src pad to sync video sink pad: %v", linked) 159 + } 160 + 161 + linked = demuxBinPadAudioSrc.Link(syncPadAudioSink) 162 + if linked != gst.PadLinkOK { 163 + return fmt.Errorf("failed to link demux bin audio src pad to sync audio sink pad: %v", linked) 164 + } 165 + 166 + eosCh := make(chan struct{}) 167 + eos := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 168 + if pad.GetDirection() != gst.PadDirectionSource { 169 + return gst.PadProbeOK 170 + } 171 + if info.GetEvent().Type() != gst.EventTypeEOS { 172 + return gst.PadProbeOK 173 + } 174 + log.Debug(ctx, "demux EOS", "name", pad.GetName(), "direction", pad.GetDirection()) 175 + downstreamPad := pad.GetPeer() 176 + unlinked := pad.Unlink(downstreamPad) 177 + if !unlinked { 178 + log.Error(ctx, "failed to unlink pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", unlinked) 179 + } 180 + go func() { 181 + eosCh <- struct{}{} 182 + }() 183 + return gst.PadProbeRemove 184 + } 185 + demuxBinPadVideoSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 186 + demuxBinPadAudioSrc.AddProbe(gst.PadProbeTypeEventBoth, eos) 187 + 188 + bin.SetState(gst.StatePlaying) 189 + 190 + <-eosCh 191 + <-eosCh 192 + 193 + err = bin.Remove(demuxBin.Element) 194 + if err != nil { 195 + return fmt.Errorf("failed to remove demux bin from bin: %w", err) 196 + } 197 + 198 + err = demuxBin.SetState(gst.StateNull) 199 + if err != nil { 200 + return fmt.Errorf("failed to set demux bin to null state: %w", err) 201 + } 202 + 203 + log.Debug(ctx, "removed concat demuxer", "seg", seg.Filepath) 204 + 205 + return nil 206 + }
+226
pkg/media/concat2_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "io" 8 + "os" 9 + "testing" 10 + "time" 11 + 12 + "github.com/go-gst/go-gst/gst" 13 + "github.com/go-gst/go-gst/gst/app" 14 + "github.com/google/uuid" 15 + "github.com/stretchr/testify/require" 16 + "go.uber.org/goleak" 17 + "golang.org/x/sync/errgroup" 18 + "stream.place/streamplace/pkg/gstinit" 19 + "stream.place/streamplace/pkg/log" 20 + "stream.place/streamplace/pkg/media/segchanman" 21 + ) 22 + 23 + func TestConcatBin(t *testing.T) { 24 + gstinit.InitGST() 25 + before := getLeakCount(t) 26 + defer checkGStreamerLeaks(t, before) 27 + ignore := goleak.IgnoreCurrent() 28 + defer goleak.VerifyNone(t, ignore) 29 + 30 + g, _ := errgroup.WithContext(context.Background()) 31 + for i := 0; i < streamplaceTestCount; i++ { 32 + g.Go(func() error { 33 + return innerTestConcatBin(t) 34 + }) 35 + } 36 + err := g.Wait() 37 + require.NoError(t, err) 38 + } 39 + 40 + // This function remains in scope for the duration of a single users' playback 41 + func innerTestConcatBin(t *testing.T) error { 42 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "ConcatBin": 9, "SegDemuxBin": 9}}) 43 + tag := os.Getenv("TEST_TAG") 44 + uuid, _ := uuid.NewV7() 45 + uuidStr := uuid.String() 46 + if tag != "" { 47 + ctx = log.WithLogValues(ctx, "tag", tag) 48 + uuidStr = fmt.Sprintf("%s-%s", tag, uuidStr) 49 + } 50 + ctx = log.WithLogValues(ctx, "func", "ConcatBin", "uuid", uuidStr) 51 + ctx, cancel := context.WithCancel(ctx) 52 + // defer cancel() 53 + 54 + pipeline, err := gst.NewPipeline("TestConcatBin") 55 + if err != nil { 56 + return fmt.Errorf("failed to create pipeline: %w", err) 57 + } 58 + 59 + errCh := make(chan error) 60 + go func() { 61 + err := HandleBusMessages(ctx, pipeline) 62 + cancel() 63 + errCh <- err 64 + close(errCh) 65 + }() 66 + 67 + defer func() { 68 + cancel() 69 + err := <-errCh 70 + require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 71 + err = pipeline.BlockSetState(gst.StateNull) 72 + require.NoError(t, err, fmt.Sprintf("uuid: %s", uuidStr)) 73 + }() 74 + 75 + filename := getFixture("sample-segment.mp4") 76 + inputFile, err := os.Open(filename) 77 + if err != nil { 78 + return fmt.Errorf("failed to open fixture file: %w", err) 79 + } 80 + defer inputFile.Close() 81 + 82 + bs, err := io.ReadAll(inputFile) 83 + if err != nil { 84 + return fmt.Errorf("failed to read fixture file: %w", err) 85 + } 86 + 87 + testSegs := []*segchanman.Seg{} 88 + for i := 0; i < 5; i++ { 89 + testSegs = append(testSegs, &segchanman.Seg{ 90 + Data: bs, 91 + Filepath: filename, 92 + }) 93 + } 94 + 95 + segCh := make(chan *segchanman.Seg) 96 + go func() { 97 + for _, seg := range testSegs { 98 + segCh <- seg 99 + } 100 + close(segCh) 101 + }() 102 + 103 + concatBin, err := ConcatBin(ctx, segCh) 104 + if err != nil { 105 + return fmt.Errorf("failed to create concat bin: %w", err) 106 + } 107 + 108 + err = pipeline.Add(concatBin.Element) 109 + if err != nil { 110 + return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 111 + } 112 + 113 + videoPad := concatBin.GetStaticPad("video_0") 114 + if videoPad == nil { 115 + return fmt.Errorf("video pad not found") 116 + } 117 + 118 + audioPad := concatBin.GetStaticPad("audio_0") 119 + if audioPad == nil { 120 + return fmt.Errorf("audio pad not found") 121 + } 122 + 123 + videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 124 + "name": "videoappsink", 125 + "sync": false, 126 + }) 127 + if err != nil { 128 + return fmt.Errorf("failed to create video appsink: %w", err) 129 + } 130 + 131 + err = pipeline.Add(videoAppSink) 132 + if err != nil { 133 + return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 134 + } 135 + 136 + videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 137 + if videoAppSinkPadSink == nil { 138 + return fmt.Errorf("video appsink pad not found") 139 + } 140 + 141 + audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 142 + "name": "audioappsink", 143 + "sync": false, 144 + }) 145 + if err != nil { 146 + return fmt.Errorf("failed to create audio appsink: %w", err) 147 + } 148 + 149 + err = pipeline.Add(audioAppSink) 150 + if err != nil { 151 + return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 152 + } 153 + 154 + audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 155 + if audioAppSinkPadSink == nil { 156 + return fmt.Errorf("audio appsink pad not found") 157 + } 158 + 159 + ok := videoPad.Link(videoAppSinkPadSink) 160 + if ok != gst.PadLinkOK { 161 + return fmt.Errorf("failed to link video pad: %v", ok) 162 + } 163 + 164 + ok = audioPad.Link(audioAppSinkPadSink) 165 + if ok != gst.PadLinkOK { 166 + return fmt.Errorf("failed to link audio pad: %v", ok) 167 + } 168 + 169 + videoBuf := bytes.Buffer{} 170 + audioBuf := bytes.Buffer{} 171 + 172 + videoappsink := app.SinkFromElement(videoAppSink) 173 + videoappsink.SetCallbacks(&app.SinkCallbacks{ 174 + NewSampleFunc: WriterNewSample(ctx, &videoBuf), 175 + }) 176 + 177 + audioappsink := app.SinkFromElement(audioAppSink) 178 + audioappsink.SetCallbacks(&app.SinkCallbacks{ 179 + NewSampleFunc: WriterNewSample(ctx, &audioBuf), 180 + }) 181 + 182 + // Start the pipeline 183 + err = pipeline.SetState(gst.StatePlaying) 184 + if err != nil { 185 + return fmt.Errorf("failed to set pipeline to playing state: %w", err) 186 + } 187 + 188 + // Start a goroutine to print buffer sizes 189 + go func() { 190 + for { 191 + select { 192 + case <-ctx.Done(): 193 + return 194 + case <-time.After(1 * time.Second): 195 + log.Debug(ctx, "buffer sizes", 196 + "videoBuf", videoBuf.Len(), 197 + "audioBuf", audioBuf.Len()) 198 + } 199 + } 200 + }() 201 + 202 + <-ctx.Done() 203 + 204 + time.Sleep(5 * time.Second) 205 + 206 + padIdleCh := make(chan struct{}) 207 + 208 + padIdle := func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 209 + log.Debug(ctx, "pad-idle", "name", pad.GetName(), "direction", pad.GetDirection()) 210 + go func() { 211 + padIdleCh <- struct{}{} 212 + }() 213 + return gst.PadProbeRemove 214 + } 215 + 216 + videoAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 217 + audioAppSinkPadSink.AddProbe(gst.PadProbeTypeIdle, padIdle) 218 + 219 + <-padIdleCh 220 + <-padIdleCh 221 + 222 + require.Equal(t, 4936240, videoBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 223 + require.Equal(t, 32200, audioBuf.Len(), fmt.Sprintf("uuid: %s", uuidStr)) 224 + 225 + return <-errCh 226 + }
+149
pkg/media/concat_demux.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "strings" 8 + 9 + "github.com/go-gst/go-gst/gst" 10 + "github.com/go-gst/go-gst/gst/app" 11 + "stream.place/streamplace/pkg/log" 12 + "stream.place/streamplace/pkg/media/segchanman" 13 + ) 14 + 15 + // silly technique to avoid leaking pads 16 + func doNothing(self *gst.Element, pad *gst.Pad) {} 17 + 18 + func ConcatDemuxBin(ctx context.Context, seg *segchanman.Seg) (*gst.Bin, error) { 19 + ctx = log.WithLogValues(ctx, "func", "SegDemuxBin") 20 + bin := gst.NewBin("seg-demux-bin") 21 + 22 + appSrc, err := gst.NewElementWithProperties("appsrc", map[string]interface{}{ 23 + "name": "concat-appsrc", 24 + }) 25 + if err != nil { 26 + return nil, fmt.Errorf("failed to create appsrc element: %w", err) 27 + } 28 + err = bin.Add(appSrc) 29 + if err != nil { 30 + return nil, fmt.Errorf("failed to add appsrc to bin: %w", err) 31 + } 32 + 33 + demux, err := gst.NewElementWithProperties("qtdemux", map[string]interface{}{ 34 + "name": "concat-demux", 35 + }) 36 + if err != nil { 37 + return nil, fmt.Errorf("failed to create qtdemux element: %w", err) 38 + } 39 + err = bin.Add(demux) 40 + if err != nil { 41 + return nil, fmt.Errorf("failed to add qtdemux to bin: %w", err) 42 + } 43 + 44 + err = appSrc.Link(demux) 45 + if err != nil { 46 + return nil, fmt.Errorf("failed to link appsrc to qtdemux: %w", err) 47 + } 48 + 49 + tmpl := demux.GetPadTemplates() 50 + if tmpl == nil { 51 + return nil, fmt.Errorf("pad templates not found") 52 + } 53 + 54 + mq, err := gst.NewElementWithProperties("multiqueue", map[string]interface{}{ 55 + "name": "concat-demux-multiqueue", 56 + }) 57 + if err != nil { 58 + return nil, fmt.Errorf("failed to create multiqueue element: %w", err) 59 + } 60 + err = bin.Add(mq) 61 + if err != nil { 62 + return nil, fmt.Errorf("failed to add multiqueue to bin: %w", err) 63 + } 64 + 65 + mqVideoSink := mq.GetRequestPad("sink_%u") 66 + if mqVideoSink == nil { 67 + return nil, fmt.Errorf("video sink pad not found") 68 + } 69 + 70 + mqAudioSink := mq.GetRequestPad("sink_%u") 71 + if mqAudioSink == nil { 72 + return nil, fmt.Errorf("audio sink pad not found") 73 + } 74 + 75 + mqVideoSrc := mq.GetStaticPad("src_0") 76 + if mqVideoSrc == nil { 77 + return nil, fmt.Errorf("video source pad not found") 78 + } 79 + 80 + mqAudioSrc := mq.GetStaticPad("src_1") 81 + if mqAudioSrc == nil { 82 + return nil, fmt.Errorf("audio source pad not found") 83 + } 84 + 85 + videoGhost := gst.NewGhostPad("video_0", mqVideoSrc) 86 + if videoGhost == nil { 87 + return nil, fmt.Errorf("failed to create video ghost pad") 88 + } 89 + 90 + audioGhost := gst.NewGhostPad("audio_0", mqAudioSrc) 91 + if audioGhost == nil { 92 + return nil, fmt.Errorf("failed to create audio ghost pad") 93 + } 94 + 95 + needed := 2 96 + 97 + var padAdded func(self *gst.Element, pad *gst.Pad) 98 + // the defer funcs are needed to avoid leaking pads for some reason 99 + padAdded = func(self *gst.Element, pad *gst.Pad) { 100 + log.Debug(ctx, "demux pad-added", "name", pad.GetName(), "direction", pad.GetDirection()) 101 + var downstreamPad *gst.Pad 102 + if strings.HasPrefix(pad.GetName(), "video_") { 103 + downstreamPad = mqVideoSink 104 + // defer func() { mqVideoSink = nil }() 105 + } else if strings.HasPrefix(pad.GetName(), "audio_") { 106 + downstreamPad = mqAudioSink 107 + // defer func() { mqAudioSink = nil }() 108 + } else { 109 + log.Error(ctx, "unknown pad", "name", pad.GetName(), "direction", pad.GetDirection()) 110 + // cancel() 111 + return 112 + } 113 + ret := pad.Link(downstreamPad) 114 + if ret != gst.PadLinkOK { 115 + log.Error(ctx, "failed to link demux to downstream pad", "name", pad.GetName(), "direction", pad.GetDirection(), "error", ret) 116 + // cancel() 117 + return 118 + } 119 + needed-- 120 + if needed == 0 { 121 + padAdded = doNothing 122 + } 123 + } 124 + outerPadAdded := func(self *gst.Element, pad *gst.Pad) { 125 + padAdded(self, pad) 126 + } 127 + 128 + _, err = demux.Connect("pad-added", outerPadAdded) 129 + if err != nil { 130 + return nil, fmt.Errorf("failed to connect demux pad-added signal: %w", err) 131 + } 132 + 133 + ok := bin.AddPad(videoGhost.Pad) 134 + if !ok { 135 + return nil, fmt.Errorf("failed to add video ghost pad to bin") 136 + } 137 + 138 + ok = bin.AddPad(audioGhost.Pad) 139 + if !ok { 140 + return nil, fmt.Errorf("failed to add audio ghost pad to bin") 141 + } 142 + 143 + src := app.SrcFromElement(appSrc) 144 + src.SetCallbacks(&app.SourceCallbacks{ 145 + NeedDataFunc: ReaderNeedData(ctx, bytes.NewReader(seg.Data)), 146 + }) 147 + 148 + return bin, nil 149 + }
+178
pkg/media/concat_demux_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "io" 8 + "os" 9 + "testing" 10 + 11 + "github.com/go-gst/go-gst/gst" 12 + "github.com/go-gst/go-gst/gst/app" 13 + "github.com/stretchr/testify/require" 14 + "go.uber.org/goleak" 15 + "golang.org/x/sync/errgroup" 16 + "stream.place/streamplace/pkg/gstinit" 17 + "stream.place/streamplace/pkg/log" 18 + "stream.place/streamplace/pkg/media/segchanman" 19 + ) 20 + 21 + func TestConcatDemuxBin(t *testing.T) { 22 + gstinit.InitGST() 23 + before := getLeakCount(t) 24 + defer checkGStreamerLeaks(t, before) 25 + ignore := goleak.IgnoreCurrent() 26 + defer goleak.VerifyNone(t, ignore) 27 + 28 + g, _ := errgroup.WithContext(context.Background()) 29 + for i := 0; i < streamplaceTestCount; i++ { 30 + g.Go(func() error { 31 + return innerTestConcatDemuxBin(t) 32 + }) 33 + } 34 + err := g.Wait() 35 + require.NoError(t, err) 36 + } 37 + 38 + // This function remains in scope for the duration of a single users' playback 39 + func innerTestConcatDemuxBin(t *testing.T) error { 40 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat2": 9, "SegDemuxBin": 9}}) 41 + ctx = log.WithLogValues(ctx, "func", "TestConcat2") 42 + ctx, cancel := context.WithCancel(ctx) 43 + // defer cancel() 44 + 45 + pipeline, err := gst.NewPipeline("TestConcat2") 46 + if err != nil { 47 + return fmt.Errorf("failed to create pipeline: %w", err) 48 + } 49 + 50 + errCh := make(chan error) 51 + go func() { 52 + err := HandleBusMessages(ctx, pipeline) 53 + cancel() 54 + errCh <- err 55 + close(errCh) 56 + }() 57 + 58 + defer func() { 59 + cancel() 60 + err := <-errCh 61 + if err != nil { 62 + t.Errorf("bus handler error: %v", err) 63 + } 64 + err = pipeline.BlockSetState(gst.StateNull) 65 + if err != nil { 66 + t.Errorf("failed to set pipeline to null state: %v", err) 67 + } 68 + }() 69 + 70 + filename := getFixture("sample-segment.mp4") 71 + inputFile, err := os.Open(filename) 72 + if err != nil { 73 + return fmt.Errorf("failed to open fixture file: %w", err) 74 + } 75 + defer inputFile.Close() 76 + 77 + bs, err := io.ReadAll(inputFile) 78 + if err != nil { 79 + return fmt.Errorf("failed to read fixture file: %w", err) 80 + } 81 + 82 + testSeg := &segchanman.Seg{ 83 + Data: bs, 84 + Filepath: filename, 85 + } 86 + 87 + concatBin, err := ConcatDemuxBin(ctx, testSeg) 88 + if err != nil { 89 + return fmt.Errorf("failed to create concat bin: %w", err) 90 + } 91 + 92 + err = pipeline.Add(concatBin.Element) 93 + if err != nil { 94 + return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 95 + } 96 + 97 + videoPad := concatBin.GetStaticPad("video_0") 98 + if videoPad == nil { 99 + return fmt.Errorf("video pad not found") 100 + } 101 + 102 + audioPad := concatBin.GetStaticPad("audio_0") 103 + if audioPad == nil { 104 + return fmt.Errorf("audio pad not found") 105 + } 106 + 107 + videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 108 + "name": "videoappsink", 109 + "sync": false, 110 + }) 111 + if err != nil { 112 + return fmt.Errorf("failed to create video appsink: %w", err) 113 + } 114 + 115 + err = pipeline.Add(videoAppSink) 116 + if err != nil { 117 + return fmt.Errorf("failed to add video appsink to pipeline: %w", err) 118 + } 119 + 120 + videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 121 + if videoAppSinkPadSink == nil { 122 + return fmt.Errorf("video appsink pad not found") 123 + } 124 + 125 + audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 126 + "name": "audioappsink", 127 + "sync": false, 128 + }) 129 + if err != nil { 130 + return fmt.Errorf("failed to create audio appsink: %w", err) 131 + } 132 + 133 + err = pipeline.Add(audioAppSink) 134 + if err != nil { 135 + return fmt.Errorf("failed to add audio appsink to pipeline: %w", err) 136 + } 137 + 138 + audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 139 + if audioAppSinkPadSink == nil { 140 + return fmt.Errorf("audio appsink pad not found") 141 + } 142 + 143 + ok := videoPad.Link(videoAppSinkPadSink) 144 + if ok != gst.PadLinkOK { 145 + return fmt.Errorf("failed to link video pad: %v", ok) 146 + } 147 + 148 + ok = audioPad.Link(audioAppSinkPadSink) 149 + if ok != gst.PadLinkOK { 150 + return fmt.Errorf("failed to link audio pad: %v", ok) 151 + } 152 + 153 + videoBuf := bytes.Buffer{} 154 + audioBuf := bytes.Buffer{} 155 + 156 + videoappsink := app.SinkFromElement(videoAppSink) 157 + videoappsink.SetCallbacks(&app.SinkCallbacks{ 158 + NewSampleFunc: WriterNewSample(ctx, &videoBuf), 159 + }) 160 + 161 + audioappsink := app.SinkFromElement(audioAppSink) 162 + audioappsink.SetCallbacks(&app.SinkCallbacks{ 163 + NewSampleFunc: WriterNewSample(ctx, &audioBuf), 164 + }) 165 + 166 + // Start the pipeline 167 + err = pipeline.SetState(gst.StatePlaying) 168 + if err != nil { 169 + return fmt.Errorf("failed to set pipeline to playing state: %w", err) 170 + } 171 + 172 + <-ctx.Done() 173 + 174 + require.Equal(t, 987248, videoBuf.Len()) 175 + require.Equal(t, 6440, audioBuf.Len()) 176 + 177 + return <-errCh 178 + }
-215
pkg/media/concat_test.go
··· 1 - package media 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "io" 7 - "math/rand" 8 - "os" 9 - "strconv" 10 - "testing" 11 - 12 - "github.com/go-gst/go-gst/gst" 13 - "github.com/go-gst/go-gst/gst/app" 14 - "github.com/stretchr/testify/require" 15 - "go.uber.org/goleak" 16 - "stream.place/streamplace/pkg/gstinit" 17 - "stream.place/streamplace/pkg/log" 18 - "stream.place/streamplace/pkg/media/segchanman" 19 - ) 20 - 21 - type TestConcatStreamer struct { 22 - fileName string 23 - data []byte 24 - count int 25 - } 26 - 27 - func (t *TestConcatStreamer) SubscribeSegment(ctx context.Context, user string, rendition string) <-chan *segchanman.Seg { 28 - if len(t.data) == 0 { 29 - panic("test file empty") 30 - } 31 - ch := make(chan *segchanman.Seg) 32 - go func() { 33 - if t.count == 5 { 34 - ch <- &segchanman.Seg{ 35 - Data: nil, 36 - Filepath: "", 37 - } 38 - } else { 39 - fmt.Println("writing segment " + strconv.Itoa(t.count) + " with random number " + strconv.Itoa(rand.Intn(100))) 40 - ch <- &segchanman.Seg{ 41 - Data: t.data, 42 - Filepath: t.fileName, 43 - } 44 - t.count += 1 45 - } 46 - }() 47 - return ch 48 - } 49 - 50 - func (t *TestConcatStreamer) UnsubscribeSegment(ctx context.Context, user string, rendition string, ch <-chan *segchanman.Seg) { 51 - } 52 - 53 - func TestConcat(t *testing.T) { 54 - gstinit.InitGST() 55 - // before := getLeakCount(t) 56 - // defer checkGStreamerLeaks(t, before) 57 - ignore := goleak.IgnoreCurrent() 58 - defer goleak.VerifyNone(t, ignore) 59 - 60 - innnerTestConcat(t) 61 - after := getLeakCount(t) 62 - if after != 6 { 63 - fmt.Println("leaks", after) 64 - } 65 - } 66 - 67 - // This function remains in scope for the duration of a single users' playback 68 - func innnerTestConcat(t *testing.T) { 69 - 70 - ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"func": {"ConcatStream": 9, "TestConcat": 9}}) 71 - ctx = log.WithLogValues(ctx, "func", "TestConcat") 72 - ctx, cancel := context.WithCancel(ctx) 73 - // defer cancel() 74 - 75 - pipeline, err := gst.NewPipeline("TestConcat") 76 - require.NoError(t, err) 77 - 78 - go func() { 79 - HandleBusMessages(ctx, pipeline) 80 - cancel() 81 - }() 82 - 83 - filename := getFixture("sample-segment.mp4") 84 - inputFile, err := os.Open(filename) 85 - require.NoError(t, err) 86 - bs, err := io.ReadAll(inputFile) 87 - require.NoError(t, err) 88 - tcs := &TestConcatStreamer{ 89 - fileName: getFixture("sample-segment.mp4"), 90 - data: bs, 91 - } 92 - 93 - outputQueue, done, err := ConcatStream(ctx, pipeline, "fakeuser", "fakerendition", tcs) 94 - require.NoError(t, err) 95 - 96 - go func() { 97 - select { 98 - case <-ctx.Done(): 99 - return 100 - case <-done: 101 - cancel() 102 - } 103 - }() 104 - 105 - videoPad := outputQueue.GetStaticPad("src_0") 106 - require.NotNil(t, videoPad) 107 - 108 - audioPad := outputQueue.GetStaticPad("src_1") 109 - require.NotNil(t, audioPad) 110 - 111 - videoAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 112 - "name": "videoappsink", 113 - "sync": false, 114 - "wait-on-eos": false, 115 - }) 116 - require.NoError(t, err) 117 - err = pipeline.Add(videoAppSink) 118 - require.NoError(t, err) 119 - 120 - videoAppSinkPadSink := videoAppSink.GetStaticPad("sink") 121 - require.NotNil(t, videoAppSinkPadSink) 122 - 123 - audioAppSink, err := gst.NewElementWithProperties("appsink", map[string]interface{}{ 124 - "name": "audioappsink", 125 - "sync": false, 126 - "wait-on-eos": false, 127 - }) 128 - require.NoError(t, err) 129 - err = pipeline.Add(audioAppSink) 130 - require.NoError(t, err) 131 - 132 - audioAppSinkPadSink := audioAppSink.GetStaticPad("sink") 133 - require.NotNil(t, audioAppSinkPadSink) 134 - 135 - ok := videoPad.Link(videoAppSinkPadSink) 136 - require.Equal(t, gst.PadLinkOK, ok) 137 - 138 - ok = audioPad.Link(audioAppSinkPadSink) 139 - require.Equal(t, gst.PadLinkOK, ok) 140 - 141 - videoTotalBytes := 0 142 - audioTotalBytes := 0 143 - 144 - videoappsink := app.SinkFromElement(videoAppSink) 145 - videoappsink.SetCallbacks(&app.SinkCallbacks{ 146 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 147 - sample := sink.PullSample() 148 - if sample == nil { 149 - return gst.FlowEOS 150 - } 151 - 152 - buffer := sample.GetBuffer() 153 - if buffer == nil { 154 - return gst.FlowError 155 - } 156 - 157 - samples := buffer.Map(gst.MapRead).Bytes() 158 - defer buffer.Unmap() 159 - 160 - videoTotalBytes += len(samples) 161 - 162 - return gst.FlowOK 163 - }, 164 - EOSFunc: func(sink *app.Sink) { 165 - log.Warn(ctx, "videoappsink EOSFunc") 166 - cancel() 167 - }, 168 - }) 169 - 170 - audioappsink := app.SinkFromElement(audioAppSink) 171 - audioappsink.SetCallbacks(&app.SinkCallbacks{ 172 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 173 - sample := sink.PullSample() 174 - if sample == nil { 175 - return gst.FlowEOS 176 - } 177 - 178 - buffer := sample.GetBuffer() 179 - if buffer == nil { 180 - return gst.FlowError 181 - } 182 - 183 - samples := buffer.Map(gst.MapRead).Bytes() 184 - defer buffer.Unmap() 185 - 186 - audioTotalBytes += len(samples) 187 - 188 - return gst.FlowOK 189 - }, 190 - EOSFunc: func(sink *app.Sink) { 191 - log.Warn(ctx, "audioappsink EOSFunc") 192 - cancel() 193 - }, 194 - }) 195 - 196 - // Start the pipeline 197 - 198 - err = pipeline.SetState(gst.StatePlaying) 199 - require.NoError(t, err) 200 - 201 - <-ctx.Done() 202 - 203 - err = pipeline.BlockSetState(gst.StateNull) 204 - require.NoError(t, err) 205 - pipeline.Remove(videoAppSink) 206 - pipeline.Remove(audioAppSink) 207 - videoAppSink.SetState(gst.StateNull) 208 - audioAppSink.SetState(gst.StateNull) 209 - videoappsink.SetCallbacks(&app.SinkCallbacks{}) 210 - audioappsink.SetCallbacks(&app.SinkCallbacks{}) 211 - pipeline.Clear() 212 - 213 - require.Greater(t, videoTotalBytes, 1000000) 214 - require.Greater(t, audioTotalBytes, 40000) 215 - }
-15
pkg/media/ffmpeg_concat.go
··· 16 16 return mm.SegmentToStream(ctx, user, rendition, muxer, w) 17 17 } 18 18 19 - func (mm *MediaManager) SegmentToMKVPlusOpus(ctx context.Context, user string, rendition string, w io.Writer) error { 20 - muxer := ffmpeg.ComponentOptions{ 21 - Name: "matroska", 22 - } 23 - pr, pw := io.Pipe() 24 - g, ctx := errgroup.WithContext(ctx) 25 - g.Go(func() error { 26 - return mm.SegmentToStream(ctx, user, rendition, muxer, pw) 27 - }) 28 - g.Go(func() error { 29 - return AddOpusToMKV(ctx, pr, w) 30 - }) 31 - return g.Wait() 32 - } 33 - 34 19 func (mm *MediaManager) SegmentToMP4(ctx context.Context, user string, rendition string, w io.Writer) error { 35 20 muxer := ffmpeg.ComponentOptions{ 36 21 Name: "mp4",
-109
pkg/media/gstreamer.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 - "errors" 8 7 "fmt" 9 8 "io" 10 9 "os" ··· 34 33 runtime.KeepAlive(r.Fd()) 35 34 runtime.KeepAlive(w.Fd()) 36 35 }, nil 37 - } 38 - 39 - func ReaderNeedData(ctx context.Context, input io.Reader) func(self *app.Source, length uint) { 40 - return func(self *app.Source, length uint) { 41 - if ctx.Err() != nil { 42 - self.EndStream() 43 - return 44 - } 45 - bs := make([]byte, length) 46 - read, err := input.Read(bs) 47 - if err != nil && !errors.Is(err, io.EOF) { 48 - log.Error(ctx, "error reading from input", "error", err) 49 - self.Error("error reading from input", err) 50 - return 51 - } 52 - if read > 0 { 53 - toPush := bs 54 - if uint(read) < length { 55 - toPush = bs[:read] 56 - } 57 - buffer := gst.NewBufferWithSize(int64(len(toPush))) 58 - buffer.Map(gst.MapWrite).WriteData(toPush) 59 - defer buffer.Unmap() 60 - self.PushBuffer(buffer) 61 - } 62 - if err != nil && errors.Is(err, io.EOF) { 63 - log.Debug(ctx, "EOF, ending stream", "length", read) 64 - self.EndStream() 65 - return 66 - } 67 - } 68 - } 69 - 70 - func WriterNewSample(ctx context.Context, output io.Writer) func(sink *app.Sink) gst.FlowReturn { 71 - return func(sink *app.Sink) gst.FlowReturn { 72 - sample := sink.PullSample() 73 - if sample == nil { 74 - return gst.FlowOK 75 - } 76 - 77 - // Retrieve the buffer from the sample. 78 - buffer := sample.GetBuffer() 79 - bs := buffer.Map(gst.MapRead).Bytes() 80 - defer buffer.Unmap() 81 - 82 - _, err := output.Write(bs) 83 - 84 - if err != nil { 85 - panic(err) 86 - } 87 - 88 - return gst.FlowOK 89 - } 90 - } 91 - 92 - func AddOpusToMKV(ctx context.Context, input io.Reader, output io.Writer) error { 93 - pipelineSlice := []string{ 94 - "appsrc name=appsrc ! matroskademux name=demux", 95 - "matroskamux name=mux ! appsink name=appsink", 96 - "demux.audio_0 ! queue ! tee name=asplit", 97 - "demux.video_0 ! queue ! mux.video_0", 98 - "asplit. ! queue ! fdkaacdec ! audioresample ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! mux.audio_1", 99 - "asplit. ! queue ! mux.audio_0", 100 - } 101 - 102 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 103 - if err != nil { 104 - return err 105 - } 106 - 107 - appsrc, err := pipeline.GetElementByName("appsrc") 108 - if err != nil { 109 - return err 110 - } 111 - 112 - src := app.SrcFromElement(appsrc) 113 - src.SetCallbacks(&app.SourceCallbacks{ 114 - NeedDataFunc: ReaderNeedData(ctx, input), 115 - }) 116 - 117 - ctx, cancel := context.WithCancel(ctx) 118 - defer cancel() 119 - 120 - appsink, err := pipeline.GetElementByName("appsink") 121 - if err != nil { 122 - return err 123 - } 124 - 125 - sink := app.SinkFromElement(appsink) 126 - sink.SetCallbacks(&app.SinkCallbacks{ 127 - NewSampleFunc: WriterNewSample(ctx, output), 128 - EOSFunc: func(sink *app.Sink) { 129 - cancel() 130 - }, 131 - }) 132 - 133 - go func() { 134 - HandleBusMessages(ctx, pipeline) 135 - cancel() 136 - }() 137 - 138 - // Start the pipeline 139 - pipeline.SetState(gst.StatePlaying) 140 - 141 - <-ctx.Done() 142 - 143 - pipeline.BlockSetState(gst.StateNull) 144 - return nil 145 36 } 146 37 147 38 // basic test to make sure gstreamer functionality is working
-37
pkg/media/gstreamer_test.go
··· 1 - package media 2 - 3 - import ( 4 - "context" 5 - "os" 6 - "testing" 7 - 8 - "github.com/go-gst/go-gst/gst" 9 - "github.com/stretchr/testify/require" 10 - _ "stream.place/streamplace/pkg/media/mediatesting" 11 - ) 12 - 13 - func TestNormalizeAudio(t *testing.T) { 14 - gst.Init(nil) 15 - ifile, err := os.Open(getFixture("sample-stream.mkv")) 16 - require.NoError(t, err) 17 - ofile, err := os.CreateTemp("", "*.mkv") 18 - defer os.Remove(ofile.Name()) 19 - require.NoError(t, err) 20 - err = AddOpusToMKV(context.Background(), ifile, ofile) 21 - require.NoError(t, err) 22 - ofile.Close() 23 - info, err := os.Stat(ofile.Name()) 24 - require.NoError(t, err) 25 - require.Greater(t, info.Size(), int64(0)) 26 - } 27 - 28 - // func TestThumbnail(t *testing.T) { 29 - // mm := MediaManager{} 30 - // gst.Init(nil) 31 - // ifile, err := os.Open(getFixture("sample-segment.mp4")) 32 - // require.NoError(t, err) 33 - // buf := &bytes.Buffer{} 34 - // err = mm.Thumbnail(context.Background(), ifile, buf) 35 - // require.NoError(t, err) 36 - // require.Greater(t, len(buf.Bytes()), 0) 37 - // }
+89
pkg/media/io_helpers.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "io" 7 + 8 + "github.com/go-gst/go-gst/gst" 9 + "github.com/go-gst/go-gst/gst/app" 10 + "stream.place/streamplace/pkg/log" 11 + ) 12 + 13 + // ReaderNeedData is a function that reads from an io.Reader and pushes the data to a gstreamer source. 14 + func ReaderNeedData(ctx context.Context, input io.Reader) func(self *app.Source, length uint) { 15 + bsCopy, err := io.ReadAll(input) 16 + if err != nil { 17 + panic(err) 18 + } 19 + return func(self *app.Source, length uint) { 20 + if ctx.Err() != nil { 21 + self.EndStream() 22 + return 23 + } 24 + buffer := gst.NewBufferWithSize(int64(len(bsCopy))) 25 + buffer.Map(gst.MapWrite).WriteData(bsCopy) 26 + defer buffer.Unmap() 27 + ret := self.PushBuffer(buffer) 28 + if ret != gst.FlowOK { 29 + log.Error(ctx, "failed to push buffer", "error", ret.String()) 30 + } else { 31 + log.Debug(ctx, "pushed buffer", "length", len(bsCopy)) 32 + } 33 + } 34 + } 35 + 36 + // Different from ReaderNeedData in that it reads the data in chunks and pushes them to the source. 37 + func ReaderNeedDataIncremental(ctx context.Context, input io.Reader) func(self *app.Source, length uint) { 38 + return func(self *app.Source, length uint) { 39 + if ctx.Err() != nil { 40 + self.EndStream() 41 + return 42 + } 43 + bs := make([]byte, length) 44 + read, err := input.Read(bs) 45 + if err != nil && !errors.Is(err, io.EOF) { 46 + log.Error(ctx, "error reading from input", "error", err) 47 + self.Error("error reading from input", err) 48 + return 49 + } 50 + if read > 0 { 51 + toPush := bs 52 + if uint(read) < length { 53 + toPush = bs[:read] 54 + } 55 + buffer := gst.NewBufferWithSize(int64(len(toPush))) 56 + buffer.Map(gst.MapWrite).WriteData(toPush) 57 + defer buffer.Unmap() 58 + self.PushBuffer(buffer) 59 + } 60 + if err != nil && errors.Is(err, io.EOF) { 61 + log.Debug(ctx, "EOF, ending stream", "length", read) 62 + self.EndStream() 63 + return 64 + } 65 + } 66 + } 67 + 68 + // WriterNewSample is a function that reads from a gstreamer sink and writes the data to an io.Writer. 69 + func WriterNewSample(ctx context.Context, output io.Writer) func(sink *app.Sink) gst.FlowReturn { 70 + return func(sink *app.Sink) gst.FlowReturn { 71 + sample := sink.PullSample() 72 + if sample == nil { 73 + return gst.FlowOK 74 + } 75 + 76 + // Retrieve the buffer from the sample. 77 + buffer := sample.GetBuffer() 78 + bs := buffer.Map(gst.MapRead).Bytes() 79 + defer buffer.Unmap() 80 + 81 + _, err := output.Write(bs) 82 + 83 + if err != nil { 84 + panic(err) 85 + } 86 + 87 + return gst.FlowOK 88 + } 89 + }
+124
pkg/media/io_helpers_test.go
··· 1 + package media 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "io" 8 + "os" 9 + "strconv" 10 + "testing" 11 + 12 + "github.com/go-gst/go-gst/gst" 13 + "github.com/go-gst/go-gst/gst/app" 14 + "github.com/stretchr/testify/require" 15 + "go.uber.org/goleak" 16 + "golang.org/x/sync/errgroup" 17 + "stream.place/streamplace/pkg/log" 18 + ) 19 + 20 + var streamplaceTestCount = 50 21 + 22 + func init() { 23 + testRunsStr := os.Getenv("STREAMPLACE_TEST_COUNT") 24 + if testRunsStr != "" { 25 + var err error 26 + streamplaceTestCount, err = strconv.Atoi(testRunsStr) 27 + if err != nil { 28 + panic(fmt.Sprintf("STREAMPLACE_TEST_COUNT is not a number: %s", testRunsStr)) 29 + } 30 + } 31 + } 32 + 33 + func TestWriterNewSample(t *testing.T) { 34 + ignore := goleak.IgnoreCurrent() 35 + defer goleak.VerifyNone(t, ignore) 36 + before := getLeakCount(t) 37 + defer checkGStreamerLeaks(t, before) 38 + filePath := getFixture("5sec.mp4") 39 + fileInfo, err := os.Stat(filePath) 40 + require.NoError(t, err) 41 + fileSize := fileInfo.Size() 42 + t.Logf("Test file size: %d bytes", fileSize) 43 + g, ctx := errgroup.WithContext(context.Background()) 44 + ctx = log.WithDebugValue(ctx, map[string]map[string]int{"func": {"TestWriterNewSample": 9}}) 45 + for i := 0; i < streamplaceTestCount; i++ { 46 + g.Go(func() error { 47 + bs := bytes.Buffer{} 48 + err := writerNewSampleInner(ctx, i, &bs) 49 + if err != nil { 50 + return err 51 + } 52 + if bs.Len() != int(fileSize) { 53 + return fmt.Errorf("expected %d bytes, got %d", fileSize, bs.Len()) 54 + } 55 + return nil 56 + }) 57 + } 58 + err = g.Wait() 59 + require.NoError(t, err) 60 + } 61 + 62 + func writerNewSampleInner(ctx context.Context, i int, w io.Writer) error { 63 + ctx = log.WithLogValues(ctx, "func", "TestWriterNewSample") 64 + ctx, cancel := context.WithCancel(ctx) 65 + defer cancel() 66 + 67 + pipeline, err := gst.NewPipeline(fmt.Sprintf("TestWriterNewSample-%d", i)) 68 + if err != nil { 69 + return err 70 + } 71 + 72 + fileSrc, err := gst.NewElementWithProperties("filesrc", map[string]any{ 73 + "location": getFixture("5sec.mp4"), 74 + }) 75 + if err != nil { 76 + return err 77 + } 78 + err = pipeline.Add(fileSrc) 79 + if err != nil { 80 + return err 81 + } 82 + 83 + var busErr error 84 + go func() { 85 + busErr = HandleBusMessages(ctx, pipeline) 86 + cancel() 87 + }() 88 + 89 + appSink, err := gst.NewElementWithProperties("appsink", map[string]any{ 90 + "name": fmt.Sprintf("TestWriterNewSample-appsink-%d", i), 91 + "sync": false, 92 + }) 93 + if err != nil { 94 + return err 95 + } 96 + err = pipeline.Add(appSink) 97 + if err != nil { 98 + return err 99 + } 100 + 101 + sink := app.SinkFromElement(appSink) 102 + sink.SetCallbacks(&app.SinkCallbacks{ 103 + NewSampleFunc: WriterNewSample(ctx, w), 104 + }) 105 + 106 + err = fileSrc.Link(appSink) 107 + if err != nil { 108 + return err 109 + } 110 + 111 + err = pipeline.SetState(gst.StatePlaying) 112 + if err != nil { 113 + return err 114 + } 115 + 116 + <-ctx.Done() 117 + 118 + err = pipeline.SetState(gst.StateNull) 119 + if err != nil { 120 + return err 121 + } 122 + 123 + return busErr 124 + }
+63
pkg/media/leak/leak.go
··· 1 + package main 2 + 3 + import ( 4 + "flag" 5 + "fmt" 6 + "log" 7 + "os" 8 + "runtime" 9 + "syscall" 10 + "time" 11 + 12 + "github.com/go-gst/go-glib/glib" 13 + "github.com/go-gst/go-gst/gst" 14 + ) 15 + 16 + // Demonstration of a GStreamer (or go-gst idk) leak 17 + 18 + func main() { 19 + flag.Parse() 20 + if flag.NArg() != 1 { 21 + log.Fatal("expected 1 argument") 22 + } 23 + os.Setenv("GST_DEBUG", "leaks:9,GST_TRACER:9") 24 + os.Setenv("GST_TRACERS", "leaks") 25 + os.Setenv("GST_LEAKS_TRACER_SIG", "1") 26 + err := RunPipeline(flag.Arg(0)) 27 + if err != nil { 28 + log.Fatal(err) 29 + } 30 + runtime.GC() 31 + time.Sleep(1 * time.Second) 32 + process, err := os.FindProcess(os.Getpid()) 33 + if err != nil { 34 + log.Fatal(err) 35 + } 36 + process.Signal(syscall.SIGUSR1) 37 + time.Sleep(1 * time.Second) 38 + } 39 + 40 + func RunPipeline(file string) error { 41 + gst.Init(nil) 42 + pipeline, err := gst.NewPipelineFromString(fmt.Sprintf("filesrc location=%s ! qtdemux ! fakesink", file)) 43 + if err != nil { 44 + return fmt.Errorf("failed to create pipeline: %w", err) 45 + } 46 + 47 + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) 48 + pipeline.SetState(gst.StatePlaying) 49 + 50 + pipeline.GetBus().AddWatch(func(msg *gst.Message) bool { 51 + if msg.Type() == gst.MessageEOS { 52 + mainLoop.Quit() 53 + return false 54 + } 55 + return true 56 + }) 57 + 58 + mainLoop.Run() 59 + 60 + pipeline.BlockSetState(gst.StateNull) 61 + 62 + return nil 63 + }
+2
pkg/media/leak_test.go
··· 16 16 17 17 "github.com/acarl005/stripansi" 18 18 "github.com/stretchr/testify/require" 19 + "stream.place/streamplace/pkg/gstinit" 19 20 ) 20 21 21 22 const GST_DEBUG_NEEDED = "leaks:9,GST_TRACER:9" ··· 76 77 panic(err) 77 78 } 78 79 }() 80 + gstinit.InitGST() 79 81 os.Exit(m.Run()) 80 82 } 81 83
+1 -1
pkg/media/media_test.go
··· 40 40 } 41 41 cli := ct.CLI(t, &config.CLI{ 42 42 TAURL: "http://timestamp.digicert.com", 43 - AllowedStreams: []string{}, 43 + AllowedStreams: []string{"did:key:zQ3shhoPCrDZWE8CryCEHYCrb1x8mCkr2byTkF5EGJT7dgazC"}, 44 44 }) 45 45 atsync := &atproto.ATProtoSynchronizer{ 46 46 CLI: cli,
+156 -169
pkg/media/progressive.go
··· 1 1 package media 2 2 3 - import ( 4 - "context" 5 - "fmt" 6 - "io" 7 - "strings" 8 - "time" 3 + // func (mm *MediaManager) MP4Playback(ctx context.Context, user string, rendition string, w io.Writer) error { 4 + // uu, err := uuid.NewV7() 5 + // if err != nil { 6 + // return err 7 + // } 8 + // ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 9 + // ctx, cancel := context.WithCancel(ctx) 9 10 10 - "github.com/go-gst/go-gst/gst" 11 - "github.com/go-gst/go-gst/gst/app" 12 - "github.com/google/uuid" 13 - "stream.place/streamplace/pkg/log" 14 - ) 11 + // ctx = log.WithLogValues(ctx, "mediafunc", "MP4Playback") 15 12 16 - func (mm *MediaManager) MP4Playback(ctx context.Context, user string, rendition string, w io.Writer) error { 17 - uu, err := uuid.NewV7() 18 - if err != nil { 19 - return err 20 - } 21 - ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 22 - ctx, cancel := context.WithCancel(ctx) 13 + // pipelineSlice := []string{ 14 + // "mp4mux name=muxer fragment-mode=first-moov-then-finalise fragment-duration=1000 streamable=true ! appsink name=mp4sink", 15 + // "h264parse name=videoparse ! muxer.", 16 + // "opusparse name=audioparse ! muxer.", 17 + // } 23 18 24 - ctx = log.WithLogValues(ctx, "mediafunc", "MP4Playback") 19 + // pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 20 + // if err != nil { 21 + // return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 22 + // } 25 23 26 - pipelineSlice := []string{ 27 - "mp4mux name=muxer fragment-mode=first-moov-then-finalise fragment-duration=1000 streamable=true ! appsink name=mp4sink", 28 - "h264parse name=videoparse ! muxer.", 29 - "opusparse name=audioparse ! muxer.", 30 - } 24 + // go func() { 25 + // HandleBusMessages(ctx, pipeline) 26 + // cancel() 27 + // }() 31 28 32 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 33 - if err != nil { 34 - return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 35 - } 29 + // outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 30 + // if err != nil { 31 + // return fmt.Errorf("failed to get output queue: %w", err) 32 + // } 33 + // go func() { 34 + // select { 35 + // case <-ctx.Done(): 36 + // return 37 + // case <-done: 38 + // cancel() 39 + // } 40 + // }() 36 41 37 - go func() { 38 - HandleBusMessages(ctx, pipeline) 39 - cancel() 40 - }() 42 + // videoParse, err := pipeline.GetElementByName("videoparse") 43 + // if err != nil { 44 + // return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 45 + // } 46 + // err = outputQueue.Link(videoParse) 47 + // if err != nil { 48 + // return fmt.Errorf("failed to link output queue to video parse: %w", err) 49 + // } 41 50 42 - outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 43 - if err != nil { 44 - return fmt.Errorf("failed to get output queue: %w", err) 45 - } 46 - go func() { 47 - select { 48 - case <-ctx.Done(): 49 - return 50 - case <-done: 51 - cancel() 52 - } 53 - }() 51 + // audioParse, err := pipeline.GetElementByName("audioparse") 52 + // if err != nil { 53 + // return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 54 + // } 55 + // err = outputQueue.Link(audioParse) 56 + // if err != nil { 57 + // return fmt.Errorf("failed to link output queue to audio parse: %w", err) 58 + // } 54 59 55 - videoParse, err := pipeline.GetElementByName("videoparse") 56 - if err != nil { 57 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 58 - } 59 - err = outputQueue.Link(videoParse) 60 - if err != nil { 61 - return fmt.Errorf("failed to link output queue to video parse: %w", err) 62 - } 60 + // go func() { 61 + // ticker := time.NewTicker(time.Second * 1) 62 + // for { 63 + // select { 64 + // case <-ctx.Done(): 65 + // return 66 + // case <-ticker.C: 67 + // state := pipeline.GetCurrentState() 68 + // log.Debug(ctx, "pipeline state", "state", state) 69 + // } 70 + // } 71 + // }() 63 72 64 - audioParse, err := pipeline.GetElementByName("audioparse") 65 - if err != nil { 66 - return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 67 - } 68 - err = outputQueue.Link(audioParse) 69 - if err != nil { 70 - return fmt.Errorf("failed to link output queue to audio parse: %w", err) 71 - } 73 + // mp4sinkele, err := pipeline.GetElementByName("mp4sink") 74 + // if err != nil { 75 + // return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 76 + // } 77 + // mp4sink := app.SinkFromElement(mp4sinkele) 78 + // mp4sink.SetCallbacks(&app.SinkCallbacks{ 79 + // NewSampleFunc: WriterNewSample(ctx, w), 80 + // EOSFunc: func(sink *app.Sink) { 81 + // log.Warn(ctx, "mp4sink EOSFunc") 82 + // cancel() 83 + // }, 84 + // }) 72 85 73 - go func() { 74 - ticker := time.NewTicker(time.Second * 1) 75 - for { 76 - select { 77 - case <-ctx.Done(): 78 - return 79 - case <-ticker.C: 80 - state := pipeline.GetCurrentState() 81 - log.Debug(ctx, "pipeline state", "state", state) 82 - } 83 - } 84 - }() 86 + // pipeline.SetState(gst.StatePlaying) 85 87 86 - mp4sinkele, err := pipeline.GetElementByName("mp4sink") 87 - if err != nil { 88 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 89 - } 90 - mp4sink := app.SinkFromElement(mp4sinkele) 91 - mp4sink.SetCallbacks(&app.SinkCallbacks{ 92 - NewSampleFunc: WriterNewSample(ctx, w), 93 - EOSFunc: func(sink *app.Sink) { 94 - log.Warn(ctx, "mp4sink EOSFunc") 95 - cancel() 96 - }, 97 - }) 88 + // <-ctx.Done() 98 89 99 - pipeline.SetState(gst.StatePlaying) 90 + // pipeline.BlockSetState(gst.StateNull) 100 91 101 - <-ctx.Done() 92 + // return nil 93 + // } 102 94 103 - pipeline.BlockSetState(gst.StateNull) 95 + // func (mm *MediaManager) MKVPlayback(ctx context.Context, user string, rendition string, w io.Writer) error { 96 + // uu, err := uuid.NewV7() 97 + // if err != nil { 98 + // return err 99 + // } 100 + // ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 101 + // ctx, cancel := context.WithCancel(ctx) 104 102 105 - return nil 106 - } 103 + // ctx = log.WithLogValues(ctx, "mediafunc", "MKVPlayback") 107 104 108 - func (mm *MediaManager) MKVPlayback(ctx context.Context, user string, rendition string, w io.Writer) error { 109 - uu, err := uuid.NewV7() 110 - if err != nil { 111 - return err 112 - } 113 - ctx = log.WithLogValues(ctx, "playbackID", uu.String()) 114 - ctx, cancel := context.WithCancel(ctx) 105 + // pipelineSlice := []string{ 106 + // "matroskamux name=muxer streamable=true ! appsink name=mkvsink", 107 + // "h264parse name=videoparse ! muxer.", 108 + // "opusparse name=audioparse ! muxer.", 109 + // } 115 110 116 - ctx = log.WithLogValues(ctx, "mediafunc", "MKVPlayback") 111 + // pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 112 + // if err != nil { 113 + // return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 114 + // } 117 115 118 - pipelineSlice := []string{ 119 - "matroskamux name=muxer streamable=true ! appsink name=mkvsink", 120 - "h264parse name=videoparse ! muxer.", 121 - "opusparse name=audioparse ! muxer.", 122 - } 116 + // go func() { 117 + // HandleBusMessages(ctx, pipeline) 118 + // cancel() 119 + // }() 123 120 124 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 125 - if err != nil { 126 - return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 127 - } 128 - 129 - go func() { 130 - HandleBusMessages(ctx, pipeline) 131 - cancel() 132 - }() 133 - 134 - outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 135 - if err != nil { 136 - return fmt.Errorf("failed to get output queue: %w", err) 137 - } 138 - go func() { 139 - select { 140 - case <-ctx.Done(): 141 - return 142 - case <-done: 143 - cancel() 144 - } 145 - }() 121 + // outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 122 + // if err != nil { 123 + // return fmt.Errorf("failed to get output queue: %w", err) 124 + // } 125 + // go func() { 126 + // select { 127 + // case <-ctx.Done(): 128 + // return 129 + // case <-done: 130 + // cancel() 131 + // } 132 + // }() 146 133 147 - videoParse, err := pipeline.GetElementByName("videoparse") 148 - if err != nil { 149 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 150 - } 151 - err = outputQueue.Link(videoParse) 152 - if err != nil { 153 - return fmt.Errorf("failed to link output queue to video parse: %w", err) 154 - } 134 + // videoParse, err := pipeline.GetElementByName("videoparse") 135 + // if err != nil { 136 + // return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 137 + // } 138 + // err = outputQueue.Link(videoParse) 139 + // if err != nil { 140 + // return fmt.Errorf("failed to link output queue to video parse: %w", err) 141 + // } 155 142 156 - audioParse, err := pipeline.GetElementByName("audioparse") 157 - if err != nil { 158 - return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 159 - } 160 - err = outputQueue.Link(audioParse) 161 - if err != nil { 162 - return fmt.Errorf("failed to link output queue to audio parse: %w", err) 163 - } 143 + // audioParse, err := pipeline.GetElementByName("audioparse") 144 + // if err != nil { 145 + // return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 146 + // } 147 + // err = outputQueue.Link(audioParse) 148 + // if err != nil { 149 + // return fmt.Errorf("failed to link output queue to audio parse: %w", err) 150 + // } 164 151 165 - go func() { 166 - ticker := time.NewTicker(time.Second * 1) 167 - for { 168 - select { 169 - case <-ctx.Done(): 170 - return 171 - case <-ticker.C: 172 - state := pipeline.GetCurrentState() 173 - log.Debug(ctx, "pipeline state", "state", state) 174 - } 175 - } 176 - }() 152 + // go func() { 153 + // ticker := time.NewTicker(time.Second * 1) 154 + // for { 155 + // select { 156 + // case <-ctx.Done(): 157 + // return 158 + // case <-ticker.C: 159 + // state := pipeline.GetCurrentState() 160 + // log.Debug(ctx, "pipeline state", "state", state) 161 + // } 162 + // } 163 + // }() 177 164 178 - mkvsinkele, err := pipeline.GetElementByName("mkvsink") 179 - if err != nil { 180 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 181 - } 182 - mkvsink := app.SinkFromElement(mkvsinkele) 183 - mkvsink.SetCallbacks(&app.SinkCallbacks{ 184 - NewSampleFunc: WriterNewSample(ctx, w), 185 - EOSFunc: func(sink *app.Sink) { 186 - log.Warn(ctx, "mp4sink EOSFunc") 187 - cancel() 188 - }, 189 - }) 165 + // mkvsinkele, err := pipeline.GetElementByName("mkvsink") 166 + // if err != nil { 167 + // return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 168 + // } 169 + // mkvsink := app.SinkFromElement(mkvsinkele) 170 + // mkvsink.SetCallbacks(&app.SinkCallbacks{ 171 + // NewSampleFunc: WriterNewSample(ctx, w), 172 + // EOSFunc: func(sink *app.Sink) { 173 + // log.Warn(ctx, "mp4sink EOSFunc") 174 + // cancel() 175 + // }, 176 + // }) 190 177 191 - pipeline.SetState(gst.StatePlaying) 178 + // pipeline.SetState(gst.StatePlaying) 192 179 193 - <-ctx.Done() 180 + // <-ctx.Done() 194 181 195 - pipeline.BlockSetState(gst.StateNull) 182 + // pipeline.BlockSetState(gst.StateNull) 196 183 197 - return nil 198 - } 184 + // return nil 185 + // }
+5 -5
pkg/media/segment_conv.go
··· 81 81 82 82 // Set up source callbacks 83 83 source.SetCallbacks(&app.SourceCallbacks{ 84 - NeedDataFunc: ReaderNeedData(ctx, input), 84 + NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 85 85 EnoughDataFunc: func(self *app.Source) { 86 86 // Nothing to do here 87 87 }, ··· 200 200 201 201 // Set up source callbacks 202 202 source.SetCallbacks(&app.SourceCallbacks{ 203 - NeedDataFunc: ReaderNeedData(ctx, input), 203 + NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 204 204 EnoughDataFunc: func(self *app.Source) { 205 205 // Nothing to do here 206 206 }, ··· 331 331 332 332 // Set up source callbacks 333 333 source.SetCallbacks(&app.SourceCallbacks{ 334 - NeedDataFunc: ReaderNeedData(ctx, input), 334 + NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 335 335 EnoughDataFunc: func(self *app.Source) { 336 336 // Nothing to do here 337 337 }, ··· 473 473 474 474 // Set up source callbacks 475 475 videoSource.SetCallbacks(&app.SourceCallbacks{ 476 - NeedDataFunc: ReaderNeedData(ctx, videoInput), 476 + NeedDataFunc: ReaderNeedDataIncremental(ctx, videoInput), 477 477 EnoughDataFunc: func(self *app.Source) { 478 478 // Nothing to do here 479 479 }, ··· 483 483 }) 484 484 485 485 audioSource.SetCallbacks(&app.SourceCallbacks{ 486 - NeedDataFunc: ReaderNeedData(ctx, audioInput), 486 + NeedDataFunc: ReaderNeedDataIncremental(ctx, audioInput), 487 487 EnoughDataFunc: func(self *app.Source) { 488 488 // Nothing to do here 489 489 },
+11 -6
pkg/media/thumbnail.go
··· 17 17 defer cancel() 18 18 19 19 pipelineSlice := []string{ 20 - "appsrc name=appsrc ! qtdemux name=demux ! decodebin ! videoconvert ! videoscale ! capsfilter name=capsfilter caps=video/x-raw,width=[1,1280],height=[1,720],pixel-aspect-ratio=1/1 ! queue ! pngenc snapshot=true ! appsink name=appsink", 20 + "appsrc name=appsrc ! qtdemux name=demux ! decodebin ! videoconvert ! videoscale ! videorate ! capsfilter name=capsfilter caps=video/x-raw,width=[1,1280],height=[1,720],pixel-aspect-ratio=1/1,framerate=1/999999 ! queue ! pngenc snapshot=true ! appsink name=appsink", 21 21 } 22 22 23 23 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 24 24 if err != nil { 25 25 return fmt.Errorf("error creating Thumbnail pipeline: %w", err) 26 26 } 27 + 28 + defer func() { 29 + cancel() 30 + err = pipeline.BlockSetState(gst.StateNull) 31 + }() 27 32 appsrc, err := pipeline.GetElementByName("appsrc") 28 33 if err != nil { 29 34 return err ··· 39 44 return err 40 45 } 41 46 47 + errCh := make(chan error) 42 48 go func() { 43 - HandleBusMessages(ctx, pipeline) 49 + err := HandleBusMessages(ctx, pipeline) 44 50 cancel() 51 + errCh <- err 52 + close(errCh) 45 53 }() 46 54 47 55 sink := app.SinkFromElement(appsink) 48 56 sink.SetCallbacks(&app.SinkCallbacks{ 49 57 NewSampleFunc: WriterNewSample(ctx, w), 50 - EOSFunc: func(sink *app.Sink) { 51 - cancel() 52 - }, 53 58 }) 54 59 55 60 pipeline.SetState(gst.StatePlaying) ··· 58 63 59 64 pipeline.BlockSetState(gst.StateNull) 60 65 61 - return nil 66 + return <-errCh 62 67 }
+26 -6
pkg/media/thumbnail_test.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "fmt" 7 + "io" 6 8 "os" 7 9 "testing" 8 10 9 11 "github.com/stretchr/testify/require" 10 12 "go.uber.org/goleak" 13 + "golang.org/x/sync/errgroup" 11 14 "stream.place/streamplace/pkg/gstinit" 12 15 "stream.place/streamplace/pkg/log" 13 16 ) ··· 15 18 func TestThumbnail(t *testing.T) { 16 19 gstinit.InitGST() 17 20 before := getLeakCount(t) 18 - defer checkGStreamerLeaks(t, before+1) 21 + defer checkGStreamerLeaks(t, before) 19 22 ignore := goleak.IgnoreCurrent() 20 23 defer goleak.VerifyNone(t, ignore) 21 24 ··· 23 26 inputFile, err := os.Open(getFixture("sample-segment.mp4")) 24 27 require.NoError(t, err) 25 28 defer inputFile.Close() 29 + bs, err := io.ReadAll(inputFile) 30 + require.NoError(t, err) 26 31 27 - thumbnail := bytes.Buffer{} 28 - ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"function": {"Thumbnail": 9}}) 29 - err = Thumbnail(ctx, inputFile, &thumbnail) 32 + ctx := context.Background() 33 + g, ctx := errgroup.WithContext(ctx) 34 + 35 + for i := 0; i < streamplaceTestCount; i++ { 36 + g.Go(func() error { 37 + thumbnail := bytes.Buffer{} 38 + thumbnailCtx := log.WithDebugValue(context.Background(), map[string]map[string]int{"function": {"Thumbnail": 9}}) 39 + err := Thumbnail(thumbnailCtx, bytes.NewReader(bs), &thumbnail) 40 + if err != nil { 41 + return err 42 + } 43 + if thumbnail.Len() == 0 { 44 + return fmt.Errorf("thumbnail buffer is empty") 45 + } 46 + require.Equal(t, thumbnail.Len(), 1418910) 47 + return nil 48 + }) 49 + } 50 + 51 + err = g.Wait() 30 52 require.NoError(t, err) 31 - require.NotNil(t, thumbnail) 32 - require.Greater(t, thumbnail.Len(), 0, "Thumbnail buffer should not be empty") 33 53 }
+48 -15
pkg/media/webrtc_playback.go
··· 12 12 "github.com/pion/webrtc/v4" 13 13 "github.com/pion/webrtc/v4/pkg/media" 14 14 "stream.place/streamplace/pkg/log" 15 + "stream.place/streamplace/pkg/media/segchanman" 15 16 "stream.place/streamplace/pkg/spmetrics" 16 17 ) 17 18 ··· 46 47 cancel() 47 48 }() 48 49 49 - outputQueue, done, err := ConcatStream(ctx, pipeline, user, rendition, mm) 50 - if err != nil { 51 - return nil, fmt.Errorf("failed to get output queue: %w", err) 52 - } 50 + segCh := make(chan *segchanman.Seg, 1024) 53 51 go func() { 54 - select { 55 - case <-ctx.Done(): 56 - return 57 - case <-done: 58 - cancel() 52 + for { 53 + ch := mm.SubscribeSegment(ctx, user, rendition) 54 + select { 55 + case <-ctx.Done(): 56 + log.Debug(ctx, "exiting segment reader") 57 + mm.UnsubscribeSegment(ctx, user, rendition, ch) 58 + return 59 + case file := <-ch: 60 + log.Debug(ctx, "got segment", "file", file.Filepath) 61 + segCh <- file 62 + } 59 63 } 60 64 }() 65 + 66 + concatBin, err := ConcatBin(ctx, segCh) 67 + if err != nil { 68 + return nil, fmt.Errorf("failed to create concat bin: %w", err) 69 + } 70 + 71 + err = pipeline.Add(concatBin.Element) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to add concat bin to pipeline: %w", err) 74 + } 75 + 76 + videoPad := concatBin.GetStaticPad("video_0") 77 + if videoPad == nil { 78 + return nil, fmt.Errorf("video pad not found") 79 + } 80 + 81 + audioPad := concatBin.GetStaticPad("audio_0") 82 + if audioPad == nil { 83 + return nil, fmt.Errorf("audio pad not found") 84 + } 85 + 61 86 // queuePadVideo := outputQueue.GetRequestPad("src_%u") 62 87 // if queuePadVideo == nil { 63 88 // return nil, fmt.Errorf("failed to get queue video pad") ··· 71 96 if err != nil { 72 97 return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 73 98 } 74 - err = outputQueue.Link(videoParse) 75 - if err != nil { 76 - return nil, fmt.Errorf("failed to link output queue to video parse: %w", err) 99 + videoParsePad := videoParse.GetStaticPad("sink") 100 + if videoParsePad == nil { 101 + return nil, fmt.Errorf("video parse pad not found") 102 + } 103 + linked := videoPad.Link(videoParsePad) 104 + if linked != gst.PadLinkOK { 105 + return nil, fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 77 106 } 78 107 79 108 audioParse, err := pipeline.GetElementByName("audioparse") 80 109 if err != nil { 81 110 return nil, fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 82 111 } 83 - err = outputQueue.Link(audioParse) 84 - if err != nil { 85 - return nil, fmt.Errorf("failed to link output queue to audio parse: %w", err) 112 + audioParsePad := audioParse.GetStaticPad("sink") 113 + if audioParsePad == nil { 114 + return nil, fmt.Errorf("audio parse pad not found") 115 + } 116 + linked = audioPad.Link(audioParsePad) 117 + if linked != gst.PadLinkOK { 118 + return nil, fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 86 119 } 87 120 88 121 videoappsinkele, err := pipeline.GetElementByName("videoappsink")
+2 -2
subprojects/gstreamer-full.wrap
··· 1 1 [wrap-git] 2 - url = https://gitlab.freedesktop.org/gstreamer/gstreamer.git 3 - revision = d31ce8e5e1aacf8f5e5beabb5c81ce2e4da5c202 2 + url = https://gitlab.freedesktop.org/iameli/gstreamer.git 3 + revision = 978278bd492d95cb576537353f40bbf4c8e6a9db 4 4 depth = 1
test/fixtures/sample-segment.mp4

This is a binary file and will not be displayed.