Live video on the AT Protocol

implement mist playback of signed streams!!!

See merge request aquareum-tv/aquareum!58

Changelog: feature

+1246 -218
+1 -1
.ci/dockerfile-hash.yaml
··· 1 1 variables: 2 - DOCKERFILE_HASH: c73491a7429660713b481343620d1ea6295bdfba 2 + DOCKERFILE_HASH: 63bfdd73722cb4b4b4b9301905dadc3d22fa3668
+12 -3
.gitlab-ci.yml
··· 19 19 20 20 builder-rebuild: 21 21 stage: build 22 + interruptible: true 22 23 image: 23 24 name: curlimages/curl:latest 24 25 entrypoint: [""] ··· 39 40 interruptible: true 40 41 image: "$CI_REGISTRY_IMAGE:builder-$DOCKERFILE_HASH" 41 42 script: 43 + - rm -rf /usr/x86_64-w64-mingw32/include/d3dcompiler.h /usr/share/mingw-w64/include/d3dcompiler.h 42 44 - git fetch --unshallow || echo 'already unshallow' 43 45 - make ci -j$(nproc) 44 46 artifacts: ··· 92 94 93 95 build-docker-manifest: 94 96 stage: build 97 + interruptible: true 95 98 image: 96 99 name: curlimages/curl:latest 97 100 entrypoint: [""] ··· 128 131 129 132 script: 130 133 - git fetch --unshallow || echo 'already unshallow' 131 - - brew install meson ninja go && go version 134 + - brew install ninja go openssl@3 && go version 132 135 - sudo gem install --user-install xcpretty 133 136 - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs > rustup.sh && bash rustup.sh -y && rm rustup.sh 134 137 - export PATH="$PATH:$HOME/.cargo/bin:$(find $HOME/.gem/ruby -type d -name bin -maxdepth 2)" 135 - - make ci-macos -j16 136 - - make test 138 + - > 139 + brew install python@3.11 140 + && python3.11 -m pip install virtualenv 141 + && python3.11 -m virtualenv ~/venv 142 + && source ~/venv/bin/activate 143 + && pip3 install meson 144 + && make ci-macos -j16 145 + && make test 137 146 138 147 release: 139 148 stage: build
+2 -1
.vscode/settings.json
··· 1 1 { 2 2 "workbench.colorCustomizations": { 3 - "statusBar.background": "#83447f" 3 + "statusBar.background": "#83447f", 4 + "activityBar.background": "#83447f" 4 5 }, 5 6 "files.associations": { 6 7 "mistserver.h": "c",
+63 -8
Makefile
··· 32 32 33 33 .PHONY: node 34 34 node: schema 35 - meson setup build --native=./util/linux-amd64-gnu.ini && meson compile -C build 35 + $(MAKE) meson-setup 36 + meson compile -C build aquareum 36 37 mv ./build/aquareum ./bin/aquareum 37 38 38 39 .PHONY: schema ··· 44 45 test: 45 46 meson test -C build go-tests 46 47 48 + # test to make sure we haven't added any more dynamic dependencies 49 + .PHONY: link-test 50 + link-test: 51 + count=$(shell ldd ./build/aquareum | wc -l) \ 52 + && echo $$count \ 53 + && if [ "$$count" != "6" ]; then echo "ldd reports new libaries linked! want 6 got $$count" \ 54 + && ldd ./bin/aquareum \ 55 + && exit 1; \ 56 + fi 57 + 47 58 .PHONY: all 48 59 all: version install check app test node-all-platforms android 49 60 ··· 58 69 59 70 .PHONY: ci-test 60 71 ci-test: app 61 - meson setup build 72 + meson setup build $(OPTS) 62 73 meson test -C build go-tests 63 74 64 75 .PHONY: android ··· 98 109 mkdir -p .build \ 99 110 && curl -L -o ./.build/bundletool.jar https://github.com/google/bundletool/releases/download/1.17.0/bundletool-all-1.17.0.jar 100 111 112 + OPTS = -D "gst-plugins-base:audioresample=enabled" \ 113 + -D "gst-plugins-base:playback=enabled" \ 114 + -D "gst-plugins-base:opus=enabled" \ 115 + -D "gst-plugins-base:gio-typefinder=enabled" \ 116 + -D "gst-plugins-base:typefind=enabled" \ 117 + -D "gst-plugins-good:matroska=enabled" \ 118 + -D "gst-plugins-bad:fdkaac=enabled" \ 119 + -D "gstreamer-full:gst-full=enabled" \ 120 + -D "gstreamer-full:gst-full-plugins=libgstaudioresample.a;libgstmatroska.a;libgstfdkaac.a;libgstopus.a;libgstplayback.a;libgsttypefindfunctions.a" \ 121 + -D "gstreamer-full:gst-full-libraries=gstreamer-controller-1.0,gstreamer-plugins-base-1.0,gstreamer-pbutils-1.0" \ 122 + -D "gstreamer-full:gst-full-target-type=static_library" \ 123 + -D "gstreamer-full:gst-full-elements=coreelements:fdsrc,fdsink,queue,queue2,typefind,tee" \ 124 + -D "gstreamer-full:bad=enabled" \ 125 + -D "gstreamer-full:ugly=disabled" \ 126 + -D "gstreamer-full:tls=disabled" \ 127 + -D "gstreamer-full:gpl=enabled" \ 128 + -D "gstreamer-full:gst-full-typefind-functions=" 129 + 130 + .PHONY: meson-setup 131 + meson-setup: 132 + meson setup build $(OPTS) 133 + meson configure build $(OPTS) 134 + 101 135 .PHONY: node-all-platforms 102 136 node-all-platforms: app 103 - meson setup build 137 + meson setup build $(OPTS) --buildtype debugoptimized 104 138 meson compile -C build archive 139 + $(MAKE) link-test 140 + $(MAKE) linux-arm64 141 + $(MAKE) windows-amd64 142 + 143 + .PHONY: linux-arm64 144 + linux-arm64: 105 145 rustup target add aarch64-unknown-linux-gnu 106 - meson setup --cross-file util/linux-arm64-gnu.ini build-aarch64 146 + meson setup --cross-file util/linux-arm64-gnu.ini --buildtype debugoptimized build-aarch64 $(OPTS) 107 147 meson compile -C build-aarch64 archive 148 + 149 + .PHONY: windows-amd64 150 + windows-amd64: 108 151 rustup target add x86_64-pc-windows-gnu 109 - meson setup --cross-file util/windows-amd64-gnu.ini build-windows 152 + meson setup --cross-file util/windows-amd64-gnu.ini --buildtype debugoptimized build-windows $(OPTS) 110 153 meson compile -C build-windows archive 2>&1 | grep -v drectve 111 154 112 155 .PHONY: node-all-platforms-macos 113 156 node-all-platforms-macos: app 114 - meson setup build 157 + meson setup --buildtype debugoptimized build $(OPTS) 115 158 meson compile -C build archive 116 159 rustup target add x86_64-apple-darwin 117 - meson setup --cross-file util/darwin-amd64-apple.ini build-amd64 160 + meson setup --buildtype debugoptimized --cross-file util/darwin-amd64-apple.ini build-amd64 $(OPTS) 118 161 meson compile -C build-amd64 archive 119 162 120 163 # link your local version of mist for dev ··· 123 166 rm -rf subprojects/mistserver 124 167 ln -s $$(realpath ../mistserver) ./subprojects/mistserver 125 168 126 - # link your local version of c2pa-gop for dev 169 + # link your local version of c2pa-go for dev 127 170 .PHONY: link-c2pa-go 128 171 link-c2pa-go: 129 172 rm -rf subprojects/c2pa_go 130 173 ln -s $$(realpath ../c2pa-go) ./subprojects/c2pa_go 174 + 175 + # link your local version of gstreamer 176 + .PHONY: link-gstreamer 177 + link-gstreamer: 178 + rm -rf subprojects/gstreamer-full 179 + ln -s $$(realpath ../gstreamer) ./subprojects/gstreamer-full 180 + 181 + # link your local version of ffmpeg for dev 182 + .PHONY: link-ffmpeg 183 + link-ffmpeg: 184 + rm -rf subprojects/FFmpeg 185 + ln -s $$(realpath ../ffmpeg) ./subprojects/FFmpeg 131 186 132 187 .PHONY: docker-build 133 188 docker-build: docker-build-builder docker-build-in-container
+3
cmd/libaquareum/aquareum.go
··· 8 8 "aquareum.tv/aquareum/pkg/log" 9 9 10 10 "aquareum.tv/aquareum/pkg/cmd" 11 + // _ "github.com/go-gst/go-glib/glib" 12 + // _ "github.com/go-gst/go-gst/gst" 11 13 ) 12 14 15 + //#cgo LDFLAGS: -lz 13 16 import "C" 14 17 15 18 //export AquareumMain
+1 -1
docker/build.Dockerfile
··· 12 12 RUN apt update \ 13 13 && apt install -y build-essential curl git openjdk-17-jdk unzip jq g++ python3-pip ninja-build \ 14 14 gcc-aarch64-linux-gnu g++-aarch64-linux-gnu clang lld qemu-user-static pkg-config \ 15 - wine64 nasm gcc-mingw-w64-x86-64 g++-mingw-w64-x86-64 mingw-w64-tools zip \ 15 + wine64 nasm gcc-mingw-w64-x86-64 g++-mingw-w64-x86-64 mingw-w64-tools zip bison flex \ 16 16 && pip install meson tomli \ 17 17 && curl -L --fail https://go.dev/dl/go$GO_VERSION.linux-$TARGETARCH.tar.gz -o go.tar.gz \ 18 18 && tar -C /usr/local -xf go.tar.gz \
+5
go.mod
··· 2 2 3 3 go 1.22.2 4 4 5 + replace github.com/livepeer/lpms => github.com/aquareum-tv/lpms v0.0.0-20240828210246-5ac9b407751e 6 + 5 7 require ( 6 8 firebase.google.com/go/v4 v4.14.1 7 9 git.aquareum.tv/aquareum-tv/c2pa-go v0.0.0-20240827194057-dc4fd1d2c425 ··· 10 12 github.com/dunglas/httpsfv v1.0.2 11 13 github.com/ethereum/go-ethereum v1.14.7 12 14 github.com/go-git/go-git/v5 v5.12.0 15 + github.com/go-gst/go-glib v1.2.1 16 + github.com/go-gst/go-gst v1.2.1 13 17 github.com/golang/glog v1.2.0 14 18 github.com/google/uuid v1.6.0 15 19 github.com/julienschmidt/httprouter v1.3.0 ··· 90 94 github.com/jstemmer/go-junit-report v1.0.0 // indirect 91 95 github.com/kevinburke/ssh_config v1.2.0 // indirect 92 96 github.com/livepeer/m3u8 v0.11.1 // indirect 97 + github.com/mattn/go-pointer v0.0.1 // indirect 93 98 github.com/mitchellh/gox v1.0.1 // indirect 94 99 github.com/mitchellh/iochan v1.0.0 // indirect 95 100 github.com/mmcloughlin/addchain v0.4.0 // indirect
+8 -4
go.sum
··· 19 19 dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= 20 20 firebase.google.com/go/v4 v4.14.1 h1:4qiUETaFRWoFGE1XP5VbcEdtPX93Qs+8B/7KvP2825g= 21 21 firebase.google.com/go/v4 v4.14.1/go.mod h1:fgk2XshgNDEKaioKco+AouiegSI9oTWVqRaBdTTGBoM= 22 - git.aquareum.tv/aquareum-tv/c2pa-go v0.0.0-20240821182412-c7339a12f026 h1:6GprIUXv9uXc16zD0c0IY72o+xroaTZP1isC3Pbhf2c= 23 - git.aquareum.tv/aquareum-tv/c2pa-go v0.0.0-20240821182412-c7339a12f026/go.mod h1:lp4UzqUyNZ4gieOSCdjsGoSUudZsDrBpsBgnRoNLlSw= 24 22 git.aquareum.tv/aquareum-tv/c2pa-go v0.0.0-20240827194057-dc4fd1d2c425 h1:4xgRxVpXdHwBGyf5BAyWfWp0WrwjD1gP0EU5K6r8sw4= 25 23 git.aquareum.tv/aquareum-tv/c2pa-go v0.0.0-20240827194057-dc4fd1d2c425/go.mod h1:lp4UzqUyNZ4gieOSCdjsGoSUudZsDrBpsBgnRoNLlSw= 26 24 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= ··· 43 41 github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= 44 42 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= 45 43 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= 44 + github.com/aquareum-tv/lpms v0.0.0-20240828210246-5ac9b407751e h1:YMbqxUpsM7cjfPdO4y2FVH+VMBMC+ZG7DRbF7J5NMs0= 45 + github.com/aquareum-tv/lpms v0.0.0-20240828210246-5ac9b407751e/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw= 46 46 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= 47 47 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= 48 48 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= ··· 135 135 github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= 136 136 github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys= 137 137 github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY= 138 + github.com/go-gst/go-glib v1.2.1 h1:ibAr5N1NmuHmZ5RaCFjFjeUy0Rk3t3LgvGutmwBeR9E= 139 + github.com/go-gst/go-glib v1.2.1/go.mod h1:JybIYeoHNwCkHGaBf1fHNIaM4sQTrJPkPLsi7dmPNOU= 140 + github.com/go-gst/go-gst v1.2.1 h1:FqUFGFllbuC8LkQoqULgAui2ZS0VU1WEBCNekIMcBEE= 141 + github.com/go-gst/go-gst v1.2.1/go.mod h1:OGPRsJdvNYCKjt3e4H4i8J6KVd2Wk5S2lzsEQ8mO1+g= 138 142 github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= 139 143 github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= 140 144 github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= ··· 228 232 github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= 229 233 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= 230 234 github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw= 231 - github.com/livepeer/lpms v0.0.0-20240812093642-b5181eb92cb2 h1:2Cjgt/Bg6bVSPHwUUeSf3n55JuCQGGfD7HXk2Qcg0I8= 232 - github.com/livepeer/lpms v0.0.0-20240812093642-b5181eb92cb2/go.mod h1:z5ROP1l5OzAKSoqVRLc34MjUdueil6wHSecQYV7llIw= 233 235 github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU= 234 236 github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04= 235 237 github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= 236 238 github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= 237 239 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 238 240 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 241 + github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= 242 + github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= 239 243 github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= 240 244 github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= 241 245 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
+1 -1
hack/push.mjs
··· 52 52 console.log(JSON.stringify(notification.notification, null, 2)); 53 53 // console.log("launching in 5 sec") 54 54 // await delay(5000) 55 - const res = await admin.messaging().sendMulticast(notification); 55 + const res = await admin.messaging().sendEachForMulticast(notification); 56 56 console.log(JSON.stringify(res)); 57 57 }; 58 58 export default blast;
+1
js/app/.env.development
··· 1 1 EXPO_PUBLIC_AQUAREUM_URL=http://localhost:8080 2 + EXPO_PUBLIC_WEB_TRY_LOCAL=false 2 3 EXPO_USE_METRO_WORKSPACE_ROOT=1
+1
js/app/.env.production
··· 1 1 EXPO_PUBLIC_AQUAREUM_URL=https://aquareum.tv 2 + EXPO_PUBLIC_WEB_TRY_LOCAL=true 2 3 EXPO_USE_METRO_WORKSPACE_ROOT=0
-36
js/app/app/Provider.tsx
··· 1 - import { useColorScheme } from "hooks/useColorScheme"; 2 - import { TamaguiProvider, type TamaguiProviderProps } from "tamagui"; 3 - import { ToastProvider, ToastViewport } from "@tamagui/toast"; 4 - import { CurrentToast } from "./CurrentToast"; 5 - import { config } from "../tamagui.config"; 6 - import NativeProvider from "components/providers"; 7 - 8 - export function Provider({ 9 - children, 10 - ...rest 11 - }: Omit<TamaguiProviderProps, "config">) { 12 - const colorScheme = useColorScheme(); 13 - 14 - return ( 15 - <TamaguiProvider 16 - config={config} 17 - defaultTheme={colorScheme === "dark" ? "dark" : "light"} 18 - {...rest} 19 - > 20 - <ToastProvider 21 - swipeDirection="vertical" 22 - duration={6000} 23 - native={ 24 - [ 25 - /* uncomment the next line to do native toasts on mobile. NOTE: it'll require you making a dev build and won't work with Expo Go */ 26 - // 'mobile' 27 - ] 28 - } 29 - > 30 - <NativeProvider>{children}</NativeProvider> 31 - <CurrentToast /> 32 - <ToastViewport name="default" top="$8" left={0} right={0} /> 33 - </ToastProvider> 34 - </TamaguiProvider> 35 - ); 36 - }
+10 -8
js/app/app/_layout.tsx
··· 21 21 } from "@react-navigation/native"; 22 22 import { useFonts } from "expo-font"; 23 23 import { SplashScreen, Stack } from "expo-router"; 24 - import { Provider } from "./Provider"; 25 - import "./updates"; 24 + import { Provider } from "components"; 26 25 import { Helmet } from "react-native-helmet-async"; 27 26 import { Settings } from "@tamagui/lucide-icons"; 28 27 ··· 85 84 title: "", 86 85 headerShown: true, 87 86 headerRight: () => { 88 - if (isWeb) { 89 - return <View />; 90 - } 91 87 return ( 92 - <Link href="/options" asChild> 88 + <Link href="/settings" asChild> 93 89 <Button icon={<Settings size="$2" />}></Button> 94 90 </Link> 95 91 ); ··· 104 100 }} 105 101 /> 106 102 <Stack.Screen 107 - name="options" 103 + name="embed/[stream]" 108 104 options={{ 109 - title: "Options", 105 + headerShown: false, 106 + }} 107 + /> 108 + <Stack.Screen 109 + name="settings" 110 + options={{ 111 + title: "Settings", 110 112 presentation: "modal", 111 113 animation: "slide_from_right", 112 114 gestureEnabled: true,
+11
js/app/app/embed/[stream].tsx
··· 1 + import { Player } from "components"; 2 + import { useLocalSearchParams } from "expo-router"; 3 + import { View } from "tamagui"; 4 + 5 + export default function StreamPage() { 6 + const params = useLocalSearchParams(); 7 + if (typeof params.stream !== "string") { 8 + return <View />; 9 + } 10 + return <Player src={params.stream}></Player>; 11 + }
js/app/app/layout.tsx

This is a binary file and will not be displayed.

-13
js/app/app/options.tsx
··· 1 - import UpdatesDemo from "components/updates"; 2 - import { isWeb, Text, View } from "tamagui"; 3 - 4 - export default function OptionsScreen() { 5 - if (isWeb) { 6 - return <View />; 7 - } 8 - return ( 9 - <View flex={1}> 10 - <UpdatesDemo /> 11 - </View> 12 - ); 13 - }
+10
js/app/app/settings.tsx
··· 1 + import { Settings } from "components"; 2 + import { View } from "tamagui"; 3 + 4 + export default function OptionsScreen() { 5 + return ( 6 + <View flex={1}> 7 + <Settings /> 8 + </View> 9 + ); 10 + }
+11
js/app/app/stream/[stream].tsx
··· 1 + import { Player } from "components"; 2 + import { useLocalSearchParams } from "expo-router"; 3 + import { View } from "tamagui"; 4 + 5 + export default function StreamPage() { 6 + const params = useLocalSearchParams(); 7 + if (typeof params.stream !== "string") { 8 + return <View />; 9 + } 10 + return <Player src={params.stream}></Player>; 11 + }
-1
js/app/app/updates.tsx
··· 1 - import * as Updates from "expo-updates";
+3
js/app/components/index.tsx
··· 1 1 export { Countdown } from "./countdown"; 2 + export { Player } from "./player/player"; 3 + export { Settings } from "./settings/settings"; 4 + export { default as Provider } from "./provider/provider";
+10
js/app/components/player/player.native.tsx
··· 1 + import React from "react"; 2 + import { View, Text } from "tamagui"; 3 + 4 + export function Player() { 5 + return ( 6 + <View f={1}> 7 + <Text>WIP</Text> 8 + </View> 9 + ); 10 + }
+126
js/app/components/player/player.tsx
··· 1 + import React, { 2 + ForwardedRef, 3 + forwardRef, 4 + useCallback, 5 + useEffect, 6 + useRef, 7 + useState, 8 + useTransition, 9 + } from "react"; 10 + import { Button, Text, View, XStack } from "tamagui"; 11 + import WHEPClient from "./webrtc"; 12 + import Hls from "hls.js"; 13 + import { Circle, CheckCircle } from "@tamagui/lucide-icons"; 14 + import useAquareumNode from "hooks/useAquareumNode"; 15 + 16 + export function Player(props: { src: string }) { 17 + const [proto, setProto] = useState("hls"); 18 + let p; 19 + if (proto === "webrtc") { 20 + p = <WebRTCPlayer src={props.src} />; 21 + } else if (proto === "hls") { 22 + p = <HLSPlayer src={props.src} />; 23 + } 24 + return ( 25 + <View f={1}> 26 + {p} 27 + <XStack justifyContent="center"> 28 + <PickerButton 29 + name="webrtc" 30 + title="WebRTC" 31 + picked={proto} 32 + setProto={setProto} 33 + /> 34 + <PickerButton 35 + name="hls" 36 + title="HLS" 37 + picked={proto} 38 + setProto={setProto} 39 + /> 40 + </XStack> 41 + </View> 42 + ); 43 + } 44 + 45 + const PickerButton = (props: { 46 + name: string; 47 + picked: string; 48 + title: string; 49 + setProto: (string) => void; 50 + }) => { 51 + const on = props.picked === props.name; 52 + return <></>; 53 + return ( 54 + <Button 55 + disabled={on} 56 + margin="$3" 57 + opacity={on ? 0.5 : 1} 58 + icon={on ? CheckCircle : Circle} 59 + onPress={() => props.setProto(props.name)} 60 + > 61 + {props.title} 62 + </Button> 63 + ); 64 + }; 65 + 66 + export function HLSPlayer(props: { src: string }) { 67 + const videoRef = useRef<HTMLVideoElement | null>(null); 68 + const { url } = useAquareumNode(); 69 + useEffect(() => { 70 + if (!videoRef.current) { 71 + return; 72 + } 73 + const index = `${url}/api/hls/${props.src}/index.m3u8`; 74 + if (Hls.isSupported()) { 75 + var hls = new Hls(); 76 + hls.loadSource(index); 77 + hls.attachMedia(videoRef.current); 78 + hls.on(Hls.Events.MANIFEST_PARSED, () => { 79 + if (!videoRef.current) { 80 + return; 81 + } 82 + videoRef.current.play(); 83 + }); 84 + return () => { 85 + hls.stopLoad(); 86 + }; 87 + } else if (videoRef.current.canPlayType("application/vnd.apple.mpegurl")) { 88 + videoRef.current.src = index; 89 + videoRef.current.addEventListener("canplay", () => { 90 + if (!videoRef.current) { 91 + return; 92 + } 93 + videoRef.current.play(); 94 + }); 95 + } 96 + }, [videoRef.current]); 97 + return <VideoElement ref={videoRef} />; 98 + } 99 + 100 + export function WebRTCPlayer(props: { src: string }) { 101 + const videoRef = useRef<HTMLVideoElement | null>(null); 102 + const { url } = useAquareumNode(); 103 + useEffect(() => { 104 + if (!videoRef.current) { 105 + return; 106 + } 107 + const client = new WHEPClient( 108 + `${url}/api/webrtc/${props.src}`, 109 + videoRef.current, 110 + ); 111 + return () => { 112 + client.close(); 113 + }; 114 + }, [videoRef.current]); 115 + return <VideoElement ref={videoRef} />; 116 + } 117 + 118 + const VideoElement = forwardRef( 119 + (props, ref: ForwardedRef<HTMLVideoElement>) => { 120 + return ( 121 + <View backgroundColor="#111"> 122 + <video autoPlay={true} ref={ref} loop={true} controls={true} /> 123 + </View> 124 + ); 125 + }, 126 + );
+178
js/app/components/player/webrtc.tsx
··· 1 + /** 2 + * Example implementation of a client that uses WHEP to playback video over WebRTC 3 + * 4 + * https://www.ietf.org/id/draft-murillo-whep-00.html 5 + */ 6 + export default class WHEPClient { 7 + endpoint: string; 8 + videoElement: HTMLVideoElement; 9 + peerConnection: RTCPeerConnection; 10 + stream: MediaStream; 11 + constructor(endpoint, videoElement) { 12 + this.endpoint = endpoint; 13 + this.videoElement = videoElement; 14 + this.stream = new MediaStream(); 15 + /** 16 + * Create a new WebRTC connection, using public STUN servers with ICE, 17 + * allowing the client to disover its own IP address. 18 + * https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Protocols#ice 19 + */ 20 + this.peerConnection = new RTCPeerConnection({ 21 + // iceServers: [ 22 + // { 23 + // urls: "stun:stun.cloudflare.com:3478", 24 + // }, 25 + // ], 26 + bundlePolicy: "max-bundle", 27 + }); 28 + /** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/addTransceiver */ 29 + this.peerConnection.addTransceiver("video", { 30 + direction: "recvonly", 31 + }); 32 + this.peerConnection.addTransceiver("audio", { 33 + direction: "recvonly", 34 + }); 35 + /** 36 + * When new tracks are received in the connection, store local references, 37 + * so that they can be added to a MediaStream, and to the <video> element. 38 + * 39 + * https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/track_event 40 + */ 41 + this.peerConnection.ontrack = (event) => { 42 + const track = event.track; 43 + const currentTracks = this.stream.getTracks(); 44 + const streamAlreadyHasVideoTrack = currentTracks.some( 45 + (track) => track.kind === "video", 46 + ); 47 + const streamAlreadyHasAudioTrack = currentTracks.some( 48 + (track) => track.kind === "audio", 49 + ); 50 + switch (track.kind) { 51 + case "video": 52 + if (streamAlreadyHasVideoTrack) { 53 + break; 54 + } 55 + this.stream.addTrack(track); 56 + break; 57 + case "audio": 58 + if (streamAlreadyHasAudioTrack) { 59 + break; 60 + } 61 + this.stream.addTrack(track); 62 + break; 63 + default: 64 + console.log("got unknown track " + track); 65 + } 66 + }; 67 + this.peerConnection.addEventListener("connectionstatechange", (ev) => { 68 + if (this.peerConnection.connectionState !== "connected") { 69 + return; 70 + } 71 + if (!this.videoElement.srcObject) { 72 + this.videoElement.srcObject = this.stream; 73 + } 74 + }); 75 + this.peerConnection.addEventListener("negotiationneeded", (ev) => { 76 + negotiateConnectionWithClientOffer(this.peerConnection, this.endpoint); 77 + }); 78 + } 79 + 80 + close() { 81 + this.peerConnection.close(); 82 + } 83 + } 84 + 85 + /** 86 + * Performs the actual SDP exchange. 87 + * 88 + * 1. Constructs the client's SDP offer 89 + * 2. Sends the SDP offer to the server, 90 + * 3. Awaits the server's offer. 91 + * 92 + * SDP describes what kind of media we can send and how the server and client communicate. 93 + * 94 + * https://developer.mozilla.org/en-US/docs/Glossary/SDP 95 + * https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html#name-protocol-operation 96 + */ 97 + export async function negotiateConnectionWithClientOffer( 98 + peerConnection: RTCPeerConnection, 99 + endpoint: string, 100 + ) { 101 + /** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createOffer */ 102 + const offer = await peerConnection.createOffer(); 103 + /** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/setLocalDescription */ 104 + await peerConnection.setLocalDescription(offer); 105 + 106 + /** Wait for ICE gathering to complete */ 107 + let ofr = await waitToCompleteICEGathering(peerConnection); 108 + if (!ofr) { 109 + throw Error("failed to gather ICE candidates for offer"); 110 + } 111 + 112 + /** 113 + * As long as the connection is open, attempt to... 114 + */ 115 + while (peerConnection.connectionState !== "closed") { 116 + try { 117 + /** 118 + * This response contains the server's SDP offer. 119 + * This specifies how the client should communicate, 120 + * and what kind of media client and server have negotiated to exchange. 121 + */ 122 + let response = await postSDPOffer(endpoint, ofr.sdp); 123 + if (response.status === 201) { 124 + let answerSDP = await response.text(); 125 + await peerConnection.setRemoteDescription( 126 + new RTCSessionDescription({ type: "answer", sdp: answerSDP }), 127 + ); 128 + return response.headers.get("Location"); 129 + } else if (response.status === 405) { 130 + console.log( 131 + "Remember to update the URL passed into the WHIP or WHEP client", 132 + ); 133 + } else { 134 + const errorMessage = await response.text(); 135 + console.error(errorMessage); 136 + } 137 + } catch (e) { 138 + console.error(e); 139 + } 140 + 141 + /** Limit reconnection attempts to at-most once every 5 seconds */ 142 + await new Promise((r) => setTimeout(r, 5000)); 143 + } 144 + } 145 + 146 + async function postSDPOffer(endpoint: string, data: string) { 147 + return await fetch(endpoint, { 148 + method: "POST", 149 + mode: "cors", 150 + headers: { 151 + "content-type": "application/sdp", 152 + }, 153 + body: data, 154 + }); 155 + } 156 + 157 + /** 158 + * Receives an RTCPeerConnection and waits until 159 + * the connection is initialized or a timeout passes. 160 + * 161 + * https://www.ietf.org/archive/id/draft-ietf-wish-whip-01.html#section-4.1 162 + * https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/iceGatheringState 163 + * https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/icegatheringstatechange_event 164 + */ 165 + async function waitToCompleteICEGathering(peerConnection: RTCPeerConnection) { 166 + return new Promise<RTCSessionDescription | null>((resolve) => { 167 + /** Wait at most 1 second for ICE gathering. */ 168 + setTimeout(function () { 169 + if (peerConnection.connectionState === "closed") { 170 + return; 171 + } 172 + resolve(peerConnection.localDescription); 173 + }, 1000); 174 + peerConnection.onicegatheringstatechange = (ev) => 175 + peerConnection.iceGatheringState === "complete" && 176 + resolve(peerConnection.localDescription); 177 + }); 178 + }
+6
js/app/components/provider/provider.native.tsx
··· 1 + import React from "react"; 2 + import SharedProvider from "./provider.shared"; 3 + 4 + export default function Provider({ children }: { children: React.ReactNode }) { 5 + return <SharedProvider>{children}</SharedProvider>; 6 + }
+29
js/app/components/provider/provider.shared.tsx
··· 1 + import { ToastProvider, ToastViewport } from "@tamagui/toast"; 2 + import { CurrentToast } from "app/CurrentToast"; 3 + import React from "react"; 4 + import { TamaguiProvider } from "tamagui"; 5 + import config from "tamagui.config"; 6 + import { AquareumProvider } from "hooks/useAquareumNode"; 7 + 8 + export default function Provider({ children }: { children: React.ReactNode }) { 9 + return ( 10 + <AquareumProvider> 11 + <TamaguiProvider config={config} defaultTheme={"dark"}> 12 + <ToastProvider 13 + swipeDirection="vertical" 14 + duration={6000} 15 + native={ 16 + [ 17 + /* uncomment the next line to do native toasts on mobile. NOTE: it'll require you making a dev build and won't work with Expo Go */ 18 + // 'mobile' 19 + ] 20 + } 21 + > 22 + {children} 23 + <CurrentToast /> 24 + <ToastViewport name="default" top="$8" left={0} right={0} /> 25 + </ToastProvider> 26 + </TamaguiProvider> 27 + </AquareumProvider> 28 + ); 29 + }
+46
js/app/components/provider/provider.tsx
··· 1 + // Web-only provider 2 + import "@rainbow-me/rainbowkit/styles.css"; 3 + 4 + import { getDefaultConfig, RainbowKitProvider } from "@rainbow-me/rainbowkit"; 5 + import { WagmiProvider } from "wagmi"; 6 + import { mainnet, polygon, optimism, arbitrum, base } from "wagmi/chains"; 7 + import { QueryClientProvider, QueryClient } from "@tanstack/react-query"; 8 + import { View, Text } from "tamagui"; 9 + import SharedProvider from "./provider.shared"; 10 + 11 + const queryClient = new QueryClient(); 12 + 13 + const config = getDefaultConfig({ 14 + appName: "Aquareum", 15 + appUrl: "https://aquareum.tv", 16 + projectId: "32c8489fbff0b10e2e011b36c36b4466", 17 + chains: [mainnet, polygon, optimism, arbitrum, base], 18 + ssr: true, // If your dApp uses server side rendering (SSR) 19 + }); 20 + 21 + export default function Provider({ children }: { children: React.ReactNode }) { 22 + return ( 23 + <SharedProvider> 24 + <WagmiProvider config={config}> 25 + <QueryClientProvider client={queryClient}> 26 + <RainbowKitProvider coolMode={true}> 27 + {/* RainbowKitProvider hides our children unless we do this...? */} 28 + <View 29 + id="rainbowkit-interior" // Also this......????? 30 + f={1} 31 + style={{ 32 + position: "absolute", 33 + top: 0, 34 + left: 0, 35 + width: "100vw", 36 + height: "100vh", 37 + }} 38 + > 39 + {children} 40 + </View> 41 + </RainbowKitProvider> 42 + </QueryClientProvider> 43 + </WagmiProvider> 44 + </SharedProvider> 45 + ); 46 + }
-5
js/app/components/providers.native.tsx
··· 1 - import React from "react"; 2 - 3 - export default function Provider({ children }: { children: React.ReactNode }) { 4 - return <>{children}</>; 5 - }
-43
js/app/components/providers.tsx
··· 1 - // Web-only provider 2 - import "@rainbow-me/rainbowkit/styles.css"; 3 - 4 - import { getDefaultConfig, RainbowKitProvider } from "@rainbow-me/rainbowkit"; 5 - import { WagmiProvider } from "wagmi"; 6 - import { mainnet, polygon, optimism, arbitrum, base } from "wagmi/chains"; 7 - import { QueryClientProvider, QueryClient } from "@tanstack/react-query"; 8 - import { View, Text } from "tamagui"; 9 - 10 - const queryClient = new QueryClient(); 11 - 12 - const config = getDefaultConfig({ 13 - appName: "Aquareum", 14 - appUrl: "https://aquareum.tv", 15 - projectId: "32c8489fbff0b10e2e011b36c36b4466", 16 - chains: [mainnet, polygon, optimism, arbitrum, base], 17 - ssr: true, // If your dApp uses server side rendering (SSR) 18 - }); 19 - 20 - export default function Provider({ children }: { children: React.ReactNode }) { 21 - return ( 22 - <WagmiProvider config={config}> 23 - <QueryClientProvider client={queryClient}> 24 - <RainbowKitProvider coolMode={true}> 25 - {/* RainbowKitProvider hides our children unless we do this...? */} 26 - <View 27 - id="rainbowkit-interior" // Also this......????? 28 - f={1} 29 - style={{ 30 - position: "absolute", 31 - top: 0, 32 - left: 0, 33 - width: "100vw", 34 - height: "100vh", 35 - }} 36 - > 37 - {children} 38 - </View> 39 - </RainbowKitProvider> 40 - </QueryClientProvider> 41 - </WagmiProvider> 42 - ); 43 - }
+41
js/app/components/settings/settings.tsx
··· 1 + import { Button, Form, H3, Input, View, XStack, YStack } from "tamagui"; 2 + import { Updates } from "./updates"; 3 + import useAquareumNode from "hooks/useAquareumNode"; 4 + import { useState } from "react"; 5 + 6 + export function Settings() { 7 + const { url, setUrl } = useAquareumNode(); 8 + const [newUrl, setNewUrl] = useState(""); 9 + const onSubmit = () => { 10 + setUrl(newUrl); 11 + setNewUrl(""); 12 + }; 13 + return ( 14 + <View f={1} alignItems="stretch" justifyContent="center" fg={1}> 15 + <Updates /> 16 + <Form 17 + fg={1} 18 + flexBasis={0} 19 + alignItems="center" 20 + justifyContent="center" 21 + padding="$4" 22 + onSubmit={onSubmit} 23 + > 24 + <H3 margin="$2">Change Aquareum Node URL</H3> 25 + <XStack alignItems="center" space="$2"> 26 + <Input 27 + value={newUrl} 28 + flex={1} 29 + size="$3" 30 + placeholder={url} 31 + onChangeText={(t) => setNewUrl(t)} 32 + onSubmitEditing={onSubmit} 33 + /> 34 + <Form.Trigger asChild> 35 + <Button size="$3">Go</Button> 36 + </Form.Trigger> 37 + </XStack> 38 + </Form> 39 + </View> 40 + ); 41 + }
+6
js/app/components/settings/updates.tsx
··· 1 + import { View, Text } from "tamagui"; 2 + 3 + // maybe someday some PWA update stuff will live here 4 + export function Updates() { 5 + return <View />; 6 + }
+14 -8
js/app/components/updates.tsx js/app/components/settings/updates.native.tsx
··· 1 1 import { StatusBar } from "expo-status-bar"; 2 - import * as Updates from "expo-updates"; 2 + import * as ExpoUpdates from "expo-updates"; 3 3 import { useEffect, useState } from "react"; 4 4 import { Button, H2, H5, ScrollView, Text, View } from "tamagui"; 5 5 import Constants from "expo-constants"; 6 6 import { ToastViewport, useToastController } from "@tamagui/toast"; 7 - import pkg from "../package.json"; 7 + import pkg from "../../package.json"; 8 8 import { Platform } from "react-native"; 9 9 10 - export default function UpdatesDemo() { 10 + export function Updates() { 11 11 const version = pkg.version; 12 12 const { currentlyRunning, isUpdateAvailable, isUpdatePending } = 13 - Updates.useUpdates(); 13 + ExpoUpdates.useUpdates(); 14 14 15 15 const [checked, setChecked] = useState(false); 16 16 17 17 useEffect(() => { 18 18 if (isUpdatePending) { 19 - Updates.reloadAsync(); 19 + ExpoUpdates.reloadAsync(); 20 20 } 21 21 }, [isUpdatePending]); 22 22 23 23 useEffect(() => { 24 24 if (isUpdateAvailable && checked) { 25 - Updates.fetchUpdateAsync(); 25 + ExpoUpdates.fetchUpdateAsync(); 26 26 } 27 27 }, [isUpdateAvailable, checked]); 28 28 ··· 41 41 const toast = useToastController(); 42 42 43 43 return ( 44 - <View f={1} alignItems="center" justifyContent="center" fg={1}> 44 + <View 45 + f={1} 46 + alignItems="center" 47 + justifyContent="center" 48 + fg={1} 49 + flexBasis={0} 50 + > 45 51 <ToastViewport name="modal" top="$8" left={0} right={0} /> 46 52 <View> 47 53 <H2 textAlign="center">Aquareum v{version}</H2> ··· 52 58 onPress={async () => { 53 59 try { 54 60 setChecked(true); 55 - const res = await Updates.checkForUpdateAsync(); 61 + const res = await ExpoUpdates.checkForUpdateAsync(); 56 62 if (!res.isAvailable) { 57 63 toast.show("No update found", { 58 64 viewportName: "modal",
+34
js/app/hooks/useAquareumNode.tsx
··· 1 + import { createContext, useContext, useState } from "react"; 2 + import { isWeb } from "tamagui"; 3 + 4 + let DEFAULT_URL = process.env.EXPO_PUBLIC_AQUAREUM_URL; 5 + if (isWeb && process.env.EXPO_PUBLIC_WEB_TRY_LOCAL === "true") { 6 + try { 7 + DEFAULT_URL = `${window.location.protocol}//${window.location.host}`; 8 + } catch (err) { 9 + // Oh well, fall back to hardcoded. 10 + } 11 + } 12 + 13 + export const AquareumContext = createContext({ 14 + url: DEFAULT_URL, 15 + setUrl: (_: string) => {}, 16 + }); 17 + 18 + export function AquareumProvider({ 19 + url: providedUrl, 20 + children, 21 + }: { 22 + url?: string; 23 + children: React.ReactNode; 24 + }) { 25 + const [url, setUrl] = useState(providedUrl || DEFAULT_URL); 26 + const val = { url, setUrl }; 27 + return ( 28 + <AquareumContext.Provider value={val}>{children}</AquareumContext.Provider> 29 + ); 30 + } 31 + 32 + export default function useAquareumNode() { 33 + return useContext(AquareumContext); 34 + }
+1
js/app/package.json
··· 46 46 "expo-system-ui": "~3.0.7", 47 47 "expo-updates": "~0.25.20", 48 48 "expo-web-browser": "~13.0.3", 49 + "hls.js": "^1.5.15", 49 50 "react": "18.3.1", 50 51 "react-dom": "18.3.1", 51 52 "react-native": "0.74.3",
+128 -16
meson.build
··· 2 2 'aquareum', 3 3 'c', 4 4 'cpp', 5 - default_options: ['cpp_std=c++98', 'default_library=static'], 5 + default_options: { 6 + 'cpp_std': 'c++11', 7 + 'default_library': 'static', 8 + 'auto_features': 'disabled', 9 + 'force_fallback_for': 'glib-2.0,gobject-2.0,gio-2.0,gio-unix-2.0,fdk-aac,zlib,libffi,pcre2,intl', 10 + 'buildtype': 'debug', 11 + }, 6 12 ) 7 13 8 14 fs = import('fs') ··· 35 41 command: ['cp', '@INPUT@', '@OUTPUT@'], 36 42 ) 37 43 44 + # todo: teach meson to automatically populate this CPATH thing for go 38 45 env = { 39 - 'PKG_CONFIG_PATH': meson.current_build_dir() + '/meson-private', 40 - 'CPATH': meson.current_source_dir() + '/subprojects/ffmpeg' + ':' + meson.current_build_dir() + '/subprojects/ffmpeg', 46 + 'PKG_CONFIG_PATH': meson.current_build_dir() + '/meson-uninstalled', 41 47 } 48 + 42 49 GOOS = meson.get_external_property('GOOS', host_machine.system()) 43 50 GOARCH = meson.get_external_property('GOARCH', host_machine.cpu()) 44 51 if GOARCH == 'x86_64' ··· 79 86 aquareum_deps += mistserver 80 87 endif 81 88 89 + ffmpeg_opts = { 90 + 'aac_adtstoasc_bsf': 'enabled', 91 + 'aac_decoder': 'enabled', 92 + 'aac_latm_parser': 'enabled', 93 + 'aac_parser': 'enabled', 94 + 'aformat_filter': 'enabled', 95 + 'aresample_filter': 'enabled', 96 + 'asetnsamples_filter': 'enabled', 97 + 'auto_features': 'enabled', 98 + 'concat_demuxer': 'enabled', 99 + 'concat_filter': 'enabled', 100 + 'concat_protocol': 'enabled', 101 + 'concatf_protocol': 'enabled', 102 + 'cuda': 'enabled', 103 + 'extract_extradata_bsf': 'enabled', 104 + 'fd_protocol': 'enabled', 105 + 'fifo_muxer': 'enabled', 106 + 'file_protocol': 'enabled', 107 + 'flac_decoder': 'enabled', 108 + 'flac_demuxer': 'enabled', 109 + 'flac_muxer': 'enabled', 110 + 'flac_parser': 'enabled', 111 + 'flv_demuxer': 'enabled', 112 + 'format_filter': 'enabled', 113 + 'fps_filter': 'enabled', 114 + 'gpl': 'enabled', 115 + 'h264_cuvid_decoder': 'enabled', 116 + 'h264_decoder': 'enabled', 117 + 'h264_metadata_bsf': 'enabled', 118 + 'h264_mp4toannexb_bsf': 'enabled', 119 + 'h264_parser': 'enabled', 120 + 'h264_redundant_pps_bsf': 'enabled', 121 + 'hevc_cuvid_decoder': 'enabled', 122 + 'hevc_mp4toannexb_bsf': 'enabled', 123 + 'hevc_muxer': 'enabled', 124 + 'hevc_parser': 'enabled', 125 + 'hls_muxer': 'enabled', 126 + 'http_protocol': 'enabled', 127 + 'hwdownload_filter': 'enabled', 128 + 'hwupload_cuda_filter': 'enabled', 129 + 'image2_demuxer': 'enabled', 130 + 'libxcb': 'enabled', 131 + 'matroska_audio_muxer': 'enabled', 132 + 'matroska_demuxer': 'enabled', 133 + 'matroska_muxer': 'enabled', 134 + 'mov_demuxer': 'enabled', 135 + 'mp3_decoder': 'enabled', 136 + 'mp3_demuxer': 'enabled', 137 + 'mp3_muxer': 'enabled', 138 + 'mp4_muxer': 'enabled', 139 + 'mpegaudio_parser': 'enabled', 140 + 'mpegts_demuxer': 'enabled', 141 + 'mpegts_muxer': 'enabled', 142 + 'network': 'enabled', 143 + 'null_muxer': 'enabled', 144 + 'opus_decoder': 'enabled', 145 + 'opus_parser': 'enabled', 146 + 'pipe_protocol': 'enabled', 147 + 'png_decoder': 'enabled', 148 + 'png_parser': 'enabled', 149 + 'rtmp_protocol': 'enabled', 150 + 'scale_cuda_filter': 'enabled', 151 + 'scale_filter': 'enabled', 152 + 'segment_muxer': 'enabled', 153 + 'stream_segment_muxer': 'enabled', 154 + 'select_filter': 'enabled', 155 + 'signature_cuda_filter': 'enabled', 156 + 'signature_filter': 'enabled', 157 + 'tcp_protocol': 'enabled', 158 + 'vorbis_decoder': 'enabled', 159 + 'vorbis_parser': 'enabled', 160 + 'vp8_cuvid_decoder': 'enabled', 161 + 'vp8_parser': 'enabled', 162 + 'vp9_cuvid_decoder': 'enabled', 163 + 'vp9_parser': 'enabled', 164 + 'wav_demuxer': 'enabled', 165 + 'wav_muxer': 'enabled', 166 + 'webm_muxer': 'enabled', 167 + 'x86asm': 'enabled', 168 + 'mov_muxer': 'enabled', 169 + 'vp9_superframe_bsf': 'enabled', 170 + 'ac3_parser': 'enabled', 171 + 'pgs_frame_merge_bsf': 'enabled', 172 + } 173 + 174 + if host_machine.system() != 'windows' 175 + ffmpeg_opts += {'nanosleep': 'enabled'} 176 + endif 177 + 82 178 ffmpeg_proj = subproject( 83 - 'ffmpeg', 179 + 'FFmpeg', 84 180 # vvc seems to have problems building as a submodule? 85 - default_options: { 86 - 'vvc_decoder': 'disabled', 87 - 'vvc_demuxer': 'disabled', 88 - 'vvc_metadata_bsf': 'disabled', 89 - 'vvc_mp4toannexb_bsf': 'disabled', 90 - 'vvc_muxer': 'disabled', 91 - 'vvc_parser': 'disabled', 92 - 'signature_cuda_filter': 'enabled', 93 - 'signature_filter': 'enabled', 94 - 'gpl': 'enabled', 95 - }, 181 + default_options: ffmpeg_opts, 96 182 ) 97 183 avformat = ffmpeg_proj.get_variable('libavformat_dep') 98 184 avfilter = ffmpeg_proj.get_variable('libavfilter_dep') 185 + aquareum_deps += [avformat, avfilter] 186 + 187 + gstreamer_proj = subproject( 188 + 'gstreamer-full', 189 + default_options: {}, 190 + ) 191 + 192 + gst_full_dep = gstreamer_proj.get_variable('gst_full_dep') 193 + aquareum_deps += [ 194 + gst_full_dep, 195 + dependency('gio-2.0'), 196 + dependency('gstreamer-controller-1.0'), 197 + dependency('gstreamer-pbutils-1.0'), 198 + dependency('gstplayback'), 199 + ] 99 200 100 201 libaquareum = custom_target( 101 202 'libaquareum', ··· 121 222 '-lntdll', 122 223 '-luserenv', 123 224 '-lcrypt32', 225 + '-lbcrypt', 124 226 ], language : [ 'c', 'cpp' ]) 125 227 endif 228 + 229 + pkg = import('pkgconfig') 230 + pkg.generate( 231 + name: 'aquareumdeps', 232 + libraries: [aquareum_deps, c2pa_go_dep], 233 + description: 'all aquareum dependencies bundled for easy inclusion' 234 + ) 235 + 126 236 aquareum = executable( 127 237 'aquareum', 128 238 ['cmd/aquareum/aquareum.c', libaquareum], 129 - dependencies: [avformat, avfilter, aquareum_deps], 239 + dependencies: [ 240 + aquareum_deps, 241 + ], 130 242 link_with: [c2pa_go_dep], 131 243 ) 132 244
+67 -1
pkg/api/api.go
··· 9 9 "log/slog" 10 10 "net" 11 11 "net/http" 12 + "net/url" 12 13 "os" 14 + "strings" 13 15 "time" 14 16 15 17 "github.com/NYTimes/gziphandler" ··· 23 25 apierrors "aquareum.tv/aquareum/pkg/errors" 24 26 "aquareum.tv/aquareum/pkg/log" 25 27 "aquareum.tv/aquareum/pkg/media" 28 + "aquareum.tv/aquareum/pkg/mist/mistconfig" 26 29 "aquareum.tv/aquareum/pkg/model" 27 30 "aquareum.tv/aquareum/pkg/notifications" 28 31 v0 "aquareum.tv/aquareum/pkg/schema/v0" ··· 67 70 if err2 == nil { 68 71 return file, nil 69 72 } 70 - return nil, err1 73 + if !errors.Is(err2, os.ErrNotExist) { 74 + return nil, err2 75 + } 76 + 77 + return fs.FileSystem.Open("index.html") 71 78 } 72 79 73 80 func (a *AquareumAPI) Handler(ctx context.Context) (http.Handler, error) { ··· 83 90 router.HandlerFunc("GET", "/app-updates", a.HandleAppUpdates(ctx)) 84 91 // new ones 85 92 apiRouter.HandlerFunc("GET", "/api/manifest", a.HandleAppUpdates(ctx)) 93 + apiRouter.POST("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 94 + apiRouter.OPTIONS("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 95 + apiRouter.DELETE("/api/webrtc/:stream", a.MistProxyHandler(ctx, "/webrtc/%s")) 96 + apiRouter.GET("/api/hls/:stream/*resource", a.MistProxyHandler(ctx, "/hls/%s")) 86 97 apiRouter.NotFound = a.HandleAPI404(ctx) 87 98 router.Handler("GET", "/api/*resource", apiRouter) 88 99 router.Handler("POST", "/api/*resource", apiRouter) ··· 96 107 handler = sloghttp.New(slog.Default())(handler) 97 108 98 109 return handler, nil 110 + } 111 + 112 + func copyHeader(dst, src http.Header) { 113 + for k, vv := range src { 114 + // we'll handle CORS ourselves, thanks 115 + if strings.HasPrefix(k, "Access-Control") { 116 + continue 117 + } 118 + for _, v := range vv { 119 + dst.Add(k, v) 120 + } 121 + } 122 + } 123 + 124 + func (a *AquareumAPI) MistProxyHandler(ctx context.Context, tmpl string) httprouter.Handle { 125 + return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 126 + if !a.CLI.HasMist() { 127 + apierrors.WriteHTTPNotImplemented(w, "Playback only on the Linux version for now", nil) 128 + return 129 + } 130 + stream := params.ByName("stream") 131 + if stream == "" { 132 + apierrors.WriteHTTPBadRequest(w, "missing stream in request", nil) 133 + return 134 + } 135 + 136 + fullstream := fmt.Sprintf("%s+%s", mistconfig.STREAM_NAME, stream) 137 + prefix := fmt.Sprintf(tmpl, fullstream) 138 + resource := params.ByName("resource") 139 + 140 + // path := strings.TrimPrefix(req.URL.EscapedPath(), "/api") 141 + 142 + client := &http.Client{} 143 + req.URL = &url.URL{ 144 + Scheme: "http", 145 + Host: fmt.Sprintf("127.0.0.1:%d", a.CLI.MistHTTPPort), 146 + Path: fmt.Sprintf("%s%s", prefix, resource), 147 + RawQuery: req.URL.RawQuery, 148 + } 149 + 150 + //http: Request.RequestURI can't be set in client requests. 151 + //http://golang.org/src/pkg/net/http/client.go 152 + req.RequestURI = "" 153 + 154 + resp, err := client.Do(req) 155 + if err != nil { 156 + apierrors.WriteHTTPInternalServerError(w, "error connecting to mist", err) 157 + return 158 + } 159 + defer resp.Body.Close() 160 + 161 + copyHeader(w.Header(), resp.Header) 162 + w.WriteHeader(resp.StatusCode) 163 + io.Copy(w, resp.Body) 164 + } 99 165 } 100 166 101 167 func (a *AquareumAPI) FileHandler(ctx context.Context, fs http.Handler) http.HandlerFunc {
+92 -1
pkg/api/api_internal.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "net/http" 9 + "path/filepath" 10 + "regexp" 9 11 "time" 10 12 11 13 "aquareum.tv/aquareum/pkg/errors" ··· 33 35 34 36 // lightweight way to authenticate push requests to ourself 35 37 var secretUUID string 38 + var mkvRE *regexp.Regexp 36 39 37 40 func init() { 38 41 uu, err := uuid.NewV7() ··· 40 43 panic(err) 41 44 } 42 45 secretUUID = uu.String() 46 + 47 + mkvRE = regexp.MustCompile(`^\d+\.mkv$`) 48 + 43 49 } 44 50 45 51 func (a *AquareumAPI) InternalHandler(ctx context.Context) (http.Handler, error) { ··· 59 65 triggerCollection := misttriggers.NewMistCallbackHandlersCollection(a.CLI, broker) 60 66 router.POST("/mist-trigger", triggerCollection.Trigger()) 61 67 68 + router.GET("/playback/:user/concat", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 69 + user := p.ByName("user") 70 + if user == "" { 71 + errors.WriteHTTPBadRequest(w, "user required", nil) 72 + return 73 + } 74 + w.Header().Set("content-type", "text/plain") 75 + fmt.Fprintf(w, "ffconcat version 1.0\n") 76 + // intermittent reports that you need two here to make things work properly? shouldn't matter. 77 + for i := 0; i < 2; i += 1 { 78 + fmt.Fprintf(w, "file '%s/playback/%s/latest.mp4'\n", a.CLI.OwnInternalURL(), user) 79 + } 80 + }) 81 + 82 + router.GET("/playback/:user/latest.mp4", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 83 + user := p.ByName("user") 84 + if user == "" { 85 + errors.WriteHTTPBadRequest(w, "user required", nil) 86 + return 87 + } 88 + log.Log(ctx, "got latest.mp4 request", "user", user) 89 + file := <-a.MediaManager.SubscribeSegment(ctx, user) 90 + w.Header().Set("Location", fmt.Sprintf("%s/playback/%s/segment/%s\n", a.CLI.OwnInternalURL(), user, file)) 91 + w.WriteHeader(301) 92 + }) 93 + 94 + router.GET("/playback/:user/segment/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 95 + user := p.ByName("user") 96 + if user == "" { 97 + errors.WriteHTTPBadRequest(w, "user required", nil) 98 + return 99 + } 100 + file := p.ByName("file") 101 + if file == "" { 102 + errors.WriteHTTPBadRequest(w, "file required", nil) 103 + return 104 + } 105 + fullpath := filepath.Join(a.CLI.DataDir, "segments", user, file) 106 + http.ServeFile(w, r, fullpath) 107 + }) 108 + 109 + router.GET("/playback/:user/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 110 + user := p.ByName("user") 111 + if user == "" { 112 + errors.WriteHTTPBadRequest(w, "user required", nil) 113 + return 114 + } 115 + w.Header().Set("Content-Type", "video/x-matroska") 116 + w.WriteHeader(200) 117 + err := a.MediaManager.StreamToMKV(ctx, user, w) 118 + if err != nil { 119 + log.Log(ctx, "stream.mkv error", "error", err) 120 + } 121 + }) 122 + 123 + router.HEAD("/playback/:user/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 124 + user := p.ByName("user") 125 + if user == "" { 126 + errors.WriteHTTPBadRequest(w, "user required", nil) 127 + return 128 + } 129 + w.Header().Set("Content-Type", "video/x-matroska") 130 + w.Header().Set("Transfer-Encoding", "chunked") 131 + w.WriteHeader(200) 132 + }) 133 + 134 + // handler for post-segmented mkv streams 135 + router.POST("/playback/:user/:uuid/stream.mkv", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 136 + user := p.ByName("user") 137 + if user == "" { 138 + errors.WriteHTTPBadRequest(w, "user required", nil) 139 + return 140 + } 141 + uu := p.ByName("uuid") 142 + if uu == "" { 143 + errors.WriteHTTPBadRequest(w, "uuid required", nil) 144 + return 145 + } 146 + a.MediaManager.HandleMKVStream(ctx, user, uu, r.Body) 147 + }) 148 + 62 149 // internal route called for each pushed segment from ffmpeg 63 150 router.POST("/segment/:uuid/:user/:file", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 64 151 ms := time.Now().UnixMilli() ··· 73 160 errors.WriteHTTPInternalServerError(w, "invalid code path: got empty user?", nil) 74 161 return 75 162 } 163 + f := p.ByName("file") 164 + if !mkvRE.MatchString(f) { 165 + errors.WriteHTTPBadRequest(w, "file was not in number.mp4 format", nil) 166 + return 167 + } 76 168 ctx := log.WithLogValues(ctx, "user", user, "file", p.ByName("file"), "time", fmt.Sprintf("%d", ms)) 77 169 err := a.MediaManager.SignSegment(ctx, r.Body, ms) 78 170 if err != nil { ··· 80 172 errors.WriteHTTPInternalServerError(w, "segment error", err) 81 173 return 82 174 } 83 - log.Log(ctx, "segment success", "url", r.URL.String()) 84 175 }) 85 176 86 177 // route to accept an incoming mkv stream from OBS, segment it, and push the segments back to this HTTP handler
+12 -30
pkg/cmd/aquareum.go
··· 1 1 package cmd 2 2 3 3 import ( 4 - "bufio" 5 4 "context" 6 5 "flag" 7 6 "fmt" 8 - "net/http" 9 7 "os" 10 8 "os/signal" 11 9 "path/filepath" 12 10 "runtime" 13 - "strings" 11 + "runtime/pprof" 14 12 "syscall" 15 13 16 14 "aquareum.tv/aquareum/pkg/crypto/signers/eip712" ··· 22 20 "aquareum.tv/aquareum/pkg/api" 23 21 "aquareum.tv/aquareum/pkg/config" 24 22 "aquareum.tv/aquareum/pkg/model" 25 - "github.com/peterbourgon/ff/v3" 23 + _ "github.com/go-gst/go-glib/glib" 24 + _ "github.com/go-gst/go-gst/gst" 26 25 ) 27 26 28 27 // Additional jobs that can be injected by platforms ··· 30 29 31 30 // parse the CLI and fire up an aquareum node! 32 31 func start(build *config.BuildFlags, platformJobs []jobFunc) error { 33 - if len(os.Args) > 1 && os.Args[1] == "slurp-file" { 34 - fs := flag.NewFlagSet("aquareum-slurp-file", flag.ExitOnError) 35 - inurl := fs.String("url", "", "Base URL to send slurped files to") 36 - fname := fs.String("file", "", "Name of this file we're uploading") 37 - ff.Parse( 38 - fs, os.Args[2:], 39 - ff.WithEnvVarPrefix("AQ"), 40 - ) 41 - *fname = strings.TrimPrefix(*fname, config.AQUAREUM_SCHEME_PREFIX) 42 - 43 - fullURL := fmt.Sprintf("%s/segment/%s", *inurl, *fname) 44 - 45 - reader := bufio.NewReader(os.Stdin) 46 - req, err := http.NewRequest("POST", fullURL, reader) 47 - if err != nil { 48 - panic(err) 32 + if len(os.Args) > 1 && os.Args[1] == "stream" { 33 + if len(os.Args) != 3 { 34 + fmt.Println("usage: aquareum stream [user]") 35 + os.Exit(1) 49 36 } 50 - client := &http.Client{} 51 - resp, err := client.Do(req) 52 - if err != nil { 53 - panic(err) 54 - } 55 - if resp.StatusCode != 200 { 56 - fmt.Printf("http %s\n", resp.Status) 57 - } 58 - os.Exit(0) 37 + return Stream(os.Args[2]) 59 38 } 60 39 61 40 defaultDataDir, err := config.DefaultDataDir() ··· 178 157 179 158 func handleSignals(ctx context.Context) error { 180 159 c := make(chan os.Signal, 1) 181 - signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) 160 + signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT) 182 161 for { 183 162 select { 184 163 case s := <-c: 164 + if s == syscall.SIGABRT { 165 + pprof.Lookup("goroutine").WriteTo(os.Stderr, 2) 166 + } 185 167 log.Log(ctx, "caught signal, attempting clean shutdown", "signal", s) 186 168 return fmt.Errorf("caught signal=%v", s) 187 169 case <-ctx.Done():
+20
pkg/cmd/stream.go
··· 1 + package cmd 2 + 3 + import ( 4 + "fmt" 5 + "io" 6 + "net/http" 7 + "os" 8 + ) 9 + 10 + func Stream(user string) error { 11 + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:9090/playback/%s/stream.mkv", user)) 12 + if err != nil { 13 + return err 14 + } 15 + if resp.StatusCode != 200 { 16 + return fmt.Errorf("http status %s", resp.Status) 17 + } 18 + io.Copy(os.Stdout, resp.Body) 19 + return nil 20 + }
+5
pkg/config/config.go
··· 11 11 "net" 12 12 "os" 13 13 "path/filepath" 14 + "runtime" 14 15 "strings" 15 16 "time" 16 17 ··· 199 200 return nil 200 201 }) 201 202 } 203 + 204 + func (cli *CLI) HasMist() bool { 205 + return runtime.GOOS == "linux" 206 + }
+131
pkg/media/gstreamer.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "os" 8 + "runtime" 9 + "strings" 10 + 11 + "aquareum.tv/aquareum/pkg/log" 12 + "github.com/go-gst/go-glib/glib" 13 + "github.com/go-gst/go-gst/gst" 14 + "golang.org/x/sync/errgroup" 15 + ) 16 + 17 + func init() { 18 + gst.Init(nil) 19 + } 20 + 21 + // Pipe with a mechanism to keep the FDs not garbage collected 22 + func SafePipe() (*os.File, *os.File, func(), error) { 23 + r, w, err := os.Pipe() 24 + if err != nil { 25 + return nil, nil, nil, err 26 + } 27 + return r, w, func() { 28 + runtime.KeepAlive(r.Fd()) 29 + runtime.KeepAlive(w.Fd()) 30 + }, nil 31 + } 32 + 33 + func AddOpusToMKV(ctx context.Context, input io.Reader, output io.Writer) error { 34 + ir, iw, idone, err := SafePipe() 35 + if err != nil { 36 + return err 37 + } 38 + defer idone() 39 + or, ow, odone, err := SafePipe() 40 + if err != nil { 41 + return err 42 + } 43 + defer odone() 44 + 45 + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) 46 + 47 + pipelineSlice := []string{ 48 + fmt.Sprintf("fdsrc name=livestream fd=%d ! matroskademux name=demux", ir.Fd()), 49 + fmt.Sprintf("matroskamux name=mux ! fdsink fd=%d", ow.Fd()), 50 + "demux.audio_0 ! queue ! tee name=asplit", 51 + "demux.video_0 ! queue ! mux.video_0", 52 + "asplit. ! queue ! fdkaacdec ! audioresample ! opusenc inband-fec=true perfect-timestamp=true bitrate=128000 ! mux.audio_1", 53 + "asplit. ! queue ! mux.audio_0", 54 + } 55 + 56 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 57 + if err != nil { 58 + return err 59 + } 60 + // demux, err := pipeline.GetElementByName("demux") 61 + // if err != nil { 62 + // return err 63 + // } 64 + // // Get the audiotestsrc's src-pad. 65 + // demuxPad := demux.GetStaticPad("sink") 66 + // if demuxPad == nil { 67 + // return fmt.Errorf("src pad on src element was nil") 68 + // } 69 + 70 + // // Add a probe handler on the audiotestsrc's src-pad. 71 + // // This handler gets called for every buffer that passes the pad we probe. 72 + // demuxPad.AddProbe(gst.PadProbeTypeAllBoth, func(self *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { 73 + // // fmt.Printf("%v\n", info) 74 + // return gst.PadProbeOK 75 + // }) 76 + 77 + ctx, cancel := context.WithCancel(ctx) 78 + defer cancel() 79 + go func() { 80 + <-ctx.Done() 81 + pipeline.BlockSetState(gst.StateNull) 82 + mainLoop.Quit() 83 + }() 84 + 85 + // Add a message handler to the pipeline bus, printing interesting information to the console. 86 + pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 87 + switch msg.Type() { 88 + 89 + case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 90 + cancel() 91 + case gst.MessageError: // Error messages are always fatal 92 + err := msg.ParseError() 93 + log.Log(ctx, "gstreamer error", "error", err.Error()) 94 + if debug := err.DebugString(); debug != "" { 95 + log.Log(ctx, "gstreamer debug", "message", debug) 96 + } 97 + cancel() 98 + default: 99 + log.Log(ctx, msg.String()) 100 + } 101 + return true 102 + }) 103 + 104 + // Start the pipeline 105 + pipeline.SetState(gst.StatePlaying) 106 + 107 + g, _ := errgroup.WithContext(ctx) 108 + 109 + g.Go(func() error { 110 + _, err := io.Copy(iw, input) 111 + log.Log(ctx, "input copy complete", "error", err) 112 + iw.Close() 113 + return err 114 + }) 115 + 116 + g.Go(func() error { 117 + mainLoop.Run() 118 + log.Log(ctx, "main loop complete") 119 + ow.Close() 120 + return nil 121 + }) 122 + 123 + g.Go(func() error { 124 + runtime.GC() 125 + _, err := io.Copy(output, or) 126 + log.Log(ctx, "output copy complete", "error", err) 127 + return err 128 + }) 129 + 130 + return g.Wait() 131 + }
+24
pkg/media/gstreamer_test.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "testing" 7 + 8 + _ "aquareum.tv/aquareum/pkg/media/mediatesting" 9 + "github.com/stretchr/testify/require" 10 + ) 11 + 12 + func TestNormalizeAudio(t *testing.T) { 13 + ifile, err := os.Open(getFixture("sample-stream.mkv")) 14 + require.NoError(t, err) 15 + ofile, err := os.CreateTemp("", "*.mkv") 16 + defer os.Remove(ofile.Name()) 17 + require.NoError(t, err) 18 + err = AddOpusToMKV(context.Background(), ifile, ofile) 19 + require.NoError(t, err) 20 + ofile.Close() 21 + info, err := os.Stat(ofile.Name()) 22 + require.NoError(t, err) 23 + require.Greater(t, info.Size(), int64(0)) 24 + }
+111 -17
pkg/media/media.go
··· 9 9 "io" 10 10 "os" 11 11 "path/filepath" 12 + "sync" 12 13 "time" 13 14 14 15 "aquareum.tv/aquareum/pkg/config" 15 16 "aquareum.tv/aquareum/pkg/crypto/signers/eip712" 16 17 "aquareum.tv/aquareum/pkg/log" 18 + "github.com/google/uuid" 17 19 "github.com/livepeer/lpms/ffmpeg" 18 20 "golang.org/x/sync/errgroup" 19 21 ··· 24 26 const SEGMENTS_DIR = "segments" 25 27 26 28 type MediaManager struct { 27 - cli *config.CLI 28 - signer crypto.Signer 29 - cert []byte 30 - user string 29 + cli *config.CLI 30 + signer crypto.Signer 31 + cert []byte 32 + user string 33 + mp4subs map[string][]chan string 34 + mp4subsmut sync.Mutex 35 + mkvsubs map[string]io.Writer 36 + mkvsubsmut sync.Mutex 31 37 } 32 38 33 39 func MakeMediaManager(ctx context.Context, cli *config.CLI, signer *eip712.EIP712Signer) (*MediaManager, error) { ··· 51 57 cli.DataFileRead([]string{CERT_FILE}, &buf) 52 58 cert := buf.Bytes() 53 59 return &MediaManager{ 54 - cli: cli, 55 - signer: signer, 56 - cert: cert, 57 - user: signer.Hex(), 60 + cli: cli, 61 + signer: signer, 62 + cert: cert, 63 + user: signer.Hex(), 64 + mp4subs: map[string][]chan string{}, 65 + mkvsubs: map[string]io.Writer{}, 58 66 }, nil 59 67 } 60 68 ··· 76 84 if err != nil { 77 85 return fmt.Errorf("error signing mp4: %w", err) 78 86 } 87 + go mm.PublishSegment(ctx, mm.user, segmentFile) 79 88 return nil 80 89 } 81 90 91 + // subscribe to the latest segments from a given user for livestreaming purposes 92 + func (mm *MediaManager) SubscribeSegment(ctx context.Context, user string) chan string { 93 + mm.mp4subsmut.Lock() 94 + defer mm.mp4subsmut.Unlock() 95 + _, ok := mm.mp4subs[user] 96 + if !ok { 97 + mm.mp4subs[user] = []chan string{} 98 + } 99 + c := make(chan string) 100 + mm.mp4subs[user] = append(mm.mp4subs[user], c) 101 + return c 102 + } 103 + 104 + // subscribe to the latest segments from a given user for livestreaming purposes 105 + func (mm *MediaManager) PublishSegment(ctx context.Context, user, file string) { 106 + mm.mp4subsmut.Lock() 107 + defer mm.mp4subsmut.Unlock() 108 + for _, sub := range mm.mp4subs[user] { 109 + sub <- file 110 + } 111 + mm.mp4subs[user] = []chan string{} 112 + } 113 + 82 114 func MuxToMP4(ctx context.Context, input io.Reader, output io.Writer) error { 83 115 tc := ffmpeg.NewTranscoder() 84 - ir, iw, err := os.Pipe() 116 + ir, iw, idone, err := SafePipe() 85 117 if err != nil { 86 118 return fmt.Errorf("error opening pipe: %w", err) 87 119 } 120 + defer idone() 88 121 dname, err := os.MkdirTemp("", "aquareum-muxing") 89 122 if err != nil { 90 123 return fmt.Errorf("error making temp directory: %w", err) 91 124 } 92 125 defer func() { 93 - log.Log(ctx, "cleaning up") 126 + // log.Log(ctx, "cleaning up") 94 127 tc.StopTranscoder() 95 128 }() 96 129 oname := filepath.Join(dname, "output.mp4") ··· 115 148 g, _ := errgroup.WithContext(ctx) 116 149 g.Go(func() error { 117 150 _, err := io.Copy(iw, input) 118 - log.Log(ctx, "input copy done", "error", err) 151 + // log.Log(ctx, "input copy done", "error", err) 119 152 iw.Close() 120 153 return err 121 154 }) 122 155 g.Go(func() error { 123 156 _, err = tc.Transcode(in, out) 124 - log.Log(ctx, "transcode done", "error", err) 157 + // log.Log(ctx, "transcode done", "error", err) 125 158 tc.StopTranscoder() 126 159 ir.Close() 127 160 return err ··· 135 168 return err 136 169 } 137 170 defer of.Close() 138 - written, err := io.Copy(output, of) 171 + _, err = io.Copy(output, of) 139 172 if err != nil { 140 173 return err 141 174 } 142 - log.Log(ctx, "transmuxing complete", "out-file", oname, "wrote", written) 175 + of.Close() 176 + status, info, err := ffmpeg.GetCodecInfo(oname) 177 + if err != nil { 178 + return fmt.Errorf("error in GetCodecInfo: %w", err) 179 + } 180 + fmt.Printf("%v %v\n", status, info.DurSecs) 181 + // log.Log(ctx, "transmuxing complete", "out-file", oname, "wrote", written) 143 182 return nil 144 183 } 145 184 146 185 func SegmentToHTTP(ctx context.Context, input io.Reader, prefix string) error { 147 186 tc := ffmpeg.NewTranscoder() 148 187 defer tc.StopTranscoder() 149 - ir, iw, err := os.Pipe() 188 + ir, iw, idone, err := SafePipe() 150 189 if err != nil { 151 190 return fmt.Errorf("error opening pipe: %w", err) 152 191 } 192 + defer idone() 153 193 out := []ffmpeg.TranscodeOptions{ 154 194 { 155 195 Oname: fmt.Sprintf("%s/%%d.mkv", prefix), ··· 173 213 g, _ := errgroup.WithContext(ctx) 174 214 g.Go(func() error { 175 215 _, err := io.Copy(iw, input) 176 - log.Log(ctx, "input copy done", "error", err) 216 + // log.Log(ctx, "input copy done", "error", err) 177 217 iw.Close() 178 218 return err 179 219 }) 180 220 g.Go(func() error { 181 221 _, err = tc.Transcode(in, out) 182 - log.Log(ctx, "transcode done", "error", err) 222 + // log.Log(ctx, "transcode done", "error", err) 183 223 tc.StopTranscoder() 184 224 ir.Close() 185 225 return err 186 226 }) 187 227 return g.Wait() 228 + } 229 + 230 + func (mm *MediaManager) StreamToMKV(ctx context.Context, user string, w io.Writer) error { 231 + tc := ffmpeg.NewTranscoder() 232 + defer tc.StopTranscoder() 233 + uu, err := uuid.NewV7() 234 + if err != nil { 235 + return err 236 + } 237 + mm.mkvsubsmut.Lock() 238 + mm.mkvsubs[uu.String()] = w 239 + mm.mkvsubsmut.Unlock() 240 + iname := fmt.Sprintf("%s/playback/%s/concat", mm.cli.OwnInternalURL(), user) 241 + in := &ffmpeg.TranscodeOptionsIn{ 242 + Fname: iname, 243 + Transmuxing: true, 244 + Profile: ffmpeg.VideoProfile{}, 245 + Loop: -1, 246 + Demuxer: ffmpeg.ComponentOptions{ 247 + Name: "concat", 248 + Opts: map[string]string{ 249 + "safe": "0", 250 + "protocol_whitelist": "file,http,https,tcp,tls", 251 + }, 252 + }, 253 + } 254 + out := []ffmpeg.TranscodeOptions{ 255 + { 256 + Oname: fmt.Sprintf("%s/playback/%s/%s/stream.mkv", mm.cli.OwnInternalURL(), user, uu.String()), 257 + VideoEncoder: ffmpeg.ComponentOptions{ 258 + Name: "copy", 259 + }, 260 + AudioEncoder: ffmpeg.ComponentOptions{ 261 + Name: "copy", 262 + }, 263 + Profile: ffmpeg.VideoProfile{Format: ffmpeg.FormatNone}, 264 + Muxer: ffmpeg.ComponentOptions{ 265 + Name: "matroska", 266 + }, 267 + }, 268 + } 269 + _, err = tc.Transcode(in, out) 270 + return err 271 + } 272 + 273 + func (mm *MediaManager) HandleMKVStream(ctx context.Context, user, uu string, r io.Reader) error { 274 + mm.mkvsubsmut.Lock() 275 + w, ok := mm.mkvsubs[uu] 276 + mm.mkvsubsmut.Unlock() 277 + if !ok { 278 + return fmt.Errorf("uuid not found: %s", uu) 279 + } 280 + err := AddOpusToMKV(ctx, r, w) 281 + return err 188 282 } 189 283 190 284 func (mm *MediaManager) SignMP4(ctx context.Context, input io.ReadSeeker, output io.ReadWriteSeeker, now int64) error {
+2 -4
pkg/media/mediatesting/mediatesting.go
··· 1 1 package mediatesting 2 2 3 - // This exists entirely to give a hardcoded list of LDFLAGS to the testing binary. 4 - // idk. There's probably a better way. 5 - 6 - // #cgo LDFLAGS: -L ../../../build/subprojects/ffmpeg -L ../../../build/subprojects/c2pa_go -lavcodec -lavfilter -lavformat -lavutil -lc2pa -lm -lpostproc -lswresample -lswscale -lz 3 + // #cgo pkg-config: aquareumdeps-uninstalled 4 + // #cgo LDFLAGS: -L ../../../build/subprojects/c2pa_go 7 5 import "C"
+1 -12
pkg/mist/mistconfig/mistconfig.go
··· 32 32 "password": md5.Sum([]byte("aquareum")), 33 33 }, 34 34 }, 35 - "autopushes": [][]any{{ 36 - fmt.Sprintf("%s+", STREAM_NAME), 37 - fmt.Sprintf("%s$wildcard/$currentMediaTime.ts?split=1&video=maxbps&audio=AAC&append=1", config.AQUAREUM_SCHEME_PREFIX), 38 - }}, 39 35 "bandwidth": map[string]any{ 40 36 "exceptions": []string{ 41 37 "::1", ··· 43 39 "10.0.0.0/8", 44 40 "192.168.0.0/16", 45 41 "172.16.0.0/12", 46 - }, 47 - }, 48 - "extwriters": [][]any{ 49 - { 50 - "aquareum", 51 - fmt.Sprintf("%s slurp-file --url=%s --file", exec, cli.OwnInternalURL()), 52 - []string{"aquareum"}, 53 42 }, 54 43 }, 55 44 "config": map[string]any{ ··· 101 90 STREAM_NAME: { 102 91 "name": STREAM_NAME, 103 92 "segmentsize": 1, 104 - "source": "push://", 93 + "source": fmt.Sprintf("mkv-exec:%s stream $wildcard", exec), 105 94 "stop_sessions": false, 106 95 }, 107 96 },
+3 -2
subprojects/.gitignore
··· 1 1 * 2 2 !.gitignore 3 3 !mistserver.wrap 4 - !ffmpeg.wrap 5 - !c2pa_go.wrap 4 + !FFmpeg.wrap 5 + !c2pa_go.wrap 6 + !gstreamer-full.wrap
subprojects/ffmpeg.wrap subprojects/FFmpeg.wrap
+4
subprojects/gstreamer-full.wrap
··· 1 + [wrap-git] 2 + url = https://gitlab.freedesktop.org/gstreamer/gstreamer.git 3 + revision = 8b0ae3ba8781cb89f8c50e131132b6c951f1303a 4 + depth = 1
test/fixtures/sample-stream.mkv

This is a binary file and will not be displayed.

+1
util/darwin-amd64-apple.ini
··· 2 2 c = 'clang' 3 3 cpp = 'clang++' 4 4 objc = 'clang' 5 + objcpp = 'clang' 5 6 ; ar = 'llvm-ar' 6 7 ; strip = 'llvm-strip' 7 8
+3 -2
util/windows-amd64-gnu.ini
··· 6 6 7 7 [host_machine] 8 8 system = 'windows' 9 - cpu_family = 'amd64' 10 - cpu = 'amd64' 9 + cpu_family = 'x86_64' 10 + cpu = 'x86_64' 11 11 endian = 'little' 12 12 13 13 ; Working clang version should we need it: ··· 17 17 ar = 'x86_64-w64-mingw32-ar' 18 18 strip = 'x86_64-w64-mingw32-strip' 19 19 windres = 'x86_64-w64-mingw32-windres' 20 + pkg-config = 'x86_64-w64-mingw32-pkg-config' 20 21 ; c_ld = 'x86_64-w64-mingw32-ld.bfd' 21 22 ; cpp_ld = 'x86_64-w64-mingw32-ld.bfd' 22 23
+8
yarn.lock
··· 7920 7920 expo-system-ui: "npm:~3.0.7" 7921 7921 expo-updates: "npm:~0.25.20" 7922 7922 expo-web-browser: "npm:~13.0.3" 7923 + hls.js: "npm:^1.5.15" 7923 7924 react: "npm:18.3.1" 7924 7925 react-dom: "npm:18.3.1" 7925 7926 react-native: "npm:0.74.3" ··· 12241 12242 version: 1.0.8 12242 12243 resolution: "hey-listen@npm:1.0.8" 12243 12244 checksum: 10/744b5f4c18c7cfb82b22bd22e1d300a9ac4eafe05a22e58fb87e48addfca8be00604d9aa006434ea02f9530990eb4b393ddb28659e2ab7f833ce873e32eb809c 12245 + languageName: node 12246 + linkType: hard 12247 + 12248 + "hls.js@npm:^1.5.15": 12249 + version: 1.5.15 12250 + resolution: "hls.js@npm:1.5.15" 12251 + checksum: 10/58dd5c70e233a3d66ebba9f55bfbe6673ac9d941d391afd896d44e7a141cba931fd25c392133c13f65fbd82e85e1575ada68a2722d1da38b6ad7b9b6a93a6a6b 12244 12252 languageName: node 12245 12253 linkType: hard 12246 12254