Live video on the AT Protocol

media: add rtmp push

+172
+2
Makefile
··· 176 176 -D "gst-plugins-bad:mpegtsdemux=enabled" \ 177 177 -D "gst-plugins-bad:codectimestamper=enabled" \ 178 178 -D "gst-plugins-bad:opus=enabled" \ 179 + -D "gst-plugins-bad:rtmp2=enabled" \ 180 + -D "gst-plugins-good:flv=enabled" \ 179 181 -D "gst-plugins-ugly:x264=enabled" \ 180 182 -D "gst-plugins-ugly:gpl=enabled" \ 181 183 -D "x264:asm=enabled" \
+18
pkg/director/stream_session.go
··· 101 101 102 102 close(ss.started) 103 103 104 + ss.Go(ctx, func() error { 105 + for { 106 + err := ss.mm.RTMPPush(ctx, spseg.Creator, "source", "rtmp://localhost:21935/live/live") 107 + if err != nil { 108 + log.Error(ctx, "failed to push to RTMP server", "error", err) 109 + } 110 + if ctx.Err() != nil { 111 + return nil 112 + } 113 + select { 114 + case <-ctx.Done(): 115 + return nil 116 + case <-time.After(time.Second * 5): 117 + continue 118 + } 119 + } 120 + }) 121 + 104 122 for { 105 123 select { 106 124 case <-ss.segmentChan:
+152
pkg/media/rtmp_push.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + "github.com/go-gst/go-gst/gst" 9 + "github.com/google/uuid" 10 + "stream.place/streamplace/pkg/bus" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + // This function remains in scope for the duration of a single users' playback 15 + func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, url string) error { 16 + uu, err := uuid.NewV7() 17 + if err != nil { 18 + return err 19 + } 20 + ctx, cancel := context.WithCancel(ctx) 21 + defer cancel() 22 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 23 + ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush") 24 + 25 + pipelineSlice := []string{ 26 + "flvmux name=muxer ! rtmp2sink name=rtmp2sink", 27 + "h264parse name=videoparse ! muxer.video", 28 + "opusparse name=audioparse ! opusdec ! fdkaacenc ! muxer.audio", 29 + } 30 + 31 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 32 + if err != nil { 33 + return fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 34 + } 35 + 36 + rtmp2sink, err := pipeline.GetElementByName("rtmp2sink") 37 + if err != nil { 38 + return fmt.Errorf("failed to get rtmp2sink element from pipeline: %w", err) 39 + } 40 + err = rtmp2sink.SetProperty("location", url) 41 + if err != nil { 42 + return fmt.Errorf("failed to set rtmp2sink location: %w", err) 43 + } 44 + 45 + segBuffer := make(chan *bus.Seg, 1024) 46 + go func() { 47 + segChan := mm.bus.SubscribeSegment(ctx, user, rendition) 48 + defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 49 + for { 50 + select { 51 + case <-ctx.Done(): 52 + log.Debug(ctx, "exiting segment reader") 53 + return 54 + case file := <-segChan.C: 55 + log.Debug(ctx, "got segment", "file", file.Filepath) 56 + segBuffer <- file 57 + } 58 + } 59 + }() 60 + 61 + segCh := make(chan *bus.Seg) 62 + go func() { 63 + for { 64 + select { 65 + case <-ctx.Done(): 66 + log.Debug(ctx, "exiting segment reader") 67 + return 68 + case seg := <-segBuffer: 69 + select { 70 + case <-ctx.Done(): 71 + return 72 + case segCh <- seg: 73 + } 74 + } 75 + } 76 + }() 77 + 78 + concatBin, err := ConcatBin(ctx, segCh) 79 + if err != nil { 80 + return fmt.Errorf("failed to create concat bin: %w", err) 81 + } 82 + 83 + err = pipeline.Add(concatBin.Element) 84 + if err != nil { 85 + return fmt.Errorf("failed to add concat bin to pipeline: %w", err) 86 + } 87 + 88 + videoPad := concatBin.GetStaticPad("video_0") 89 + if videoPad == nil { 90 + return fmt.Errorf("video pad not found") 91 + } 92 + 93 + audioPad := concatBin.GetStaticPad("audio_0") 94 + if audioPad == nil { 95 + return fmt.Errorf("audio pad not found") 96 + } 97 + 98 + // queuePadVideo := outputQueue.GetRequestPad("src_%u") 99 + // if queuePadVideo == nil { 100 + // return fmt.Errorf("failed to get queue video pad") 101 + // } 102 + // queuePadAudio := outputQueue.GetRequestPad("src_%u") 103 + // if queuePadAudio == nil { 104 + // return fmt.Errorf("failed to get queue audio pad") 105 + // } 106 + 107 + videoParse, err := pipeline.GetElementByName("videoparse") 108 + if err != nil { 109 + return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 110 + } 111 + videoParsePad := videoParse.GetStaticPad("sink") 112 + if videoParsePad == nil { 113 + return fmt.Errorf("video parse pad not found") 114 + } 115 + linked := videoPad.Link(videoParsePad) 116 + if linked != gst.PadLinkOK { 117 + return fmt.Errorf("failed to link video pad to video parse pad: %v", linked) 118 + } 119 + 120 + audioParse, err := pipeline.GetElementByName("audioparse") 121 + if err != nil { 122 + return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 123 + } 124 + audioParsePad := audioParse.GetStaticPad("sink") 125 + if audioParsePad == nil { 126 + return fmt.Errorf("audio parse pad not found") 127 + } 128 + linked = audioPad.Link(audioParsePad) 129 + if linked != gst.PadLinkOK { 130 + return fmt.Errorf("failed to link audio pad to audio parse pad: %v", linked) 131 + } 132 + 133 + errCh := make(chan error) 134 + go func() { 135 + err := HandleBusMessages(ctx, pipeline) 136 + errCh <- err 137 + }() 138 + 139 + err = pipeline.SetState(gst.StatePlaying) 140 + if err != nil { 141 + return fmt.Errorf("failed to set pipeline state to playing: %w", err) 142 + } 143 + 144 + defer func() { 145 + err = pipeline.SetState(gst.StateNull) 146 + if err != nil { 147 + log.Error(ctx, "failed to set pipeline state to null", "error", err) 148 + } 149 + }() 150 + 151 + return <-errCh 152 + }