tangled
alpha
login
or
join now
stream.place
/
streamplace
74
fork
atom
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
rtmp: wip wip wip
Eli Mallon
3 months ago
cb7d6262
383d5cb9
+324
-162
5 changed files
expand all
collapse all
unified
split
.vscode
launch.json
pkg
api
rtmp_server.go
cmd
streamplace.go
media
rtmp.go
rtmp_ingest.go
+1
-1
.vscode/launch.json
···
13
"type": "go",
14
"request": "launch",
15
"mode": "exec",
16
-
"program": "${workspaceFolder}/build-darwin-amd64/libstreamplace"
17
}
18
]
19
}
···
13
"type": "go",
14
"request": "launch",
15
"mode": "exec",
16
+
"program": "${workspaceFolder}/build-darwin-arm64/libstreamplace"
17
}
18
]
19
}
+161
pkg/api/rtmp_server.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
// Package main contains an example.
2
+
package api
3
+
4
+
import (
5
+
"context"
6
+
"fmt"
7
+
"net"
8
+
"strings"
9
+
"time"
10
+
11
+
"github.com/bluenviron/gortmplib"
12
+
"github.com/bluenviron/gortsplib/v5/pkg/format"
13
+
"golang.org/x/sync/errgroup"
14
+
"stream.place/streamplace/pkg/log"
15
+
"stream.place/streamplace/pkg/media"
16
+
)
17
+
18
+
// This example shows how to:
19
+
// 1. create a RTMP server
20
+
// 2. accept a stream from a reader.
21
+
// 3. broadcast the stream to readers.
22
+
23
+
// var (
24
+
// mutex sync.Mutex
25
+
// publisher *gortmplib.ServerConn
26
+
// tracks []format.Format
27
+
// readers []*gortmplib.Writer
28
+
// )
29
+
30
+
const RTMPPrefix = "/live/"
31
+
32
+
func (a *StreamplaceAPI) HandleRTMPPublisher(ctx context.Context, sc *gortmplib.ServerConn) error {
33
+
sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second))
34
+
35
+
if !strings.HasPrefix(sc.URL.Path, RTMPPrefix) {
36
+
return fmt.Errorf("RTMP publisher is not allowed to publish to %s (must start with %s)", sc.URL.String(), RTMPPrefix)
37
+
}
38
+
streamKey := strings.TrimPrefix(sc.URL.Path, RTMPPrefix)
39
+
mediaSigner, err := a.MakeMediaSigner(ctx, streamKey)
40
+
if err != nil {
41
+
return fmt.Errorf("failed to make media signer: %w", err)
42
+
}
43
+
44
+
ctx = log.WithLogValues(ctx, "streamer", mediaSigner.Streamer())
45
+
46
+
videoInput := make(chan *media.RTMPH264Data, 1024)
47
+
defer close(videoInput)
48
+
audioInput := make(chan *media.RTMPAACData, 1024)
49
+
defer close(audioInput)
50
+
51
+
r := &gortmplib.Reader{
52
+
Conn: sc,
53
+
}
54
+
err = r.Initialize()
55
+
if err != nil {
56
+
return err
57
+
}
58
+
59
+
for _, track := range r.Tracks() {
60
+
log.Log(ctx, "get track", "track", track)
61
+
62
+
switch track := track.(type) {
63
+
case *format.H264:
64
+
r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
65
+
log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts)
66
+
videoInput <- &media.RTMPH264Data{
67
+
AU: au,
68
+
PTS: pts,
69
+
}
70
+
})
71
+
72
+
case *format.MPEG4Audio:
73
+
r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) {
74
+
log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts)
75
+
audioInput <- &media.RTMPAACData{
76
+
AU: au,
77
+
PTS: pts,
78
+
}
79
+
})
80
+
81
+
default:
82
+
return fmt.Errorf("unsupported track type: %T", track)
83
+
}
84
+
}
85
+
86
+
g, ctx := errgroup.WithContext(ctx)
87
+
g.Go(func() error {
88
+
for {
89
+
if ctx.Err() != nil {
90
+
return ctx.Err()
91
+
}
92
+
sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second))
93
+
err = r.Read()
94
+
if err != nil {
95
+
return err
96
+
}
97
+
}
98
+
})
99
+
100
+
g.Go(func() error {
101
+
return a.MediaManager.RTMPIngest(ctx, videoInput, audioInput, mediaSigner)
102
+
})
103
+
104
+
return g.Wait()
105
+
}
106
+
107
+
func (a *StreamplaceAPI) HandleRTMPConnInner(ctx context.Context, conn net.Conn) error {
108
+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
109
+
110
+
sc := &gortmplib.ServerConn{
111
+
RW: conn,
112
+
}
113
+
err := sc.Initialize()
114
+
if err != nil {
115
+
return err
116
+
}
117
+
118
+
err = sc.Accept()
119
+
if err != nil {
120
+
return err
121
+
}
122
+
123
+
if sc.Publish {
124
+
return a.HandleRTMPPublisher(ctx, sc)
125
+
}
126
+
return fmt.Errorf("RTMP playback is not supported")
127
+
}
128
+
129
+
func (a *StreamplaceAPI) HandleRTMPConn(ctx context.Context, conn net.Conn) {
130
+
defer conn.Close()
131
+
132
+
log.Log(ctx, "connection opened", "remoteAddr", conn.RemoteAddr())
133
+
err := a.HandleRTMPConnInner(ctx, conn)
134
+
log.Log(ctx, "connection closed", "remoteAddr", conn.RemoteAddr(), "error", err)
135
+
}
136
+
137
+
func (a *StreamplaceAPI) StartRTMPServer(ctx context.Context) error {
138
+
ln, err := net.Listen("tcp", ":1935")
139
+
if err != nil {
140
+
return fmt.Errorf("failed to listen: %w", err)
141
+
}
142
+
defer ln.Close()
143
+
144
+
log.Log(ctx, "listening on :1935")
145
+
146
+
// Accept loop in a goroutine so we can select on context.Done
147
+
go func() {
148
+
for {
149
+
conn, err := ln.Accept()
150
+
if err != nil {
151
+
log.Error(ctx, "error accepting RTMP connection", "error", err)
152
+
}
153
+
154
+
go a.HandleRTMPConn(ctx, conn)
155
+
}
156
+
}()
157
+
158
+
<-ctx.Done()
159
+
160
+
return ln.Close()
161
+
}
+1
-1
pkg/cmd/streamplace.go
···
448
})
449
450
group.Go(func() error {
451
-
return media.StartRTMPServer(ctx)
452
})
453
454
group.Go(func() error {
···
448
})
449
450
group.Go(func() error {
451
+
return a.StartRTMPServer(ctx)
452
})
453
454
group.Go(func() error {
-160
pkg/media/rtmp.go
···
1
-
// Package main contains an example.
2
-
package media
3
-
4
-
import (
5
-
"context"
6
-
"fmt"
7
-
"net"
8
-
"time"
9
-
10
-
"github.com/bluenviron/gortmplib"
11
-
"github.com/bluenviron/gortsplib/v5/pkg/format"
12
-
"stream.place/streamplace/pkg/log"
13
-
)
14
-
15
-
// This example shows how to:
16
-
// 1. create a RTMP server
17
-
// 2. accept a stream from a reader.
18
-
// 3. broadcast the stream to readers.
19
-
20
-
// var (
21
-
// mutex sync.Mutex
22
-
// publisher *gortmplib.ServerConn
23
-
// tracks []format.Format
24
-
// readers []*gortmplib.Writer
25
-
// )
26
-
27
-
func handlePublisher(ctx context.Context, sc *gortmplib.ServerConn) error {
28
-
sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second))
29
-
30
-
r := &gortmplib.Reader{
31
-
Conn: sc,
32
-
}
33
-
err := r.Initialize()
34
-
if err != nil {
35
-
return err
36
-
}
37
-
38
-
log.Log(ctx, "conn %v is publishing:", sc.RW.(net.Conn).RemoteAddr())
39
-
40
-
for _, track := range r.Tracks() {
41
-
log.Log(ctx, "get track", "track", track)
42
-
43
-
switch track := track.(type) {
44
-
case *format.AV1:
45
-
r.OnDataAV1(track, func(pts time.Duration, tu [][]byte) {
46
-
log.Log(ctx, "got AV1", "len", len(tu), "pts", pts)
47
-
})
48
-
49
-
case *format.VP9:
50
-
r.OnDataVP9(track, func(pts time.Duration, frame []byte) {
51
-
52
-
log.Log(ctx, "got VP9", "len", len(frame), "pts", pts)
53
-
})
54
-
55
-
case *format.H265:
56
-
r.OnDataH265(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
57
-
log.Log(ctx, "got H265", "len", len(au), "pts", pts, "dts", dts)
58
-
})
59
-
60
-
case *format.H264:
61
-
r.OnDataH264(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
62
-
log.Log(ctx, "got H264", "len", len(au), "pts", pts, "dts", dts)
63
-
})
64
-
65
-
case *format.Opus:
66
-
r.OnDataOpus(track, func(pts time.Duration, packet []byte) {
67
-
log.Log(ctx, "got Opus", "len", len(packet), "pts", pts)
68
-
})
69
-
70
-
case *format.MPEG4Audio:
71
-
r.OnDataMPEG4Audio(track, func(pts time.Duration, au []byte) {
72
-
log.Log(ctx, "got MPEG4Au", "len", len(au), "pts", pts)
73
-
})
74
-
75
-
case *format.MPEG1Audio:
76
-
r.OnDataMPEG1Audio(track, func(pts time.Duration, frame []byte) {
77
-
log.Log(ctx, "got MPEG1Au", "len", len(frame), "pts", pts)
78
-
})
79
-
80
-
case *format.AC3:
81
-
r.OnDataAC3(track, func(pts time.Duration, frame []byte) {
82
-
log.Log(ctx, "got AC3", "len", len(frame), "pts", pts)
83
-
})
84
-
85
-
case *format.G711:
86
-
r.OnDataG711(track, func(pts time.Duration, samples []byte) {
87
-
log.Log(ctx, "got G711", "len", len(samples), "pts", pts)
88
-
})
89
-
90
-
case *format.LPCM:
91
-
r.OnDataLPCM(track, func(pts time.Duration, samples []byte) {
92
-
log.Log(ctx, "got LPCM", "len", len(samples), "pts", pts)
93
-
})
94
-
}
95
-
}
96
-
97
-
for {
98
-
sc.RW.(net.Conn).SetReadDeadline(time.Now().Add(10 * time.Second))
99
-
err = r.Read()
100
-
if err != nil {
101
-
return err
102
-
}
103
-
}
104
-
}
105
-
106
-
func handleConnInner(ctx context.Context, conn net.Conn) error {
107
-
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
108
-
109
-
sc := &gortmplib.ServerConn{
110
-
RW: conn,
111
-
}
112
-
err := sc.Initialize()
113
-
if err != nil {
114
-
return err
115
-
}
116
-
117
-
err = sc.Accept()
118
-
if err != nil {
119
-
return err
120
-
}
121
-
122
-
if sc.Publish {
123
-
return handlePublisher(ctx, sc)
124
-
}
125
-
return fmt.Errorf("RTMP playback is not supported")
126
-
}
127
-
128
-
func handleConn(ctx context.Context, conn net.Conn) {
129
-
defer conn.Close()
130
-
131
-
log.Log(ctx, "conn %v opened", conn.RemoteAddr())
132
-
err := handleConnInner(ctx, conn)
133
-
log.Log(ctx, "conn %v closed: %v", conn.RemoteAddr(), err)
134
-
}
135
-
136
-
func StartRTMPServer(ctx context.Context) error {
137
-
ln, err := net.Listen("tcp", ":1935")
138
-
if err != nil {
139
-
return fmt.Errorf("failed to listen: %w", err)
140
-
}
141
-
defer ln.Close()
142
-
143
-
log.Log(ctx, "listening on :1935")
144
-
145
-
// Accept loop in a goroutine so we can select on context.Done
146
-
go func() {
147
-
for {
148
-
conn, err := ln.Accept()
149
-
if err != nil {
150
-
log.Error(ctx, "error accepting RTMP connection", "error", err)
151
-
}
152
-
153
-
go handleConn(ctx, conn)
154
-
}
155
-
}()
156
-
157
-
<-ctx.Done()
158
-
159
-
return ln.Close()
160
-
}
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
+161
pkg/media/rtmp_ingest.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
package media
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"strings"
7
+
"time"
8
+
9
+
"github.com/bluenviron/mediacommon/v2/pkg/codecs/h264"
10
+
"github.com/go-gst/go-gst/gst"
11
+
"github.com/go-gst/go-gst/gst/app"
12
+
"stream.place/streamplace/pkg/log"
13
+
)
14
+
15
+
type RTMPH264Data struct {
16
+
AU [][]byte
17
+
PTS time.Duration
18
+
}
19
+
20
+
type RTMPAACData struct {
21
+
AU []byte
22
+
PTS time.Duration
23
+
}
24
+
25
+
// ingest a H264+AAC RTMP stream
26
+
func (mm *MediaManager) RTMPIngest(ctx context.Context, videoInput chan *RTMPH264Data, audioInput chan *RTMPAACData, ms MediaSigner) error {
27
+
ctx, cancel := context.WithCancel(ctx)
28
+
defer cancel()
29
+
pipelineSlice := []string{
30
+
"appsrc name=videosrc ! queue ! h264parse name=parse",
31
+
"appsrc name=audiosrc ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc",
32
+
}
33
+
pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
34
+
if err != nil {
35
+
return fmt.Errorf("error creating RTMPIngest pipeline: %w", err)
36
+
}
37
+
38
+
videosrcEle, err := pipeline.GetElementByName("videosrc")
39
+
if err != nil {
40
+
return err
41
+
}
42
+
// defer runtime.KeepAlive(srcele)
43
+
videosrc := app.SrcFromElement(videosrcEle)
44
+
videosrc.SetCaps(gst.NewCapsFromString("video/x-h264,stream-format=byte-stream"))
45
+
videosrc.SetCallbacks(&app.SourceCallbacks{
46
+
NeedDataFunc: func(self *app.Source, length uint) {
47
+
if ctx.Err() != nil {
48
+
self.EndStream()
49
+
return
50
+
}
51
+
52
+
packet := <-videoInput
53
+
if packet == nil {
54
+
log.Debug(ctx, "video input closed, ending stream")
55
+
self.EndStream()
56
+
return
57
+
}
58
+
59
+
// allBytes := bytes.Buffer{}
60
+
// for _, au := range packet.AU {
61
+
// allBytes.Write(au)
62
+
// }
63
+
64
+
avcc, err := h264.AnnexB(packet.AU).Marshal()
65
+
if err != nil {
66
+
log.Error(ctx, "failed to marshal AVCC", "error", err)
67
+
self.Error("failed to marshal AVCC", fmt.Errorf("failed to marshal AVCC: %w", err))
68
+
return
69
+
}
70
+
71
+
buf := gst.NewBufferFromBytes(avcc)
72
+
buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds())))
73
+
ret := self.PushBuffer(buf)
74
+
if ret != gst.FlowOK {
75
+
log.Error(ctx, "failed to push video buffer", "error", ret.String())
76
+
self.Error("failed to push video buffer", fmt.Errorf("failed to push video buffer: %s", ret.String()))
77
+
return
78
+
}
79
+
},
80
+
})
81
+
82
+
audiosrcEle, err := pipeline.GetElementByName("videosrc")
83
+
if err != nil {
84
+
return err
85
+
}
86
+
// defer runtime.KeepAlive(srcele)
87
+
audiosrc := app.SrcFromElement(audiosrcEle)
88
+
audiosrc.SetCallbacks(&app.SourceCallbacks{
89
+
NeedDataFunc: func(self *app.Source, length uint) {
90
+
if ctx.Err() != nil {
91
+
self.EndStream()
92
+
return
93
+
}
94
+
packet := <-audioInput
95
+
if packet == nil {
96
+
log.Debug(ctx, "audio input closed, ending stream")
97
+
self.EndStream()
98
+
return
99
+
}
100
+
buf := gst.NewBufferFromBytes(packet.AU)
101
+
buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds())))
102
+
ret := self.PushBuffer(buf)
103
+
if ret != gst.FlowOK {
104
+
log.Error(ctx, "failed to push audio buffer", "error", ret.String())
105
+
self.Error("failed to push audio buffer", fmt.Errorf("failed to push audio buffer: %s", ret.String()))
106
+
return
107
+
}
108
+
},
109
+
})
110
+
111
+
parseEle, err := pipeline.GetElementByName("parse")
112
+
if err != nil {
113
+
return err
114
+
}
115
+
116
+
signer, err := mm.SegmentAndSignElem(ctx, ms)
117
+
if err != nil {
118
+
return err
119
+
}
120
+
121
+
err = pipeline.Add(signer)
122
+
if err != nil {
123
+
return err
124
+
}
125
+
err = parseEle.Link(signer)
126
+
if err != nil {
127
+
return err
128
+
}
129
+
audioenc, err := pipeline.GetElementByName("audioenc")
130
+
if err != nil {
131
+
return err
132
+
}
133
+
err = audioenc.Link(signer)
134
+
if err != nil {
135
+
return err
136
+
}
137
+
138
+
busErr := make(chan error)
139
+
go func() {
140
+
err := HandleBusMessages(ctx, pipeline)
141
+
busErr <- err
142
+
}()
143
+
144
+
go mm.HandleKeyRevocation(ctx, ms, pipeline)
145
+
146
+
err = pipeline.SetState(gst.StatePlaying)
147
+
if err != nil {
148
+
return err
149
+
}
150
+
151
+
defer func() {
152
+
err := pipeline.SetState(gst.StateNull)
153
+
if err != nil {
154
+
log.Error(ctx, "error setting pipeline to null state", "error", err)
155
+
}
156
+
}()
157
+
158
+
err = <-busErr
159
+
160
+
return err
161
+
}