Live video on the AT Protocol

rtcrec: implement webrtc session replay (#266)

* Makefile: turn gstreamer registry back on in dev for dynamic linking

* peerproxy: init peerproxy

* peerproxy: wrap more stuff

* rtcrec: rename

* rtcrec: basic serialization and deserialization

* app: CBOR can just be an array

* rtcrec: iterate on the format a bit

* rtcrec: basic peerconnection recording, cli tool

* rtcrec: use SSRC as primary key

* rtcrec: fix gitignore

* rtcrec: rename files

* rtcrec: add a few more fields

* rtcrec: stream playback working hooray!

* rtcrec: add replay endpoint to api_internal

* lexicons: add and index place.stream.server.settings

* rtcrec: rename some classes

* rtcrec: run or don't based on user settings

* app: debug recording popup

* settings: clean up switches + move debugrecording component

* app: working debug recording popup, improve live popup

authored by

Eli Mallon and committed by
GitHub
3f9a02b7 ef53db11

+1877 -173
+4
Makefile
··· 696 696 dockerfile-hash-precommit: 697 697 @bash -c 'printf "variables:\n DOCKERFILE_HASH: `git hash-object docker/build.Dockerfile`" > .ci/dockerfile-hash.yaml' \ 698 698 && git add .ci/dockerfile-hash.yaml 699 + 700 + .PHONY: rtcrec 701 + rtcrec: 702 + go build -o $(BUILDDIR)/rtcrec ./pkg/rtcrec/cmd/...
+2
go.mod
··· 21 21 github.com/decred/dcrd/dcrec/secp256k1 v1.0.4 22 22 github.com/dunglas/httpsfv v1.0.2 23 23 github.com/ethereum/go-ethereum v1.14.7 24 + github.com/fxamacker/cbor/v2 v2.8.0 24 25 github.com/go-git/go-git/v5 v5.12.0 25 26 github.com/go-gst/go-glib v1.4.0 26 27 github.com/go-gst/go-gst v1.4.0 ··· 387 388 github.com/valyala/fasttemplate v1.2.2 // indirect 388 389 github.com/vektah/gqlparser/v2 v2.5.22 // indirect 389 390 github.com/wlynxg/anet v0.0.5 // indirect 391 + github.com/x448/float16 v0.8.4 // indirect 390 392 github.com/xanzy/ssh-agent v0.3.3 // indirect 391 393 github.com/xen0n/gosmopolitan v1.3.0 // indirect 392 394 github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
+4
go.sum
··· 261 261 github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= 262 262 github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= 263 263 github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= 264 + github.com/fxamacker/cbor/v2 v2.8.0 h1:fFtUGXUzXPHTIUdne5+zzMPTfffl3RD5qYnkY40vtxU= 265 + github.com/fxamacker/cbor/v2 v2.8.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= 264 266 github.com/fzipp/gocyclo v0.6.0 h1:lsblElZG7d3ALtGMx9fmxeTKZaLLpU8mET09yN4BBLo= 265 267 github.com/fzipp/gocyclo v0.6.0/go.mod h1:rXPyn8fnlpa0R2csP/31uerbiVBugk5whMdlyaLkLoA= 266 268 github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= ··· 996 998 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so= 997 999 github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= 998 1000 github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= 1001 + github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= 1002 + github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= 999 1003 github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= 1000 1004 github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= 1001 1005 github.com/xen0n/gosmopolitan v1.3.0 h1:zAZI1zefvo7gcpbCOrPSHJZJYA9ZgLfJqtKzZ5pHqQM=
+1 -51
js/app/components/livestream/livestream.tsx
··· 14 14 import Loading from "components/loading/loading"; 15 15 import { Player } from "components/player/player"; 16 16 import { PlayerProps } from "components/player/props"; 17 - import Popup from "components/popup"; 18 17 import Timer from "components/timer"; 19 18 import Viewers from "components/viewers"; 20 19 import { useFullscreen } from "contexts/FullscreenContext"; ··· 23 22 setSidebarUnhidden, 24 23 } from "features/base/sidebarSlice"; 25 24 import { getProfile } from "features/bluesky/blueskySlice"; 26 - import { 27 - selectTelemetry, 28 - telemetryOpt, 29 - } from "features/streamplace/streamplaceSlice"; 30 25 import useAvatars from "hooks/useAvatars"; 31 26 import { useKeyboard } from "hooks/useKeyboard"; 32 27 import usePlatform from "hooks/usePlatform"; ··· 38 33 SafeAreaView, 39 34 } from "react-native"; 40 35 import storage from "storage"; 41 - import { useAppDispatch, useAppSelector } from "store/hooks"; 36 + import { useAppDispatch } from "store/hooks"; 42 37 import { 43 38 Button, 44 - H3, 45 39 isWeb, 46 40 ScrollView, 47 41 Text, ··· 62 56 } 63 57 64 58 export function LivestreamInner(props: Partial<PlayerProps>) { 65 - const telemetry = useAppSelector(selectTelemetry); 66 59 const toast = useToastController(); 67 60 const viewers = useViewers(); 68 61 ··· 184 177 <Loading /> 185 178 </View> 186 179 )} 187 - {telemetry === null && ( 188 - <Popup 189 - onClose={() => { 190 - dispatch(telemetryOpt(false)); 191 - }} 192 - containerProps={{ 193 - bottom: "$8", 194 - zIndex: 1000, 195 - }} 196 - bubbleProps={{ 197 - backgroundColor: "$accentBackground", 198 - gap: "$3", 199 - maxWidth: 400, 200 - }} 201 - > 202 - <H3 textAlign="center">Player Telemetry</H3> 203 - <Text> 204 - Streamplace is beta software and it helps us out to have the 205 - player report back on how playback is working. Would you like to 206 - opt in to optional player telemetry? 207 - </Text> 208 - <View flexDirection="row" gap="$2" f={1}> 209 - <Button 210 - f={3} 211 - backgroundColor="$accentColor" 212 - onPress={() => { 213 - dispatch(telemetryOpt(true)); 214 - }} 215 - > 216 - Opt in 217 - </Button> 218 - <Button 219 - f={3} 220 - onPress={() => { 221 - dispatch(telemetryOpt(false)); 222 - }} 223 - > 224 - Opt out 225 - </Button> 226 - </View> 227 - </Popup> 228 - )} 229 180 <View 230 181 f={1} 231 182 opacity={videoWidth === 0 ? 0 : 1} ··· 250 201 }} 251 202 > 252 203 <Player 253 - telemetry={telemetry === true} 254 204 src={src} 255 205 fullscreen={fullscreen} 256 206 setFullscreen={setFullscreen}
+2
js/app/components/popup.tsx
··· 21 21 f={1} 22 22 alignItems="center" 23 23 width="100%" 24 + pointerEvents="none" 24 25 {...viewProps} 25 26 > 26 27 <View ··· 28 29 alignItems="stretch" 29 30 padding="$4" 30 31 borderRadius="$4" 32 + pointerEvents="auto" 31 33 onPress={() => { 32 34 if (onPress) { 33 35 onPress();
+83 -60
js/app/components/settings/settings.tsx
··· 3 3 import AQLink from "components/aqlink"; 4 4 import Container from "components/container"; 5 5 import { 6 - DEFAULT_URL, 7 - selectTelemetry, 8 - setURL, 9 - telemetryOpt, 10 - } from "features/streamplace/streamplaceSlice"; 6 + createServerSettingsRecord, 7 + getServerSettingsFromPDS, 8 + selectIsReady, 9 + selectServerSettings, 10 + } from "features/bluesky/blueskySlice"; 11 + import { DEFAULT_URL, setURL } from "features/streamplace/streamplaceSlice"; 11 12 import useStreamplaceNode from "hooks/useStreamplaceNode"; 12 13 import { useEffect, useState } from "react"; 14 + import { Switch } from "react-native"; 13 15 import { useAppDispatch, useAppSelector } from "store/hooks"; 14 - import { Button, H3, H5, Input, Switch, Text, View, XStack } from "tamagui"; 16 + import { Button, H3, H5, Input, Text, View, XStack } from "tamagui"; 15 17 import { Updates } from "./updates"; 16 18 17 19 export function Settings() { ··· 47 49 } 48 50 }; 49 51 50 - const telemetry = useAppSelector(selectTelemetry); 51 - 52 - const handleTelemetryToggle = (checked: boolean) => { 53 - dispatch(telemetryOpt(checked)); 54 - }; 55 - 56 52 return ( 57 53 <Container alignItems="center" justifyContent="center"> 58 54 <View ··· 88 84 </Text> 89 85 </View> 90 86 <Switch 91 - size="small" 92 - checked={overrideEnabled} 93 - onCheckedChange={handleToggleOverride} 94 - > 95 - <Switch.Thumb animation="bouncy" /> 96 - </Switch> 87 + value={overrideEnabled} 88 + onValueChange={handleToggleOverride} 89 + /> 97 90 </View> 98 91 </XStack> 99 92 ··· 126 119 </XStack> 127 120 </View> 128 121 129 - <View alignItems="center" justifyContent="center" gap="$4"> 130 - <XStack 131 - alignItems="center" 132 - justifyContent="space-between" 133 - width="100%" 134 - > 135 - <View flex={1} pr="$3"> 136 - <H3 fontSize="$7">Player Telemetry</H3> 137 - <Text fontSize="$5" color="$gray10"> 138 - Optional 139 - </Text> 140 - </View> 141 - <Switch 142 - size="$3" 143 - checked={telemetry === true} 144 - onCheckedChange={handleTelemetryToggle} 145 - theme="purple" 146 - > 147 - <Switch.Thumb animation="bouncy" /> 148 - </Switch> 149 - </XStack> 150 - </View> 151 - 152 122 {loggedIn && ( 153 - <AQLink 154 - to={{ 155 - screen: "KeyManagement", 156 - }} 157 - > 158 - <View 159 - flexDirection="row" 160 - gap="$2" 161 - alignItems="center" 162 - justifyContent="center" 163 - borderWidth={1} 164 - borderColor="$color.gray3Dark" 165 - padding="$2" 166 - borderRadius="$4" 167 - backgroundColor="$color.gray1Dark" 123 + <> 124 + <DebugRecording /> 125 + <AQLink 126 + to={{ 127 + screen: "KeyManagement", 128 + }} 168 129 > 169 - <H5>Manage Keys</H5> 170 - <ArrowRight size="$1" /> 171 - </View> 172 - </AQLink> 130 + <View 131 + flexDirection="row" 132 + gap="$2" 133 + alignItems="center" 134 + justifyContent="center" 135 + borderWidth={1} 136 + borderColor="$color.gray3Dark" 137 + padding="$2" 138 + borderRadius="$4" 139 + backgroundColor="$color.gray1Dark" 140 + > 141 + <H5>Manage Keys</H5> 142 + <ArrowRight size="$1" /> 143 + </View> 144 + </AQLink> 145 + </> 173 146 )} 174 147 </View> 175 148 </Container> 176 149 ); 177 150 } 151 + 152 + const DebugRecording = () => { 153 + const dispatch = useAppDispatch(); 154 + const isReady = useAppSelector(selectIsReady); 155 + const serverSettings = useAppSelector(selectServerSettings) || {}; 156 + const { url } = useStreamplaceNode(); 157 + 158 + useEffect(() => { 159 + if (isReady) { 160 + dispatch(getServerSettingsFromPDS()); 161 + } 162 + }, [isReady]); 163 + 164 + const u = new URL(url); 165 + return ( 166 + <View alignItems="center" justifyContent="center" gap="$4"> 167 + <XStack alignItems="center" justifyContent="space-between" width="100%"> 168 + <View flex={1} pr="$3"> 169 + <H3 fontSize="$8"> 170 + Allow {u.host} to record your livestream for debugging and improving 171 + the service 172 + </H3> 173 + <Text fontSize="$5" color="$gray10"> 174 + Optional 175 + </Text> 176 + </View> 177 + <Switch 178 + value={serverSettings?.debugRecording === true} 179 + onValueChange={(value) => { 180 + if (value === true) { 181 + dispatch( 182 + createServerSettingsRecord({ 183 + ...serverSettings, 184 + debugRecording: true, 185 + }), 186 + ); 187 + } else { 188 + dispatch( 189 + createServerSettingsRecord({ 190 + ...serverSettings, 191 + debugRecording: false, 192 + }), 193 + ); 194 + } 195 + }} 196 + ></Switch> 197 + </XStack> 198 + </View> 199 + ); 200 + };
+121
js/app/features/bluesky/blueskySlice.tsx
··· 22 22 PlaceStreamChatProfile, 23 23 PlaceStreamKey, 24 24 PlaceStreamLivestream, 25 + PlaceStreamServerSettings, 25 26 StreamplaceAgent, 26 27 } from "streamplace"; 27 28 import { isWeb } from "tamagui"; ··· 62 63 records: null, 63 64 }, 64 65 newLivestream: null, 66 + serverSettings: null, 65 67 }; 66 68 67 69 const uploadThumbnail = async ( ··· 1282 1284 }, 1283 1285 }, 1284 1286 ), 1287 + 1288 + getServerSettingsFromPDS: create.asyncThunk( 1289 + async (_, thunkAPI) => { 1290 + const { bluesky, streamplace } = thunkAPI.getState() as { 1291 + bluesky: BlueskyState; 1292 + streamplace: StreamplaceState; 1293 + }; 1294 + const did = bluesky.oauthSession?.did; 1295 + if (!did) { 1296 + throw new Error("No DID"); 1297 + } 1298 + const profile = bluesky.profiles[did]; 1299 + if (!profile) { 1300 + throw new Error("No profile"); 1301 + } 1302 + if (!bluesky.pdsAgent) { 1303 + throw new Error("No agent"); 1304 + } 1305 + const u = new URL(streamplace.url); 1306 + const res = await bluesky.pdsAgent.com.atproto.repo.getRecord({ 1307 + repo: did, 1308 + collection: "place.stream.server.settings", 1309 + rkey: u.host, 1310 + }); 1311 + if (!res.success) { 1312 + throw new Error("Failed to get chat profile record"); 1313 + } 1314 + 1315 + if (PlaceStreamServerSettings.isRecord(res.data.value)) { 1316 + return res.data.value; 1317 + } else { 1318 + console.log("not a record", res.data.value); 1319 + } 1320 + return null; 1321 + }, 1322 + { 1323 + pending: (state) => { 1324 + return { 1325 + ...state, 1326 + }; 1327 + }, 1328 + fulfilled: (state, action) => { 1329 + if (!action.payload) { 1330 + return state; 1331 + } 1332 + return { 1333 + ...state, 1334 + serverSettings: action.payload, 1335 + }; 1336 + }, 1337 + rejected: (state, action) => { 1338 + console.error("getServerSettingsFromPDS rejected", action.error); 1339 + return { 1340 + ...state, 1341 + }; 1342 + }, 1343 + }, 1344 + ), 1345 + 1346 + createServerSettingsRecord: create.asyncThunk( 1347 + async ({ debugRecording }: { debugRecording: boolean }, thunkAPI) => { 1348 + const { bluesky, streamplace } = thunkAPI.getState() as { 1349 + bluesky: BlueskyState; 1350 + streamplace: StreamplaceState; 1351 + }; 1352 + if (!bluesky.pdsAgent) { 1353 + throw new Error("No agent"); 1354 + } 1355 + const did = bluesky.oauthSession?.did; 1356 + if (!did) { 1357 + throw new Error("No DID"); 1358 + } 1359 + const profile = bluesky.profiles[did]; 1360 + if (!profile) { 1361 + throw new Error("No profile"); 1362 + } 1363 + if (!did) { 1364 + throw new Error("No DID"); 1365 + } 1366 + const u = new URL(streamplace.url); 1367 + const serverSettings: PlaceStreamServerSettings.Record = { 1368 + debugRecording: debugRecording, 1369 + }; 1370 + 1371 + const res = await bluesky.pdsAgent.com.atproto.repo.putRecord({ 1372 + repo: did, 1373 + collection: "place.stream.server.settings", 1374 + record: serverSettings, 1375 + rkey: u.host, 1376 + }); 1377 + if (!res.success) { 1378 + throw new Error("Failed to create server settings record"); 1379 + } 1380 + return serverSettings; 1381 + }, 1382 + { 1383 + pending: (state) => { 1384 + return { 1385 + ...state, 1386 + }; 1387 + }, 1388 + fulfilled: (state, action) => { 1389 + return { 1390 + ...state, 1391 + serverSettings: action.payload, 1392 + }; 1393 + }, 1394 + rejected: (state, action) => { 1395 + console.error("createServerSettingsRecord rejected", action.error); 1396 + return { 1397 + ...state, 1398 + }; 1399 + }, 1400 + }, 1401 + ), 1285 1402 }), 1286 1403 1287 1404 // You can define your selectors here. These selectors receive the slice ··· 1293 1410 selectProfiles: (bluesky) => bluesky.profiles, 1294 1411 selectStoredKey: (bluesky) => bluesky.storedKey, 1295 1412 selectKeyRecords: (bluesky) => bluesky.streamKeysResponse, 1413 + selectServerSettings: (bluesky) => bluesky.serverSettings, 1296 1414 selectUserProfile: (bluesky) => { 1297 1415 const did = bluesky.oauthSession?.did; 1298 1416 if (!did) return null; ··· 1342 1460 createBlockRecord, 1343 1461 followUser, 1344 1462 unfollowUser, 1463 + getServerSettingsFromPDS, 1464 + createServerSettingsRecord, 1345 1465 } = blueskySlice.actions; 1346 1466 1347 1467 // Selectors returned by `slice.selectors` take the root state as their first argument. ··· 1357 1477 selectNewLivestream, 1358 1478 selectChatProfile, 1359 1479 selectCachedProfiles, 1480 + selectServerSettings, 1360 1481 } = blueskySlice.selectors;
+2
js/app/features/bluesky/blueskyTypes.tsx
··· 5 5 import { 6 6 PlaceStreamChatProfile, 7 7 PlaceStreamLivestream, 8 + PlaceStreamServerSettings, 8 9 StreamplaceAgent, 9 10 } from "streamplace"; 10 11 import { StreamplaceOAuthClient } from "./oauthClient"; ··· 48 49 error: null | string; 49 50 profile: null | PlaceStreamChatProfile.Record; 50 51 }; 52 + serverSettings: null | PlaceStreamServerSettings.Record; 51 53 }
+7 -34
js/app/features/streamplace/streamplaceSlice.tsx
··· 48 48 firstRequest: boolean; 49 49 }; 50 50 mySegments: PlaceStreamSegment.SegmentView[]; 51 - telemetry: boolean | null; 52 51 userMuted: boolean | null; 53 52 chatWarned: boolean; 54 53 } ··· 64 63 firstRequest: true, 65 64 }, 66 65 mySegments: [], 67 - telemetry: null, 68 66 userMuted: null, 69 67 chatWarned: false, 70 68 }; 71 69 72 70 const USER_MUTED_KEY = "streamplaceUserMuted"; 73 - const TELEMETRY_KEY = "streamplaceTelemetry"; 74 71 const URL_KEY = "streamplaceUrl"; 75 72 const CHAT_WARNING_KEY = "streamplaceChatWarning2"; 76 73 ··· 80 77 reducers: (create) => ({ 81 78 initialize: create.asyncThunk( 82 79 async (_, { getState }) => { 83 - let [url, telemetryStr, userMutedStr, chatWarningStr] = 84 - await Promise.all([ 85 - Storage.getItem(URL_KEY), 86 - Storage.getItem(TELEMETRY_KEY), 87 - Storage.getItem(USER_MUTED_KEY), 88 - Storage.getItem(CHAT_WARNING_KEY), 89 - ]); 80 + let [url, userMutedStr, chatWarningStr] = await Promise.all([ 81 + Storage.getItem(URL_KEY), 82 + Storage.getItem(USER_MUTED_KEY), 83 + Storage.getItem(CHAT_WARNING_KEY), 84 + ]); 90 85 if (!url) { 91 86 url = DEFAULT_URL; 92 - } 93 - let telemetry: boolean | null = null; 94 - if (typeof telemetryStr === "string") { 95 - telemetry = JSON.parse(telemetryStr); 96 - } else { 97 - telemetry = null; 98 87 } 99 88 let userMuted: boolean | null = null; 100 89 console.log("userMutedStr", userMutedStr); ··· 107 96 if (typeof chatWarningStr === "string") { 108 97 chatWarned = chatWarningStr === "true"; 109 98 } 110 - return { url, telemetry, userMuted, chatWarned }; 99 + return { url, userMuted, chatWarned }; 111 100 }, 112 101 { 113 102 pending: (state) => { 114 103 // state.status = "loading"; 115 104 }, 116 105 fulfilled: (state, action) => { 117 - const { url, telemetry, userMuted, chatWarned } = action.payload; 106 + const { url, userMuted, chatWarned } = action.payload; 118 107 return { 119 108 ...state, 120 109 url, 121 - telemetry, 122 110 userMuted, 123 111 initialized: true, 124 112 chatWarned, ··· 138 126 return { 139 127 ...state, 140 128 url: action.payload, 141 - }; 142 - }), 143 - 144 - telemetryOpt: create.reducer((state, action: { payload: boolean }) => { 145 - Storage.setItem(TELEMETRY_KEY, JSON.stringify(action.payload)).catch( 146 - (err) => { 147 - console.error("telemetryOpt error", err); 148 - }, 149 - ); 150 - return { 151 - ...state, 152 - telemetry: action.payload, 153 129 }; 154 130 }), 155 131 ··· 248 224 selectUrl: (streamplace) => streamplace.url, 249 225 selectRecentSegments: (streamplace) => streamplace.recentSegments, 250 226 selectMySegments: (streamplace) => streamplace.mySegments, 251 - selectTelemetry: (streamplace) => streamplace.telemetry, 252 227 selectUserMuted: (streamplace) => streamplace.userMuted, 253 228 selectChatWarned: (streamplace) => streamplace.chatWarned, 254 229 }, ··· 260 235 setURL, 261 236 initialize, 262 237 pollMySegments, 263 - telemetryOpt, 264 238 userMute, 265 239 chatWarn, 266 240 } = streamplaceSlice.actions; 267 241 export const { 268 242 selectStreamplace, 269 243 selectMySegments, 270 - selectTelemetry, 271 244 selectUserMuted, 272 245 selectChatWarned, 273 246 selectUrl,
+27 -3
js/app/src/router.tsx
··· 12 12 NavigatorScreenParams, 13 13 useLinkTo, 14 14 useNavigation, 15 + useRoute, 15 16 } from "@react-navigation/native"; 16 17 import { createNativeStackNavigator } from "@react-navigation/native-stack"; 17 18 import { ··· 49 50 import { useLiveUser } from "hooks/useLiveUser"; 50 51 import usePlatform from "hooks/usePlatform"; 51 52 import { useSidebarControl } from "hooks/useSidebarControl"; 52 - import { ReactElement, useEffect, useState } from "react"; 53 + import { Fragment, ReactElement, useEffect, useState } from "react"; 53 54 import { 54 55 ImageBackground, 55 56 ImageSourcePropType, ··· 332 333 const userIsLive = useLiveUser(); 333 334 const toast = useToastController(); 334 335 336 + const [isLiveDashboard, setIsLiveDashboard] = useState(true); 335 337 useEffect(() => { 336 - if (userIsLive && !poppedUp) { 338 + if (!isLiveDashboard && userIsLive && !poppedUp) { 337 339 setPoppedUp(true); 338 340 setLivePopup(true); 339 341 } ··· 358 360 : undefined, 359 361 }, 360 362 // rest 361 - headerLeft: () => <NavigationButton />, 363 + headerLeft: () => ( 364 + <> 365 + {/* this is a hack to give the popup the navigator context */} 366 + <PopupChecker setIsLiveDashboard={setIsLiveDashboard} /> 367 + <NavigationButton /> 368 + </> 369 + ), 362 370 headerRight: () => <AvatarButton />, 363 371 drawerActiveTintColor: theme.accentColor.val, 364 372 unmountOnBlur: true, ··· 547 555 </> 548 556 ); 549 557 } 558 + 559 + export const PopupChecker = ({ 560 + setIsLiveDashboard, 561 + }: { 562 + setIsLiveDashboard: (isLiveDashboard: boolean) => void; 563 + }) => { 564 + const route = useRoute(); 565 + useEffect(() => { 566 + if (route.name === "LiveDashboard") { 567 + setIsLiveDashboard(true); 568 + } else { 569 + setIsLiveDashboard(false); 570 + } 571 + }, [route.name]); 572 + return <Fragment />; 573 + }; 550 574 551 575 const MainTab = () => { 552 576 const theme = useTheme();
+66 -6
js/app/src/screens/live-dashboard.tsx
··· 7 7 import Waiting from "components/live-dashboard/waiting"; 8 8 import Loading from "components/loading/loading"; 9 9 import { Player } from "components/player/player"; 10 + import Popup from "components/popup"; 10 11 import ButtonSelector from "components/ui/button-selector"; 11 12 import { VideoElementProvider } from "contexts/VideoElementContext"; 12 13 import { 14 + createServerSettingsRecord, 15 + getServerSettingsFromPDS, 13 16 selectIsReady, 17 + selectServerSettings, 14 18 selectUserProfile, 15 19 } from "features/bluesky/blueskySlice"; 16 - import { selectTelemetry } from "features/streamplace/streamplaceSlice"; 17 20 import { useLiveUser } from "hooks/useLiveUser"; 18 - import React, { useCallback, useState } from "react"; 19 - import { useAppSelector } from "store/hooks"; 20 - import { Button, H6, isWeb, Text, View } from "tamagui"; 21 + import React, { useCallback, useEffect, useState } from "react"; 22 + import { useAppDispatch, useAppSelector } from "store/hooks"; 23 + import { Button, H3, H6, isWeb, Text, View } from "tamagui"; 21 24 22 25 enum StreamSource { 23 26 Start, ··· 29 32 const isReady = useAppSelector(selectIsReady); 30 33 const userProfile = useAppSelector(selectUserProfile); 31 34 const [streamSource, setStreamSource] = useState(StreamSource.Start); 35 + const serverSettings = useAppSelector(selectServerSettings); 32 36 const isLive = useLiveUser(); 33 - const telemetry = useAppSelector(selectTelemetry); 34 37 const [videoElement, setVideoElement] = useState<HTMLVideoElement | null>( 35 38 null, 36 39 ); 40 + const [gotSettings, setGotSettings] = useState(false); 41 + const dispatch = useAppDispatch(); 42 + useEffect(() => { 43 + if (isReady) { 44 + (async () => { 45 + await dispatch(getServerSettingsFromPDS()); 46 + setGotSettings(true); 47 + })(); 48 + } 49 + }, [isReady]); 50 + 51 + let madeChoiceAboutDebugRecording = true; 52 + if (gotSettings && serverSettings?.debugRecording === undefined) { 53 + madeChoiceAboutDebugRecording = false; 54 + } 37 55 38 56 const [page, setPage] = useState<"update" | "create">("create"); 39 57 ··· 57 75 if (isLive && streamSource !== StreamSource.Camera) { 58 76 topPane = ( 59 77 <Player 60 - telemetry={telemetry === true} 61 78 src={userProfile.did} 62 79 name={userProfile.handle} 63 80 videoRef={videoRef} ··· 93 110 </Button> 94 111 ); 95 112 } 113 + 96 114 return ( 97 115 <LivestreamProvider src={userProfile.did}> 98 116 <VideoElementProvider videoElement={videoElement}> ··· 116 134 {page === "update" && isLive ? <UpdateLivestream /> : null} 117 135 {page === "create" ? <CreateLivestream /> : null} 118 136 </View> 137 + {madeChoiceAboutDebugRecording ? null : <DebugRecordingPopup />} 119 138 </View> 120 139 </VideoElementProvider> 121 140 </LivestreamProvider> ··· 134 153 to: StreamSource.StreamKey, 135 154 }, 136 155 ]; 156 + 157 + export function DebugRecordingPopup() { 158 + const dispatch = useAppDispatch(); 159 + const serverSettings = useAppSelector(selectServerSettings) || {}; 160 + const opt = (choice) => () => 161 + dispatch( 162 + createServerSettingsRecord({ 163 + ...serverSettings, 164 + debugRecording: choice, 165 + }), 166 + ); 167 + return ( 168 + <Popup 169 + onClose={opt(false)} 170 + containerProps={{ 171 + bottom: "$8", 172 + zIndex: 1000, 173 + }} 174 + bubbleProps={{ 175 + backgroundColor: "$accentBackground", 176 + gap: "$3", 177 + maxWidth: 400, 178 + }} 179 + > 180 + <H3 textAlign="center">Debug Recording</H3> 181 + <Text> 182 + Streamplace is beta software and it helps us to archive livestreams so 183 + we can later use them for debugging. Would you like to opt in to debug 184 + recording? 185 + </Text> 186 + <View flexDirection="row" gap="$2" f={1}> 187 + <Button f={3} backgroundColor="$accentColor" onPress={opt(true)}> 188 + Allow 189 + </Button> 190 + <Button f={3} onPress={opt(false)}> 191 + Don't Allow 192 + </Button> 193 + </View> 194 + </Popup> 195 + ); 196 + } 137 197 138 198 export function StreamSourcePicker({ 139 199 onPick,
+52
js/docs/src/content/docs/lex-reference/server/place-stream-server-settings.md
··· 1 + --- 2 + title: place.stream.server.settings 3 + description: Reference for the place.stream.server.settings lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `record` 15 + 16 + Record containing user settings for a particular Streamplace node 17 + 18 + **Record Key:** `any` 19 + 20 + **Record Properties:** 21 + 22 + | Name | Type | Req'd | Description | Constraints | 23 + | ---------------- | --------- | ----- | ----------------------------------------------------------------------- | ----------- | 24 + | `debugRecording` | `boolean` | ❌ | Whether this node may archive your livestream for improving the service | | 25 + 26 + --- 27 + 28 + ## Lexicon Source 29 + 30 + ```json 31 + { 32 + "lexicon": 1, 33 + "id": "place.stream.server.settings", 34 + "defs": { 35 + "main": { 36 + "type": "record", 37 + "description": "Record containing user settings for a particular Streamplace node", 38 + "key": "any", 39 + "record": { 40 + "type": "object", 41 + "required": [], 42 + "properties": { 43 + "debugRecording": { 44 + "type": "boolean", 45 + "description": "Whether this node may archive your livestream for improving the service" 46 + } 47 + } 48 + } 49 + } 50 + } 51 + } 52 + ```
+21
lexicons/place/stream/server/settings.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.server.settings", 4 + "defs": { 5 + "main": { 6 + "type": "record", 7 + "description": "Record containing user settings for a particular Streamplace node", 8 + "key": "any", 9 + "record": { 10 + "type": "object", 11 + "required": [], 12 + "properties": { 13 + "debugRecording": { 14 + "type": "boolean", 15 + "description": "Whether this node may archive your livestream for improving the service" 16 + } 17 + } 18 + } 19 + } 20 + } 21 + }
+30
pkg/api/api_internal.go
··· 20 20 "time" 21 21 22 22 "github.com/julienschmidt/httprouter" 23 + "github.com/pion/webrtc/v4" 23 24 "github.com/prometheus/client_golang/prometheus/promhttp" 24 25 sloghttp "github.com/samber/slog-http" 25 26 "golang.org/x/sync/errgroup" ··· 31 32 "stream.place/streamplace/pkg/model" 32 33 notificationpkg "stream.place/streamplace/pkg/notifications" 33 34 "stream.place/streamplace/pkg/renditions" 35 + "stream.place/streamplace/pkg/rtcrec" 34 36 v0 "stream.place/streamplace/pkg/schema/v0" 35 37 ) 36 38 ··· 599 601 return 600 602 } 601 603 if _, err := w.Write(bs); err != nil { 604 + log.Error(ctx, "error writing response", "error", err) 605 + } 606 + }) 607 + 608 + router.POST("/replay/:streamKey", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 609 + key := p.ByName("streamKey") 610 + if key == "" { 611 + errors.WriteHTTPBadRequest(w, "streamKey required", nil) 612 + return 613 + } 614 + mediaSigner, err := a.MakeMediaSigner(ctx, key) 615 + if err != nil { 616 + errors.WriteHTTPUnauthorized(w, "invalid authorization key", err) 617 + return 618 + } 619 + pc, err := rtcrec.NewReplayPeerConnection(ctx, r.Body) 620 + if err != nil { 621 + errors.WriteHTTPInternalServerError(w, "unable to create replay peer connection", err) 622 + return 623 + } 624 + answer, err := a.MediaManager.WebRTCIngest(ctx, &webrtc.SessionDescription{SDP: "placeholder"}, mediaSigner, pc) 625 + if err != nil { 626 + errors.WriteHTTPInternalServerError(w, "unable to ingest web rtc", err) 627 + return 628 + } 629 + w.WriteHeader(200) 630 + if _, err := w.Write([]byte(answer.SDP)); err != nil { 631 + errors.WriteHTTPInternalServerError(w, "unable to write response", err) 602 632 log.Error(ctx, "error writing response", "error", err) 603 633 } 604 634 })
+6 -1
pkg/api/playback.go
··· 219 219 return 220 220 } 221 221 offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)} 222 - answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner) 222 + pc, err := a.MediaManager.NewPeerConnection(ctx, mediaSigner.Streamer()) 223 + if err != nil { 224 + errors.WriteHTTPInternalServerError(w, "unable to create peer connection", err) 225 + return 226 + } 227 + answer, err := a.MediaManager.WebRTCIngest(ctx, &offer, mediaSigner, pc) 223 228 if err != nil { 224 229 errors.WriteHTTPInternalServerError(w, "error playing back", err) 225 230 return
+1
pkg/atproto/firehose.go
··· 148 148 constants.APP_BSKY_GRAPH_FOLLOW, 149 149 constants.APP_BSKY_FEED_POST, 150 150 constants.APP_BSKY_GRAPH_BLOCK, 151 + constants.PLACE_STREAM_SERVER_SETTINGS, 151 152 } 152 153 153 154 func (atsync *ATProtoSynchronizer) handleCommitEventOps(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) {
+15
pkg/atproto/sync.go
··· 163 163 log.Error(ctx, "failed to create chat profile", "err", err) 164 164 } 165 165 166 + case *streamplace.ServerSettings: 167 + _, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model) 168 + if err != nil { 169 + return fmt.Errorf("failed to sync bluesky repo: %w", err) 170 + } 171 + settings := &model.ServerSettings{ 172 + Server: rkey.String(), 173 + RepoDID: userDID, 174 + Record: recCBOR, 175 + } 176 + err = atsync.Model.UpdateServerSettings(ctx, settings) 177 + if err != nil { 178 + log.Error(ctx, "failed to create server settings", "err", err) 179 + } 180 + 166 181 case *bsky.FeedPost: 167 182 // jsonData, err := json.Marshal(d) 168 183 // if err != nil {
+9 -8
pkg/constants/constants.go
··· 1 1 package constants 2 2 3 - var PLACE_STREAM_KEY = "place.stream.key" //nolint:all 4 - var PLACE_STREAM_LIVESTREAM = "place.stream.livestream" //nolint:all 5 - var PLACE_STREAM_CHAT_MESSAGE = "place.stream.chat.message" //nolint:all 6 - var PLACE_STREAM_CHAT_PROFILE = "place.stream.chat.profile" //nolint:all 7 - var STREAMPLACE_SIGNING_KEY = "signingKey" //nolint:all 8 - var APP_BSKY_GRAPH_FOLLOW = "app.bsky.graph.follow" //nolint:all 9 - var APP_BSKY_FEED_POST = "app.bsky.feed.post" //nolint:all 10 - var APP_BSKY_GRAPH_BLOCK = "app.bsky.graph.block" //nolint:all 3 + var PLACE_STREAM_KEY = "place.stream.key" //nolint:all 4 + var PLACE_STREAM_LIVESTREAM = "place.stream.livestream" //nolint:all 5 + var PLACE_STREAM_CHAT_MESSAGE = "place.stream.chat.message" //nolint:all 6 + var PLACE_STREAM_CHAT_PROFILE = "place.stream.chat.profile" //nolint:all 7 + var PLACE_STREAM_SERVER_SETTINGS = "place.stream.server.settings" //nolint:all 8 + var STREAMPLACE_SIGNING_KEY = "signingKey" //nolint:all 9 + var APP_BSKY_GRAPH_FOLLOW = "app.bsky.graph.follow" //nolint:all 10 + var APP_BSKY_FEED_POST = "app.bsky.feed.post" //nolint:all 11 + var APP_BSKY_GRAPH_BLOCK = "app.bsky.graph.block" //nolint:all 11 12 12 13 const DID_KEY_PREFIX = "did:key" //nolint:all 13 14 const ADDRESS_KEY_PREFIX = "0x" //nolint:all
+1
pkg/gen/gen.go
··· 23 23 streamplace.ChatProfile{}, 24 24 streamplace.ChatProfile_Color{}, 25 25 streamplace.ChatMessage_ReplyRef{}, 26 + streamplace.ServerSettings{}, 26 27 ); err != nil { 27 28 panic(err) 28 29 }
+33
pkg/media/peer_connection.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + 6 + "stream.place/streamplace/pkg/log" 7 + "stream.place/streamplace/pkg/rtcrec" 8 + ) 9 + 10 + func (mm *MediaManager) NewPeerConnection(ctx context.Context, user string) (rtcrec.PeerConnection, error) { 11 + shouldRecord := false 12 + settings, err := mm.model.GetServerSettings(ctx, mm.cli.PublicHost, user) 13 + if err != nil { 14 + return nil, err 15 + } 16 + if settings != nil { 17 + spsettings, err := settings.ToStreamplaceServerSettings() 18 + if err != nil { 19 + return nil, err 20 + } 21 + if spsettings.DebugRecording != nil { 22 + shouldRecord = *spsettings.DebugRecording 23 + } 24 + } 25 + if !shouldRecord { 26 + log.Warn(ctx, "no server settings found, will not record") 27 + } 28 + pionpc, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 29 + if err != nil { 30 + return nil, err 31 + } 32 + return rtcrec.NewRecordingPeerConnection(ctx, *mm.cli, user, pionpc, shouldRecord) 33 + }
+5 -10
pkg/media/webrtc_ingest.go
··· 12 12 "github.com/pion/rtcp" 13 13 "github.com/pion/webrtc/v4" 14 14 "stream.place/streamplace/pkg/log" 15 + "stream.place/streamplace/pkg/rtcrec" 15 16 ) 16 17 17 18 // This function remains in scope for the duration of a single users' playback 18 - func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner) (*webrtc.SessionDescription, error) { 19 + func (mm *MediaManager) WebRTCIngest(ctx context.Context, offer *webrtc.SessionDescription, signer MediaSigner, peerConnection rtcrec.PeerConnection) (*webrtc.SessionDescription, error) { 19 20 uu, err := uuid.NewV7() 20 21 if err != nil { 21 22 return nil, err 22 23 } 23 24 24 - ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest") 25 - 26 - // Create a new RTCPeerConnection 27 - peerConnection, err := mm.webrtcAPI.NewPeerConnection(mm.webrtcConfig) 28 - if err != nil { 29 - return nil, fmt.Errorf("failed to create WebRTC peer connection: %w", err) 30 - } 25 + ctx = log.WithLogValues(ctx, "webrtcID", uu.String(), "mediafunc", "WebRTCIngest", "streamer", signer.Streamer()) 31 26 32 27 // Allow us to receive 1 audio track, and 1 video track 33 28 if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil { ··· 116 111 } 117 112 118 113 // Create channel that is blocked until ICE Gathering is complete 119 - gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 114 + gatherComplete := rtcrec.GatheringCompletePromise(peerConnection) 120 115 121 116 go func() { 122 117 ticker := time.NewTicker(time.Second * 1) ··· 186 181 audioFirst := false 187 182 188 183 log.Warn(ctx, "setting OnTrack") 189 - peerConnection.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { 184 + peerConnection.OnTrack(func(track rtcrec.TrackRemote, _ rtcrec.RTPReceiver) { 190 185 log.Warn(ctx, "OnTrack") 191 186 if track.Kind() == webrtc.RTPCodecTypeVideo { 192 187 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
+5
pkg/model/model.go
··· 92 92 UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error 93 93 ListOAuthSessions() ([]oatproxy.OAuthSession, error) 94 94 GetSessionByDID(did string) (*oatproxy.OAuthSession, error) 95 + 96 + UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 97 + GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 98 + DeleteServerSettings(ctx context.Context, server string, repoDID string) error 95 99 } 96 100 97 101 func MakeDB(dbURL string) (Model, error) { ··· 151 155 ChatMessage{}, 152 156 ChatProfile{}, 153 157 oatproxy.OAuthSession{}, 158 + ServerSettings{}, 154 159 } { 155 160 err = db.AutoMigrate(model) 156 161 if err != nil {
+70
pkg/model/server_settings.go
··· 1 + package model 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "time" 8 + 9 + lexutil "github.com/bluesky-social/indigo/lex/util" 10 + "gorm.io/gorm" 11 + "stream.place/streamplace/pkg/streamplace" 12 + ) 13 + 14 + // ServerSettings represents a user's settings for a particular Streamplace node 15 + type ServerSettings struct { 16 + Server string `gorm:"primaryKey;column:server"` 17 + RepoDID string `gorm:"primaryKey;column:repo_did"` 18 + Record *[]byte `gorm:"column:record"` 19 + Created time.Time `gorm:"column:created;not null"` 20 + Updated time.Time `gorm:"column:updated;not null"` 21 + } 22 + 23 + // TableName specifies the table name for the ServerSettings model 24 + func (ServerSettings) TableName() string { 25 + return "server_settings" 26 + } 27 + 28 + // ToStreamplaceServerSettings converts the model to a streamplace ServerSettings 29 + func (m *ServerSettings) ToStreamplaceServerSettings() (*streamplace.ServerSettings, error) { 30 + if m.Record == nil { 31 + return nil, fmt.Errorf("no record data") 32 + } 33 + rec, err := lexutil.CborDecodeValue(*m.Record) 34 + if err != nil { 35 + return nil, fmt.Errorf("error decoding server settings: %w", err) 36 + } 37 + ss, ok := rec.(*streamplace.ServerSettings) 38 + if !ok { 39 + return nil, fmt.Errorf("invalid server settings") 40 + } 41 + return ss, nil 42 + } 43 + 44 + // UpdateServerSettings creates or updates a server settings record 45 + func (m *DBModel) UpdateServerSettings(ctx context.Context, settings *ServerSettings) error { 46 + now := time.Now() 47 + if settings.Created.IsZero() { 48 + settings.Created = now 49 + } 50 + settings.Updated = now 51 + return m.DB.Save(settings).Error 52 + } 53 + 54 + // GetServerSettings retrieves server settings for a given server and repoDID 55 + func (m *DBModel) GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) { 56 + var settings ServerSettings 57 + err := m.DB.Where("server = ? AND repo_did = ?", server, repoDID).First(&settings).Error 58 + if errors.Is(err, gorm.ErrRecordNotFound) { 59 + return nil, nil 60 + } 61 + if err != nil { 62 + return nil, err 63 + } 64 + return &settings, nil 65 + } 66 + 67 + // DeleteServerSettings deletes server settings for a given server and repoDID 68 + func (m *DBModel) DeleteServerSettings(ctx context.Context, server string, repoDID string) error { 69 + return m.DB.Where("server = ? AND repo_did = ?", server, repoDID).Delete(&ServerSettings{}).Error 70 + }
+221
pkg/rtcrec/cbor.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "fmt" 5 + "io" 6 + "sort" 7 + "time" 8 + 9 + "github.com/fxamacker/cbor/v2" 10 + "github.com/pion/webrtc/v4" 11 + ) 12 + 13 + type WebRTCEventDecoder struct { 14 + dec *cbor.Decoder 15 + } 16 + 17 + func Opts() (cbor.EncMode, error) { 18 + opts := cbor.CoreDetEncOptions() 19 + opts.Time = cbor.TimeRFC3339Nano 20 + em, err := opts.EncMode() 21 + if err != nil { 22 + return nil, fmt.Errorf("failed to create encoder mode: %w", err) 23 + } 24 + return em, nil 25 + } 26 + 27 + func MakeWebRTCDecoder(r io.Reader) (*WebRTCEventDecoder, error) { 28 + dec := cbor.NewDecoder(r) 29 + return &WebRTCEventDecoder{dec: dec}, nil 30 + } 31 + 32 + func (d *WebRTCEventDecoder) Next() (*WebRTCEvent, error) { 33 + var ev WebRTCEvent 34 + err := d.dec.Decode(&ev) 35 + if err != nil { 36 + return nil, err 37 + } 38 + return &ev, err 39 + } 40 + 41 + type WebRTCEventGroup struct { 42 + Events map[string][]*WebRTCEvent 43 + Tracks map[webrtc.SSRC]map[string][]*WebRTCEvent 44 + FirstTime time.Time 45 + } 46 + 47 + const ( 48 + EventTypeOffer = "Offer" 49 + EventTypeCreateAnswer = "CreateAnswer" 50 + EventTypeSetRemoteDescription = "SetRemoteDescription" 51 + EventTypeSetLocalDescription = "SetLocalDescription" 52 + EventTypeLocalDescription = "LocalDescription" 53 + EventTypeICEConnectionState = "ICEConnectionStateChange" 54 + EventTypeConnectionState = "ConnectionStateChange" 55 + EventTypeTrack = "Track" 56 + EventTypeAddTransceiverFromKind = "AddTransceiverFromKind" 57 + EventTypeICEGatheringState = "ICEGatheringState" 58 + EventTypeDataChannel = "DataChannel" 59 + EventTypeNegotiationNeeded = "NegotiationNeeded" 60 + EventTypeTrackRead = "TrackRead" 61 + EventTypeTrackCodec = "TrackCodec" 62 + EventTypeTrackKind = "TrackKind" 63 + EventTypeTrackPayloadType = "TrackPayloadType" 64 + EventTypeTrackSSRC = "TrackSSRC" 65 + EventTypeUnknown = "Unknown" 66 + ) 67 + 68 + // ReadAllEvents reads all WebRTC events from a CBOR reader and organizes them by type. 69 + // Returns a map where keys are event types and values are slices of events of that type. 70 + func ReadAllEvents(r io.Reader) (*WebRTCEventGroup, error) { 71 + dec, err := MakeWebRTCDecoder(r) 72 + if err != nil { 73 + return nil, err 74 + } 75 + 76 + eventList := []*WebRTCEvent{} 77 + events := make(map[string][]*WebRTCEvent) 78 + tracks := make(map[webrtc.SSRC]map[string][]*WebRTCEvent) 79 + var firstTime time.Time 80 + for { 81 + ev, err := dec.Next() 82 + if err == io.EOF { 83 + break 84 + } 85 + if err != nil { 86 + return nil, err 87 + } 88 + eventList = append(eventList, ev) 89 + } 90 + 91 + sort.Slice(eventList, func(i, j int) bool { 92 + return eventList[i].Time.Before(eventList[j].Time) 93 + }) 94 + 95 + for _, ev := range eventList { 96 + if firstTime.IsZero() { 97 + firstTime = ev.Time 98 + } 99 + 100 + // Determine the event type based on which field is non-nil 101 + var eventType string 102 + var trackSSRC *webrtc.SSRC 103 + switch { 104 + case ev.Offer != nil: 105 + eventType = EventTypeOffer 106 + case ev.CreateAnswer != nil: 107 + eventType = EventTypeCreateAnswer 108 + case ev.SetRemoteDescription != nil: 109 + eventType = EventTypeSetRemoteDescription 110 + case ev.SetLocalDescription != nil: 111 + eventType = EventTypeSetLocalDescription 112 + case ev.LocalDescription != nil: 113 + eventType = EventTypeLocalDescription 114 + case ev.ICEConnectionStateChange != nil: 115 + eventType = EventTypeICEConnectionState 116 + case ev.ConnectionStateChange != nil: 117 + eventType = EventTypeConnectionState 118 + case ev.Track != nil: 119 + eventType = EventTypeTrack 120 + case ev.AddTransceiverFromKind != nil: 121 + eventType = EventTypeAddTransceiverFromKind 122 + case ev.ICEGatheringState != nil: 123 + eventType = EventTypeICEGatheringState 124 + case ev.DataChannel != nil: 125 + eventType = EventTypeDataChannel 126 + case ev.NegotiationNeeded != nil: 127 + eventType = EventTypeNegotiationNeeded 128 + 129 + case ev.TrackRead != nil: 130 + trackSSRC = &ev.TrackRead.SSRC 131 + eventType = EventTypeTrackRead 132 + case ev.TrackCodec != nil: 133 + trackSSRC = &ev.TrackCodec.SSRC 134 + eventType = EventTypeTrackCodec 135 + case ev.TrackKind != nil: 136 + trackSSRC = &ev.TrackKind.SSRC 137 + eventType = EventTypeTrackKind 138 + case ev.TrackPayloadType != nil: 139 + trackSSRC = &ev.TrackPayloadType.SSRC 140 + eventType = EventTypeTrackPayloadType 141 + case ev.TrackSSRC != nil: 142 + trackSSRC = &ev.TrackSSRC.SSRC 143 + eventType = EventTypeTrackSSRC 144 + default: 145 + eventType = EventTypeUnknown 146 + panic(fmt.Sprintf("unknown event type: %+v", ev)) 147 + } 148 + 149 + if trackSSRC != nil { 150 + if tracks[*trackSSRC] == nil { 151 + tracks[*trackSSRC] = make(map[string][]*WebRTCEvent) 152 + } 153 + if tracks[*trackSSRC][eventType] == nil { 154 + tracks[*trackSSRC][eventType] = []*WebRTCEvent{} 155 + } 156 + tracks[*trackSSRC][eventType] = append(tracks[*trackSSRC][eventType], ev) 157 + } 158 + 159 + if events[eventType] == nil { 160 + events[eventType] = []*WebRTCEvent{} 161 + } 162 + 163 + events[eventType] = append(events[eventType], ev) 164 + } 165 + 166 + return &WebRTCEventGroup{ 167 + Events: events, 168 + Tracks: tracks, 169 + FirstTime: firstTime, 170 + }, nil 171 + } 172 + 173 + func (g *WebRTCEventGroup) Peek(eventType string) *WebRTCEvent { 174 + if g.Events[eventType] == nil { 175 + panic(fmt.Sprintf("no events of type %s", eventType)) 176 + } 177 + if len(g.Events[eventType]) == 0 { 178 + return nil 179 + } 180 + return g.Events[eventType][0] 181 + } 182 + 183 + func (g *WebRTCEventGroup) PeekTrack(ssrc webrtc.SSRC, eventType string) *WebRTCEvent { 184 + if g.Tracks[ssrc] == nil { 185 + panic(fmt.Sprintf("no tracks for ssrc %d", ssrc)) 186 + } 187 + if g.Tracks[ssrc][eventType] == nil { 188 + panic(fmt.Sprintf("no events of type %s for ssrc %d", eventType, ssrc)) 189 + } 190 + if len(g.Tracks[ssrc][eventType]) == 0 { 191 + return nil 192 + } 193 + return g.Tracks[ssrc][eventType][0] 194 + } 195 + 196 + func (g *WebRTCEventGroup) Next(eventType string) *WebRTCEvent { 197 + if g.Events[eventType] == nil { 198 + panic(fmt.Sprintf("no events of type %s", eventType)) 199 + } 200 + if len(g.Events[eventType]) == 0 { 201 + return nil 202 + } 203 + ev := g.Events[eventType][0] 204 + g.Events[eventType] = g.Events[eventType][1:] 205 + return ev 206 + } 207 + 208 + func (g *WebRTCEventGroup) NextTrack(ssrc webrtc.SSRC, eventType string) *WebRTCEvent { 209 + if g.Tracks[ssrc] == nil { 210 + panic(fmt.Sprintf("no tracks for ssrc %d", ssrc)) 211 + } 212 + if g.Tracks[ssrc][eventType] == nil { 213 + panic(fmt.Sprintf("no events of type %s for ssrc %d", eventType, ssrc)) 214 + } 215 + if len(g.Tracks[ssrc][eventType]) == 0 { 216 + return nil 217 + } 218 + ev := g.Tracks[ssrc][eventType][0] 219 + g.Tracks[ssrc][eventType] = g.Tracks[ssrc][eventType][1:] 220 + return ev 221 + }
+77
pkg/rtcrec/cmd/rtcrec.go
··· 1 + package main 2 + 3 + import ( 4 + "encoding/json" 5 + "errors" 6 + "flag" 7 + "fmt" 8 + "io" 9 + "log" 10 + "os" 11 + 12 + "stream.place/streamplace/pkg/rtcrec" 13 + ) 14 + 15 + func main() { 16 + err := Start() 17 + if err != nil { 18 + log.Fatal(err) 19 + } 20 + } 21 + 22 + func Start() error { 23 + var path string 24 + flag.StringVar(&path, "path", "", "path to the file to decode") 25 + flag.Parse() 26 + if path == "" { 27 + return fmt.Errorf("path is required") 28 + } 29 + return DecodeFile(path) 30 + } 31 + 32 + func DecodeFile(path string) error { 33 + f, err := os.Open(path) 34 + if err != nil { 35 + return err 36 + } 37 + defer f.Close() 38 + dec, err := rtcrec.MakeWebRTCDecoder(f) 39 + if err != nil { 40 + return err 41 + } 42 + for { 43 + ev, err := dec.Next() 44 + if errors.Is(err, io.EOF) { 45 + return nil 46 + } 47 + if err != nil { 48 + return err 49 + } 50 + if ev.TrackRead != nil { 51 + // spitting out the data as base64 is pointless, replace with a label 52 + n := len(ev.TrackRead.Data) 53 + byteString := fmt.Sprintf("%d bytes", n) 54 + bs, err := json.Marshal(ev) 55 + if err != nil { 56 + return err 57 + } 58 + var m map[string]any 59 + err = json.Unmarshal(bs, &m) 60 + if err != nil { 61 + return err 62 + } 63 + m["trackRead"].(map[string]any)["data"] = byteString 64 + bs, err = json.Marshal(m) 65 + if err != nil { 66 + return err 67 + } 68 + fmt.Println(string(bs)) 69 + } else { 70 + bs, err := json.Marshal(ev) 71 + if err != nil { 72 + return err 73 + } 74 + fmt.Println(string(bs)) 75 + } 76 + } 77 + }
+236
pkg/rtcrec/recording_peerconnection.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "time" 8 + 9 + "github.com/pion/rtcp" 10 + "github.com/pion/webrtc/v4" 11 + "stream.place/streamplace/pkg/aqtime" 12 + "stream.place/streamplace/pkg/config" 13 + "stream.place/streamplace/pkg/log" 14 + ) 15 + 16 + type RecordingPeerConnection struct { 17 + enabled bool 18 + pionpc *webrtc.PeerConnection 19 + file *os.File 20 + stream *RecorderStream 21 + } 22 + 23 + func NewRecordingPeerConnection(ctx context.Context, cli config.CLI, user string, pionpc *webrtc.PeerConnection, enabled bool) (PeerConnection, error) { 24 + if !enabled { 25 + return &RecordingPeerConnection{ 26 + pionpc: pionpc, 27 + enabled: enabled, 28 + }, nil 29 + } 30 + aqt := aqtime.FromTime(time.Now()) 31 + f, err := cli.DataFileCreate([]string{user, "rtcrec", fmt.Sprintf("%s.cbor", aqt.FileSafeString())}, true) 32 + if err != nil { 33 + return nil, fmt.Errorf("failed to create data file: %w", err) 34 + } 35 + log.Log(ctx, "logging webrtc session to file", "file", f.Name()) 36 + stream, err := NewRecorderStream(f) 37 + if err != nil { 38 + return nil, fmt.Errorf("failed to create recorder stream: %w", err) 39 + } 40 + return &RecordingPeerConnection{ 41 + pionpc: pionpc, 42 + file: f, 43 + stream: stream, 44 + enabled: enabled, 45 + }, nil 46 + } 47 + 48 + func (pc *RecordingPeerConnection) Do(f func()) { 49 + if pc.enabled { 50 + go f() 51 + } 52 + } 53 + 54 + func (pc *RecordingPeerConnection) Close() error { 55 + return pc.pionpc.Close() 56 + } 57 + 58 + func (pc *RecordingPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) { 59 + now := time.Now() 60 + ret, err := pc.pionpc.CreateAnswer(options) 61 + if err != nil { 62 + return ret, err 63 + } 64 + pc.Do(func() { 65 + pc.stream.Event(WebRTCEvent{ 66 + CreateAnswer: &CreateAnswer{ 67 + SDPAnswer: ret.SDP, 68 + }, 69 + Time: now, 70 + }) 71 + }) 72 + return ret, nil 73 + } 74 + 75 + func (pc *RecordingPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error { 76 + now := time.Now() 77 + pc.Do(func() { 78 + pc.stream.Event(WebRTCEvent{ 79 + SetRemoteDescription: &SetRemoteDescription{ 80 + SDPRemoteDescription: desc.SDP, 81 + }, 82 + Time: now, 83 + }) 84 + }) 85 + return pc.pionpc.SetLocalDescription(desc) 86 + } 87 + 88 + func (pc *RecordingPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error { 89 + now := time.Now() 90 + pc.Do(func() { 91 + pc.stream.Event(WebRTCEvent{ 92 + SetRemoteDescription: &SetRemoteDescription{ 93 + SDPRemoteDescription: desc.SDP, 94 + }, 95 + Time: now, 96 + }) 97 + }) 98 + return pc.pionpc.SetRemoteDescription(desc) 99 + } 100 + 101 + func (pc *RecordingPeerConnection) LocalDescription() *webrtc.SessionDescription { 102 + now := time.Now() 103 + desc := pc.pionpc.LocalDescription() 104 + pc.Do(func() { 105 + pc.stream.Event(WebRTCEvent{ 106 + LocalDescription: &LocalDescription{ 107 + SDPLocalDescription: pc.pionpc.LocalDescription().SDP, 108 + }, 109 + Time: now, 110 + }) 111 + }) 112 + return desc 113 + } 114 + 115 + // func (pc *RecorderPeerConnection) RemoteDescription() *webrtc.SessionDescription { 116 + // return pc.pionpc.RemoteDescription() 117 + // } 118 + 119 + func (pc *RecordingPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) { 120 + pc.pionpc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { 121 + now := time.Now() 122 + pc.Do(func() { 123 + pc.stream.Event(WebRTCEvent{ 124 + ICEConnectionStateChange: &ICEConnectionStateChange{ 125 + ICEConnectionState: state, 126 + }, 127 + Time: now, 128 + }) 129 + }) 130 + f(state) 131 + }) 132 + } 133 + 134 + func (pc *RecordingPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) { 135 + pc.pionpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { 136 + now := time.Now() 137 + pc.Do(func() { 138 + pc.stream.Event(WebRTCEvent{ 139 + ConnectionStateChange: &ConnectionStateChange{ 140 + ConnectionState: state, 141 + }, 142 + Time: now, 143 + }) 144 + }) 145 + f(state) 146 + }) 147 + } 148 + 149 + func (pc *RecordingPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) { 150 + pc.pionpc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { 151 + now := time.Now() 152 + wrappedTrack := &RecordingTrackRemote{track: track, stream: pc.stream, pc: pc} 153 + id := track.ID() 154 + kind := track.Kind() 155 + ssrc := track.SSRC() 156 + payloadType := track.PayloadType() 157 + streamID := track.StreamID() 158 + msid := track.Msid() 159 + rid := track.RID() 160 + pc.Do(func() { 161 + pc.stream.Event(WebRTCEvent{ 162 + Track: &Track{ 163 + ID: id, 164 + Kind: kind, 165 + SSRC: ssrc, 166 + PayloadType: payloadType, 167 + StreamID: streamID, 168 + Msid: msid, 169 + RID: rid, 170 + }, 171 + Time: now, 172 + }) 173 + }) 174 + f(wrappedTrack, receiver) 175 + }) 176 + } 177 + 178 + func (pc *RecordingPeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 179 + return pc.pionpc.WriteRTCP(pkts) 180 + } 181 + 182 + func (pc *RecordingPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) { 183 + now := time.Now() 184 + ret, err := pc.pionpc.AddTransceiverFromKind(kind, init...) 185 + pc.Do(func() { 186 + pc.stream.Event(WebRTCEvent{ 187 + AddTransceiverFromKind: &AddTransceiverFromKind{ 188 + Kind: kind, 189 + }, 190 + Time: now, 191 + }) 192 + }) 193 + return ret, err 194 + } 195 + 196 + func (pc *RecordingPeerConnection) ICEGatheringState() webrtc.ICEGatheringState { 197 + now := time.Now() 198 + state := pc.pionpc.ICEGatheringState() 199 + pc.Do(func() { 200 + pc.stream.Event(WebRTCEvent{ 201 + ICEGatheringState: &ICEGatheringState{ 202 + State: state, 203 + }, 204 + Time: now, 205 + }) 206 + }) 207 + return state 208 + } 209 + 210 + func (pc *RecordingPeerConnection) OnDataChannel(f func(*webrtc.DataChannel)) { 211 + pc.pionpc.OnDataChannel(func(dc *webrtc.DataChannel) { 212 + now := time.Now() 213 + pc.Do(func() { 214 + pc.stream.Event(WebRTCEvent{ 215 + DataChannel: &DataChannel{ 216 + Label: dc.Label(), 217 + }, 218 + Time: now, 219 + }) 220 + }) 221 + f(dc) 222 + }) 223 + } 224 + 225 + func (pc *RecordingPeerConnection) OnNegotiationNeeded(f func()) { 226 + pc.pionpc.OnNegotiationNeeded(func() { 227 + now := time.Now() 228 + pc.Do(func() { 229 + pc.stream.Event(WebRTCEvent{ 230 + NegotiationNeeded: &NegotiationNeeded{}, 231 + Time: now, 232 + }) 233 + }) 234 + f() 235 + }) 236 + }
+105
pkg/rtcrec/recording_track.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/pion/interceptor" 7 + "github.com/pion/webrtc/v4" 8 + ) 9 + 10 + type RecordingTrackRemote struct { 11 + track *webrtc.TrackRemote 12 + stream *RecorderStream 13 + pc *RecordingPeerConnection 14 + } 15 + 16 + func (t *RecordingTrackRemote) do(f func()) { 17 + go f() 18 + } 19 + 20 + func (t *RecordingTrackRemote) Read(p []byte) (n int, attrs interceptor.Attributes, err error) { 21 + n, attrs, err = t.track.Read(p) 22 + now := time.Now() 23 + b2 := make([]byte, n) 24 + copy(b2, p) 25 + t.pc.Do(func() { 26 + errString := "" 27 + if err != nil { 28 + errString = err.Error() 29 + } 30 + t.stream.Event(WebRTCEvent{ 31 + TrackRead: &TrackRead{ 32 + Data: b2, 33 + SSRC: t.track.SSRC(), 34 + Count: n, 35 + // Attrs: attrs, 36 + Err: errString, 37 + }, 38 + Time: now, 39 + }) 40 + }) 41 + return n, attrs, err 42 + } 43 + 44 + func (t *RecordingTrackRemote) Codec() webrtc.RTPCodecParameters { 45 + now := time.Now() 46 + codec := t.track.Codec() 47 + t.pc.Do(func() { 48 + t.stream.Event(WebRTCEvent{ 49 + TrackCodec: &TrackCodec{ 50 + SSRC: t.track.SSRC(), 51 + Codec: codec, 52 + }, 53 + Time: now, 54 + }) 55 + }) 56 + return codec 57 + } 58 + 59 + func (t *RecordingTrackRemote) ID() string { 60 + return t.track.ID() 61 + } 62 + 63 + func (t *RecordingTrackRemote) Kind() webrtc.RTPCodecType { 64 + now := time.Now() 65 + kind := t.track.Kind() 66 + t.pc.Do(func() { 67 + t.stream.Event(WebRTCEvent{ 68 + TrackKind: &TrackKind{ 69 + SSRC: t.track.SSRC(), 70 + Kind: kind, 71 + }, 72 + Time: now, 73 + }) 74 + }) 75 + return kind 76 + } 77 + 78 + func (t *RecordingTrackRemote) PayloadType() webrtc.PayloadType { 79 + now := time.Now() 80 + payloadType := t.track.PayloadType() 81 + t.pc.Do(func() { 82 + t.stream.Event(WebRTCEvent{ 83 + TrackPayloadType: &TrackPayloadType{ 84 + SSRC: t.track.SSRC(), 85 + PayloadType: payloadType, 86 + }, 87 + Time: now, 88 + }) 89 + }) 90 + return payloadType 91 + } 92 + 93 + func (t *RecordingTrackRemote) SSRC() webrtc.SSRC { 94 + now := time.Now() 95 + ssrc := t.track.SSRC() 96 + t.pc.Do(func() { 97 + t.stream.Event(WebRTCEvent{ 98 + Time: now, 99 + TrackSSRC: &TrackSSRC{ 100 + SSRC: ssrc, 101 + }, 102 + }) 103 + }) 104 + return ssrc 105 + }
+148
pkg/rtcrec/replay_peerconnection.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "time" 8 + 9 + "github.com/pion/rtcp" 10 + "github.com/pion/webrtc/v4" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + type ReplayPeerConnection struct { 15 + startTime time.Time 16 + group *WebRTCEventGroup 17 + ctx context.Context 18 + } 19 + 20 + func NewReplayPeerConnection(ctx context.Context, r io.Reader) (PeerConnection, error) { 21 + group, err := ReadAllEvents(r) 22 + if err != nil { 23 + return nil, fmt.Errorf("failed to create web rtc decoder: %w", err) 24 + } 25 + 26 + return &ReplayPeerConnection{ 27 + startTime: time.Now(), 28 + group: group, 29 + ctx: context.Background(), 30 + }, nil 31 + } 32 + 33 + func (pc *ReplayPeerConnection) wait(label string, t time.Time) <-chan time.Time { 34 + now := time.Now() 35 + historicalDiff := t.Sub(pc.group.FirstTime) 36 + currentDiff := time.Since(pc.startTime) 37 + diff := historicalDiff - currentDiff 38 + log.Debug(pc.ctx, "waiting for event", "event", label, "diff", diff, "t", t, "first", pc.group.FirstTime, "startTime", pc.startTime, "now", now) 39 + return time.After(diff) 40 + } 41 + 42 + func (pc *ReplayPeerConnection) Close() error { 43 + // todo: implment stopping here 44 + return nil 45 + } 46 + 47 + func (pc *ReplayPeerConnection) CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) { 48 + ev := pc.group.Peek(EventTypeCreateAnswer) 49 + if ev == nil { 50 + return webrtc.SessionDescription{}, fmt.Errorf("no create answer event found") 51 + } 52 + return webrtc.SessionDescription{SDP: ev.CreateAnswer.SDPAnswer}, nil 53 + } 54 + 55 + func (pc *ReplayPeerConnection) SetLocalDescription(desc webrtc.SessionDescription) error { 56 + return nil 57 + } 58 + 59 + func (pc *ReplayPeerConnection) SetRemoteDescription(desc webrtc.SessionDescription) error { 60 + return nil 61 + } 62 + 63 + func (pc *ReplayPeerConnection) LocalDescription() *webrtc.SessionDescription { 64 + ev := pc.group.Peek(EventTypeLocalDescription) 65 + if ev == nil { 66 + return nil 67 + } 68 + return &webrtc.SessionDescription{SDP: ev.LocalDescription.SDPLocalDescription} 69 + } 70 + 71 + // func (pc *ReplayPeerConnection) RemoteDescription() *webrtc.SessionDescription { 72 + // return pc.pionpc.RemoteDescription() 73 + // } 74 + 75 + func (pc *ReplayPeerConnection) OnICEConnectionStateChange(f func(webrtc.ICEConnectionState)) { 76 + go func() { 77 + for { 78 + ev := pc.group.Next(EventTypeICEConnectionState) 79 + if ev == nil { 80 + return 81 + } 82 + select { 83 + case <-pc.wait("OnICEConnectionStateChange", ev.Time): 84 + f(ev.ICEConnectionStateChange.ICEConnectionState) 85 + case <-pc.ctx.Done(): 86 + return 87 + } 88 + } 89 + }() 90 + } 91 + 92 + func (pc *ReplayPeerConnection) OnConnectionStateChange(f func(webrtc.PeerConnectionState)) { 93 + go func() { 94 + for { 95 + ev := pc.group.Next(EventTypeConnectionState) 96 + if ev == nil { 97 + return 98 + } 99 + select { 100 + case <-pc.wait("OnConnectionStateChange", ev.Time): 101 + f(ev.ConnectionStateChange.ConnectionState) 102 + case <-pc.ctx.Done(): 103 + return 104 + } 105 + } 106 + }() 107 + } 108 + 109 + func (pc *ReplayPeerConnection) OnTrack(f func(TrackRemote, RTPReceiver)) { 110 + go func() { 111 + for { 112 + ev := pc.group.Next(EventTypeTrack) 113 + if ev == nil { 114 + return 115 + } 116 + select { 117 + case <-pc.wait("OnTrack", ev.Time): 118 + track := &ReplayTrackRemote{ 119 + ssrc: ev.Track.SSRC, 120 + trackEvent: ev, 121 + events: pc.group.Tracks[ev.Track.SSRC], 122 + pc: pc, 123 + } 124 + go func() { 125 + f(track, nil) 126 + }() 127 + case <-pc.ctx.Done(): 128 + return 129 + } 130 + } 131 + }() 132 + } 133 + 134 + func (pc *ReplayPeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 135 + return nil 136 + } 137 + 138 + func (pc *ReplayPeerConnection) AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) { 139 + return nil, nil 140 + } 141 + 142 + func (pc *ReplayPeerConnection) ICEGatheringState() webrtc.ICEGatheringState { 143 + ev := pc.group.Peek(EventTypeICEGatheringState) 144 + if ev == nil { 145 + return webrtc.ICEGatheringStateNew 146 + } 147 + return ev.ICEGatheringState.State 148 + }
+64
pkg/rtcrec/replay_track.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "errors" 5 + "fmt" 6 + 7 + "github.com/pion/interceptor" 8 + "github.com/pion/webrtc/v4" 9 + ) 10 + 11 + type ReplayTrackRemote struct { 12 + ssrc webrtc.SSRC 13 + trackEvent *WebRTCEvent 14 + events map[string][]*WebRTCEvent 15 + pc *ReplayPeerConnection 16 + } 17 + 18 + func (t *ReplayTrackRemote) Read(p []byte) (n int, attrs interceptor.Attributes, err error) { 19 + ev := t.pc.group.NextTrack(t.ssrc, EventTypeTrackRead) 20 + if ev == nil { 21 + return 0, nil, nil 22 + } 23 + select { 24 + case <-t.pc.wait(fmt.Sprintf("TrackRead %s", t.trackEvent.Track.ID), ev.Time): 25 + var err error 26 + if ev.TrackRead.Err != "" { 27 + err = errors.New(ev.TrackRead.Err) 28 + } 29 + if ev.TrackRead.Data != nil { 30 + copied := copy(p, ev.TrackRead.Data) 31 + if copied != len(ev.TrackRead.Data) { 32 + panic("copied != len(ev.TrackRead.Data)") 33 + } 34 + } 35 + // log.Log(t.pc.ctx, "TrackRead", "trackId", t.trackEvent.Track.ID, "count", ev.TrackRead.Count, "err", err) 36 + return ev.TrackRead.Count, nil, err 37 + case <-t.pc.ctx.Done(): 38 + return 0, nil, t.pc.ctx.Err() 39 + } 40 + } 41 + 42 + func (t *ReplayTrackRemote) Codec() webrtc.RTPCodecParameters { 43 + ev := t.pc.group.PeekTrack(t.ssrc, EventTypeTrackCodec) 44 + if ev == nil { 45 + panic("no codec found") 46 + } 47 + return ev.TrackCodec.Codec 48 + } 49 + 50 + func (t *ReplayTrackRemote) ID() string { 51 + return t.trackEvent.Track.ID 52 + } 53 + 54 + func (t *ReplayTrackRemote) Kind() webrtc.RTPCodecType { 55 + return t.trackEvent.Track.Kind 56 + } 57 + 58 + func (t *ReplayTrackRemote) PayloadType() webrtc.PayloadType { 59 + return t.trackEvent.Track.PayloadType 60 + } 61 + 62 + func (t *ReplayTrackRemote) SSRC() webrtc.SSRC { 63 + return t.ssrc 64 + }
+56
pkg/rtcrec/rtcrec.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "time" 5 + 6 + "github.com/pion/interceptor" 7 + "github.com/pion/rtcp" 8 + "github.com/pion/webrtc/v4" 9 + ) 10 + 11 + type PeerConnection interface { 12 + AddTransceiverFromKind(kind webrtc.RTPCodecType, init ...webrtc.RTPTransceiverInit) (RTPTransceiver, error) 13 + Close() error 14 + SetRemoteDescription(description webrtc.SessionDescription) error 15 + CreateAnswer(options *webrtc.AnswerOptions) (webrtc.SessionDescription, error) 16 + SetLocalDescription(description webrtc.SessionDescription) error 17 + OnICEConnectionStateChange(func(webrtc.ICEConnectionState)) 18 + OnConnectionStateChange(func(webrtc.PeerConnectionState)) 19 + OnTrack(func(TrackRemote, RTPReceiver)) 20 + // OnDataChannel(func(*webrtc.DataChannel)) 21 + // OnNegotiationNeeded(func()) 22 + WriteRTCP(pkts []rtcp.Packet) error 23 + ICEGatheringState() webrtc.ICEGatheringState 24 + LocalDescription() *webrtc.SessionDescription 25 + } 26 + 27 + type RTPTransceiver interface { 28 + } 29 + 30 + type TrackRemote interface { 31 + Read(p []byte) (n int, attrs interceptor.Attributes, err error) 32 + Kind() webrtc.RTPCodecType 33 + PayloadType() webrtc.PayloadType 34 + Codec() webrtc.RTPCodecParameters 35 + SSRC() webrtc.SSRC 36 + } 37 + 38 + type RTPReceiver interface { 39 + } 40 + 41 + func GatheringCompletePromise(pc PeerConnection) <-chan struct{} { 42 + recorder, ok := pc.(*RecordingPeerConnection) 43 + if ok { 44 + return webrtc.GatheringCompletePromise(recorder.pionpc) 45 + } 46 + _, ok = pc.(*ReplayPeerConnection) 47 + if ok { 48 + ch := make(chan struct{}) 49 + go func() { 50 + <-time.After(100 * time.Millisecond) 51 + ch <- struct{}{} 52 + }() 53 + return ch 54 + } 55 + panic("unknown peer connection type") 56 + }
+152
pkg/rtcrec/webrtc_recording.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "time" 8 + 9 + "github.com/fxamacker/cbor/v2" 10 + "github.com/pion/webrtc/v4" 11 + "stream.place/streamplace/pkg/log" 12 + ) 13 + 14 + type WebRTCRecording struct { 15 + Events []WebRTCEvent `json:"events,omitempty"` 16 + } 17 + 18 + type WebRTCEvent struct { 19 + Offer *Offer `json:"offer,omitempty"` 20 + CreateAnswer *CreateAnswer `json:"answer,omitempty"` 21 + SetRemoteDescription *SetRemoteDescription `json:"setRemoteDescription,omitempty"` 22 + SetLocalDescription *SetLocalDescription `json:"setLocalDescription,omitempty"` 23 + LocalDescription *LocalDescription `json:"localDescription,omitempty"` 24 + ICEConnectionStateChange *ICEConnectionStateChange `json:"iceConnectionStateChange,omitempty"` 25 + ConnectionStateChange *ConnectionStateChange `json:"connectionStateChange,omitempty"` 26 + Track *Track `json:"track,omitempty"` 27 + TrackRead *TrackRead `json:"trackRead,omitempty"` 28 + TrackCodec *TrackCodec `json:"trackCodec,omitempty"` 29 + TrackKind *TrackKind `json:"trackKind,omitempty"` 30 + TrackPayloadType *TrackPayloadType `json:"trackPayloadType,omitempty"` 31 + TrackSSRC *TrackSSRC `json:"trackSSRC,omitempty"` 32 + AddTransceiverFromKind *AddTransceiverFromKind `json:"addTransceiverFromKind,omitempty"` 33 + ICEGatheringState *ICEGatheringState `json:"iceGatheringState,omitempty"` 34 + DataChannel *DataChannel `json:"dataChannel,omitempty"` 35 + NegotiationNeeded *NegotiationNeeded `json:"negotiationNeeded,omitempty"` 36 + Time time.Time `json:"time,omitempty"` 37 + } 38 + 39 + func (e *WebRTCEvent) Detail() WebRTCEventDetail { 40 + if e.Offer != nil { 41 + return e.Offer 42 + } 43 + if e.CreateAnswer != nil { 44 + return e.CreateAnswer 45 + } 46 + return nil 47 + } 48 + 49 + type WebRTCEventDetail interface{} 50 + 51 + type Offer struct { 52 + SDPOffer string `json:"sdpOffer,omitempty"` 53 + } 54 + 55 + type CreateAnswer struct { 56 + SDPAnswer string `json:"sdpAnswer,omitempty"` 57 + } 58 + 59 + type SetRemoteDescription struct { 60 + SDPRemoteDescription string `json:"sdpRemoteDescription,omitempty"` 61 + } 62 + 63 + type SetLocalDescription struct { 64 + SDPLocalDescription string `json:"sdpRemoteDescription,omitempty"` 65 + } 66 + 67 + type LocalDescription struct { 68 + SDPLocalDescription string `json:"sdpLocalDescription,omitempty"` 69 + } 70 + 71 + type ICEConnectionStateChange struct { 72 + ICEConnectionState webrtc.ICEConnectionState `json:"iceConnectionState,omitempty"` 73 + } 74 + 75 + type ConnectionStateChange struct { 76 + ConnectionState webrtc.PeerConnectionState `json:"connectionState,omitempty"` 77 + } 78 + 79 + type Track struct { 80 + ID string `json:"id,omitempty"` 81 + Kind webrtc.RTPCodecType `json:"kind,omitempty"` 82 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 83 + PayloadType webrtc.PayloadType `json:"payloadType,omitempty"` 84 + StreamID string `json:"streamId,omitempty"` 85 + Msid string `json:"msid,omitempty"` 86 + RID string `json:"rid,omitempty"` 87 + } 88 + 89 + type TrackRead struct { 90 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 91 + Data []byte `json:"data,omitempty"` 92 + Count int `json:"count,omitempty"` 93 + Err string `json:"err,omitempty"` 94 + } 95 + 96 + type TrackCodec struct { 97 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 98 + Codec webrtc.RTPCodecParameters `json:"codec,omitempty"` 99 + } 100 + 101 + type TrackKind struct { 102 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 103 + Kind webrtc.RTPCodecType `json:"kind,omitempty"` 104 + } 105 + 106 + type TrackPayloadType struct { 107 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 108 + PayloadType webrtc.PayloadType `json:"payloadType,omitempty"` 109 + } 110 + 111 + type TrackSSRC struct { 112 + SSRC webrtc.SSRC `json:"ssrc,omitempty"` 113 + } 114 + 115 + type RecorderStream struct { 116 + encoder *cbor.Encoder 117 + } 118 + 119 + func NewRecorderStream(w io.Writer) (*RecorderStream, error) { 120 + opts := cbor.CoreDetEncOptions() 121 + opts.Time = cbor.TimeRFC3339Nano 122 + em, err := opts.EncMode() 123 + if err != nil { 124 + return nil, fmt.Errorf("failed to create encoder mode: %w", err) 125 + } 126 + encoder := em.NewEncoder(w) 127 + 128 + return &RecorderStream{ 129 + encoder: encoder, 130 + }, nil 131 + } 132 + 133 + func (s *RecorderStream) Event(event WebRTCEvent) { 134 + err := s.encoder.Encode(event) 135 + if err != nil { 136 + log.Log(context.Background(), "error encoding event", "error", err) 137 + } 138 + } 139 + 140 + type AddTransceiverFromKind struct { 141 + Kind webrtc.RTPCodecType `json:"kind,omitempty"` 142 + } 143 + 144 + type ICEGatheringState struct { 145 + State webrtc.ICEGatheringState `json:"state,omitempty"` 146 + } 147 + 148 + type DataChannel struct { 149 + Label string `json:"label,omitempty"` 150 + } 151 + 152 + type NegotiationNeeded struct{}
+72
pkg/rtcrec/webrtc_recording_test.go
··· 1 + package rtcrec 2 + 3 + import ( 4 + "io" 5 + "os" 6 + "testing" 7 + "time" 8 + 9 + "github.com/fxamacker/cbor/v2" 10 + "github.com/stretchr/testify/require" 11 + ) 12 + 13 + func TestWebRTCRecording(t *testing.T) { 14 + // Create a temporary file for testing 15 + tmpfile, err := os.CreateTemp("", "webrtc-recording-test-*") 16 + require.NoError(t, err) 17 + 18 + // Create recorder stream writing to temp file 19 + recorder, err := NewRecorderStream(tmpfile) 20 + require.NoError(t, err) 21 + 22 + // Test recording an offer event 23 + offerEvent := WebRTCEvent{ 24 + Offer: &Offer{ 25 + SDPOffer: "test-offer", 26 + }, 27 + Time: time.Now().UTC(), 28 + } 29 + recorder.Event(offerEvent) 30 + 31 + // Test recording an answer event 32 + answerEvent := WebRTCEvent{ 33 + CreateAnswer: &CreateAnswer{ 34 + SDPAnswer: "test-answer", 35 + }, 36 + Time: time.Now().UTC(), 37 + } 38 + recorder.Event(answerEvent) 39 + 40 + // err = recorder.Close() 41 + // require.NoError(t, err) 42 + err = tmpfile.Close() 43 + require.NoError(t, err) 44 + 45 + tmpfile, err = os.Open(tmpfile.Name()) 46 + require.NoError(t, err) 47 + defer tmpfile.Close() 48 + 49 + dec := cbor.NewDecoder(tmpfile) 50 + 51 + evs := []WebRTCEvent{} 52 + err = nil 53 + for err == nil { 54 + var ev WebRTCEvent 55 + err = dec.Decode(&ev) 56 + if err == nil { 57 + evs = append(evs, ev) 58 + } 59 + } 60 + 61 + require.ErrorIs(t, err, io.EOF) 62 + 63 + off, ok := evs[0].Detail().(*Offer) 64 + require.True(t, ok) 65 + ans, ok := evs[1].Detail().(*CreateAnswer) 66 + require.True(t, ok) 67 + 68 + require.Equal(t, 2, len(evs)) 69 + require.Equal(t, off.SDPOffer, offerEvent.Offer.SDPOffer) 70 + require.Equal(t, ans.SDPAnswer, answerEvent.CreateAnswer.SDPAnswer) 71 + 72 + }
+1
pkg/rtcrec/wrapped_peerconnection.go
··· 1 + package rtcrec
+159
pkg/streamplace/cbor_gen.go
··· 2610 2610 2611 2611 return nil 2612 2612 } 2613 + func (t *ServerSettings) MarshalCBOR(w io.Writer) error { 2614 + if t == nil { 2615 + _, err := w.Write(cbg.CborNull) 2616 + return err 2617 + } 2618 + 2619 + cw := cbg.NewCborWriter(w) 2620 + fieldCount := 2 2621 + 2622 + if t.DebugRecording == nil { 2623 + fieldCount-- 2624 + } 2625 + 2626 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 2627 + return err 2628 + } 2629 + 2630 + // t.LexiconTypeID (string) (string) 2631 + if len("$type") > 1000000 { 2632 + return xerrors.Errorf("Value in field \"$type\" was too long") 2633 + } 2634 + 2635 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil { 2636 + return err 2637 + } 2638 + if _, err := cw.WriteString(string("$type")); err != nil { 2639 + return err 2640 + } 2641 + 2642 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("place.stream.server.settings"))); err != nil { 2643 + return err 2644 + } 2645 + if _, err := cw.WriteString(string("place.stream.server.settings")); err != nil { 2646 + return err 2647 + } 2648 + 2649 + // t.DebugRecording (bool) (bool) 2650 + if t.DebugRecording != nil { 2651 + 2652 + if len("debugRecording") > 1000000 { 2653 + return xerrors.Errorf("Value in field \"debugRecording\" was too long") 2654 + } 2655 + 2656 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("debugRecording"))); err != nil { 2657 + return err 2658 + } 2659 + if _, err := cw.WriteString(string("debugRecording")); err != nil { 2660 + return err 2661 + } 2662 + 2663 + if t.DebugRecording == nil { 2664 + if _, err := cw.Write(cbg.CborNull); err != nil { 2665 + return err 2666 + } 2667 + } else { 2668 + if err := cbg.WriteBool(w, *t.DebugRecording); err != nil { 2669 + return err 2670 + } 2671 + } 2672 + } 2673 + return nil 2674 + } 2675 + 2676 + func (t *ServerSettings) UnmarshalCBOR(r io.Reader) (err error) { 2677 + *t = ServerSettings{} 2678 + 2679 + cr := cbg.NewCborReader(r) 2680 + 2681 + maj, extra, err := cr.ReadHeader() 2682 + if err != nil { 2683 + return err 2684 + } 2685 + defer func() { 2686 + if err == io.EOF { 2687 + err = io.ErrUnexpectedEOF 2688 + } 2689 + }() 2690 + 2691 + if maj != cbg.MajMap { 2692 + return fmt.Errorf("cbor input should be of type map") 2693 + } 2694 + 2695 + if extra > cbg.MaxLength { 2696 + return fmt.Errorf("ServerSettings: map struct too large (%d)", extra) 2697 + } 2698 + 2699 + n := extra 2700 + 2701 + nameBuf := make([]byte, 14) 2702 + for i := uint64(0); i < n; i++ { 2703 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) 2704 + if err != nil { 2705 + return err 2706 + } 2707 + 2708 + if !ok { 2709 + // Field doesn't exist on this type, so ignore it 2710 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 2711 + return err 2712 + } 2713 + continue 2714 + } 2715 + 2716 + switch string(nameBuf[:nameLen]) { 2717 + // t.LexiconTypeID (string) (string) 2718 + case "$type": 2719 + 2720 + { 2721 + sval, err := cbg.ReadStringWithMax(cr, 1000000) 2722 + if err != nil { 2723 + return err 2724 + } 2725 + 2726 + t.LexiconTypeID = string(sval) 2727 + } 2728 + // t.DebugRecording (bool) (bool) 2729 + case "debugRecording": 2730 + 2731 + { 2732 + b, err := cr.ReadByte() 2733 + if err != nil { 2734 + return err 2735 + } 2736 + if b != cbg.CborNull[0] { 2737 + if err := cr.UnreadByte(); err != nil { 2738 + return err 2739 + } 2740 + 2741 + maj, extra, err = cr.ReadHeader() 2742 + if err != nil { 2743 + return err 2744 + } 2745 + if maj != cbg.MajOther { 2746 + return fmt.Errorf("booleans must be major type 7") 2747 + } 2748 + 2749 + var val bool 2750 + switch extra { 2751 + case 20: 2752 + val = false 2753 + case 21: 2754 + val = true 2755 + default: 2756 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 2757 + } 2758 + t.DebugRecording = &val 2759 + } 2760 + } 2761 + 2762 + default: 2763 + // Field doesn't exist on this type, so ignore it 2764 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 2765 + return err 2766 + } 2767 + } 2768 + } 2769 + 2770 + return nil 2771 + }
+19
pkg/streamplace/serversettings.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package streamplace 4 + 5 + // schema: place.stream.server.settings 6 + 7 + import ( 8 + "github.com/bluesky-social/indigo/lex/util" 9 + ) 10 + 11 + func init() { 12 + util.RegisterType("place.stream.server.settings", &ServerSettings{}) 13 + } // 14 + // RECORDTYPE: ServerSettings 15 + type ServerSettings struct { 16 + LexiconTypeID string `json:"$type,const=place.stream.server.settings" cborgen:"$type,const=place.stream.server.settings"` 17 + // debugRecording: Whether this node may archive your livestream for improving the service 18 + DebugRecording *bool `json:"debugRecording,omitempty" cborgen:"debugRecording,omitempty"` 19 + }