tangled
alpha
login
or
join now
stream.place
/
streamplace
74
fork
atom
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
rtmp: cleanup from code review
Eli Mallon
3 months ago
66e926b4
e4f29710
-166
2 changed files
expand all
collapse all
unified
split
pkg
api
rtmp_server.go
media
rtmp_ingest.go
-12
pkg/api/rtmp_server.go
···
22
22
// 2. accept a stream from a reader.
23
23
// 3. broadcast the stream to readers.
24
24
25
25
-
// var (
26
26
-
// mutex sync.Mutex
27
27
-
// publisher *gortmplib.ServerConn
28
28
-
// tracks []format.Format
29
29
-
// readers []*gortmplib.Writer
30
30
-
// )
31
31
-
32
25
var RTMPTimeout = 10 * time.Second
33
26
34
27
const RTMPPrefix = "/live/"
···
64
57
a.rtmpSessionsLock.Unlock()
65
58
close(session.EventChan)
66
59
}()
67
67
-
68
68
-
// videoInput := make(chan *media.RTMPH264Data, 1024)
69
69
-
// defer close(videoInput)
70
70
-
// audioInput := make(chan *media.RTMPAACData, 1024)
71
71
-
// defer close(audioInput)
72
60
73
61
r := &gortmplib.Reader{
74
62
Conn: sc,
-154
pkg/media/rtmp_ingest.go
···
93
93
94
94
return err
95
95
}
96
96
-
97
97
-
// // ingest a H264+AAC RTMP stream
98
98
-
// func (mm *MediaManager) RTMPIngest(ctx context.Context, videoInput chan *RTMPH264Data, audioInput chan *RTMPAACData, ms MediaSigner) error {
99
99
-
// ctx, cancel := context.WithCancel(ctx)
100
100
-
// defer cancel()
101
101
-
// pipelineSlice := []string{
102
102
-
// "appsrc name=videosrc ! queue ! h264parse name=parse",
103
103
-
// "appsrc name=audiosrc ! queue ! fdkaacdec ! audioresample ! opusenc name=audioenc",
104
104
-
// }
105
105
-
// pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
106
106
-
// if err != nil {
107
107
-
// return fmt.Errorf("error creating RTMPIngest pipeline: %w", err)
108
108
-
// }
109
109
-
110
110
-
// videosrcEle, err := pipeline.GetElementByName("videosrc")
111
111
-
// if err != nil {
112
112
-
// return err
113
113
-
// }
114
114
-
// first := true
115
115
-
// // defer runtime.KeepAlive(srcele)
116
116
-
// videosrc := app.SrcFromElement(videosrcEle)
117
117
-
// videosrc.SetCaps(gst.NewCapsFromString("video/x-h264,stream-format=avc3"))
118
118
-
// videosrc.SetCallbacks(&app.SourceCallbacks{
119
119
-
// NeedDataFunc: func(self *app.Source, length uint) {
120
120
-
// if ctx.Err() != nil {
121
121
-
// self.EndStream()
122
122
-
// return
123
123
-
// }
124
124
-
125
125
-
// packet := <-videoInput
126
126
-
// if packet == nil {
127
127
-
// log.Debug(ctx, "video input closed, ending stream")
128
128
-
// self.EndStream()
129
129
-
// return
130
130
-
// }
131
131
-
132
132
-
// // allBytes := bytes.Buffer{}
133
133
-
// // for _, au := range packet.AU {
134
134
-
// // allBytes.Write(au)
135
135
-
// // }
136
136
-
137
137
-
// var avc []byte
138
138
-
// if first {
139
139
-
// c := h264conf.Conf{
140
140
-
// SPS: packet.AU[0],
141
141
-
// PPS: packet.AU[1],
142
142
-
// }
143
143
-
// avc, err = c.Marshal()
144
144
-
// if err != nil {
145
145
-
// log.Error(ctx, "failed to marshal H264 config", "error", err)
146
146
-
// self.Error("failed to marshal H264 config", fmt.Errorf("failed to marshal H264 config: %w", err))
147
147
-
// return
148
148
-
// }
149
149
-
// first = false
150
150
-
// } else {
151
151
-
// avc, err = h264.AVCC(packet.AU).Marshal()
152
152
-
// if err != nil {
153
153
-
// log.Error(ctx, "failed to marshal AnnexB", "error", err)
154
154
-
// self.Error("failed to marshal AnnexB", fmt.Errorf("failed to marshal AnnexB: %w", err))
155
155
-
// return
156
156
-
// }
157
157
-
// }
158
158
-
159
159
-
// buf := gst.NewBufferFromBytes(avc)
160
160
-
// buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds())))
161
161
-
// ret := self.PushBuffer(buf)
162
162
-
// if ret != gst.FlowOK {
163
163
-
// log.Error(ctx, "failed to push video buffer", "error", ret.String())
164
164
-
// self.Error("failed to push video buffer", fmt.Errorf("failed to push video buffer: %s", ret.String()))
165
165
-
// return
166
166
-
// }
167
167
-
// },
168
168
-
// })
169
169
-
170
170
-
// audiosrcEle, err := pipeline.GetElementByName("videosrc")
171
171
-
// if err != nil {
172
172
-
// return err
173
173
-
// }
174
174
-
// // defer runtime.KeepAlive(srcele)
175
175
-
// audiosrc := app.SrcFromElement(audiosrcEle)
176
176
-
// audiosrc.SetCallbacks(&app.SourceCallbacks{
177
177
-
// NeedDataFunc: func(self *app.Source, length uint) {
178
178
-
// if ctx.Err() != nil {
179
179
-
// self.EndStream()
180
180
-
// return
181
181
-
// }
182
182
-
// packet := <-audioInput
183
183
-
// if packet == nil {
184
184
-
// log.Debug(ctx, "audio input closed, ending stream")
185
185
-
// self.EndStream()
186
186
-
// return
187
187
-
// }
188
188
-
// buf := gst.NewBufferFromBytes(packet.AU)
189
189
-
// buf.SetPresentationTimestamp(gst.ClockTime(uint64(packet.PTS.Nanoseconds())))
190
190
-
// ret := self.PushBuffer(buf)
191
191
-
// if ret != gst.FlowOK {
192
192
-
// log.Error(ctx, "failed to push audio buffer", "error", ret.String())
193
193
-
// self.Error("failed to push audio buffer", fmt.Errorf("failed to push audio buffer: %s", ret.String()))
194
194
-
// return
195
195
-
// }
196
196
-
// },
197
197
-
// })
198
198
-
199
199
-
// parseEle, err := pipeline.GetElementByName("parse")
200
200
-
// if err != nil {
201
201
-
// return err
202
202
-
// }
203
203
-
204
204
-
// signer, err := mm.SegmentAndSignElem(ctx, ms)
205
205
-
// if err != nil {
206
206
-
// return err
207
207
-
// }
208
208
-
209
209
-
// err = pipeline.Add(signer)
210
210
-
// if err != nil {
211
211
-
// return err
212
212
-
// }
213
213
-
// err = parseEle.Link(signer)
214
214
-
// if err != nil {
215
215
-
// return err
216
216
-
// }
217
217
-
// audioenc, err := pipeline.GetElementByName("audioenc")
218
218
-
// if err != nil {
219
219
-
// return err
220
220
-
// }
221
221
-
// err = audioenc.Link(signer)
222
222
-
// if err != nil {
223
223
-
// return err
224
224
-
// }
225
225
-
226
226
-
// busErr := make(chan error)
227
227
-
// go func() {
228
228
-
// err := HandleBusMessages(ctx, pipeline)
229
229
-
// busErr <- err
230
230
-
// }()
231
231
-
232
232
-
// go mm.HandleKeyRevocation(ctx, ms, pipeline)
233
233
-
234
234
-
// err = pipeline.SetState(gst.StatePlaying)
235
235
-
// if err != nil {
236
236
-
// return err
237
237
-
// }
238
238
-
239
239
-
// defer func() {
240
240
-
// err := pipeline.SetState(gst.StateNull)
241
241
-
// if err != nil {
242
242
-
// log.Error(ctx, "error setting pipeline to null state", "error", err)
243
243
-
// }
244
244
-
// }()
245
245
-
246
246
-
// err = <-busErr
247
247
-
248
248
-
// return err
249
249
-
// }