Live video on the AT Protocol

implement basic stream problem detection

+293 -14
hack/upload-fixture.sh
+19 -1
js/app/src/screens/live-dashboard.tsx
··· 1 - import { LivestreamProvider } from "@streamplace/components"; 2 import { Camera, FerrisWheel, X } from "@tamagui/lucide-icons"; 3 import { Redirect } from "components/aqlink"; 4 import CreateLivestream from "components/create-livestream"; ··· 134 {page === "update" && isLive ? <UpdateLivestream /> : null} 135 {page === "create" ? <CreateLivestream /> : null} 136 </View> 137 {madeChoiceAboutDebugRecording ? null : <DebugRecordingPopup />} 138 </View> 139 </VideoElementProvider> 140 </LivestreamProvider> 141 ); 142 } 143 144 const elems = [ 145 {
··· 1 + import { 2 + LivestreamProvider, 3 + useLivestreamStore, 4 + } from "@streamplace/components"; 5 import { Camera, FerrisWheel, X } from "@tamagui/lucide-icons"; 6 import { Redirect } from "components/aqlink"; 7 import CreateLivestream from "components/create-livestream"; ··· 137 {page === "update" && isLive ? <UpdateLivestream /> : null} 138 {page === "create" ? <CreateLivestream /> : null} 139 </View> 140 + <View> 141 + <Problems /> 142 + </View> 143 {madeChoiceAboutDebugRecording ? null : <DebugRecordingPopup />} 144 </View> 145 </VideoElementProvider> 146 </LivestreamProvider> 147 ); 148 } 149 + 150 + const Problems = () => { 151 + const problems = useLivestreamStore((x) => x.problems); 152 + if (problems.length === 0) { 153 + return null; 154 + } 155 + return ( 156 + <View> 157 + <Text>{JSON.stringify(problems, null, 2)}</Text> 158 + </View> 159 + ); 160 + }; 161 162 const elems = [ 163 {
+9
js/components/src/livestream-store/livestream-state.tsx
··· 15 viewers: number | null; 16 pendingHides: string[]; 17 segment: PlaceStreamSegment.Record | null; 18 renditions: PlaceStreamDefs.Rendition[]; 19 replyToMessage: ChatMessageViewHydrated | null; 20 streamKey: string | null; 21 setStreamKey: (key: string | null) => void; 22 }
··· 15 viewers: number | null; 16 pendingHides: string[]; 17 segment: PlaceStreamSegment.Record | null; 18 + recentSegments: PlaceStreamSegment.Record[]; 19 + problems: LivestreamProblem[]; 20 renditions: PlaceStreamDefs.Rendition[]; 21 replyToMessage: ChatMessageViewHydrated | null; 22 streamKey: string | null; 23 setStreamKey: (key: string | null) => void; 24 } 25 + 26 + export interface LivestreamProblem { 27 + code: string; 28 + message: string; 29 + severity: "error" | "warning" | "info"; 30 + link?: string; 31 + }
+2
js/components/src/livestream-store/livestream-store.tsx
··· 20 streamKey: null, 21 setStreamKey: (sk) => set({ streamKey: sk }), 22 authors: {}, 23 })); 24 }; 25
··· 20 streamKey: null, 21 setStreamKey: (sk) => set({ streamKey: sk }), 22 authors: {}, 23 + recentSegments: [], 24 + problems: [], 25 })); 26 }; 27
+96
js/components/src/livestream-store/problems.tsx
···
··· 1 + import { PlaceStreamSegment } from "streamplace"; 2 + import { LivestreamProblem } from "./livestream-state"; 3 + 4 + const VARIANCE_THRESHOLD = 0.5; 5 + const DURATION_THRESHOLD = 5000000000; // 5s in ns 6 + 7 + const detectVariableSegmentLength = ( 8 + segments: PlaceStreamSegment.Record[], 9 + ): { variable: boolean; duration: boolean } => { 10 + if (segments.length < 3) { 11 + // Need at least 3 segments to detect variability 12 + return { variable: false, duration: false }; 13 + } 14 + 15 + const durations = segments 16 + .map((segment) => segment.duration) 17 + .filter( 18 + (duration): duration is number => duration !== undefined && duration > 0, 19 + ); 20 + 21 + if (durations.length < 3) { 22 + return { variable: false, duration: false }; 23 + } 24 + 25 + // Calculate mean 26 + const mean = 27 + durations.reduce((sum: number, duration: number) => sum + duration, 0) / 28 + durations.length; 29 + 30 + // Calculate standard deviation 31 + const variance = 32 + durations.reduce((sum: number, duration: number) => { 33 + const diff = duration - mean; 34 + return sum + diff * diff; 35 + }, 0) / durations.length; 36 + const stdDev = Math.sqrt(variance); 37 + 38 + // Calculate coefficient of variation (CV) 39 + const cv = stdDev / mean; 40 + 41 + // CV > 0.5 indicates high variability 42 + // This threshold can be adjusted based on testing 43 + return { 44 + variable: cv > VARIANCE_THRESHOLD, 45 + duration: mean > DURATION_THRESHOLD, 46 + }; 47 + }; 48 + 49 + export const findProblems = ( 50 + segments: PlaceStreamSegment.Record[], 51 + ): LivestreamProblem[] => { 52 + const problems: LivestreamProblem[] = []; 53 + let hasBFrames = false; 54 + for (const segment of segments) { 55 + const video = segment.video?.[0]; 56 + if (!video) { 57 + // i mean yes this is a problem but it can't happen yet 58 + continue; 59 + } 60 + if (video.bframes === true) { 61 + hasBFrames = true; 62 + break; 63 + } 64 + } 65 + if (hasBFrames) { 66 + problems.push({ 67 + code: "bframes", 68 + message: 69 + "Your stream contains B-Frames, which are not supported in Streamplace. Your stream will stutter.", 70 + severity: "error", 71 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 72 + }); 73 + } 74 + 75 + const { variable, duration } = detectVariableSegmentLength(segments); 76 + if (variable) { 77 + problems.push({ 78 + code: "variable_segment_length", 79 + message: 80 + "Your stream contains variable segment lengths, which may cause playback issues.", 81 + severity: "warning", 82 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 83 + }); 84 + } 85 + if (duration) { 86 + problems.push({ 87 + code: "long_segments", 88 + message: 89 + "Your stream contains long segments (>5s). This will work fine, but increases the delay of the livestream.", 90 + severity: "warning", 91 + link: "https://stream.place/docs/guides/start-streaming/obs/#obs-configuration", 92 + }); 93 + } 94 + 95 + return problems; 96 + };
+10
js/components/src/livestream-store/websocket-consumer.tsx
··· 11 } from "streamplace"; 12 import { reduceChat } from "./chat"; 13 import { LivestreamState } from "./livestream-state"; 14 15 export const handleWebSocketMessages = ( 16 state: LivestreamState, ··· 40 }; 41 state = reduceChat(state, [hydrated], [], []); 42 } else if (PlaceStreamSegment.isRecord(message)) { 43 state = { 44 ...state, 45 segment: message as PlaceStreamSegment.Record, 46 }; 47 } else if (PlaceStreamDefs.isBlockView(message)) { 48 const block = message as PlaceStreamDefs.BlockView;
··· 11 } from "streamplace"; 12 import { reduceChat } from "./chat"; 13 import { LivestreamState } from "./livestream-state"; 14 + import { findProblems } from "./problems"; 15 + 16 + const MAX_RECENT_SEGMENTS = 10; 17 18 export const handleWebSocketMessages = ( 19 state: LivestreamState, ··· 43 }; 44 state = reduceChat(state, [hydrated], [], []); 45 } else if (PlaceStreamSegment.isRecord(message)) { 46 + const newRecentSegments = [...state.recentSegments]; 47 + newRecentSegments.unshift(message); 48 + if (newRecentSegments.length > MAX_RECENT_SEGMENTS) { 49 + newRecentSegments.pop(); 50 + } 51 state = { 52 ...state, 53 segment: message as PlaceStreamSegment.Record, 54 + recentSegments: newRecentSegments, 55 + problems: findProblems(newRecentSegments), 56 }; 57 } else if (PlaceStreamDefs.isBlockView(message)) { 58 const block = message as PlaceStreamDefs.BlockView;
+1 -1
js/docs/src/content/docs/guides/start-streaming/obs.md
··· 26 6. Click "Generate Stream Key" 27 - The stream key will automatically be copied to your clipboard 28 29 - ### 2. Configure OBS Studio 30 31 #### 2a. Initial OBS Configuration 32
··· 26 6. Click "Generate Stream Key" 27 - The stream key will automatically be copied to your clipboard 28 29 + ### 2. Configure OBS Studio <a name="obs-configuration"></a> 30 31 #### 2a. Initial OBS Configuration 32
+4
js/docs/src/content/docs/lex-reference/place-stream-segment.md
··· 61 | `width` | `integer` | ✅ | | | 62 | `height` | `integer` | ✅ | | | 63 | `framerate` | [`#framerate`](#framerate) | ❌ | | | 64 65 --- 66 ··· 180 "framerate": { 181 "type": "ref", 182 "ref": "#framerate" 183 } 184 } 185 },
··· 61 | `width` | `integer` | ✅ | | | 62 | `height` | `integer` | ✅ | | | 63 | `framerate` | [`#framerate`](#framerate) | ❌ | | | 64 + | `bframes` | `boolean` | ❌ | | | 65 66 --- 67 ··· 181 "framerate": { 182 "type": "ref", 183 "ref": "#framerate" 184 + }, 185 + "bframes": { 186 + "type": "boolean" 187 } 188 } 189 },
+2 -1
lexicons/place/stream/segment.json
··· 67 "framerate": { 68 "type": "ref", 69 "ref": "#framerate" 70 - } 71 } 72 }, 73 "framerate": {
··· 67 "framerate": { 68 "type": "ref", 69 "ref": "#framerate" 70 + }, 71 + "bframes": { "type": "boolean" } 72 } 73 }, 74 "framerate": {
+5 -5
pkg/media/audio_smear.go
··· 137 NeedDataFunc: ReaderNeedData(ctx, input), 138 }) 139 140 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 141 if err != nil { 142 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) ··· 144 audioSink := app.SinkFromElement(audioSinkElem) 145 if audioSink == nil { 146 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 147 - } 148 - 149 - seg := SegmentData{ 150 - Audio: []SegmentBuffer{}, 151 - Video: []SegmentBuffer{}, 152 } 153 154 audioSink.SetCallbacks(&app.SinkCallbacks{
··· 137 NeedDataFunc: ReaderNeedData(ctx, input), 138 }) 139 140 + seg := SegmentData{ 141 + Audio: []SegmentBuffer{}, 142 + Video: []SegmentBuffer{}, 143 + } 144 + 145 audioSinkElem, err := pipeline.GetElementByName("audioappsink") 146 if err != nil { 147 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) ··· 149 audioSink := app.SinkFromElement(audioSinkElem) 150 if audioSink == nil { 151 return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 152 } 153 154 audioSink.SetCallbacks(&app.SinkCallbacks{
+56 -1
pkg/media/media_data_parser.go
··· 21 ctx, cancel := context.WithCancel(ctx) 22 defer cancel() 23 pipelineSlice := []string{ 24 - "appsrc name=appsrc ! qtdemux name=demux ! fakesink sync=false", 25 } 26 27 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) ··· 118 return nil, fmt.Errorf("error connecting pad-add: %w", err) 119 } 120 121 go func() { 122 if err := HandleBusMessages(ctx, pipeline); err != nil { 123 log.Log(ctx, "pipeline error", "error", err) ··· 144 if audioMetadata == nil { 145 return nil, fmt.Errorf("no audio metadata") 146 } 147 148 meta := &model.SegmentMediaData{ 149 Video: []*model.SegmentMediadataVideo{videoMetadata},
··· 21 ctx, cancel := context.WithCancel(ctx) 22 defer cancel() 23 pipelineSlice := []string{ 24 + "appsrc name=appsrc ! qtdemux name=demux", 25 + "demux.video_0 ! queue ! h264parse name=videoparse disable-passthrough=true config-interval=-1 ! h264timestamper ! appsink sync=false name=videoappsink", 26 + "demux.audio_0 ! queue ! opusparse name=audioparse ! appsink sync=false name=audioappsink", 27 } 28 29 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) ··· 120 return nil, fmt.Errorf("error connecting pad-add: %w", err) 121 } 122 123 + audioSinkElem, err := pipeline.GetElementByName("audioappsink") 124 + if err != nil { 125 + return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 126 + } 127 + audioSink := app.SinkFromElement(audioSinkElem) 128 + if audioSink == nil { 129 + return nil, fmt.Errorf("failed to get audioappsink element: %w", err) 130 + } 131 + 132 + audioSink.SetCallbacks(&app.SinkCallbacks{ 133 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 134 + sample := sink.PullSample() 135 + if sample == nil { 136 + return gst.FlowOK 137 + } 138 + 139 + return gst.FlowOK 140 + }, 141 + }) 142 + 143 + videoSinkElem, err := pipeline.GetElementByName("videoappsink") 144 + if err != nil { 145 + return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 146 + } 147 + videoSink := app.SinkFromElement(videoSinkElem) 148 + if videoSink == nil { 149 + return nil, fmt.Errorf("failed to get videoappsink element: %w", err) 150 + } 151 + 152 + hasBFrames := false 153 + videoSink.SetCallbacks(&app.SinkCallbacks{ 154 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 155 + sample := sink.PullSample() 156 + if sample == nil { 157 + return gst.FlowOK 158 + } 159 + 160 + buf := sample.GetBuffer() 161 + pts := buf.PresentationTimestamp().String() 162 + dts := buf.DecodingTimestamp().String() 163 + 164 + if pts != dts { 165 + hasBFrames = true 166 + } else { 167 + log.Log(ctx, "no bframes", "pts", pts, "dts", dts) 168 + } 169 + 170 + return gst.FlowOK 171 + }, 172 + }) 173 + 174 go func() { 175 if err := HandleBusMessages(ctx, pipeline); err != nil { 176 log.Log(ctx, "pipeline error", "error", err) ··· 197 if audioMetadata == nil { 198 return nil, fmt.Errorf("no audio metadata") 199 } 200 + 201 + videoMetadata.BFrames = hasBFrames 202 203 meta := &model.SegmentMediaData{ 204 Video: []*model.SegmentMediadataVideo{videoMetadata},
+19
pkg/media/media_data_parser_test.go
··· 8 9 "github.com/stretchr/testify/require" 10 "stream.place/streamplace/pkg/log" 11 ) 12 13 func TestMediaDataParser(t *testing.T) { ··· 23 mediaData, err := ParseSegmentMediaData(ctx, bs) 24 require.NoError(t, err) 25 require.NotNil(t, mediaData) 26 require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 27 }) 28 }
··· 8 9 "github.com/stretchr/testify/require" 10 "stream.place/streamplace/pkg/log" 11 + "stream.place/streamplace/test/remote" 12 ) 13 14 func TestMediaDataParser(t *testing.T) { ··· 24 mediaData, err := ParseSegmentMediaData(ctx, bs) 25 require.NoError(t, err) 26 require.NotNil(t, mediaData) 27 + require.False(t, mediaData.Video[0].BFrames, "Video should not have BFrames") 28 + require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 29 + }) 30 + } 31 + 32 + func TestMediaDataParserBFrames(t *testing.T) { 33 + withNoGSTLeaks(t, func() { 34 + inputFile, err := os.Open(remote.RemoteFixture("5ea6c4491bade0cdcad3770aa0b63b2cd7a580e233ee320d5bc2282503b26491/segment-with-bframes.mp4")) 35 + require.NoError(t, err) 36 + defer inputFile.Close() 37 + bs, err := io.ReadAll(inputFile) 38 + require.NoError(t, err) 39 + 40 + ctx := log.WithDebugValue(context.Background(), map[string]map[string]int{"GStreamerFunc": {"ParseSegmentMediaData": 9}}) 41 + mediaData, err := ParseSegmentMediaData(ctx, bs) 42 + require.NoError(t, err) 43 + require.NotNil(t, mediaData) 44 + require.True(t, mediaData.Video[0].BFrames, "Video should have BFrames") 45 require.Greater(t, mediaData.Duration, int64(0), "Video duration should not be empty") 46 }) 47 }
+6 -4
pkg/model/segment.go
··· 15 ) 16 17 type SegmentMediadataVideo struct { 18 - Width int `json:"width"` 19 - Height int `json:"height"` 20 - FPSNum int `json:"fpsNum"` 21 - FPSDen int `json:"fpsDen"` 22 } 23 24 type SegmentMediadataAudio struct { ··· 89 Num: int64(s.MediaData.Video[0].FPSNum), 90 Den: int64(s.MediaData.Video[0].FPSDen), 91 }, 92 }, 93 }, 94 Audio: []*streamplace.Segment_Audio{
··· 15 ) 16 17 type SegmentMediadataVideo struct { 18 + Width int `json:"width"` 19 + Height int `json:"height"` 20 + FPSNum int `json:"fpsNum"` 21 + FPSDen int `json:"fpsDen"` 22 + BFrames bool `json:"bframes"` 23 } 24 25 type SegmentMediadataAudio struct { ··· 90 Num: int64(s.MediaData.Video[0].FPSNum), 91 Den: int64(s.MediaData.Video[0].FPSDen), 92 }, 93 + Bframes: &s.MediaData.Video[0].BFrames, 94 }, 95 }, 96 Audio: []*streamplace.Segment_Audio{
+63 -1
pkg/streamplace/cbor_gen.go
··· 1224 } 1225 1226 cw := cbg.NewCborWriter(w) 1227 - fieldCount := 4 1228 1229 if t.Framerate == nil { 1230 fieldCount-- ··· 1298 } else { 1299 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Height-1)); err != nil { 1300 return err 1301 } 1302 } 1303 ··· 1425 } 1426 1427 t.Height = int64(extraI) 1428 } 1429 // t.Framerate (streamplace.Segment_Framerate) (struct) 1430 case "framerate":
··· 1224 } 1225 1226 cw := cbg.NewCborWriter(w) 1227 + fieldCount := 5 1228 + 1229 + if t.Bframes == nil { 1230 + fieldCount-- 1231 + } 1232 1233 if t.Framerate == nil { 1234 fieldCount-- ··· 1302 } else { 1303 if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Height-1)); err != nil { 1304 return err 1305 + } 1306 + } 1307 + 1308 + // t.Bframes (bool) (bool) 1309 + if t.Bframes != nil { 1310 + 1311 + if len("bframes") > 1000000 { 1312 + return xerrors.Errorf("Value in field \"bframes\" was too long") 1313 + } 1314 + 1315 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("bframes"))); err != nil { 1316 + return err 1317 + } 1318 + if _, err := cw.WriteString(string("bframes")); err != nil { 1319 + return err 1320 + } 1321 + 1322 + if t.Bframes == nil { 1323 + if _, err := cw.Write(cbg.CborNull); err != nil { 1324 + return err 1325 + } 1326 + } else { 1327 + if err := cbg.WriteBool(w, *t.Bframes); err != nil { 1328 + return err 1329 + } 1330 } 1331 } 1332 ··· 1454 } 1455 1456 t.Height = int64(extraI) 1457 + } 1458 + // t.Bframes (bool) (bool) 1459 + case "bframes": 1460 + 1461 + { 1462 + b, err := cr.ReadByte() 1463 + if err != nil { 1464 + return err 1465 + } 1466 + if b != cbg.CborNull[0] { 1467 + if err := cr.UnreadByte(); err != nil { 1468 + return err 1469 + } 1470 + 1471 + maj, extra, err = cr.ReadHeader() 1472 + if err != nil { 1473 + return err 1474 + } 1475 + if maj != cbg.MajOther { 1476 + return fmt.Errorf("booleans must be major type 7") 1477 + } 1478 + 1479 + var val bool 1480 + switch extra { 1481 + case 20: 1482 + val = false 1483 + case 21: 1484 + val = true 1485 + default: 1486 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 1487 + } 1488 + t.Bframes = &val 1489 + } 1490 } 1491 // t.Framerate (streamplace.Segment_Framerate) (struct) 1492 case "framerate":
+1
pkg/streamplace/streamsegment.go
··· 48 49 // Segment_Video is a "video" in the place.stream.segment schema. 50 type Segment_Video struct { 51 Codec string `json:"codec" cborgen:"codec"` 52 Framerate *Segment_Framerate `json:"framerate,omitempty" cborgen:"framerate,omitempty"` 53 Height int64 `json:"height" cborgen:"height"`
··· 48 49 // Segment_Video is a "video" in the place.stream.segment schema. 50 type Segment_Video struct { 51 + Bframes *bool `json:"bframes,omitempty" cborgen:"bframes,omitempty"` 52 Codec string `json:"codec" cborgen:"codec"` 53 Framerate *Segment_Framerate `json:"framerate,omitempty" cborgen:"framerate,omitempty"` 54 Height int64 `json:"height" cborgen:"height"`