Live video on the AT Protocol

Merge pull request #934 from streamplace/natb/audio-only

feat: audio only mode

authored by

natalie and committed by
GitHub
9487d19f fbd01bd1

+274 -34
+4 -1
js/app/components/mobile/desktop-ui.tsx
··· 70 70 const embedded = usePlayerStore((state) => state.embedded); 71 71 72 72 const fullscreen = usePlayerStore((state) => state.fullscreen); 73 + const selectedRendition = usePlayerStore((state) => state.selectedRendition); 73 74 74 75 const safeAreaInsets = embedded 75 76 ? { ...originalSafeAreaInsets, top: 0 } ··· 92 93 if (fadeTimeout.current) clearTimeout(fadeTimeout.current); 93 94 setIsControlsVisible(true); 94 95 96 + if (selectedRendition === "audio") return; 97 + 95 98 fadeTimeout.current = setTimeout(() => { 96 99 fadeOpacity.value = withTiming(0, { duration: 400 }); 97 100 setIsControlsVisible(false); 98 101 }, FADE_OUT_DELAY); 99 - }, [fadeOpacity]); 102 + }, [fadeOpacity, selectedRendition]); 100 103 101 104 const onPlayerHover = useCallback(() => { 102 105 resetFadeTimer();
+2
js/app/components/mobile/ui.tsx
··· 99 99 const FADE_OUT_DELAY = 4000; 100 100 const fadeOpacity = useSharedValue(1); 101 101 const fadeTimeout = useRef<NodeJS.Timeout | null>(null); 102 + const selectedRendition = usePlayerStore((state) => state.selectedRendition); 102 103 103 104 const resetFadeTimer = () => { 104 105 fadeOpacity.value = withTiming(1, { duration: 200 }); 105 106 if (fadeTimeout.current) clearTimeout(fadeTimeout.current); 107 + if (selectedRendition === "audio") return; 106 108 fadeTimeout.current = setTimeout(() => { 107 109 fadeOpacity.value = withTiming(0, { duration: 400 }); 108 110 }, FADE_OUT_DELAY);
+3
js/components/src/components/mobile-player/fullscreen.native.tsx
··· 15 15 usePlayerStore, 16 16 VideoRetry, 17 17 } from "../.."; 18 + import { AudioOnlyOverlay } from "./ui/audio-only-overlay"; 18 19 import Video from "./video.native"; 19 20 20 21 // Standard 16:9 video aspect ratio ··· 166 167 objectFit={props.objectFit} 167 168 pictureInPictureEnabled={props.pictureInPictureEnabled} 168 169 /> 170 + <AudioOnlyOverlay /> 169 171 <DanmuOverlay 170 172 enabled={danmuEnabled} 171 173 opacity={danmuOpacity} ··· 188 190 pictureInPictureEnabled={props.pictureInPictureEnabled} 189 191 /> 190 192 </VideoRetry> 193 + <AudioOnlyOverlay /> 191 194 <DanmuOverlay 192 195 enabled={danmuEnabled} 193 196 opacity={danmuOpacity}
+2
js/components/src/components/mobile-player/fullscreen.tsx
··· 11 11 usePlayerStore, 12 12 } from "../.."; 13 13 import { View } from "../../components/ui"; 14 + import { AudioOnlyOverlay } from "./ui/audio-only-overlay"; 14 15 import Video from "./video"; 15 16 import VideoRetry from "./video-retry"; 16 17 ··· 105 106 pictureInPictureEnabled={props.pictureInPictureEnabled} 106 107 /> 107 108 </VideoRetry> 109 + <AudioOnlyOverlay /> 108 110 <DanmuOverlay 109 111 enabled={danmuEnabled} 110 112 opacity={danmuOpacity}
+48
js/components/src/components/mobile-player/ui/audio-only-overlay.tsx
··· 1 + import { Volume2 } from "lucide-react-native"; 2 + import { zero } from "../../.."; 3 + import { usePlayerStore } from "../../../player-store"; 4 + import { Text, View } from "../../ui"; 5 + 6 + export function AudioOnlyOverlay() { 7 + const selectedRendition = usePlayerStore((x) => x.selectedRendition); 8 + const setSelectedRendition = usePlayerStore((x) => x.setSelectedRendition); 9 + 10 + if (selectedRendition !== "audio") { 11 + return null; 12 + } 13 + 14 + return ( 15 + <View 16 + style={[ 17 + zero.layout.position.absolute, 18 + zero.position.top[0], 19 + zero.position.left[0], 20 + zero.position.right[0], 21 + zero.position.bottom[0], 22 + zero.layout.flex.center, 23 + ]} 24 + > 25 + <View 26 + style={[ 27 + zero.layout.flex.column, 28 + zero.layout.flex.alignCenter, 29 + zero.gap.all[3], 30 + zero.px[6], 31 + ]} 32 + > 33 + <Volume2 color="#fff" size={48} /> 34 + <Text size="lg" weight="semibold" center> 35 + Audio Only mode 36 + </Text> 37 + <Text 38 + size="sm" 39 + color="muted" 40 + center 41 + onPress={() => setSelectedRendition("source")} 42 + > 43 + Go to Settings &gt; Quality to switch back to video. 44 + </Text> 45 + </View> 46 + </View> 47 + ); 48 + }
+1
js/components/src/components/mobile-player/ui/index.ts
··· 1 + export * from "./audio-only-overlay"; 1 2 export * from "./autoplay-button"; 2 3 export * from "./countdown"; 3 4 export * from "./input";
+4 -2
js/components/src/components/mobile-player/ui/viewer-context-menu.tsx
··· 250 250 <Text muted size={isMobile ? "base" : "sm"}> 251 251 {quality === "source" 252 252 ? `Source${resolutionDisplay ? " " + resolutionDisplay + "\n" : ", "}` 253 - : quality} 253 + : quality === "audio" 254 + ? `Audio Only\n` 255 + : quality} 254 256 {lowLatency ? "Low Latency" : ""} 255 257 </Text> 256 258 </View> ··· 266 268 </DropdownMenuRadioItem> 267 269 {qualities.map((r) => ( 268 270 <DropdownMenuRadioItem key={r.name} value={r.name}> 269 - <Text>{r.name}</Text> 271 + <Text>{r.name === "audio" ? "Audio Only" : r.name}</Text> 270 272 </DropdownMenuRadioItem> 271 273 ))} 272 274 </DropdownMenuRadioGroup>
+15 -2
js/components/src/player-store/player-store.tsx
··· 20 20 id: id || Math.random().toString(36).slice(8), 21 21 selectedRendition: "source", 22 22 setSelectedRendition: (rendition: string) => 23 - set((state) => ({ ...state, selectedRendition: rendition })), 23 + set((state) => { 24 + if (rendition === "audio" && state.controlsTimeout) { 25 + clearTimeout(state.controlsTimeout); 26 + return { 27 + ...state, 28 + selectedRendition: rendition, 29 + showControls: true, 30 + controlsTimeout: undefined, 31 + }; 32 + } 33 + return { ...state, selectedRendition: rendition }; 34 + }), 24 35 protocol: PlayerProtocol.WEBRTC, 25 36 setProtocol: (protocol: PlayerProtocol) => 26 37 set((state) => ({ ...state, protocol: protocol })), ··· 167 178 168 179 setUserInteraction: () => 169 180 set((p) => { 170 - // controls timeout 171 181 if (p.controlsTimeout) { 172 182 clearTimeout(p.controlsTimeout); 183 + } 184 + if (p.selectedRendition === "audio") { 185 + return { showControls: true, controlsTimeout: undefined }; 173 186 } 174 187 let controlsTimeout = setTimeout(() => p.setShowControls(false), 1000); 175 188 return { showControls: true, controlsTimeout };
+11 -7
pkg/api/websocket.go
··· 194 194 return 195 195 } 196 196 initialBurst <- spSeg 197 + outRs := streamplace.Defs_Renditions{ 198 + LexiconTypeID: "place.stream.defs#renditions", 199 + Renditions: []*streamplace.Defs_Rendition{}, 200 + } 197 201 if a.CLI.LivepeerGatewayURL != "" { 198 - renditions, err := renditions.GenerateRenditions(spSeg) 202 + videoRenditions, err := renditions.GenerateRenditions(spSeg) 199 203 if err != nil { 200 204 log.Error(ctx, "could not generate renditions", "error", err) 201 205 return 202 206 } 203 - outRs := streamplace.Defs_Renditions{ 204 - LexiconTypeID: "place.stream.defs#renditions", 205 - } 206 - outRs.Renditions = []*streamplace.Defs_Rendition{} 207 - for _, r := range renditions { 207 + for _, r := range videoRenditions { 208 208 outRs.Renditions = append(outRs.Renditions, &streamplace.Defs_Rendition{ 209 209 LexiconTypeID: "place.stream.defs#rendition", 210 210 Name: r.Name, 211 211 }) 212 212 } 213 - initialBurst <- outRs 214 213 } 214 + outRs.Renditions = append(outRs.Renditions, &streamplace.Defs_Rendition{ 215 + LexiconTypeID: "place.stream.defs#rendition", 216 + Name: renditions.AudioRendition.Name, 217 + }) 218 + initialBurst <- outRs 215 219 }() 216 220 217 221 go func() {
+30
pkg/director/stream_session.go
··· 101 101 Height: spseg.Video[0].Height, 102 102 } 103 103 allRenditions = append([]renditions.Rendition{sourceRendition}, allRenditions...) 104 + allRenditions = append(allRenditions, renditions.AudioRendition) 104 105 ss.hls = media.NewM3U8(allRenditions) 105 106 106 107 close(ss.started) ··· 193 194 194 195 ss.Go(ctx, func() error { 195 196 return ss.statefulDB.UpsertBroadcastOrigin(spseg.Creator, ss.cli.BroadcasterDID(), time.Now()) 197 + }) 198 + 199 + ss.Go(ctx, func() error { 200 + return ss.AddAudioOnlyHLSSegment(ctx, spseg, notif.Data) 196 201 }) 197 202 198 203 if ss.cli.Thumbnail { ··· 777 782 return fmt.Errorf("failed to create new segment: %w", err) 778 783 } 779 784 785 + return nil 786 + } 787 + 788 + func (ss *StreamSession) AddAudioOnlyHLSSegment(ctx context.Context, spseg *streamplace.Segment, data []byte) error { 789 + buf := bytes.Buffer{} 790 + dur, err := media.MP4ToMPEGTSAudioOnly(ctx, bytes.NewReader(data), &buf) 791 + if err != nil { 792 + return fmt.Errorf("failed to convert MP4 to audio-only MPEG-TS: %w", err) 793 + } 794 + aqt, err := aqtime.FromString(spseg.StartTime) 795 + if err != nil { 796 + return fmt.Errorf("failed to parse segment start time: %w", err) 797 + } 798 + log.Debug(ctx, "transmuxed to audio-only mpegts, adding to hls", "size", buf.Len()) 799 + rend, err := ss.hls.GetRendition(renditions.AudioRendition.Name) 800 + if err != nil { 801 + return fmt.Errorf("failed to get audio rendition: %w", err) 802 + } 803 + if err := rend.NewSegment(&media.Segment{ 804 + Buf: &buf, 805 + Duration: time.Duration(dur), 806 + Time: aqt.Time(), 807 + }); err != nil { 808 + return fmt.Errorf("failed to create new audio segment: %w", err) 809 + } 780 810 return nil 781 811 } 782 812
+5 -2
pkg/media/m3u8.go
··· 61 61 } 62 62 63 63 func (r *M3U8Rendition) GetMediaLine(session string) string { 64 - // m.waitForStart() 65 64 lines := []string{} 66 65 lines = append(lines, "#EXTM3U") 67 - lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d", r.Rendition.Bitrate, r.Rendition.Width, r.Rendition.Height)) 66 + if r.Rendition.AudioOnly { 67 + lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d", r.Rendition.Bitrate)) 68 + } else { 69 + lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d", r.Rendition.Bitrate, r.Rendition.Width, r.Rendition.Height)) 70 + } 68 71 lines = append(lines, fmt.Sprintf("%s/%s?session=%s", r.Rendition.Name, IndexM3U8, session)) 69 72 return strings.Join(lines, "\n") 70 73 }
+110
pkg/media/segment_conv.go
··· 150 150 return dur, busErr 151 151 } 152 152 153 + // MP4ToMPEGTSAudioOnly converts an MP4 file to an audio-only MPEG-TS file, discarding video. 154 + func MP4ToMPEGTSAudioOnly(ctx context.Context, input io.Reader, output io.Writer) (int64, error) { 155 + ctx = log.WithLogValues(ctx, "func", "MP4ToMPEGTSAudioOnly") 156 + pipelineStr := strings.Join([]string{ 157 + "appsrc name=appsrc ! qtdemux name=demux", 158 + "mpegtsmux name=mux ! appsink name=appsink sync=false", 159 + "demux.audio_0 ! opusdec name=audioparse ! audioresample ! audiorate ! fdkaacenc name=audioenc ! queue name=audioqueue", 160 + }, " ") 161 + 162 + pipeline, err := gst.NewPipelineFromString(pipelineStr) 163 + if err != nil { 164 + return 0, err 165 + } 166 + 167 + mux, err := pipeline.GetElementByName("mux") 168 + if err != nil { 169 + return 0, err 170 + } 171 + muxAudioSinkPad := mux.GetRequestPad("sink_%d") 172 + if muxAudioSinkPad == nil { 173 + return 0, fmt.Errorf("failed to get audio sink pad") 174 + } 175 + audioQueue, err := pipeline.GetElementByName("audioqueue") 176 + if err != nil { 177 + return 0, err 178 + } 179 + audioQueueSrcPad := audioQueue.GetStaticPad("src") 180 + if audioQueueSrcPad == nil { 181 + return 0, fmt.Errorf("failed to get audio queue source pad") 182 + } 183 + ok := audioQueueSrcPad.Link(muxAudioSinkPad) 184 + if ok != gst.PadLinkOK { 185 + return 0, fmt.Errorf("failed to link audio queue source pad to mux audio sink pad: %v", ok) 186 + } 187 + 188 + appsrc, err := pipeline.GetElementByName("appsrc") 189 + if err != nil { 190 + return 0, err 191 + } 192 + appsink, err := pipeline.GetElementByName("appsink") 193 + if err != nil { 194 + return 0, err 195 + } 196 + 197 + source := app.SrcFromElement(appsrc) 198 + sink := app.SinkFromElement(appsink) 199 + 200 + source.SetCallbacks(&app.SourceCallbacks{ 201 + NeedDataFunc: ReaderNeedDataIncremental(ctx, input), 202 + EnoughDataFunc: func(self *app.Source) { 203 + }, 204 + SeekDataFunc: func(self *app.Source, offset uint64) bool { 205 + return false 206 + }, 207 + }) 208 + 209 + sink.SetCallbacks(&app.SinkCallbacks{ 210 + NewSampleFunc: WriterNewSample(ctx, output), 211 + NewPrerollFunc: func(self *app.Sink) gst.FlowReturn { 212 + return gst.FlowOK 213 + }, 214 + }) 215 + 216 + ctx, cancel := context.WithCancel(ctx) 217 + defer func() { 218 + cancel() 219 + err = pipeline.SetState(gst.StateNull) 220 + if err != nil { 221 + log.Error(ctx, "failed to set pipeline state to null", "error", err) 222 + } 223 + }() 224 + 225 + go func() { 226 + select { 227 + case <-ctx.Done(): 228 + return 229 + case <-time.After(time.Second * 10): 230 + log.Debug(ctx, "pipeline is taking too long to start, cancelling") 231 + err := fmt.Errorf("pipeline is taking too long to start, cancelling") 232 + pipeline.Error(err.Error(), err) 233 + } 234 + }() 235 + 236 + errCh := make(chan error) 237 + go func() { 238 + err := HandleBusMessages(ctx, pipeline) 239 + cancel() 240 + errCh <- err 241 + close(errCh) 242 + }() 243 + 244 + err = pipeline.SetState(gst.StatePlaying) 245 + if err != nil { 246 + return 0, fmt.Errorf("failed to set pipeline state to playing: %w", err) 247 + } 248 + 249 + var durOk bool 250 + var dur int64 251 + busErr := <-errCh 252 + 253 + if busErr == nil { 254 + durOk, dur = pipeline.QueryDuration(gst.FormatTime) 255 + if !durOk { 256 + return 0, fmt.Errorf("failed to query duration") 257 + } 258 + } 259 + 260 + return dur, busErr 261 + } 262 + 153 263 // MPEGTSToMP4 converts an MPEG-TS file with H264 video and Opus audio to an MP4 file. 154 264 // It reads from the provided reader and writes the converted MP4 to the writer. 155 265 func MPEGTSToMP4(ctx context.Context, input io.Reader, output io.Writer) error {
+31 -19
pkg/media/webrtc_playback2.go
··· 11 11 "golang.org/x/sync/errgroup" 12 12 "stream.place/streamplace/pkg/bus" 13 13 "stream.place/streamplace/pkg/log" 14 + "stream.place/streamplace/pkg/renditions" 14 15 ) 15 16 16 17 // This function remains in scope for the duration of a single users' playback ··· 28 29 return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 29 30 } 30 31 31 - videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 32 - if err != nil { 33 - return nil, fmt.Errorf("failed to create video track: %w", err) 34 - } 35 - videoRTPSender, err := peerConnection.AddTrack(videoTrack) 36 - if err != nil { 37 - return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 32 + audioOnly := rendition == renditions.AudioRendition.Name 33 + 34 + var videoTrack *webrtc.TrackLocalStaticSample 35 + var videoRTPSender *webrtc.RTPSender 36 + if !audioOnly { 37 + videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 38 + if err != nil { 39 + return nil, fmt.Errorf("failed to create video track: %w", err) 40 + } 41 + videoRTPSender, err = peerConnection.AddTrack(videoTrack) 42 + if err != nil { 43 + return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 44 + } 38 45 } 39 46 40 47 audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") ··· 84 91 85 92 packetQueue := make(chan *bus.PacketizedSegment, 1024) 86 93 go func() { 87 - segChan := mm.bus.SubscribeSegmentBuf(ctx, user, rendition, 2) 88 - defer mm.bus.UnsubscribeSegment(ctx, user, rendition, segChan) 94 + busRendition := rendition 95 + if audioOnly { 96 + busRendition = "source" 97 + } 98 + segChan := mm.bus.SubscribeSegmentBuf(ctx, user, busRendition, 2) 99 + defer mm.bus.UnsubscribeSegment(ctx, user, busRendition, segChan) 89 100 for { 90 101 select { 91 102 case <-ctx.Done(): ··· 131 142 } 132 143 g, _ := errgroup.WithContext(ctx) 133 144 134 - if videoDur > 0 { 145 + if !audioOnly && videoDur > 0 { 135 146 g.Go(func() error { 136 147 ticker := time.NewTicker(time.Duration(float64(videoDur) * (1 / scalar))) 137 148 defer ticker.Stop() 138 149 for _, video := range packet.Video { 139 - // log.Log(ctx, "writing video sample", "duration", videoDur) 140 150 err := videoTrack.WriteSample(media.Sample{Data: video, Duration: videoDur}) 141 151 if err != nil { 142 152 return fmt.Errorf("failed to write video sample: %w", err) ··· 151 161 } 152 162 return nil 153 163 }) 154 - } else { 164 + } else if !audioOnly { 155 165 log.Warn(ctx, "no video samples to write") 156 166 } 157 167 if audioDur > 0 { ··· 187 197 mm.IncrementViewerCount(user, "webrtc") 188 198 defer mm.DecrementViewerCount(user, "webrtc") 189 199 190 - go func() { 191 - rtcpBuf := make([]byte, 1500) 192 - for { 193 - if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 194 - return 200 + if !audioOnly { 201 + go func() { 202 + rtcpBuf := make([]byte, 1500) 203 + for { 204 + if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 205 + return 206 + } 195 207 } 196 - } 197 - }() 208 + }() 209 + } 198 210 199 211 go func() { 200 212 rtcpBuf := make([]byte, 1500)
+7
pkg/renditions/renditions.go
··· 21 21 Profile string 22 22 Name string 23 23 Parent *Rendition 24 + AudioOnly bool 25 + } 26 + 27 + var AudioRendition = Rendition{ 28 + Name: "audio", 29 + Bitrate: 128_000, 30 + AudioOnly: true, 24 31 } 25 32 26 33 type JSONProfile struct {
+1 -1
pkg/spxrpc/place_stream_badge.go
··· 18 18 } 19 19 20 20 // Get valid badges using shared badge logic 21 - badgeList, err := badges.GetValidBadges(ctx, session.DID, streamer, s.cli.MyDID(), s.model) 21 + badgeList, err := badges.GetValidBadges(ctx, session.DID, streamer, s.cli.BroadcasterDID(), s.model) 22 22 if err != nil { 23 23 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to get valid badges") 24 24 }