tangled
alpha
login
or
join now
stream.place
/
streamplace
75
fork
atom
Live video on the AT Protocol
75
fork
atom
overview
issues
1
pulls
pipelines
determinism: who needs average bitrate anyway
Eli Mallon
3 months ago
d9481d8b
e75fc56e
+98
-58
7 changed files
expand all
collapse all
unified
split
go.mod
go.sum
pkg
cmd
combine.go
split.go
media
segment_converge.go
segment_split.go
segmenter.go
+2
-1
go.mod
···
13
13
require (
14
14
firebase.google.com/go/v4 v4.14.1
15
15
github.com/99designs/gqlgen v0.17.64
16
16
+
github.com/Eyevinn/mp4ff v0.50.0
16
17
github.com/NYTimes/gziphandler v1.1.1
17
18
github.com/ThalesGroup/crypto11 v0.0.0-00010101000000-000000000000
18
19
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
···
65
66
go.opentelemetry.io/otel v1.36.0
66
67
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.35.0
67
68
go.opentelemetry.io/otel/sdk v1.36.0
68
68
-
go.opentelemetry.io/otel/trace v1.36.0
69
69
go.uber.org/goleak v1.3.0
70
70
golang.org/x/image v0.30.0
71
71
golang.org/x/net v0.43.0
···
499
499
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
500
500
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
501
501
go.opentelemetry.io/otel/metric v1.36.0 // indirect
502
502
+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
502
503
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
503
504
go.uber.org/atomic v1.11.0 // indirect
504
505
go.uber.org/automaxprocs v1.6.0 // indirect
+4
-2
go.sum
···
104
104
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
105
105
github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 h1:sHglBQTwgx+rWPdisA5ynNEsoARbiCBOyGcJM4/OzsM=
106
106
github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24/go.mod h1:4UJr5HIiMZrwgkSPdsjy2uOQExX/WEILpIrO9UPGuXs=
107
107
+
github.com/Eyevinn/mp4ff v0.50.0 h1:vFlsvpQh5Jfz++cuaeTI90vbID5dAabebvvN/l9lom0=
108
108
+
github.com/Eyevinn/mp4ff v0.50.0/go.mod h1:hJNUUqOBryLAzUW9wpCJyw2HaI+TCd2rUPhafoS5lgg=
107
109
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.1 h1:Sz1JIXEcSfhz7fUi7xHnhpIE0thVASYjvosApmHuD2k=
108
110
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.1/go.mod h1:n/LSCXNuIYqVfBlVXyHfMQkZDdp1/mmxfSjADd3z1Zg=
109
111
github.com/Kagami/go-avif v0.1.0 h1:8GHAGLxCdFfhpd4Zg8j1EqO7rtcQNenxIDerC/uu68w=
···
463
465
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
464
466
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
465
467
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
466
466
-
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
467
467
-
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
468
468
+
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
469
469
+
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
468
470
github.com/go-text/typesetting v0.3.0 h1:OWCgYpp8njoxSRpwrdd1bQOxdjOXDj9Rqart9ML4iF4=
469
471
github.com/go-text/typesetting v0.3.0/go.mod h1:qjZLkhRgOEYMhU9eHBr3AR4sfnGJvOXNLt8yRAySFuY=
470
472
github.com/go-text/typesetting-utils v0.0.0-20241103174707-87a29e9e6066 h1:qCuYC+94v2xrb1PoS4NIDe7DGYtLnU2wWiQe9a1B1c0=
+3
-3
pkg/cmd/combine.go
···
62
62
if err != nil {
63
63
return err
64
64
}
65
65
-
err = CheckCombined(ctx, outFd, *debugDir)
65
65
+
err = CheckCombined(ctx, cli, outFd, *debugDir)
66
66
if err != nil {
67
67
return err
68
68
}
69
69
return nil
70
70
}
71
71
72
72
-
func CheckCombined(ctx context.Context, inFD io.ReadWriteSeeker, debugDir string) error {
72
72
+
func CheckCombined(ctx context.Context, cli *config.CLI, inFD io.ReadWriteSeeker, debugDir string) error {
73
73
_, err := inFD.Seek(0, io.SeekStart)
74
74
if err != nil {
75
75
return err
76
76
}
77
77
-
err = media.SplitSegments(ctx, inFD, func(fname string) media.ReadWriteSeekCloser {
77
77
+
err = media.SplitSegments(ctx, cli, inFD, func(fname string) media.ReadWriteSeekCloser {
78
78
if debugDir == "" {
79
79
return aqio.NewReadWriteSeeker([]byte{})
80
80
}
+2
-1
pkg/cmd/split.go
···
7
7
"os"
8
8
"path/filepath"
9
9
10
10
+
"stream.place/streamplace/pkg/config"
10
11
"stream.place/streamplace/pkg/log"
11
12
"stream.place/streamplace/pkg/media"
12
13
)
···
24
25
25
26
names := []string{}
26
27
27
27
-
err = media.SplitSegments(ctx, inFD, func(fname string) media.ReadWriteSeekCloser {
28
28
+
err = media.SplitSegments(ctx, &config.CLI{}, inFD, func(fname string) media.ReadWriteSeekCloser {
28
29
fullPath := filepath.Join(outDir, fname)
29
30
names = append(names, fullPath)
30
31
log.Log(ctx, "creating segment file", "path", fullPath)
+68
pkg/media/segment_converge.go
···
1
1
+
package media
2
2
+
3
3
+
import (
4
4
+
"bytes"
5
5
+
"context"
6
6
+
"fmt"
7
7
+
"io"
8
8
+
"os"
9
9
+
"path/filepath"
10
10
+
"slices"
11
11
+
12
12
+
"github.com/Eyevinn/mp4ff/mp4"
13
13
+
"stream.place/streamplace/pkg/aqtime"
14
14
+
"stream.place/streamplace/pkg/config"
15
15
+
"stream.place/streamplace/pkg/log"
16
16
+
)
17
17
+
18
18
+
func ConvergeSegment(ctx context.Context, cli *config.CLI, bs []byte, now int64, streamer string) ([]byte, error) {
19
19
+
previousBs := []byte{}
20
20
+
currentBs := bs
21
21
+
i := 0
22
22
+
for i = 0; i <= MaxSegmentTries; i++ {
23
23
+
if slices.Compare(previousBs, currentBs) == 0 {
24
24
+
break
25
25
+
}
26
26
+
if cli.SegmentDebugDir != "" {
27
27
+
mydir := filepath.Join(cli.SegmentDebugDir, streamer)
28
28
+
err := os.MkdirAll(mydir, 0755)
29
29
+
if err != nil {
30
30
+
return nil, fmt.Errorf("failed to create debug directory: %w", err)
31
31
+
}
32
32
+
aqt := aqtime.FromMillis(now)
33
33
+
outFile := filepath.Join(cli.SegmentDebugDir, fmt.Sprintf("%s-attempt-%03d.mp4", aqt.FileSafeString(), i))
34
34
+
err = os.WriteFile(outFile, currentBs, 0644)
35
35
+
if err != nil {
36
36
+
return nil, fmt.Errorf("failed to write debug file: %w", err)
37
37
+
}
38
38
+
log.Log(ctx, "wrote debug file", "path", outFile)
39
39
+
}
40
40
+
buf := bytes.Buffer{}
41
41
+
err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf)
42
42
+
if err != nil {
43
43
+
return nil, fmt.Errorf("failed to attempt segment convergence: %w", err)
44
44
+
}
45
45
+
previousBs = currentBs
46
46
+
currentBs = buf.Bytes()
47
47
+
mp4file, err := mp4.DecodeFile(bytes.NewReader(currentBs))
48
48
+
if err != nil {
49
49
+
return nil, fmt.Errorf("failed to decode segment: %w", err)
50
50
+
}
51
51
+
btrt := mp4file.Moov.Trak.Mdia.Minf.Stbl.Stsd.AvcX.Btrt
52
52
+
btrt.AvgBitrate = 0
53
53
+
btrt.MaxBitrate = 0
54
54
+
// log.Log(ctx, "btrt", "average bitrate", btrt.AvgBitrate, "max bitrate", btrt.MaxBitrate)
55
55
+
encodedBuf := bytes.Buffer{}
56
56
+
err = mp4file.Encode(&encodedBuf)
57
57
+
if err != nil {
58
58
+
return nil, fmt.Errorf("failed to encode segment: %w", err)
59
59
+
}
60
60
+
currentBs = encodedBuf.Bytes()
61
61
+
}
62
62
+
if slices.Compare(previousBs, currentBs) != 0 {
63
63
+
return nil, fmt.Errorf("failed to converge segment after %d tries", MaxSegmentTries)
64
64
+
}
65
65
+
bs = currentBs
66
66
+
log.Log(ctx, "converged segments", "tries", i, "size", len(bs))
67
67
+
return currentBs, nil
68
68
+
}
+4
-2
pkg/media/segment_split.go
···
11
11
"golang.org/x/sync/errgroup"
12
12
"stream.place/streamplace/pkg/aqio"
13
13
c2patypes "stream.place/streamplace/pkg/c2patypes"
14
14
+
"stream.place/streamplace/pkg/config"
14
15
"stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
15
16
"stream.place/streamplace/pkg/log"
16
17
)
···
95
96
}
96
97
97
98
// split a signed concatenated mp4 into its constituent signed segments
98
98
-
func SplitSegments(ctx context.Context, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error {
99
99
+
func SplitSegments(ctx context.Context, cli *config.CLI, input io.ReadSeeker, cb func(fname string) ReadWriteSeekCloser) error {
99
100
manifestsStr, err := iroh_streamplace.GetManifests(c2patypes.NewReader(input))
100
101
if err != nil {
101
102
return fmt.Errorf("failed to get manifests: %w", err)
···
142
143
}
143
144
g, ctx := errgroup.WithContext(ctx)
144
145
unsignedCh := make(chan *SplitSegment)
146
146
+
streamer := manifestList[0].SegmentMetadata.Creator
145
147
146
148
// note: we're passing the input to two places here and need to make sure
147
149
// they're not running into problems with concurrent seeking. so we use
···
162
164
if err != nil {
163
165
return fmt.Errorf("failed to seek to start: %w", err)
164
166
}
165
165
-
err = SegmentUnsigned(ctx, input, unsignedCh)
167
167
+
err = SegmentUnsigned(ctx, cli, streamer, input, unsignedCh)
166
168
if err != nil {
167
169
return fmt.Errorf("failed to segment file: %w", err)
168
170
}
+15
-49
pkg/media/segmenter.go
···
6
6
"fmt"
7
7
"io"
8
8
"os"
9
9
-
"path/filepath"
10
10
-
"slices"
11
9
"strings"
12
10
"time"
13
11
14
12
"github.com/go-gst/go-gst/gst"
15
13
"github.com/go-gst/go-gst/gst/app"
16
16
-
"stream.place/streamplace/pkg/aqtime"
14
14
+
"stream.place/streamplace/pkg/config"
17
15
"stream.place/streamplace/pkg/log"
18
16
)
19
17
20
18
// element that takes the input stream, muxes to mp4, and signs the result
21
21
-
func SegmentElem(ctx context.Context, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) {
19
19
+
func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) {
22
20
// elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1")
23
21
elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{
24
22
"name": "signer",
···
121
119
if previousSegCh != nil {
122
120
<-previousSegCh
123
121
}
124
124
-
err := cb(ctx, bs, now)
122
122
+
bs, err := ConvergeSegment(ctx, cli, bs, now, streamer)
123
123
+
if err != nil {
124
124
+
log.Error(ctx, "error converging segment", "error", err)
125
125
+
elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "Error converging segment", err.Error())
126
126
+
return
127
127
+
}
128
128
+
err = cb(ctx, bs, now)
125
129
if err != nil {
126
130
log.Error(ctx, "error signing segment", "error", err)
127
131
elem.ErrorMessage(gst.DomainCore, gst.CoreErrorFailed, "Error signing segment", err.Error())
···
142
146
var MaxSegmentTries = 10
143
147
144
148
func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) {
145
145
-
return SegmentElem(ctx, func(ctx context.Context, bs []byte, now int64) error {
146
146
-
signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
147
147
-
if err != nil {
148
148
-
return fmt.Errorf("error calling SignMP4: %w", err)
149
149
-
}
150
150
-
previousBs := []byte{}
151
151
-
currentBs := signedBs
152
152
-
i := 0
153
153
-
for i = 0; i <= MaxSegmentTries; i++ {
154
154
-
if slices.Compare(previousBs, currentBs) == 0 {
155
155
-
break
156
156
-
}
157
157
-
if mm.cli.SegmentDebugDir != "" {
158
158
-
mydir := filepath.Join(mm.cli.SegmentDebugDir, ms.Streamer())
159
159
-
err := os.MkdirAll(mydir, 0755)
160
160
-
if err != nil {
161
161
-
return fmt.Errorf("failed to create debug directory: %w", err)
162
162
-
}
163
163
-
aqt := aqtime.FromMillis(now)
164
164
-
outFile := filepath.Join(mm.cli.SegmentDebugDir, fmt.Sprintf("%s-attempt-%03d.mp4", aqt.FileSafeString(), i))
165
165
-
err = os.WriteFile(outFile, currentBs, 0644)
166
166
-
if err != nil {
167
167
-
return fmt.Errorf("failed to write debug file: %w", err)
168
168
-
}
169
169
-
log.Log(ctx, "wrote debug file", "path", outFile)
170
170
-
}
171
171
-
buf := bytes.Buffer{}
172
172
-
err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf)
173
173
-
if err != nil {
174
174
-
return fmt.Errorf("failed to attempt segment convergence: %w", err)
175
175
-
}
176
176
-
previousBs = currentBs
177
177
-
currentBs = buf.Bytes()
178
178
-
}
179
179
-
if slices.Compare(previousBs, currentBs) != 0 {
180
180
-
return fmt.Errorf("failed to converge segment after %d tries", MaxSegmentTries)
181
181
-
}
182
182
-
bs = currentBs
183
183
-
log.Log(ctx, "converged segments", "tries", i, "size", len(bs))
149
149
+
return SegmentElem(ctx, mm.cli, ms.Streamer(), func(ctx context.Context, bs []byte, now int64) error {
184
150
if mm.cli.SmearAudio {
185
151
smearedBuf := &bytes.Buffer{}
186
152
err := SmearAudioTimestamps(ctx, bytes.NewReader(bs), smearedBuf)
···
189
155
}
190
156
bs = smearedBuf.Bytes()
191
157
}
192
192
-
signedBs, err = ms.SignMP4(ctx, bytes.NewReader(bs), now)
158
158
+
signedBs, err := ms.SignMP4(ctx, bytes.NewReader(bs), now)
193
159
if err != nil {
194
160
return fmt.Errorf("error calling SignMP4: %w", err)
195
161
}
···
202
168
})
203
169
}
204
170
205
205
-
func SegmentFileUnsigned(ctx context.Context, input string, ch chan *SplitSegment) error {
171
171
+
func SegmentFileUnsigned(ctx context.Context, cli *config.CLI, streamer string, input string, ch chan *SplitSegment) error {
206
172
fd, err := os.OpenFile(input, os.O_RDONLY, 0644)
207
173
log.Log(ctx, "reading file", "file", input)
208
174
if err != nil {
209
175
return fmt.Errorf("failed to read file: %w", err)
210
176
}
211
177
defer fd.Close()
212
212
-
return SegmentUnsigned(ctx, fd, ch)
178
178
+
return SegmentUnsigned(ctx, cli, streamer, fd, ch)
213
179
}
214
180
215
215
-
func SegmentUnsigned(ctx context.Context, input io.Reader, ch chan *SplitSegment) error {
181
181
+
func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, ch chan *SplitSegment) error {
216
182
ctx, cancel := context.WithCancel(ctx)
217
183
defer cancel()
218
184
pipelineSlice := []string{
···
238
204
return err
239
205
}
240
206
241
241
-
segmenter, err := SegmentElem(ctx, func(ctx context.Context, buf []byte, now int64) error {
207
207
+
segmenter, err := SegmentElem(ctx, cli, streamer, func(ctx context.Context, buf []byte, now int64) error {
242
208
ch <- &SplitSegment{
243
209
Filename: fmt.Sprintf("%d.mp4", now),
244
210
Data: buf,