Live video on the AT Protocol

add WebRTC ingest

See merge request aquareum-tv/aquareum!80

Changelog: feature

+1031 -274
+4 -1
Makefile
··· 150 150 -D "gst-plugins-base:audioconvert=enabled" \ 151 151 -D "gst-plugins-good:matroska=enabled" \ 152 152 -D "gst-plugins-good:multifile=enabled" \ 153 + -D "gst-plugins-good:rtp=enabled" \ 153 154 -D "gst-plugins-bad:fdkaac=enabled" \ 154 155 -D "gst-plugins-good:audioparsers=enabled" \ 155 156 -D "gst-plugins-good:isomp4=enabled" \ ··· 159 160 -D "gst-plugins-good:audioparsers=enabled" \ 160 161 -D "gst-plugins-bad:videoparsers=enabled" \ 161 162 -D "gst-plugins-bad:mpegtsmux=enabled" \ 163 + -D "gst-plugins-bad:codectimestamper=enabled" \ 164 + -D "gst-plugins-bad:opus=enabled" \ 162 165 -D "gst-plugins-ugly:x264=enabled" \ 163 166 -D "gst-plugins-ugly:gpl=enabled" \ 164 167 -D "x264:asm=enabled" \ 165 168 -D "gstreamer-full:gst-full=enabled" \ 166 - -D "gstreamer-full:gst-full-plugins=libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstplayback.a;libgsttypefindfunctions.a" \ 169 + -D "gstreamer-full:gst-full-plugins=libgstopusparse.a;libgstcodectimestamper.a;libgstrtp.a;libgstaudioresample.a;libgstlibav.a;libgstmatroska.a;libgstmultifile.a;libgstjpeg.a;libgstaudiotestsrc.a;libgstaudioconvert.a;libgstaudioparsers.a;libgstfdkaac.a;libgstisomp4.a;libgstapp.a;libgstvideoconvertscale.a;libgstvideobox.a;libgstvideorate.a;libgstpng.a;libgstcompositor.a;libgstx264.a;libgstopus.a;libgstvideotestsrc.a;libgstvideoparsersbad.a;libgstaudioparsers.a;libgstmpegtsmux.a;libgstplayback.a;libgsttypefindfunctions.a" \ 167 170 -D "gstreamer-full:gst-full-libraries=gstreamer-controller-1.0,gstreamer-plugins-base-1.0,gstreamer-pbutils-1.0" \ 168 171 -D "gstreamer-full:gst-full-target-type=static_library" \ 169 172 -D "gstreamer-full:gst-full-elements=coreelements:concat,filesrc,queue,queue2,multiqueue,typefind,tee,capsfilter,fakesink" \
+61 -5
js/app/components/player/controls.tsx
··· 17 17 import { useEffect, useRef, useState } from "react"; 18 18 import { Animated, Pressable } from "react-native"; 19 19 import { 20 + Button, 20 21 Adapt, 22 + H3, 21 23 ListItem, 22 24 Popover, 23 25 Separator, ··· 34 36 PROTOCOL_PROGRESSIVE_WEBM, 35 37 PROTOCOL_WEBRTC, 36 38 } from "./props"; 39 + import { selectPlayer, usePlayerActions } from "features/player/playerSlice"; 40 + import { useAppDispatch, useAppSelector } from "store/hooks"; 41 + import Loading from "components/loading/loading"; 37 42 38 43 const Bar = (props) => ( 39 44 <XStack ··· 41 46 backgroundColor="rgba(0,0,0,0.8)" 42 47 justifyContent="space-between" 43 48 flex-direction="row" 49 + opacity={props.opacity} 50 + animation="quick" 51 + animateOnly={["opacity"]} 44 52 > 45 53 {props.children} 46 54 </XStack> ··· 81 89 zIndex={999} 82 90 flexDirection="column" 83 91 justifyContent="space-between" 84 - animation="quick" 85 - animateOnly={["opacity"]} 86 - opacity={props.showControls ? 1 : 0} 87 92 onPointerMove={props.userInteraction} 88 93 onTouchStart={props.userInteraction} 89 94 onPress={onPress} ··· 101 106 justifyContent: "space-between", 102 107 }} 103 108 > */} 104 - <Bar> 109 + <Bar opacity={props.showControls ? 1 : 0}> 105 110 <Part> 106 111 <View justifyContent="center" paddingLeft="$5"> 107 112 <Text>{props.name}</Text> ··· 109 114 </Part> 110 115 <Part>{/* <Text>Top Right</Text> */}</Part> 111 116 </Bar> 112 - <Bar> 117 + {props.ingest && <LiveBubble />} 118 + <Bar opacity={props.showControls ? 1 : 0}> 113 119 <Part> 114 120 <Pressable 115 121 style={{ ··· 204 210 </Popover.Content> 205 211 </Popover> 206 212 ); 213 + } 214 + 215 + function LiveBubble() { 216 + const player = useAppSelector(selectPlayer); 217 + const dispatch = useAppDispatch(); 218 + const { startIngest } = usePlayerActions(); 219 + return ( 220 + <View 221 + position="absolute" 222 + bottom={100} 223 + alignItems="center" 224 + justifyContent="center" 225 + width="100%" 226 + > 227 + <Button 228 + backgroundColor="rgba(0,0,0,0.9)" 229 + borderRadius={9999999999} 230 + padding="$2" 231 + paddingLeft="$3" 232 + paddingRight="$3" 233 + onPress={() => { 234 + dispatch(startIngest(!player.ingestStarting)); 235 + }} 236 + > 237 + <LiveBubbleText /> 238 + </Button> 239 + </View> 240 + ); 241 + } 242 + 243 + function LiveBubbleText() { 244 + const player = useAppSelector(selectPlayer); 245 + if (!player.ingestStarting) { 246 + return <H3>START STREAMING</H3>; 247 + } 248 + if (player.ingestConnectionState === "connected") { 249 + return ( 250 + <> 251 + <H3>LIVE</H3> 252 + <View 253 + backgroundColor="red" 254 + width={15} 255 + height={15} 256 + borderRadius={9999999999} 257 + marginLeft="$2" 258 + ></View> 259 + </> 260 + ); 261 + } 262 + return <Loading />; 207 263 } 208 264 209 265 function GearMenu(props: PlayerProps) {
+27 -1
js/app/components/player/player.tsx
··· 11 11 PlayerStatusTracker, 12 12 PROTOCOL_WEBRTC, 13 13 } from "./props"; 14 + import { newPlayer, PlayerContext } from "features/player/playerSlice"; 15 + import { useAppDispatch } from "store/hooks"; 14 16 15 17 const HIDE_CONTROLS_AFTER = 2000; 16 18 19 + // basically PlayerProvider that sets up our magic context, 20 + // PlayerInner starts doing player stuff 17 21 export function Player(props: Partial<PlayerProps>) { 22 + const dispatch = useAppDispatch(); 23 + const [playerId, setPlayerId] = useState<string | null>(null); 24 + useEffect(() => { 25 + const newPlayerAction = newPlayer(); 26 + if (props.playerId) { 27 + newPlayerAction.payload.playerId = props.playerId; 28 + } 29 + setPlayerId(newPlayerAction.payload.playerId); 30 + dispatch(newPlayerAction); 31 + }, []); 32 + if (!playerId) { 33 + return <></>; 34 + } 35 + return ( 36 + <PlayerContext.Provider value={{ playerId }}> 37 + <PlayerInner {...props} /> 38 + </PlayerContext.Provider> 39 + ); 40 + } 41 + 42 + export function PlayerInner(props: Partial<PlayerProps>) { 18 43 if (typeof props.src !== "string") { 19 44 return ( 20 45 <View> ··· 83 108 const [fullscreen, setFullscreen] = useState(false); 84 109 const childProps: PlayerProps = { 85 110 playerId: playerId, 86 - name: props.name || props.src, 111 + ingest: props.ingest, 112 + name: props.ingest ? "Go Live" : props.name || props.src, 87 113 telemetry: props.telemetry ?? false, 88 114 src: props.src, 89 115 muted: muted,
+1
js/app/components/player/props.tsx
··· 22 22 setStatus: (status: PlayerStatus) => void; 23 23 playTime: number; 24 24 setPlayTime: (playTime: number) => void; 25 + ingest?: boolean; 25 26 }; 26 27 27 28 export type PlayerEvent = {
+37
js/app/components/player/use-webrtc.tsx
··· 1 1 import { useEffect, useState } from "react"; 2 2 import { RTCPeerConnection, RTCSessionDescription } from "./webrtc-primitives"; 3 + import { usePlayerActions } from "features/player/playerSlice"; 4 + import { useAppDispatch } from "store/hooks"; 3 5 4 6 export default function useWebRTC(endpoint: string) { 5 7 const [mediaStream, setMediaStream] = useState<MediaStream | null>(null); ··· 137 139 }); 138 140 }); 139 141 } 142 + 143 + export function useWebRTCIngest( 144 + endpoint: string, 145 + ): [MediaStream | null, (MediaStream) => void] { 146 + const [mediaStream, setMediaStream] = useState<MediaStream | null>(null); 147 + const { ingestConnectionState } = usePlayerActions(); 148 + const dispatch = useAppDispatch(); 149 + useEffect(() => { 150 + if (!mediaStream) { 151 + return; 152 + } 153 + console.log("creating peer connection"); 154 + const peerConnection = new RTCPeerConnection({ 155 + bundlePolicy: "max-bundle", 156 + }); 157 + for (const track of mediaStream.getTracks()) { 158 + peerConnection.addTrack(track, mediaStream); 159 + } 160 + peerConnection.addEventListener("connectionstatechange", (ev) => { 161 + dispatch(ingestConnectionState(peerConnection.connectionState)); 162 + console.log("connection state change", peerConnection.connectionState); 163 + if (peerConnection.connectionState !== "connected") { 164 + return; 165 + } 166 + }); 167 + peerConnection.addEventListener("negotiationneeded", (ev) => { 168 + negotiateConnectionWithClientOffer(peerConnection, endpoint); 169 + }); 170 + 171 + return () => { 172 + peerConnection.close(); 173 + }; 174 + }, [endpoint, mediaStream]); 175 + return [mediaStream, setMediaStream]; 176 + }
+69 -3
js/app/components/player/video.tsx
··· 17 17 PROTOCOL_WEBRTC, 18 18 } from "./props"; 19 19 import { srcToUrl } from "./shared"; 20 - import useWebRTC from "./use-webrtc"; 20 + import useWebRTC, { useWebRTCIngest } from "./use-webrtc"; 21 + import useAquareumNode from "hooks/useAquareumNode"; 22 + import { selectPlayer } from "features/player/playerSlice"; 23 + import { useAppSelector } from "store/hooks"; 21 24 22 25 type VideoProps = PlayerProps & { url: string }; 23 26 ··· 33 36 props.videoRef.current.play(); 34 37 } 35 38 }, [props.playTime]); 39 + if (props.ingest) { 40 + return <WebcamIngestPlayer url={url} {...props} />; 41 + } 42 + console.log("protocol", protocol); 36 43 if (protocol === PROTOCOL_PROGRESSIVE_MP4) { 37 44 return <ProgressiveMP4Player url={url} {...props} />; 38 45 } else if (protocol === PROTOCOL_PROGRESSIVE_WEBM) { ··· 67 74 }; 68 75 69 76 useEffect(() => { 70 - console.log("video mounted"); 71 77 return () => { 72 78 props.setStatus(PlayerStatus.START); 73 79 }; ··· 85 91 playsInline={true} 86 92 ref={ref} 87 93 controls={false} 88 - src={props.url} 94 + src={props.ingest ? undefined : props.url} 89 95 muted={props.muted} 90 96 crossOrigin="anonymous" 91 97 onMouseMove={props.userInteraction} ··· 118 124 backgroundColor: "transparent", 119 125 width: "100%", 120 126 height: "100%", 127 + // transform: props.ingest ? "scaleX(-1)" : undefined, 121 128 }} 122 129 /> 123 130 </View> ··· 201 208 202 209 return <VideoElement {...props} ref={handleRef} />; 203 210 } 211 + 212 + export function WebcamIngestPlayer( 213 + props: VideoProps & { videoRef: RefObject<HTMLVideoElement> }, 214 + ) { 215 + const player = useAppSelector(selectPlayer); 216 + const [videoElement, setVideoElement] = useState<HTMLVideoElement | null>( 217 + null, 218 + ); 219 + const handleRef = useCallback((node: HTMLVideoElement | null) => { 220 + if (node) { 221 + setVideoElement(node); 222 + } 223 + }, []); 224 + 225 + const { url } = useAquareumNode(); 226 + const [localMediaStream, setLocalMediaStream] = useState<MediaStream | null>( 227 + null, 228 + ); 229 + const [remoteMediaStream, setRemoteMediaStream] = useWebRTCIngest( 230 + `${url}/api/ingest/webrtc`, 231 + ); 232 + 233 + useEffect(() => { 234 + navigator.mediaDevices 235 + .getUserMedia({ 236 + audio: true, 237 + video: { 238 + width: { min: 200, ideal: 1920, max: 3840 }, 239 + height: { min: 200, ideal: 1080, max: 2160 }, 240 + }, 241 + }) 242 + .then((stream) => { 243 + setLocalMediaStream(stream); 244 + }); 245 + }, []); 246 + 247 + useEffect(() => { 248 + if (!player.ingestStarting) { 249 + setRemoteMediaStream(null); 250 + return; 251 + } 252 + if (!localMediaStream) { 253 + return; 254 + } 255 + setRemoteMediaStream(localMediaStream); 256 + }, [localMediaStream, player.ingestStarting]); 257 + 258 + useEffect(() => { 259 + if (!videoElement) { 260 + return; 261 + } 262 + if (!localMediaStream) { 263 + return; 264 + } 265 + videoElement.srcObject = localMediaStream; 266 + }, [videoElement, localMediaStream]); 267 + 268 + return <VideoElement {...props} ref={handleRef} />; 269 + }
+121
js/app/features/player/playerSlice.tsx
··· 1 + import { createAction } from "@reduxjs/toolkit"; 2 + import { createAppSlice } from "../../hooks/createSlice"; 3 + import { uuidv7 } from "hooks/uuid"; 4 + import { createContext, useContext } from "react"; 5 + 6 + export interface PlayerContextType { 7 + playerId: string | null; 8 + } 9 + 10 + export const PlayerContext = createContext<PlayerContextType>({ 11 + playerId: null, 12 + }); 13 + 14 + export interface PlayerState { 15 + ingestStarted: number | null; 16 + ingestStarting: boolean; 17 + ingestConnectionState: RTCPeerConnectionState | null; 18 + } 19 + 20 + export interface PlayersState { 21 + [key: string]: PlayerState; 22 + } 23 + 24 + const initialState: PlayersState = {}; 25 + 26 + export const newPlayer = createAction("player/newPlayer", function prepare() { 27 + return { 28 + payload: { playerId: uuidv7() }, 29 + }; 30 + }); 31 + 32 + const usePlayerId = () => { 33 + const { playerId } = useContext(PlayerContext); 34 + if (!playerId) { 35 + throw new Error("Player context not found"); 36 + } 37 + return playerId; 38 + }; 39 + 40 + export const playerSlice = createAppSlice({ 41 + name: "player", 42 + initialState, 43 + 44 + extraReducers: (builder) => { 45 + builder.addCase(newPlayer, (state, action) => { 46 + state[action.payload.playerId] = { 47 + ingestStarted: null, 48 + ingestStarting: false, 49 + ingestConnectionState: null, 50 + }; 51 + }); 52 + }, 53 + 54 + reducers: (create) => { 55 + return { 56 + startIngest: create.reducer( 57 + ( 58 + state, 59 + action: { 60 + payload: { playerId: string; startIngest: boolean }; 61 + type: string; 62 + }, 63 + ) => { 64 + return { 65 + ...state, 66 + [action.payload.playerId]: { 67 + ...state[action.payload.playerId], 68 + ingestStarting: action.payload.startIngest, 69 + }, 70 + }; 71 + }, 72 + ), 73 + 74 + ingestConnectionState: create.reducer( 75 + ( 76 + state, 77 + action: { 78 + payload: { 79 + playerId: string; 80 + ingestConnectionState: RTCPeerConnectionState; 81 + }; 82 + type: string; 83 + }, 84 + ) => { 85 + return { 86 + ...state, 87 + [action.payload.playerId]: { 88 + ...state[action.payload.playerId], 89 + ingestConnectionState: action.payload.ingestConnectionState, 90 + }, 91 + }; 92 + }, 93 + ), 94 + }; 95 + }, 96 + 97 + selectors: { 98 + selectPlayer: (state) => { 99 + const playerId = usePlayerId(); 100 + return state[playerId]; 101 + }, 102 + }, 103 + }); 104 + 105 + export const usePlayerActions = () => { 106 + const playerId = usePlayerId(); 107 + return { 108 + startIngest: (startIngest: boolean) => 109 + playerSlice.actions.startIngest({ playerId, startIngest }), 110 + ingestConnectionState: (ingestConnectionState: RTCPeerConnectionState) => { 111 + console.log("ingestConnectionState", ingestConnectionState); 112 + return playerSlice.actions.ingestConnectionState({ 113 + playerId, 114 + ingestConnectionState, 115 + }); 116 + }, 117 + }; 118 + }; 119 + 120 + // Action creators are generated for each case reducer function. 121 + export const { selectPlayer } = playerSlice.selectors;
+21 -3
js/app/src/router.tsx
··· 30 30 StatusBar, 31 31 } from "react-native"; 32 32 import { useAppSelector } from "store/hooks"; 33 - import { useTheme, View } from "tamagui"; 33 + import { Text, useTheme, View } from "tamagui"; 34 34 import AppReturnScreen from "./screens/app-return"; 35 35 import GoLiveScreen from "./screens/golive"; 36 + import LiveScreen from "./screens/live"; 36 37 import MultiScreen from "./screens/multi"; 37 38 import StreamScreen from "./screens/stream"; 38 39 import SupportScreen from "./screens/support"; 39 - 40 + import WebcamScreen from "./screens/webcam"; 40 41 function HomeScreen() { 41 42 return ( 42 43 <View f={1}> ··· 44 45 </View> 45 46 ); 46 47 } 47 - 48 48 const Stack = createNativeStackNavigator(); 49 49 50 50 const linking: LinkingOptions<ReactNavigation.RootParamList> = { ··· 63 63 Support: "support", 64 64 Settings: "settings", 65 65 GoLive: "golive", 66 + Live: "live", 67 + Webcam: "live/webcam", 66 68 Login: "login", 67 69 AppReturn: "app-return/:scheme", 68 70 }, ··· 211 213 }} 212 214 /> 213 215 <Drawer.Screen 216 + name="Live" 217 + component={LiveScreen} 218 + options={{ 219 + drawerLabel: () => <Text>Go Live</Text>, 220 + drawerIcon: () => <Video />, 221 + }} 222 + /> 223 + <Drawer.Screen 214 224 name="AppReturn" 215 225 component={AppReturnScreen} 226 + options={{ 227 + drawerLabel: () => null, 228 + drawerItemStyle: { display: "none" }, 229 + }} 230 + /> 231 + <Drawer.Screen 232 + name="Webcam" 233 + component={WebcamScreen} 216 234 options={{ 217 235 drawerLabel: () => null, 218 236 drawerItemStyle: { display: "none" },
+42
js/app/src/screens/live.tsx
··· 1 + import { Camera, FerrisWheel } from "@tamagui/lucide-icons"; 2 + import AQLink from "components/aqlink"; 3 + import React from "react"; 4 + import { Button, H6, Text, View } from "tamagui"; 5 + const elems = [ 6 + { 7 + title: "Stream your camera!", 8 + Icon: Camera, 9 + to: "Webcam", 10 + }, 11 + { 12 + title: "Stream from OBS!", 13 + Icon: FerrisWheel, 14 + to: "Webcam", 15 + }, 16 + ]; 17 + 18 + export default function StreamScreen({ route }) { 19 + return ( 20 + <View f={1} jc="space-around" ai="center" padding="$3" flexDirection="row"> 21 + <View f={1} maxWidth={250}> 22 + {elems.map(({ Icon, title, to }, i) => ( 23 + <React.Fragment key={i}> 24 + <AQLink to={{ screen: to }} style={{ display: "flex" }}> 25 + <Button f={1} padding="$6" backgroundColor="$accentColor"> 26 + <View f={1} flexDirection="row" ai="center" jc="space-between"> 27 + <Icon padding="$5" size={48} marginLeft={-20} /> 28 + <Text>{title}</Text> 29 + </View> 30 + </Button> 31 + </AQLink> 32 + {i < elems.length - 1 && ( 33 + <View jc="center" ai="center"> 34 + <H6 padding="$5">OR</H6> 35 + </View> 36 + )} 37 + </React.Fragment> 38 + ))} 39 + </View> 40 + </View> 41 + ); 42 + }
+5
js/app/src/screens/webcam.tsx
··· 1 + import { Player } from "components/player/player"; 2 + 3 + export default function StreamScreen({ route }) { 4 + return <Player ingest={true} src="live" />; 5 + }
+7 -1
js/app/store/store.tsx
··· 4 4 import { aquareumSlice } from "features/aquareum/aquareumSlice"; 5 5 import { blueskySlice } from "features/bluesky/blueskySlice"; 6 6 import { platformSlice } from "features/platform/platformSlice"; 7 + import { playerSlice } from "features/player/playerSlice"; 7 8 8 - const rootReducer = combineSlices(blueskySlice, aquareumSlice, platformSlice); 9 + const rootReducer = combineSlices( 10 + blueskySlice, 11 + aquareumSlice, 12 + platformSlice, 13 + playerSlice, 14 + ); 9 15 10 16 export type RootState = ReturnType<typeof rootReducer>; 11 17
+1
pkg/api/api.go
··· 111 111 apiRouter.GET("/api/playback/:user/hls/:file", a.HandleHLSPlayback(ctx)) 112 112 apiRouter.GET("/api/playback/:user/stream.jpg", a.HandleThumbnailPlayback(ctx)) 113 113 apiRouter.POST("/api/playback/:user/webrtc", a.HandleWebRTCPlayback(ctx)) 114 + apiRouter.POST("/api/ingest/webrtc", a.HandleWebRTCIngest(ctx)) 114 115 apiRouter.POST("/api/player-event", a.HandlePlayerEvent(ctx)) 115 116 apiRouter.GET("/api/segment/recent", a.HandleRecentSegments(ctx)) 116 117 apiRouter.GET("/api/identity", a.HandleIdentityGET(ctx))
+43 -4
pkg/api/playback.go
··· 12 12 "time" 13 13 14 14 "aquareum.tv/aquareum/pkg/aqtime" 15 - "aquareum.tv/aquareum/pkg/aqwebrtc" 16 15 "aquareum.tv/aquareum/pkg/atproto" 17 16 "aquareum.tv/aquareum/pkg/errors" 18 17 "aquareum.tv/aquareum/pkg/log" 18 + "aquareum.tv/aquareum/pkg/media" 19 19 "github.com/julienschmidt/httprouter" 20 20 "github.com/pion/webrtc/v4" 21 21 "golang.org/x/sync/errgroup" ··· 108 108 return 109 109 } 110 110 } 111 - w.Header().Set("Content-Type", "video/mp4") 111 + w.Header().Set("Content-Type", "video/webm") 112 112 w.WriteHeader(200) 113 113 g, ctx := errgroup.WithContext(ctx) 114 114 pr, pw := io.Pipe() 115 115 bufw := bufio.NewWriter(pw) 116 116 g.Go(func() error { 117 - return a.MediaManager.SegmentToMKVPlusOpus(ctx, user, bufw) 117 + return a.MediaManager.SegmentToMKV(ctx, user, bufw) 118 118 }) 119 119 g.Go(func() error { 120 120 time.Sleep(time.Duration(delayMS) * time.Millisecond) ··· 144 144 } 145 145 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 146 146 pr, pw := io.Pipe() 147 - answer, err := aqwebrtc.WebRTCPlayback(ctx, pr, &offer) 147 + answer, err := media.WebRTCPlayback(ctx, pr, &offer) 148 148 if err != nil { 149 149 errors.WriteHTTPInternalServerError(w, "error playing back", err) 150 150 return ··· 157 157 }() 158 158 w.WriteHeader(201) 159 159 w.Header().Add("Location", r.URL.Path) 160 + w.Write([]byte(answer.SDP)) 161 + } 162 + } 163 + 164 + func (a *AquareumAPI) HandleWebRTCIngest(ctx context.Context) httprouter.Handle { 165 + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 166 + // user := p.ByName("user") 167 + // if user == "" { 168 + // errors.WriteHTTPBadRequest(w, "user required", nil) 169 + // return 170 + // } 171 + // _, err := a.NormalizeUser(ctx, user) 172 + // if err != nil { 173 + // errors.WriteHTTPBadRequest(w, "invalid user", err) 174 + // return 175 + // } 176 + body, err := io.ReadAll(r.Body) 177 + if err != nil { 178 + errors.WriteHTTPBadRequest(w, "error reading body", err) 179 + return 180 + } 181 + offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 182 + answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, a.MediaSigner) 183 + if err != nil { 184 + errors.WriteHTTPInternalServerError(w, "error playing back", err) 185 + return 186 + } 187 + host := r.Host 188 + if host == "" { 189 + host = r.URL.Host 190 + } 191 + scheme := "http" 192 + if r.TLS != nil { 193 + scheme = "https" 194 + } 195 + location := fmt.Sprintf("%s://%s/api/live/webrtc", scheme, host) 196 + log.Log(ctx, "location", "location", location) 197 + w.Header().Set("Location", location) 198 + w.WriteHeader(201) 160 199 w.Write([]byte(answer.SDP)) 161 200 } 162 201 }
-254
pkg/aqwebrtc/aqwebrtc.go
··· 1 - package aqwebrtc 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "io" 7 - "strings" 8 - 9 - "aquareum.tv/aquareum/pkg/log" 10 - aqmedia "aquareum.tv/aquareum/pkg/media" 11 - "github.com/go-gst/go-gst/gst" 12 - "github.com/go-gst/go-gst/gst/app" 13 - "github.com/google/uuid" 14 - "github.com/pion/webrtc/v4" 15 - "github.com/pion/webrtc/v4/pkg/media" 16 - ) 17 - 18 - // This function remains in scope for the duration of a single users' playback 19 - func WebRTCPlayback(ctx context.Context, input io.Reader, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 20 - uu, err := uuid.NewV7() 21 - if err != nil { 22 - return nil, err 23 - } 24 - ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 25 - ctx, cancel := context.WithCancel(ctx) 26 - 27 - ctx = log.WithLogValues(ctx, "GStreamerFunc", "ToWHEP") 28 - 29 - pipelineSlice := []string{ 30 - "appsrc name=appsrc ! matroskademux name=demux", 31 - "multiqueue name=queue", 32 - "demux.video_0 ! queue.sink_0", 33 - "demux.audio_0 ! queue.sink_1", 34 - "multiqueue name=outqueue", 35 - "queue.src_0 ! h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 36 - "queue.src_1 ! fdkaacdec ! audioresample ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! appsink name=audioappsink", 37 - } 38 - 39 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 40 - if err != nil { 41 - return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 42 - } 43 - 44 - appsrc, err := pipeline.GetElementByName("appsrc") 45 - if err != nil { 46 - return nil, fmt.Errorf("failed to get appsrc element from pipeline: %w", err) 47 - } 48 - 49 - src := app.SrcFromElement(appsrc) 50 - src.SetCallbacks(&app.SourceCallbacks{ 51 - NeedDataFunc: aqmedia.ReaderNeedData(ctx, input), 52 - }) 53 - 54 - go func() { 55 - <-ctx.Done() 56 - pipeline.BlockSetState(gst.StateNull) 57 - }() 58 - 59 - videoappsinkele, err := pipeline.GetElementByName("videoappsink") 60 - if err != nil { 61 - return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 62 - } 63 - 64 - audioappsinkele, err := pipeline.GetElementByName("audioappsink") 65 - if err != nil { 66 - return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 67 - } 68 - 69 - // Create a new RTCPeerConnection 70 - peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ 71 - ICEServers: []webrtc.ICEServer{ 72 - { 73 - // URLs: []string{"stun:stun.l.google.com:19302"}, 74 - }, 75 - }, 76 - }) 77 - if err != nil { 78 - return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 79 - } 80 - go func() { 81 - <-ctx.Done() 82 - if cErr := peerConnection.Close(); cErr != nil { 83 - log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 84 - } 85 - }() 86 - 87 - videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 88 - if err != nil { 89 - return nil, fmt.Errorf("failed to create video track: %w", err) 90 - } 91 - videoRTPSender, err := peerConnection.AddTrack(videoTrack) 92 - if err != nil { 93 - return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 94 - } 95 - 96 - audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 97 - if err != nil { 98 - return nil, fmt.Errorf("failed to create audio track: %w", err) 99 - } 100 - audioRTPSender, err := peerConnection.AddTrack(audioTrack) 101 - if err != nil { 102 - return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 103 - } 104 - 105 - // Set the remote SessionDescription 106 - if err = peerConnection.SetRemoteDescription(*offer); err != nil { 107 - return nil, fmt.Errorf("failed to set remote description: %w", err) 108 - } 109 - 110 - // Create answer 111 - answer, err := peerConnection.CreateAnswer(nil) 112 - if err != nil { 113 - return nil, fmt.Errorf("failed to create answer: %w", err) 114 - } 115 - 116 - // Sets the LocalDescription, and starts our UDP listeners 117 - if err = peerConnection.SetLocalDescription(answer); err != nil { 118 - return nil, fmt.Errorf("failed to set local description: %w", err) 119 - } 120 - 121 - // Create channel that is blocked until ICE Gathering is complete 122 - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 123 - 124 - // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 125 - 126 - go func() { 127 - pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 128 - switch msg.Type() { 129 - 130 - case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 131 - log.Log(ctx, "got gst.MessageEOS, exiting") 132 - cancel() 133 - case gst.MessageError: // Error messages are always fatal 134 - err := msg.ParseError() 135 - log.Error(ctx, "gstreamer error", "error", err.Error()) 136 - if debug := err.DebugString(); debug != "" { 137 - log.Log(ctx, "gstreamer debug", "message", debug) 138 - } 139 - cancel() 140 - default: 141 - log.Debug(ctx, msg.String()) 142 - } 143 - return true 144 - }) 145 - 146 - videoappsink := app.SinkFromElement(videoappsinkele) 147 - videoappsink.SetCallbacks(&app.SinkCallbacks{ 148 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 149 - sample := sink.PullSample() 150 - if sample == nil { 151 - return gst.FlowEOS 152 - } 153 - 154 - buffer := sample.GetBuffer() 155 - if buffer == nil { 156 - return gst.FlowError 157 - } 158 - 159 - samples := buffer.Map(gst.MapRead).Bytes() 160 - defer buffer.Unmap() 161 - 162 - if err := videoTrack.WriteSample(media.Sample{Data: samples, Duration: *buffer.Duration().AsDuration()}); err != nil { 163 - log.Log(ctx, "failed to write video sample", "error", err) 164 - cancel() 165 - } 166 - 167 - return gst.FlowOK 168 - }, 169 - EOSFunc: func(sink *app.Sink) { 170 - cancel() 171 - }, 172 - }) 173 - 174 - audioappsink := app.SinkFromElement(audioappsinkele) 175 - audioappsink.SetCallbacks(&app.SinkCallbacks{ 176 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 177 - sample := sink.PullSample() 178 - if sample == nil { 179 - return gst.FlowEOS 180 - } 181 - 182 - buffer := sample.GetBuffer() 183 - if buffer == nil { 184 - return gst.FlowError 185 - } 186 - 187 - samples := buffer.Map(gst.MapRead).Bytes() 188 - defer buffer.Unmap() 189 - 190 - if err := audioTrack.WriteSample(media.Sample{Data: samples, Duration: *buffer.Duration().AsDuration()}); err != nil { 191 - log.Log(ctx, "failed to write audio sample", "error", err) 192 - cancel() 193 - } 194 - 195 - return gst.FlowOK 196 - }, 197 - EOSFunc: func(sink *app.Sink) { 198 - cancel() 199 - }, 200 - }) 201 - 202 - // Start the pipeline 203 - pipeline.SetState(gst.StatePlaying) 204 - 205 - go func() { 206 - rtcpBuf := make([]byte, 1500) 207 - for { 208 - if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 209 - return 210 - } 211 - } 212 - }() 213 - 214 - go func() { 215 - rtcpBuf := make([]byte, 1500) 216 - for { 217 - if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 218 - return 219 - } 220 - } 221 - }() 222 - 223 - // Set the handler for ICE connection state 224 - // This will notify you when the peer has connected/disconnected 225 - peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 226 - log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 227 - if connectionState == webrtc.ICEConnectionStateConnected { 228 - // iceConnectedCtxCancel() 229 - } 230 - }) 231 - 232 - // Set the handler for Peer connection state 233 - // This will notify you when the peer has connected/disconnected 234 - peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 235 - log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 236 - 237 - if s == webrtc.PeerConnectionStateFailed { 238 - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 239 - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 240 - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 241 - log.Log(ctx, "Peer Connection has gone to failed exiting") 242 - cancel() 243 - } 244 - }) 245 - 246 - <-ctx.Done() 247 - }() 248 - select { 249 - case <-gatherComplete: 250 - return peerConnection.LocalDescription(), nil 251 - case <-ctx.Done(): 252 - return nil, ctx.Err() 253 - } 254 - }
+2 -2
pkg/media/gstreamer.go
··· 292 292 pipelineSlice := []string{ 293 293 "appsrc name=appsrc ! matroskademux name=demux", 294 294 "demux.video_0 ! queue ! h264parse name=videoparse", 295 - "demux.audio_0 ! queue ! aacparse name=audioparse", 295 + "demux.audio_0 ! queue ! opusparse name=audioparse", 296 296 } 297 297 298 298 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) ··· 507 507 fmt.Sprintf("videobox border-alpha=0 top=-%d left=-%d name=box ! comp.", (TESTSRC_HEIGHT/2)-(QR_SIZE/2), (TESTSRC_WIDTH/2)-(QR_SIZE/2)), 508 508 "appsrc name=pngsrc ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! box.", 509 509 "appsrc name=timetext ! pngdec ! videoconvert ! videorate ! video/x-raw,format=AYUV,framerate=1/1 ! comp.", 510 - "audiotestsrc ! audioconvert ! fdkaacenc ! queue ! aacparse name=audioparse", 510 + "audiotestsrc ! audioconvert ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! queue ! opusparse name=audioparse", 511 511 } 512 512 513 513 pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n"))
+590
pkg/media/webrtc.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "strings" 8 + "time" 9 + 10 + "aquareum.tv/aquareum/pkg/log" 11 + "github.com/go-gst/go-gst/gst" 12 + "github.com/go-gst/go-gst/gst/app" 13 + "github.com/google/uuid" 14 + "github.com/pion/interceptor" 15 + "github.com/pion/interceptor/pkg/intervalpli" 16 + "github.com/pion/rtcp" 17 + "github.com/pion/webrtc/v4" 18 + "github.com/pion/webrtc/v4/pkg/media" 19 + ) 20 + 21 + // This function remains in scope for the duration of a single users' playback 22 + func WebRTCPlayback(ctx context.Context, input io.Reader, offer *webrtc.SessionDescription) (*webrtc.SessionDescription, error) { 23 + uu, err := uuid.NewV7() 24 + if err != nil { 25 + return nil, err 26 + } 27 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String()) 28 + ctx, cancel := context.WithCancel(ctx) 29 + 30 + ctx = log.WithLogValues(ctx, "mediafunc", "WebRTCPlayback") 31 + 32 + pipelineSlice := []string{ 33 + "appsrc name=appsrc ! matroskademux name=demux", 34 + "multiqueue name=queue", 35 + "demux.video_0 ! queue.sink_0", 36 + "demux.audio_0 ! queue.sink_1", 37 + "multiqueue name=outqueue", 38 + "queue.src_0 ! h264parse name=videoparse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 39 + "queue.src_1 ! opusparse ! appsink name=audioappsink", 40 + } 41 + 42 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 43 + if err != nil { 44 + return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 45 + } 46 + 47 + pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 48 + switch msg.Type() { 49 + 50 + case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 51 + log.Log(ctx, "got gst.MessageEOS, exiting") 52 + cancel() 53 + case gst.MessageError: // Error messages are always fatal 54 + err := msg.ParseError() 55 + log.Error(ctx, "gstreamer error", "error", err.Error()) 56 + if debug := err.DebugString(); debug != "" { 57 + log.Log(ctx, "gstreamer debug", "message", debug) 58 + } 59 + cancel() 60 + default: 61 + log.Debug(ctx, msg.String()) 62 + } 63 + return true 64 + }) 65 + 66 + appsrc, err := pipeline.GetElementByName("appsrc") 67 + if err != nil { 68 + return nil, fmt.Errorf("failed to get appsrc element from pipeline: %w", err) 69 + } 70 + 71 + src := app.SrcFromElement(appsrc) 72 + src.SetCallbacks(&app.SourceCallbacks{ 73 + NeedDataFunc: ReaderNeedData(ctx, input), 74 + }) 75 + 76 + go func() { 77 + <-ctx.Done() 78 + pipeline.BlockSetState(gst.StateNull) 79 + }() 80 + 81 + videoappsinkele, err := pipeline.GetElementByName("videoappsink") 82 + if err != nil { 83 + return nil, fmt.Errorf("failed to get video sink element from pipeline: %w", err) 84 + } 85 + 86 + audioappsinkele, err := pipeline.GetElementByName("audioappsink") 87 + if err != nil { 88 + return nil, fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 89 + } 90 + 91 + // Create a new RTCPeerConnection 92 + peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ 93 + ICEServers: []webrtc.ICEServer{ 94 + { 95 + // URLs: []string{"stun:stun.l.google.com:19302"}, 96 + }, 97 + }, 98 + }) 99 + if err != nil { 100 + return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 101 + } 102 + go func() { 103 + <-ctx.Done() 104 + if cErr := peerConnection.Close(); cErr != nil { 105 + log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 106 + } 107 + }() 108 + 109 + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion") 110 + if err != nil { 111 + return nil, fmt.Errorf("failed to create video track: %w", err) 112 + } 113 + videoRTPSender, err := peerConnection.AddTrack(videoTrack) 114 + if err != nil { 115 + return nil, fmt.Errorf("failed to add video track to peer connection: %w", err) 116 + } 117 + 118 + audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion") 119 + if err != nil { 120 + return nil, fmt.Errorf("failed to create audio track: %w", err) 121 + } 122 + audioRTPSender, err := peerConnection.AddTrack(audioTrack) 123 + if err != nil { 124 + return nil, fmt.Errorf("failed to add audio track to peer connection: %w", err) 125 + } 126 + 127 + // Set the remote SessionDescription 128 + if err = peerConnection.SetRemoteDescription(*offer); err != nil { 129 + return nil, fmt.Errorf("failed to set remote description: %w", err) 130 + } 131 + 132 + // Create answer 133 + answer, err := peerConnection.CreateAnswer(nil) 134 + if err != nil { 135 + return nil, fmt.Errorf("failed to create answer: %w", err) 136 + } 137 + 138 + // Sets the LocalDescription, and starts our UDP listeners 139 + if err = peerConnection.SetLocalDescription(answer); err != nil { 140 + return nil, fmt.Errorf("failed to set local description: %w", err) 141 + } 142 + 143 + // Create channel that is blocked until ICE Gathering is complete 144 + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 145 + 146 + // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 147 + 148 + go func() { 149 + 150 + videoappsink := app.SinkFromElement(videoappsinkele) 151 + videoappsink.SetCallbacks(&app.SinkCallbacks{ 152 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 153 + sample := sink.PullSample() 154 + if sample == nil { 155 + return gst.FlowEOS 156 + } 157 + 158 + buffer := sample.GetBuffer() 159 + if buffer == nil { 160 + return gst.FlowError 161 + } 162 + 163 + samples := buffer.Map(gst.MapRead).Bytes() 164 + defer buffer.Unmap() 165 + 166 + if err := videoTrack.WriteSample(media.Sample{Data: samples, Duration: *buffer.Duration().AsDuration()}); err != nil { 167 + log.Log(ctx, "failed to write video sample", "error", err) 168 + cancel() 169 + } 170 + 171 + return gst.FlowOK 172 + }, 173 + EOSFunc: func(sink *app.Sink) { 174 + cancel() 175 + }, 176 + }) 177 + 178 + audioappsink := app.SinkFromElement(audioappsinkele) 179 + audioappsink.SetCallbacks(&app.SinkCallbacks{ 180 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 181 + sample := sink.PullSample() 182 + if sample == nil { 183 + return gst.FlowEOS 184 + } 185 + 186 + buffer := sample.GetBuffer() 187 + if buffer == nil { 188 + return gst.FlowError 189 + } 190 + 191 + samples := buffer.Map(gst.MapRead).Bytes() 192 + defer buffer.Unmap() 193 + 194 + clockTime := buffer.Duration() 195 + dur := clockTime.AsDuration() 196 + mediaSample := media.Sample{Data: samples} 197 + if dur != nil { 198 + mediaSample.Duration = *dur 199 + } else { 200 + log.Log(ctx, "no duration", "samples", len(samples)) 201 + // cancel() 202 + return gst.FlowOK 203 + } 204 + if err := audioTrack.WriteSample(mediaSample); err != nil { 205 + log.Log(ctx, "failed to write audio sample", "error", err) 206 + return gst.FlowOK 207 + } 208 + 209 + return gst.FlowOK 210 + }, 211 + EOSFunc: func(sink *app.Sink) { 212 + cancel() 213 + }, 214 + }) 215 + 216 + // Start the pipeline 217 + pipeline.SetState(gst.StatePlaying) 218 + 219 + go func() { 220 + rtcpBuf := make([]byte, 1500) 221 + for { 222 + if _, _, rtcpErr := videoRTPSender.Read(rtcpBuf); rtcpErr != nil { 223 + return 224 + } 225 + } 226 + }() 227 + 228 + go func() { 229 + rtcpBuf := make([]byte, 1500) 230 + for { 231 + if _, _, rtcpErr := audioRTPSender.Read(rtcpBuf); rtcpErr != nil { 232 + return 233 + } 234 + } 235 + }() 236 + 237 + // Set the handler for ICE connection state 238 + // This will notify you when the peer has connected/disconnected 239 + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 240 + log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 241 + if connectionState == webrtc.ICEConnectionStateConnected { 242 + // iceConnectedCtxCancel() 243 + } 244 + }) 245 + 246 + // Set the handler for Peer connection state 247 + // This will notify you when the peer has connected/disconnected 248 + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 249 + log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 250 + 251 + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed { 252 + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 253 + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 254 + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 255 + log.Log(ctx, "Peer Connection has gone to failed, exiting") 256 + cancel() 257 + } 258 + }) 259 + 260 + <-ctx.Done() 261 + }() 262 + select { 263 + case <-gatherComplete: 264 + return peerConnection.LocalDescription(), nil 265 + case <-ctx.Done(): 266 + return nil, ctx.Err() 267 + } 268 + } 269 + 270 + // This function remains in scope for the duration of a single users' playback 271 + func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer *MediaSigner) (*webrtc.SessionDescription, error) { 272 + uu, err := uuid.NewV7() 273 + if err != nil { 274 + return nil, err 275 + } 276 + 277 + ctx, cancel := context.WithCancel(ctx) 278 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest") 279 + 280 + m := &webrtc.MediaEngine{} 281 + 282 + // Setup the codecs you want to use. 283 + // We'll use a VP8 and Opus but you can also define your own 284 + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 285 + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 286 + PayloadType: 102, 287 + }, webrtc.RTPCodecTypeVideo); err != nil { 288 + return nil, err 289 + } 290 + if err := m.RegisterCodec(webrtc.RTPCodecParameters{ 291 + RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil}, 292 + PayloadType: 111, 293 + }, webrtc.RTPCodecTypeAudio); err != nil { 294 + return nil, err 295 + } 296 + 297 + // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. 298 + // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` 299 + // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry 300 + // for each PeerConnection. 301 + i := &interceptor.Registry{} 302 + 303 + // Register a intervalpli factory 304 + // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender. 305 + // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates 306 + // A real world application should process incoming RTCP packets from viewers and forward them to senders 307 + intervalPliFactory, err := intervalpli.NewReceiverInterceptor() 308 + if err != nil { 309 + return nil, fmt.Errorf("failed to create intervalpli factory: %w", err) 310 + } 311 + i.Add(intervalPliFactory) 312 + 313 + // Use the default set of Interceptors 314 + if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil { 315 + return nil, fmt.Errorf("failed to register default interceptors: %w", err) 316 + } 317 + 318 + // Create the API object with the MediaEngine 319 + api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i)) 320 + 321 + // Prepare the configuration 322 + config := webrtc.Configuration{ 323 + ICEServers: []webrtc.ICEServer{ 324 + { 325 + URLs: []string{"stun:stun.l.google.com:19302"}, 326 + }, 327 + }, 328 + } 329 + 330 + // Create a new RTCPeerConnection 331 + peerConnection, err := api.NewPeerConnection(config) 332 + if err != nil { 333 + return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 334 + } 335 + 336 + // Allow us to receive 1 audio track, and 1 video track 337 + if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { 338 + return nil, fmt.Errorf("failed to add audio transceiver: %w", err) 339 + } else if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil { 340 + return nil, fmt.Errorf("failed to add video transceiver: %w", err) 341 + } 342 + 343 + pipelineSlice := []string{ 344 + "multiqueue name=queue", 345 + "appsrc format=time is-live=true do-timestamp=true name=videosrc ! capsfilter caps=application/x-rtp ! rtph264depay ! capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=nal ! h264parse ! h264timestamper ! queue.sink_0", 346 + "appsrc format=time is-live=true do-timestamp=true name=audiosrc ! capsfilter caps=application/x-rtp,media=audio,encoding-name=OPUS,payload=111 ! rtpopusdepay ! queue.sink_1", 347 + } 348 + 349 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 350 + if err != nil { 351 + return nil, fmt.Errorf("failed to create GStreamer pipeline: %w", err) 352 + } 353 + 354 + pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 355 + switch msg.Type() { 356 + 357 + case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 358 + log.Log(ctx, "got gst.MessageEOS, exiting") 359 + cancel() 360 + case gst.MessageError: // Error messages are always fatal 361 + err := msg.ParseError() 362 + log.Error(ctx, "gstreamer error", "error", err.Error()) 363 + if debug := err.DebugString(); debug != "" { 364 + log.Log(ctx, "gstreamer debug", "message", debug) 365 + } 366 + cancel() 367 + default: 368 + log.Log(ctx, msg.String()) 369 + } 370 + return true 371 + }) 372 + 373 + queue, err := pipeline.GetElementByName("queue") 374 + if err != nil { 375 + return nil, fmt.Errorf("failed to get queue element from pipeline: %w", err) 376 + } 377 + 378 + signerElem, err := mm.SegmentAndSignElem(ctx, signer) 379 + if err != nil { 380 + return nil, fmt.Errorf("failed create signer element: %w", err) 381 + } 382 + err = pipeline.Add(signerElem) 383 + if err != nil { 384 + return nil, fmt.Errorf("failed to add signer element to pipeline: %w", err) 385 + } 386 + 387 + // err = queue.Link(signerElem) 388 + // if err != nil { 389 + // return nil, fmt.Errorf("failed to link queue to signer element: %w", err) 390 + // } 391 + videoSrcPads, err := queue.GetSrcPads() 392 + if err != nil { 393 + return nil, fmt.Errorf("failed to get videoSrcPads from queue: %w", err) 394 + } 395 + if len(videoSrcPads) != 2 { 396 + return nil, fmt.Errorf("failed to get videoSrcPads from queue") 397 + } 398 + videoSrcPad := videoSrcPads[0] 399 + audioSrcPad := videoSrcPads[1] 400 + 401 + signerElemPads, err := signerElem.GetPads() 402 + if err != nil { 403 + return nil, fmt.Errorf("failed to get signerElemPads from signer element: %w", err) 404 + } 405 + if len(signerElemPads) != 2 { 406 + return nil, fmt.Errorf("failed to get signerElemPads from signer element") 407 + } 408 + signerElemVideoPad := signerElemPads[0] 409 + signerElemAudioPad := signerElemPads[1] 410 + videoSrcPad.Link(signerElemVideoPad) 411 + audioSrcPad.Link(signerElemAudioPad) 412 + 413 + videoSrcElem, err := pipeline.GetElementByName("videosrc") 414 + if err != nil { 415 + return nil, fmt.Errorf("failed to get videoSrcElem element from pipeline: %w", err) 416 + } 417 + videoSrc := app.SrcFromElement(videoSrcElem) 418 + 419 + audioSrcElem, err := pipeline.GetElementByName("audiosrc") 420 + if err != nil { 421 + return nil, fmt.Errorf("failed to get audioSrcElem element from pipeline: %w", err) 422 + } 423 + audioSrc := app.SrcFromElement(audioSrcElem) 424 + 425 + go func() { 426 + <-ctx.Done() 427 + pipeline.BlockSetState(gst.StateNull) 428 + }() 429 + 430 + go func() { 431 + <-ctx.Done() 432 + if cErr := peerConnection.Close(); cErr != nil { 433 + log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 434 + } 435 + }() 436 + 437 + // Set the remote SessionDescription 438 + if err = peerConnection.SetRemoteDescription(*offer); err != nil { 439 + return nil, fmt.Errorf("failed to set remote description: %w", err) 440 + } 441 + 442 + // Create answer 443 + answer, err := peerConnection.CreateAnswer(nil) 444 + if err != nil { 445 + return nil, fmt.Errorf("failed to create answer: %w", err) 446 + } 447 + 448 + // Sets the LocalDescription, and starts our UDP listeners 449 + if err = peerConnection.SetLocalDescription(answer); err != nil { 450 + return nil, fmt.Errorf("failed to set local description: %w", err) 451 + } 452 + 453 + // Create channel that is blocked until ICE Gathering is complete 454 + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 455 + 456 + go func() { 457 + ticker := time.NewTicker(time.Second * 1) 458 + for { 459 + select { 460 + case <-ctx.Done(): 461 + return 462 + case <-ticker.C: 463 + state := pipeline.GetCurrentState() 464 + log.Log(ctx, "pipeline state", "state", state) 465 + } 466 + } 467 + }() 468 + // Setup complete! Now we boot up streaming in the background while returning the SDP offer to the user. 469 + 470 + go func() { 471 + log.Debug(ctx, "starting pipeline") 472 + 473 + // Start the pipeline 474 + err = pipeline.SetState(gst.StatePlaying) 475 + if err != nil { 476 + log.Log(ctx, "failed to set pipeline state", "error", err) 477 + cancel() 478 + } 479 + 480 + // Set the handler for ICE connection state 481 + // This will notify you when the peer has connected/disconnected 482 + peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 483 + log.Log(ctx, "Connection State has changed", "state", connectionState.String()) 484 + if connectionState == webrtc.ICEConnectionStateConnected { 485 + // iceConnectedCtxCancel() 486 + } 487 + }) 488 + 489 + // Set the handler for Peer connection state 490 + // This will notify you when the peer has connected/disconnected 491 + peerConnection.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { 492 + log.Log(ctx, "Peer Connection State has changed", "state", s.String()) 493 + 494 + if s == webrtc.PeerConnectionStateFailed { 495 + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 496 + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 497 + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 498 + log.Log(ctx, "Peer Connection has gone to failed exiting") 499 + cancel() 500 + } 501 + }) 502 + 503 + peerConnection.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { 504 + if track.Kind() == webrtc.RTPCodecTypeVideo { 505 + // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval 506 + go func() { 507 + ticker := time.NewTicker(time.Second * 5) 508 + for { 509 + select { 510 + case <-ctx.Done(): 511 + return 512 + case <-ticker.C: 513 + rtcpSendErr := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}) 514 + if rtcpSendErr != nil { 515 + log.Log(ctx, "failed to send rtcp packet", "error", rtcpSendErr) 516 + cancel() 517 + return 518 + } 519 + } 520 + } 521 + }() 522 + 523 + codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 524 + log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 525 + 526 + // appSrc := pipelineForCodec(track, codecName) 527 + buf := make([]byte, 1400) 528 + for { 529 + i, _, readErr := track.Read(buf) 530 + if readErr != nil { 531 + log.Log(ctx, "failed to read track", "error", readErr) 532 + cancel() 533 + return 534 + } 535 + // log.Log(ctx, "read video track", "bytes", i) 536 + 537 + ret := videoSrc.PushBuffer(gst.NewBufferFromBytes(buf[:i])) 538 + if ret != gst.FlowOK { 539 + log.Log(ctx, "failed to push buffer", "error", ret) 540 + cancel() 541 + return 542 + } 543 + // state := pipeline.GetCurrentState() 544 + // if state != gst.StatePlaying { 545 + // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 546 + // cancel() 547 + // return 548 + // } 549 + } 550 + } 551 + if track.Kind() == webrtc.RTPCodecTypeAudio { 552 + 553 + codecName := strings.Split(track.Codec().RTPCodecCapability.MimeType, "/")[1] 554 + log.Log(ctx, "Track has started", "payloadType", track.PayloadType(), "codecName", codecName) 555 + 556 + buf := make([]byte, 1400) 557 + for { 558 + i, _, readErr := track.Read(buf) 559 + if readErr != nil { 560 + log.Log(ctx, "failed to read track", "error", readErr) 561 + cancel() 562 + return 563 + } 564 + // log.Log(ctx, "read audio track", "bytes", i) 565 + 566 + ret := audioSrc.PushBuffer(gst.NewBufferFromBytes(buf[:i])) 567 + if ret != gst.FlowOK { 568 + log.Log(ctx, "failed to push buffer", "error", ret) 569 + cancel() 570 + return 571 + } 572 + // state := pipeline.GetCurrentState() 573 + // if state != gst.StatePlaying { 574 + // log.Warn(ctx, "pipeline state is not playing, consider running with GST_DEBUG=*:5 to find out why", "state", state) 575 + // cancel() 576 + // return 577 + // } 578 + } 579 + } 580 + }) 581 + 582 + <-ctx.Done() 583 + }() 584 + select { 585 + case <-gatherComplete: 586 + return peerConnection.LocalDescription(), nil 587 + case <-ctx.Done(): 588 + return nil, ctx.Err() 589 + } 590 + }