Live video on the AT Protocol

segmenter: add doH264Parse option

+86 -65
+2
hack/compare-hash.sh
··· 56 56 ffprobe -loglevel fatal -show_frames "$TWO" > "2.frames" 57 57 (diff --color=always "1.frames" "2.frames" || true) | head -n 5 58 58 59 + set +e 59 60 echo -e "\033[0m" 60 61 video_frames_one="$(cat 1.frames | grep media_type=video | wc -l | xargs)" 61 62 video_frames_two="$(cat 2.frames | grep media_type=video | wc -l | xargs)" 62 63 if [[ "$video_frames_one" -ne "$video_frames_two" ]]; then 63 64 echo "Video frame count mismatch: $video_frames_one -> $video_frames_two" 64 65 fi 66 + set -e 65 67 66 68 audio_frames_one="$(cat 1.frames | grep media_type=audio | wc -l | xargs)" 67 69 audio_frames_two="$(cat 2.frames | grep media_type=audio | wc -l | xargs)"
-1
pkg/config/config.go
··· 666 666 return 667 667 } 668 668 defer fd.Close() 669 - log.Log(ctx, "writing debug file", "path", outFile) 670 669 _, err = io.Copy(fd, r) 671 670 if err != nil { 672 671 log.Error(ctx, "failed to copy debug file", "error", err)
+3 -3
pkg/media/audio_smear.go
··· 92 92 93 93 pipelineSlice := []string{ 94 94 "appsrc name=mp4src ! qtdemux name=demux", 95 - "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! appsink sync=false name=videoappsink", 95 + "demux.video_0 ! queue ! appsink sync=false name=videoappsink", 96 96 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 97 97 } 98 98 ··· 233 233 } 234 234 235 235 if len(seg.Video) == 0 { 236 - return nil, fmt.Errorf("no video segments") 236 + return nil, fmt.Errorf("no video segments when rewriting audio") 237 237 } 238 238 if len(seg.Audio) == 0 { 239 - return nil, fmt.Errorf("no audio segments") 239 + return nil, fmt.Errorf("no audio segments when rewriting audio") 240 240 } 241 241 242 242 return &seg, nil
+1 -1
pkg/media/clip_user.go
··· 39 39 defer fd.Close() 40 40 segmentFiles = append(segmentFiles, fd) 41 41 } 42 - err = CombineSegmentsUnsigned(ctx, segmentFiles, writer) 42 + err = CombineSegmentsUnsigned(ctx, segmentFiles, writer, false) 43 43 if err != nil { 44 44 return fmt.Errorf("unable to clip segments: %w", err) 45 45 }
+4 -4
pkg/media/concat2.go
··· 12 12 13 13 var ErrConcatDone = errors.New("concat done") 14 14 15 - func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg) (*gst.Bin, error) { 15 + func ConcatBin(ctx context.Context, segCh <-chan *bus.Seg, doH264Parse bool) (*gst.Bin, error) { 16 16 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 17 17 bin := gst.NewBin("concat-bin") 18 18 ··· 141 141 142 142 return 143 143 } 144 - err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink) 144 + err := addConcatDemuxer(ctx, bin, seg, syncPadVideoSink, syncPadAudioSink, doH264Parse) 145 145 if err != nil { 146 146 log.Error(ctx, "failed to add concat demuxer", "error", err) 147 147 bin.Error(err.Error(), err) ··· 156 156 return bin, nil 157 157 } 158 158 159 - func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad) error { 159 + func addConcatDemuxer(ctx context.Context, bin *gst.Bin, seg *bus.Seg, syncPadVideoSink *gst.Pad, syncPadAudioSink *gst.Pad, doH264Parse bool) error { 160 160 var cancel context.CancelFunc 161 161 ctx, cancel = context.WithCancel(ctx) 162 162 defer cancel() 163 163 ctx = log.WithLogValues(ctx, "func", "ConcatBin") 164 164 165 165 log.Debug(ctx, "adding concat demuxer", "seg", seg.Filepath) 166 - demuxBin, err := ConcatDemuxBin(ctx, seg) 166 + demuxBin, err := ConcatDemuxBin(ctx, seg, doH264Parse) 167 167 if err != nil { 168 168 return fmt.Errorf("failed to create demux bin: %w", err) 169 169 }
+1 -1
pkg/media/concat2_test.go
··· 95 95 close(segCh) 96 96 }() 97 97 98 - concatBin, err := ConcatBin(ctx, segCh) 98 + concatBin, err := ConcatBin(ctx, segCh, true) 99 99 if err != nil { 100 100 return fmt.Errorf("failed to create concat bin: %w", err) 101 101 }
+38 -31
pkg/media/concat_demux.go
··· 18 18 // Function for demuxing a single segment. Needs to be handled very carefully. 19 19 // In particular: users of this MUST cancel the passed context when they're 20 20 // done with the bin. 21 - func ConcatDemuxBin(ctx context.Context, seg *bus.Seg) (*gst.Bin, error) { 21 + func ConcatDemuxBin(ctx context.Context, seg *bus.Seg, doH264Parse bool) (*gst.Bin, error) { 22 22 ctx = log.WithLogValues(ctx, "func", "ConcatDemuxBin") 23 23 bin := gst.NewBin("seg-demux-bin") 24 24 ··· 80 80 // return nil, fmt.Errorf("failed to set max-size-buffers: %w", err) 81 81 // } 82 82 83 - h264parse, err := gst.NewElementWithProperties("h264parse", map[string]interface{}{ 84 - "name": "concat-demux-h264parse", 85 - "config-interval": -1, 86 - "disable-passthrough": true, 87 - }) 88 - if err != nil { 89 - return nil, fmt.Errorf("failed to create h264parse element: %w", err) 90 - } 91 - err = bin.Add(h264parse) 92 - if err != nil { 93 - return nil, fmt.Errorf("failed to add h264parse to bin: %w", err) 94 - } 95 - h264parseSinkPad := h264parse.GetStaticPad("sink") 96 - if h264parseSinkPad == nil { 97 - return nil, fmt.Errorf("failed to get h264parse sink pad") 98 - } 99 - h264parseSrcPad := h264parse.GetStaticPad("src") 100 - if h264parseSrcPad == nil { 101 - return nil, fmt.Errorf("failed to get h264parse source pad") 102 - } 103 - 104 83 opusparse, err := gst.NewElementWithProperties("opusparse", map[string]interface{}{ 105 84 "name": "concat-demux-opusparse", 106 85 "disable-passthrough": true, ··· 141 120 return nil, fmt.Errorf("audio source pad not found") 142 121 } 143 122 144 - linked := mqVideoSrc.Link(h264parseSinkPad) 123 + linked := mqAudioSrc.Link(opusparseSinkPad) 145 124 if linked != gst.PadLinkOK { 146 - return nil, fmt.Errorf("failed to link h264parse sink pad to mq video sink pad") 125 + return nil, fmt.Errorf("failed to link opusparse sink pad to mq audio sink pad") 147 126 } 148 127 149 - linked = mqAudioSrc.Link(opusparseSinkPad) 150 - if linked != gst.PadLinkOK { 151 - return nil, fmt.Errorf("failed to link opusparse sink pad to mq audio sink pad") 152 - } 128 + var videoGhost *gst.GhostPad 129 + if doH264Parse { 130 + h264parse, err := gst.NewElementWithProperties("h264parse", map[string]interface{}{ 131 + "name": "concat-demux-h264parse", 132 + "config-interval": -1, 133 + "disable-passthrough": true, 134 + }) 135 + if err != nil { 136 + return nil, fmt.Errorf("failed to create h264parse element: %w", err) 137 + } 138 + err = bin.Add(h264parse) 139 + if err != nil { 140 + return nil, fmt.Errorf("failed to add h264parse to bin: %w", err) 141 + } 142 + h264parseSinkPad := h264parse.GetStaticPad("sink") 143 + if h264parseSinkPad == nil { 144 + return nil, fmt.Errorf("failed to get h264parse sink pad") 145 + } 146 + h264parseSrcPad := h264parse.GetStaticPad("src") 147 + if h264parseSrcPad == nil { 148 + return nil, fmt.Errorf("failed to get h264parse source pad") 149 + } 150 + linked := mqVideoSrc.Link(h264parseSinkPad) 151 + if linked != gst.PadLinkOK { 152 + return nil, fmt.Errorf("failed to link h264parse sink pad to mq video sink pad") 153 + } 153 154 154 - videoGhost := gst.NewGhostPad("video_0", h264parseSrcPad) 155 - if videoGhost == nil { 156 - return nil, fmt.Errorf("failed to create video ghost pad") 155 + videoGhost = gst.NewGhostPad("video_0", h264parseSrcPad) 156 + if videoGhost == nil { 157 + return nil, fmt.Errorf("failed to create video ghost pad") 158 + } 159 + } else { 160 + videoGhost = gst.NewGhostPad("video_0", mqVideoSrc) 161 + if videoGhost == nil { 162 + return nil, fmt.Errorf("failed to create video ghost pad") 163 + } 157 164 } 158 165 159 166 audioGhost := gst.NewGhostPad("audio_0", opusparseSrcPad)
+1 -1
pkg/media/concat_demux_test.go
··· 64 64 Filepath: filename, 65 65 } 66 66 67 - concatBin, err := ConcatDemuxBin(ctx, testSeg) 67 + concatBin, err := ConcatDemuxBin(ctx, testSeg, true) 68 68 if err != nil { 69 69 return fmt.Errorf("failed to create concat bin: %w", err) 70 70 }
+3 -1
pkg/media/media_data_parser.go
··· 29 29 defer cancel() 30 30 pipelineSlice := []string{ 31 31 "appsrc name=appsrc ! qtdemux name=demux", 32 - "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! tee name=videotee", 32 + "demux.video_0 ! queue ! tee name=videotee", 33 33 "videotee. ! queue ! h2642json ! appsink sync=false name=jsonappsink", 34 34 "videotee. ! queue ! appsink sync=false name=videoappsink", 35 35 "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", ··· 303 303 dur := buf.Duration().AsDuration() 304 304 if dur != nil && *dur > 0 { 305 305 *foundThisTrack = true 306 + } else { 307 + log.Warn(ctx, "no duration found for track", "track", sink.GetName()) 306 308 } 307 309 return gst.FlowOK 308 310 }
+3 -2
pkg/media/media_data_parser_test.go
··· 13 13 14 14 func TestMediaDataParser(t *testing.T) { 15 15 segmentsWithoutBFrames := []string{ 16 - remote.RemoteFixture("d63d26050db9a60c0944b4c2e2b1d052c4350a2a8a877324c7b0b7e7a0c1ae27/bframe-false-positive.mp4"), 17 - getFixture("sample-segment.mp4"), 16 + // remote.RemoteFixture("d63d26050db9a60c0944b4c2e2b1d052c4350a2a8a877324c7b0b7e7a0c1ae27/bframe-false-positive.mp4"), 17 + // getFixture("sample-segment.mp4"), 18 + "/Users/iameli/testvids/no-video-or-whatever/2025-11-18T01-10-56-292Z-signed-segment.mp4", 18 19 } 19 20 withNoGSTLeaks(t, func() { 20 21 for _, segment := range segmentsWithoutBFrames {
+1 -1
pkg/media/packetize.go
··· 28 28 return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) //nolint:all 29 29 } 30 30 31 - demuxBin, err := ConcatDemuxBin(ctx, seg) 31 + demuxBin, err := ConcatDemuxBin(ctx, seg, true) 32 32 if err != nil { 33 33 return nil, fmt.Errorf("failed to create concat bin: %w", err) 34 34 }
+1 -2
pkg/media/rtcrec_test.go
··· 25 25 name: "IntermittentTracks", 26 26 fatalErrors: false, 27 27 fixture: getFixture("intermittent-tracks.cbor"), 28 - expectedSegments: 1, 28 + expectedSegments: 10, 29 29 }, 30 30 { 31 31 name: "SegmentConvergenceIssues", ··· 55 55 err = cli.Parse(fs, []string{ 56 56 "--data-dir", dir, 57 57 "-wide-open=true", 58 - // "--segment-debug-dir", "/Users/iameli/testvids/stuck-converge", 59 58 }) 60 59 require.NoError(t, err) 61 60 mm, err := MakeMediaManager(context.Background(), cli, nil, nil, nil, nil)
+3 -3
pkg/media/segment_combine.go
··· 17 17 // CombineSegments combines a list of segments into a single segment that maintains all of the manifests 18 18 func CombineSegments(ctx context.Context, inputFds []io.ReadSeeker, ms MediaSigner, output io.ReadWriteSeeker) error { 19 19 rws := aqio.NewReadWriteSeeker([]byte{}) 20 - err := CombineSegmentsUnsigned(ctx, inputFds, rws) 20 + err := CombineSegmentsUnsigned(ctx, inputFds, rws, true) 21 21 if err != nil { 22 22 return err 23 23 } ··· 39 39 return nil 40 40 } 41 41 42 - func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer) error { 42 + func CombineSegmentsUnsigned(ctx context.Context, sources []io.ReadSeeker, w io.Writer, doH264Parse bool) error { 43 43 ctx = log.WithLogValues(ctx, "mediafunc", "CombineSegmentsUnsigned") 44 44 ctx, cancel := context.WithCancel(ctx) 45 45 defer cancel() ··· 72 72 close(segCh) 73 73 }() 74 74 75 - concatBin, err := ConcatBin(ctx, segCh) 75 + concatBin, err := ConcatBin(ctx, segCh, doH264Parse) 76 76 if err != nil { 77 77 return fmt.Errorf("failed to create concat bin: %w", err) 78 78 }
+1 -1
pkg/media/segment_combine_test.go
··· 44 44 inputFds[i] = fd 45 45 } 46 46 buf := aqio.NewReadWriteSeeker([]byte{}) 47 - err := CombineSegmentsUnsigned(ctx, inputFds, buf) 47 + err := CombineSegmentsUnsigned(ctx, inputFds, buf, true) 48 48 require.NoError(t, err) 49 49 slice, err := buf.Bytes() 50 50 require.NoError(t, err)
+2 -2
pkg/media/segment_converge.go
··· 19 19 20 20 // run this segment through the segmenter/splitter until it comes out the 21 21 // same, meaning we can cleanly get it in and out of a concatenated mp4 file 22 - func ConvergeSegment(ctx context.Context, cli *config.CLI, bs []byte, now int64, streamer string) ([]byte, error) { 22 + func ConvergeSegment(ctx context.Context, cli *config.CLI, bs []byte, now int64, streamer string, doH264Parse bool) ([]byte, error) { 23 23 cli.DumpDebugSegment(ctx, fmt.Sprintf("converge-segment-%s.mp4", streamer), bytes.NewReader(bs)) 24 24 25 25 log.Debug(ctx, "parsing segment media data", "size", len(bs)) ··· 60 60 log.Log(ctx, "wrote debug file", "path", outFile) 61 61 } 62 62 buf := bytes.Buffer{} 63 - err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf) 63 + err := CombineSegmentsUnsigned(ctx, []io.ReadSeeker{bytes.NewReader(currentBs)}, &buf, doH264Parse) 64 64 if err != nil { 65 65 return nil, fmt.Errorf("failed to attempt segment convergence: %w", err) 66 66 }
+7 -2
pkg/media/segment_converge_test.go
··· 31 31 { 32 32 Name: "Stuck", 33 33 File: remote.RemoteFixture("77e32825eaa9dfb8f6c7bbe3cb21213ffa01c1dc0d041f8e3e9cc4d107c95f16/2025-11-17T01-08-56-070Z-converge-segment-did-key-zQ3shX7nQpEqXEp3XFSPkS7mtUjQ3S1MNvxrEP2HeiwyPqmoz.mp4"), 34 - Success: false, 34 + Success: true, 35 + }, 36 + { 37 + Name: "CouldNotMultiplex", 38 + File: "/Users/iameli/testvids/thursday/2025-11-20T18-02-37-957Z-attempt-000.mp4", 39 + Success: true, 35 40 }, 36 41 // { 37 42 // Name: "CrashedPipeline", ··· 49 54 t.Logf("test case: %s", tc.File) 50 55 bs, err := os.ReadFile(tc.File) 51 56 require.NoError(t, err) 52 - bs, err = ConvergeSegment(ctx, &config.CLI{}, bs, 0, "test-streamer") 57 + bs, err = ConvergeSegment(ctx, &config.CLI{}, bs, 0, "test-streamer", true) 53 58 if tc.Success { 54 59 require.NoError(t, err) 55 60 require.Greater(t, len(bs), 0)
+5 -1
pkg/media/segment_roundtrip_test.go
··· 86 86 require.NoError(t, err) 87 87 88 88 signedSplitSegDir := makeTestSubdir(t, tempDir, "signed-split-segments") 89 - err = SplitSegments(context.Background(), &config.CLI{}, rws, func(fname string) ReadWriteSeekCloser { 89 + cli := &config.CLI{} 90 + fs := cli.NewFlagSet("rtcrec-test") 91 + err = cli.Parse(fs, []string{}) 92 + require.NoError(t, err) 93 + err = SplitSegments(context.Background(), cli, rws, func(fname string) ReadWriteSeekCloser { 90 94 fd, err := os.Create(filepath.Join(signedSplitSegDir, fname)) 91 95 require.NoError(t, err) 92 96 return fd
+3 -1
pkg/media/segment_split.go
··· 1 1 package media 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "encoding/json" 6 7 "errors" ··· 164 165 if err != nil { 165 166 return fmt.Errorf("failed to seek to start: %w", err) 166 167 } 167 - err = SegmentUnsigned(ctx, cli, streamer, input, unsignedCh) 168 + err = SegmentUnsigned(ctx, cli, streamer, input, true, unsignedCh) 168 169 if err != nil { 169 170 return fmt.Errorf("failed to segment file: %w", err) 170 171 } ··· 194 195 _, validationError := ValidateMP4Media(ctx, bs) 195 196 if validationError != nil { 196 197 validationErrors = append(validationErrors, validationError) 198 + cli.DumpDebugSegment(ctx, fmt.Sprintf("%s-invalid.mp4", fname), bytes.NewReader(bs)) 197 199 } 198 200 log.Log(ctx, "validated segment file", "path", fname) 199 201 err = rwsc.Close()
+6 -6
pkg/media/segmenter.go
··· 23 23 var FatalSegmentationErrors = false 24 24 25 25 // element that takes the input stream, muxes to mp4, and signs the result 26 - func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 26 + func SegmentElem(ctx context.Context, cli *config.CLI, streamer string, doH264Parse bool, cb func(ctx context.Context, buf []byte, now int64) error) (*gst.Element, error) { 27 27 // elem, err := gst.NewElement("splitmuxsink name=splitter async-finalize=true sink-factory=appsink muxer-factory=matroskamux max-size-bytes=1") 28 28 elem, err := gst.NewElementWithProperties("splitmuxsink", map[string]any{ 29 29 "name": "signer", ··· 127 127 <-previousSegCh 128 128 } 129 129 err := func() error { 130 - bs, err := ConvergeSegment(ctx, cli, bs, now, streamer) 130 + bs, err := ConvergeSegment(ctx, cli, bs, now, streamer, doH264Parse) 131 131 if err != nil { 132 132 return fmt.Errorf("error converging segment: %w", err) 133 133 } ··· 157 157 } 158 158 159 159 func (mm *MediaManager) SegmentAndSignElem(ctx context.Context, ms MediaSigner) (*gst.Element, error) { 160 - return SegmentElem(ctx, mm.cli, ms.Streamer(), func(ctx context.Context, bs []byte, now int64) error { 160 + return SegmentElem(ctx, mm.cli, ms.Streamer(), false, func(ctx context.Context, bs []byte, now int64) error { 161 161 if mm.cli.SmearAudio { 162 162 smearedBuf := &bytes.Buffer{} 163 163 err := RewriteAudioTimestamps(ctx, mm.cli, bytes.NewReader(bs), smearedBuf, true) ··· 187 187 return fmt.Errorf("failed to read file: %w", err) 188 188 } 189 189 defer fd.Close() 190 - return SegmentUnsigned(ctx, cli, streamer, fd, ch) 190 + return SegmentUnsigned(ctx, cli, streamer, fd, false, ch) 191 191 } 192 192 193 - func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, ch chan *SplitSegment) error { 193 + func SegmentUnsigned(ctx context.Context, cli *config.CLI, streamer string, input io.Reader, doH264Parse bool, ch chan *SplitSegment) error { 194 194 ctx, cancel := context.WithCancel(ctx) 195 195 defer cancel() 196 196 pipelineSlice := []string{ ··· 216 216 return err 217 217 } 218 218 219 - segmenter, err := SegmentElem(ctx, cli, streamer, func(ctx context.Context, buf []byte, now int64) error { 219 + segmenter, err := SegmentElem(ctx, cli, streamer, doH264Parse, func(ctx context.Context, buf []byte, now int64) error { 220 220 ch <- &SplitSegment{ 221 221 Filename: fmt.Sprintf("%d.mp4", now), 222 222 Data: buf,
+1 -1
pkg/media/webrtc_playback.go
··· 73 73 } 74 74 }() 75 75 76 - concatBin, err := ConcatBin(ctx, segCh) 76 + concatBin, err := ConcatBin(ctx, segCh, true) 77 77 if err != nil { 78 78 return nil, fmt.Errorf("failed to create concat bin: %w", err) 79 79 }