Live video on the AT Protocol

fix some gstreamer memory leaks + add leak tooling

See merge request streamplace/streamplace!103

Changelog: feature

+473 -302
+1
.gitignore
··· 15 15 build-* 16 16 .ci/build.env 17 17 *.log 18 + *.heap
+7
hack/atproto-key.mjs
··· 1 + import { Secp256k1Keypair, bytesToMultibase } from "@atproto/crypto"; 2 + 3 + const keypair = await Secp256k1Keypair.create({ exportable: true }); 4 + const exportedKey = await keypair.export(); 5 + const multibaseKey = bytesToMultibase(exportedKey, "base58btc"); 6 + console.log(keypair.did()); 7 + console.log(multibaseKey);
+3 -1
js/app/hooks/useLiveUser.tsx
··· 9 9 if (!profile) { 10 10 return false; 11 11 } 12 - const isLive = segments.some((segment) => segment.repo.did === profile.did); 12 + const isLive = segments.some( 13 + (segment) => segment.repo && segment.repo.did === profile.did, 14 + ); 13 15 return isLive; 14 16 };
+27 -12
pkg/api/playback.go
··· 12 12 "strings" 13 13 "time" 14 14 15 + atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 15 16 "github.com/decred/dcrd/dcrec/secp256k1" 16 17 "github.com/julienschmidt/httprouter" 17 18 "github.com/mr-tron/base58" ··· 191 192 // it's easy to copy-paste a trailing or leading space, so clear those out 192 193 encoded = strings.TrimSpace(encoded) 193 194 } 195 + 194 196 if len(encoded) < 2 || encoded[0] != 'z' { 195 197 errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil) 196 198 return 197 199 } 198 - data, err := base58.Decode(encoded[1:]) 199 - if err != nil { 200 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a multibase base58btc string)", nil) 201 - return 200 + 201 + var addrBytes []byte 202 + var didBytes []byte 203 + priv, err := atcrypto.ParsePrivateMultibase(encoded) 204 + if err == nil { 205 + if err != nil { 206 + errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", nil) 207 + return 208 + } 209 + addrBytes = priv.Bytes() 210 + } else { 211 + decoded, err := base58.Decode(encoded[1:]) 212 + if err != nil { 213 + errors.WriteHTTPUnauthorized(w, "invalid authorization key (not a base58btc string)", nil) 214 + return 215 + } 216 + addrBytes = decoded[:32] 217 + didBytes = decoded[32:] 202 218 } 203 - addrBytes := data[:32] 204 - didBytes := data[32:] 205 219 206 220 key, _ := secp256k1.PrivKeyFromBytes(addrBytes) 207 221 if key == nil { ··· 211 225 var signer crypto.Signer = key.ToECDSA() 212 226 213 227 did := string(didBytes) 214 - 215 - mediaSigner, err := media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 216 - if err != nil { 217 - errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 218 - return 219 - } 220 228 221 229 if did != "" { 222 230 repo, err := atproto.SyncBlueskyRepo(ctx, did, a.Model) ··· 241 249 apierrors.WriteHTTPUnauthorized(w, "user is not allowed to stream", err) 242 250 return 243 251 } 252 + } 253 + 254 + mediaSigner, err := media.MakeMediaSignerExt(ctx, a.CLI, did, addrBytes) 255 + // mediaSigner, err := media.MakeMediaSigner(ctx, a.CLI, did, signer) 256 + if err != nil { 257 + errors.WriteHTTPUnauthorized(w, "invalid authorization key (not valid secp256k1)", err) 258 + return 244 259 } 245 260 246 261 body, err := io.ReadAll(r.Body)
+4
pkg/cmd/sign.go
··· 29 29 return err 30 30 } 31 31 32 + if *streamerName == "" { 33 + return fmt.Errorf("streamer name is required") 34 + } 35 + 32 36 secpSigner, _ := secp256k1.PrivKeyFromBytes(keyBs) 33 37 if secpSigner == nil { 34 38 return fmt.Errorf("invalid key")
+5 -5
pkg/cmd/streamplace.go
··· 83 83 return Sign(context.Background()) 84 84 } 85 85 86 + if len(os.Args) > 1 && os.Args[1] == "whip" { 87 + return WHIP() 88 + } 89 + 86 90 if len(os.Args) > 1 && os.Args[1] == "self-test" { 87 91 err := media.RunSelfTest(context.Background()) 88 92 if err != nil { ··· 122 126 fs.StringVar(&cli.AppBundleID, "app-bundle-id", "", "bundle id of an app that we facilitate oauth login for") 123 127 fs.StringVar(&cli.StreamerName, "streamer-name", "", "name of the person streaming from this streamplace node") 124 128 fs.StringVar(&cli.FrontendProxy, "dev-frontend-proxy", "", "(FOR DEVELOPMENT ONLY) proxy frontend requests to this address instead of using the bundled frontend") 129 + fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 125 130 cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 126 131 cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 127 132 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 128 133 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 129 134 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") 130 - doValidate := fs.Bool("validate", false, "validate media") 131 135 verbosity := fs.String("v", "3", "log verbosity level") 132 136 fs.StringVar(&cli.RelayHost, "relay-host", "wss://bsky.network", "websocket url for relay firehose") 133 137 fs.Bool("insecure", false, "DEPRECATED, does nothing.") ··· 166 170 "runtime.Version", runtime.Version()) 167 171 if *version { 168 172 return nil 169 - } 170 - 171 - if *doValidate { 172 - return media.ValidateMedia(ctx) 173 173 } 174 174 175 175 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version)
+371
pkg/cmd/whip.go
··· 1 + package cmd 2 + 3 + import ( 4 + "context" 5 + "flag" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "os" 10 + "strings" 11 + "time" 12 + 13 + atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 14 + "github.com/go-gst/go-gst/gst" 15 + "github.com/go-gst/go-gst/gst/app" 16 + "github.com/pion/webrtc/v4" 17 + "github.com/pion/webrtc/v4/pkg/media" 18 + "golang.org/x/sync/errgroup" 19 + "stream.place/streamplace/pkg/log" 20 + ) 21 + 22 + func WHIP() error { 23 + fs := flag.NewFlagSet("whip", flag.ExitOnError) 24 + streamKey := fs.String("stream-key", "", "stream key") 25 + count := fs.Int("count", 1, "number of concurrent streams (for load testing)") 26 + duration := fs.Duration("duration", 0, "duration of the stream") 27 + file := fs.String("file", "", "file to stream (needs to be an MP4 containing H264 video and Opus audio)") 28 + endpoint := fs.String("endpoint", "http://127.0.0.1:38080", "endpoint to send the WHIP request to") 29 + err := fs.Parse(os.Args[2:]) 30 + if *file == "" { 31 + return fmt.Errorf("file is required") 32 + } 33 + if err != nil { 34 + return err 35 + } 36 + 37 + ctx := context.Background() 38 + if *duration > 0 { 39 + var cancel context.CancelFunc 40 + ctx, cancel = context.WithTimeout(ctx, *duration) 41 + defer cancel() 42 + } 43 + 44 + w := &WHIPClient{ 45 + StreamKey: *streamKey, 46 + File: *file, 47 + Endpoint: *endpoint, 48 + Count: *count, 49 + } 50 + 51 + return w.WHIP(ctx) 52 + } 53 + 54 + type WHIPClient struct { 55 + StreamKey string 56 + File string 57 + Endpoint string 58 + Count int 59 + } 60 + 61 + var failureStates = []webrtc.ICEConnectionState{ 62 + webrtc.ICEConnectionStateFailed, 63 + webrtc.ICEConnectionStateDisconnected, 64 + webrtc.ICEConnectionStateClosed, 65 + webrtc.ICEConnectionStateCompleted, 66 + } 67 + 68 + type WHIPConnection struct { 69 + peerConnection *webrtc.PeerConnection 70 + audioTrack *webrtc.TrackLocalStaticSample 71 + videoTrack *webrtc.TrackLocalStaticSample 72 + did string 73 + } 74 + 75 + func (w *WHIPClient) StartWHIPConnection(ctx context.Context) (*WHIPConnection, error) { 76 + 77 + var streamKey string 78 + var did string 79 + if w.StreamKey != "" { 80 + streamKey = w.StreamKey 81 + } else { 82 + priv, err := atcrypto.GeneratePrivateKeyK256() 83 + if err != nil { 84 + return nil, err 85 + } 86 + pub, err := priv.PublicKey() 87 + if err != nil { 88 + return nil, err 89 + } 90 + 91 + did = pub.DIDKey() 92 + ctx = log.WithLogValues(ctx, "did", did) 93 + streamKey = priv.Multibase() 94 + } 95 + 96 + // Prepare the configuration 97 + config := webrtc.Configuration{} 98 + 99 + // Create a new RTCPeerConnection 100 + peerConnection, err := webrtc.NewPeerConnection(config) 101 + if err != nil { 102 + return nil, err 103 + } 104 + 105 + // Create a audio track 106 + audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion1") 107 + if err != nil { 108 + return nil, err 109 + } 110 + _, err = peerConnection.AddTrack(audioTrack) 111 + if err != nil { 112 + return nil, err 113 + } 114 + 115 + // Create a video track 116 + videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: "video/h264"}, "video", "pion2") 117 + if err != nil { 118 + return nil, err 119 + } 120 + _, err = peerConnection.AddTrack(videoTrack) 121 + if err != nil { 122 + return nil, err 123 + } 124 + 125 + // Create an offer 126 + offer, err := peerConnection.CreateOffer(nil) 127 + if err != nil { 128 + return nil, err 129 + } 130 + 131 + // Set the generated offer as our LocalDescription 132 + err = peerConnection.SetLocalDescription(offer) 133 + if err != nil { 134 + return nil, err 135 + } 136 + 137 + // Wait for ICE gathering to complete 138 + // gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 139 + // <-gatherComplete 140 + 141 + // Create HTTP client and prepare the request 142 + client := &http.Client{} 143 + 144 + // Send the WHIP request to the server 145 + req, err := http.NewRequest("POST", w.Endpoint, strings.NewReader(offer.SDP)) 146 + if err != nil { 147 + return nil, err 148 + } 149 + req.Header.Set("Authorization", "Bearer "+streamKey) 150 + req.Header.Set("Content-Type", "application/sdp") 151 + 152 + // Execute the request 153 + resp, err := client.Do(req) 154 + if err != nil { 155 + return nil, err 156 + } 157 + defer resp.Body.Close() 158 + 159 + // Read and process the answer 160 + answerBytes, err := io.ReadAll(resp.Body) 161 + if err != nil { 162 + return nil, err 163 + } 164 + 165 + // Parse the SDP answer 166 + var answer webrtc.SessionDescription 167 + answer.Type = webrtc.SDPTypeAnswer 168 + answer.SDP = string(answerBytes) 169 + 170 + // Apply the answer as remote description 171 + err = peerConnection.SetRemoteDescription(answer) 172 + if err != nil { 173 + return nil, err 174 + } 175 + 176 + gatherComplete := webrtc.GatheringCompletePromise(peerConnection) 177 + <-gatherComplete 178 + 179 + conn := &WHIPConnection{ 180 + peerConnection: peerConnection, 181 + audioTrack: audioTrack, 182 + videoTrack: videoTrack, 183 + did: did, 184 + } 185 + 186 + return conn, nil 187 + } 188 + 189 + func (w *WHIPClient) WHIP(ctx context.Context) error { 190 + ctx, cancel := context.WithCancel(ctx) 191 + defer cancel() 192 + 193 + // Initialize GStreamer 194 + gst.Init(nil) 195 + 196 + pipelineSlice := []string{ 197 + "filesrc name=filesrc ! qtdemux name=demux", 198 + "demux.video_0 ! tee name=video_tee", 199 + "demux.audio_0 ! tee name=audio_tee", 200 + "video_tee. ! queue ! h264parse ! video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 201 + "audio_tee. ! queue ! opusparse ! rtpopuspay ! appsink name=audioappsink", 202 + // "matroskamux name=mux ! fakesink name=fakesink sync=true", 203 + // "video_tee. ! mux.video_0", 204 + // "audio_tee. ! mux.audio_0", 205 + } 206 + 207 + pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 208 + if err != nil { 209 + return err 210 + } 211 + 212 + fileSrc, err := pipeline.GetElementByName("filesrc") 213 + if err != nil { 214 + return err 215 + } 216 + 217 + fileSrc.Set("location", w.File) 218 + 219 + videoSink, err := pipeline.GetElementByName("videoappsink") 220 + if err != nil { 221 + return err 222 + } 223 + 224 + audioSink, err := pipeline.GetElementByName("audioappsink") 225 + if err != nil { 226 + return err 227 + } 228 + 229 + startTime := time.Now() 230 + sinks := []*app.Sink{ 231 + app.SinkFromElement(videoSink), 232 + app.SinkFromElement(audioSink), 233 + } 234 + // Create accumulators for tracking elapsed duration 235 + accumulators := make([]time.Duration, len(sinks)) 236 + 237 + conns := make([]*WHIPConnection, w.Count) 238 + g := &errgroup.Group{} 239 + for i := 0; i < w.Count; i++ { 240 + g.Go(func() error { 241 + conn, err := w.StartWHIPConnection(ctx) 242 + if err != nil { 243 + return err 244 + } 245 + conns[i] = conn 246 + ctx := log.WithLogValues(ctx, "did", conn.did) 247 + conn.peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { 248 + log.Log(ctx, "connection State has changed", "state", connectionState.String()) 249 + for _, state := range failureStates { 250 + if connectionState == state { 251 + log.Log(ctx, "connection failed, cancelling") 252 + cancel() 253 + } 254 + } 255 + }) 256 + return nil 257 + }) 258 + } 259 + if err := g.Wait(); err != nil { 260 + return err 261 + } 262 + // Start a ticker to print elapsed duration every second 263 + go func() { 264 + ticker := time.NewTicker(time.Second) 265 + defer ticker.Stop() 266 + 267 + for { 268 + <-ticker.C 269 + for i, duration := range accumulators { 270 + trackType := "video" 271 + if i == 1 { 272 + trackType = "audio" 273 + } 274 + target := startTime.Add(time.Duration(accumulators[i])) 275 + diff := time.Since(target) 276 + log.Debug(ctx, "elapsed duration", "track", trackType, "duration", duration, "diff", diff) 277 + } 278 + } 279 + }() 280 + 281 + errCh := make(chan error, 1) 282 + 283 + for i, _ := range sinks { 284 + func(i int) { 285 + sink := sinks[i] 286 + trackType := "video" 287 + if i == 1 { 288 + trackType = "audio" 289 + } 290 + 291 + sink.SetCallbacks(&app.SinkCallbacks{ 292 + NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 293 + 294 + sample := sink.PullSample() 295 + if sample == nil { 296 + return gst.FlowEOS 297 + } 298 + 299 + buffer := sample.GetBuffer() 300 + if buffer == nil { 301 + return gst.FlowError 302 + } 303 + 304 + samples := buffer.Map(gst.MapRead).Bytes() 305 + defer buffer.Unmap() 306 + 307 + durationPtr := buffer.Duration().AsDuration() 308 + var duration time.Duration 309 + if durationPtr == nil { 310 + errCh <- fmt.Errorf("%v duration: nil", trackType) 311 + return gst.FlowError 312 + } else { 313 + // fmt.Printf("%v duration: %v\n", trackType, *durationPtr) 314 + duration = *durationPtr 315 + } 316 + 317 + accumulators[i] += duration 318 + 319 + for _, conn := range conns { 320 + if trackType == "video" { 321 + if err := conn.videoTrack.WriteSample(media.Sample{Data: samples, Duration: duration}); err != nil { 322 + log.Log(ctx, "error writing video sample", "error", err) 323 + errCh <- err 324 + return gst.FlowError 325 + } 326 + } else { 327 + if err := conn.audioTrack.WriteSample(media.Sample{Data: samples, Duration: duration}); err != nil { 328 + log.Log(ctx, "error writing video sample", "error", err) 329 + errCh <- err 330 + return gst.FlowError 331 + } 332 + } 333 + } 334 + 335 + return gst.FlowOK 336 + }, 337 + }) 338 + }(i) 339 + } 340 + 341 + ok := pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 342 + switch msg.Type() { 343 + case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 344 + log.Log(ctx, "got gst.MessageEOS, exiting") 345 + cancel() 346 + case gst.MessageError: // Error messages are always fatal 347 + err := msg.ParseError() 348 + log.Error(ctx, "gstreamer error", "error", err.Error()) 349 + if debug := err.DebugString(); debug != "" { 350 + log.Log(ctx, "gstreamer debug", "message", debug) 351 + } 352 + cancel() 353 + default: 354 + log.Debug(ctx, msg.String()) 355 + } 356 + return true 357 + }) 358 + if !ok { 359 + return fmt.Errorf("failed to add watch to pipeline bus") 360 + } 361 + 362 + if err = pipeline.SetState(gst.StatePlaying); err != nil { 363 + return err 364 + } 365 + select { 366 + case err := <-errCh: 367 + return err 368 + case <-ctx.Done(): 369 + return ctx.Err() 370 + } 371 + }
+4
pkg/config/config.go
··· 75 75 RelayHost string 76 76 Debug map[string]map[string]int 77 77 AllowedStreams []string 78 + WideOpen bool 78 79 Peers []string 79 80 TestStream bool 80 81 FrontendProxy string ··· 334 335 } 335 336 336 337 func (cli *CLI) StreamIsAllowed(did string) error { 338 + if cli.WideOpen { 339 + return nil 340 + } 337 341 // if the user set no test streams, anyone can stream 338 342 openServer := len(cli.AllowedStreams) == 0 || (cli.TestStream && len(cli.AllowedStreams) == 1) 339 343 // but only valid atproto accounts! did:key is only allowed for our local test stream
+4 -1
pkg/media/concat.go
··· 16 16 17 17 type ConcatStreamer interface { 18 18 SubscribeSegment(ctx context.Context, user string) <-chan string 19 + UnsubscribeSegment(ctx context.Context, user string, ch <-chan string) 19 20 } 20 21 21 22 // This function remains in scope for the duration of a single users' playback ··· 127 128 allFiles := make(chan string, 1024) 128 129 go func() { 129 130 for { 131 + ch := streamer.SubscribeSegment(ctx, user) 130 132 select { 131 133 case <-ctx.Done(): 132 134 log.Warn(ctx, "exiting segment reader") 135 + streamer.UnsubscribeSegment(ctx, user, ch) 133 136 return 134 - case file := <-streamer.SubscribeSegment(ctx, user): 137 + case file := <-ch: 135 138 log.Debug(ctx, "got segment", "file", file) 136 139 allFiles <- file 137 140 if file == "" {
+4
pkg/media/gstreamer.go
··· 59 59 } 60 60 buffer := gst.NewBufferWithSize(int64(len(toPush))) 61 61 buffer.Map(gst.MapWrite).WriteData(toPush) 62 + defer buffer.Unmap() 62 63 self.PushBuffer(buffer) 63 64 } 64 65 } ··· 193 194 NeedDataFunc: func(self *app.Source, _ uint) { 194 195 buffer := gst.NewBufferWithSize(int64(len(bs))) 195 196 buffer.Map(gst.MapWrite).WriteData(bs) 197 + defer buffer.Unmap() 196 198 self.PushBuffer(buffer) 197 199 self.EndStream() 198 200 }, ··· 637 639 } 638 640 buffer := gst.NewBufferWithSize(int64(len(png))) 639 641 buffer.Map(gst.MapWrite).WriteData(png) 642 + defer buffer.Unmap() 640 643 self.PushBuffer(buffer) 641 644 }, 642 645 }) ··· 659 662 } 660 663 buffer := gst.NewBufferWithSize(int64(len(png))) 661 664 buffer.Map(gst.MapWrite).WriteData(png) 665 + defer buffer.Unmap() 662 666 self.PushBuffer(buffer) 663 667 }, 664 668 })
+11
pkg/media/media.go
··· 122 122 return c 123 123 } 124 124 125 + func (mm *MediaManager) UnsubscribeSegment(ctx context.Context, user string, ch <-chan string) { 126 + mm.mp4subsmut.Lock() 127 + defer mm.mp4subsmut.Unlock() 128 + for i, c := range mm.mp4subs[user] { 129 + if c == ch { 130 + mm.mp4subs[user] = append(mm.mp4subs[user][:i], mm.mp4subs[user][i+1:]...) 131 + break 132 + } 133 + } 134 + } 135 + 125 136 // subscribe to the latest segments from a given user for livestreaming purposes 126 137 func (mm *MediaManager) PublishSegment(ctx context.Context, user, file string) { 127 138 mm.mp4subsmut.Lock()
+4 -1
pkg/media/media_signer_ext.go
··· 68 68 "--streamer", ms.streamer, 69 69 "--start-time", fmt.Sprintf("%d", start)) 70 70 71 + // overwrite so that our subprocesses don't do their own leak checking 72 + cmd.Env = append(os.Environ(), "LD_PRELOAD=") 73 + 71 74 // Set up pipes for stdin and stdout 72 75 stdin, err := cmd.StdinPipe() 73 76 if err != nil { ··· 87 90 // Copy input to stdin 88 91 _, err = io.Copy(stdin, input) 89 92 if err != nil { 90 - return nil, fmt.Errorf("failed to write to stdin: %w", err) 93 + return nil, fmt.Errorf("failed to write to stdin: %w stderr=%s", err, stderr.String()) 91 94 } 92 95 stdin.Close() 93 96
-282
pkg/media/media_validator.go
··· 1 - package media 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "strings" 7 - "time" 8 - 9 - "github.com/go-gst/go-gst/gst" 10 - "github.com/go-gst/go-gst/gst/app" 11 - "github.com/pion/webrtc/v4/pkg/media" 12 - "stream.place/streamplace/pkg/log" 13 - ) 14 - 15 - type MediaValidator struct { 16 - idx int 17 - } 18 - 19 - // var files []string = []string{ 20 - // // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-00-411Z.mp4", 21 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-01-212Z.mp4", 22 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-01-830Z.mp4", 23 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-02-492Z.mp4", 24 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-03-163Z.mp4", 25 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-03-430Z.mp4", 26 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-04-209Z.mp4", 27 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-04-604Z.mp4", 28 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-05-308Z.mp4", 29 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-05-970Z.mp4", 30 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-06-406Z.mp4", 31 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-07-271Z.mp4", 32 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-07-868Z.mp4", 33 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-08-572Z.mp4", 34 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-09-286Z.mp4", 35 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-09-404Z.mp4", 36 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-10-289Z.mp4", 37 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-11-431Z.mp4", 38 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-12-390Z.mp4", 39 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-13-585Z.mp4", 40 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-14-588Z.mp4", 41 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-15-409Z.mp4", 42 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-17-372Z.mp4", 43 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-18-407Z.mp4", 44 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-19-025Z.mp4", 45 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-19-591Z.mp4", 46 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-20-369Z.mp4", 47 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-20-967Z.mp4", 48 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-21-393Z.mp4", 49 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-21-970Z.mp4", 50 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-22-812Z.mp4", 51 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-24-391Z.mp4", 52 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-24-988Z.mp4", 53 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-25-606Z.mp4", 54 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-26-310Z.mp4", 55 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-27-333Z.mp4", 56 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-27-452Z.mp4", 57 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-28-305Z.mp4", 58 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-29-052Z.mp4", 59 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-29-607Z.mp4", 60 - // "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/22/18/2025-01-15T22-18-30-407Z.mp4", 61 - // } 62 - 63 - var files []string = []string{ 64 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-00-459Z.mp4", // evil 65 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-03-424Z.mp4", // good 66 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-04-661Z.mp4", // good 67 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-07-121Z.mp4", // good 68 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-11-285Z.mp4", // good 69 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-12-938Z.mp4", // evil 70 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-17-343Z.mp4", // evil 71 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-19-158Z.mp4", // good 72 - "/home/iameli/.streamplace/segments/0x3371a9b874d9815c8d18e7d4662cda099a4737b2/2025/01/15/23/29/2025-01-15T23-29-22-261Z.mp4", // good 73 - // "/home/iameli/Desktop/out/2025-01-15T23-29-00-459Z.mp4.mkv.mp4", 74 - // "/home/iameli/Desktop/out/2025-01-15T23-29-03-424Z.mp4.mkv.mp4", 75 - // "/home/iameli/Desktop/out/2025-01-15T23-29-04-661Z.mp4.mkv.mp4", 76 - // "/home/iameli/Desktop/out/2025-01-15T23-29-07-121Z.mp4.mkv.mp4", 77 - // "/home/iameli/Desktop/out/2025-01-15T23-29-11-285Z.mp4.mkv.mp4", 78 - // "/home/iameli/Desktop/out/2025-01-15T23-29-12-938Z.mp4.mkv.mp4", 79 - // "/home/iameli/Desktop/out/2025-01-15T23-29-17-343Z.mp4.mkv.mp4", 80 - // "/home/iameli/Desktop/out/2025-01-15T23-29-19-158Z.mp4.mkv.mp4", 81 - // "/home/iameli/Desktop/out/2025-01-15T23-29-22-261Z.mp4.mkv.mp4", 82 - } 83 - 84 - func (mv *MediaValidator) SubscribeSegment(ctx context.Context, user string) <-chan string { 85 - ch := make(chan string, 1024) 86 - go func() { 87 - if mv.idx >= len(files) { 88 - ch <- "" 89 - return 90 - } 91 - ch <- files[mv.idx] 92 - mv.idx += 1 93 - }() 94 - return ch 95 - } 96 - 97 - func ValidateMedia(ctx context.Context) error { 98 - mv := &MediaValidator{} 99 - 100 - ctx, cancel := context.WithCancel(ctx) 101 - 102 - ctx = log.WithLogValues(ctx, "mediafunc", "ValidateMedia") 103 - 104 - log.Debug(ctx, "starting pipeline") 105 - 106 - pipelineSlice := []string{ 107 - "h264timestamper name=videoparse ! h264parse ! capsfilter caps=video/x-h264,stream-format=byte-stream ! appsink name=videoappsink", 108 - "opusparse name=audioparse ! appsink name=audioappsink", 109 - } 110 - 111 - pipeline, err := gst.NewPipelineFromString(strings.Join(pipelineSlice, "\n")) 112 - if err != nil { 113 - return fmt.Errorf("failed to create GStreamer pipeline: %w", err) 114 - } 115 - 116 - ok := pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { 117 - switch msg.Type() { 118 - case gst.MessageEOS: // When end-of-stream is received flush the pipeling and stop the main loop 119 - log.Log(ctx, "got gst.MessageEOS, exiting") 120 - cancel() 121 - case gst.MessageError: // Error messages are always fatal 122 - err := msg.ParseError() 123 - log.Error(ctx, "gstreamer error", "error", err.Error()) 124 - if debug := err.DebugString(); debug != "" { 125 - log.Log(ctx, "gstreamer debug", "message", debug) 126 - } 127 - cancel() 128 - default: 129 - log.Debug(ctx, msg.String()) 130 - } 131 - return true 132 - }) 133 - 134 - if !ok { 135 - return fmt.Errorf("failed to add watch to pipeline bus") 136 - } 137 - 138 - outputQueue, done, err := ConcatStream(ctx, pipeline, "user", mv) 139 - if err != nil { 140 - return fmt.Errorf("failed to get output queue: %w", err) 141 - } 142 - go func() { 143 - select { 144 - case <-ctx.Done(): 145 - return 146 - case <-done: 147 - cancel() 148 - } 149 - }() 150 - // queuePadVideo := outputQueue.GetRequestPad("src_%u") 151 - // if queuePadVideo == nil { 152 - // return fmt.Errorf("failed to get queue video pad") 153 - // } 154 - // queuePadAudio := outputQueue.GetRequestPad("src_%u") 155 - // if queuePadAudio == nil { 156 - // return fmt.Errorf("failed to get queue audio pad") 157 - // } 158 - 159 - videoParse, err := pipeline.GetElementByName("videoparse") 160 - if err != nil { 161 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 162 - } 163 - err = outputQueue.Link(videoParse) 164 - if err != nil { 165 - return fmt.Errorf("failed to link output queue to video parse: %w", err) 166 - } 167 - 168 - audioParse, err := pipeline.GetElementByName("audioparse") 169 - if err != nil { 170 - return fmt.Errorf("failed to get audio parse element from pipeline: %w", err) 171 - } 172 - err = outputQueue.Link(audioParse) 173 - if err != nil { 174 - return fmt.Errorf("failed to link output queue to audio parse: %w", err) 175 - } 176 - 177 - go func() { 178 - <-ctx.Done() 179 - pipeline.BlockSetState(gst.StateNull) 180 - }() 181 - 182 - go func() { 183 - ticker := time.NewTicker(time.Second * 1) 184 - for { 185 - select { 186 - case <-ctx.Done(): 187 - return 188 - case <-ticker.C: 189 - state := pipeline.GetCurrentState() 190 - log.Debug(ctx, "pipeline state", "state", state) 191 - } 192 - } 193 - }() 194 - 195 - videoappsinkele, err := pipeline.GetElementByName("videoappsink") 196 - if err != nil { 197 - return fmt.Errorf("failed to get video sink element from pipeline: %w", err) 198 - } 199 - 200 - audioappsinkele, err := pipeline.GetElementByName("audioappsink") 201 - if err != nil { 202 - return fmt.Errorf("failed to get audio sink element from pipeline: %w", err) 203 - } 204 - 205 - videoappsink := app.SinkFromElement(videoappsinkele) 206 - videoappsink.SetCallbacks(&app.SinkCallbacks{ 207 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 208 - sample := sink.PullSample() 209 - if sample == nil { 210 - return gst.FlowEOS 211 - } 212 - 213 - buffer := sample.GetBuffer() 214 - if buffer == nil { 215 - return gst.FlowError 216 - } 217 - 218 - samples := buffer.Map(gst.MapRead).Bytes() 219 - defer buffer.Unmap() 220 - clockTime := buffer.Duration() 221 - dur := clockTime.AsDuration() 222 - 223 - mediaSample := media.Sample{Data: samples} 224 - if dur != nil { 225 - mediaSample.Duration = *dur 226 - } else { 227 - log.Log(ctx, "no video duration", "samples", len(samples), "segment duration", sample.GetSegment().GetDuration()) 228 - // cancel() 229 - return gst.FlowOK 230 - } 231 - 232 - return gst.FlowOK 233 - }, 234 - EOSFunc: func(sink *app.Sink) { 235 - log.Warn(ctx, "videoappsink EOSFunc") 236 - cancel() 237 - }, 238 - }) 239 - 240 - audioappsink := app.SinkFromElement(audioappsinkele) 241 - audioappsink.SetCallbacks(&app.SinkCallbacks{ 242 - NewSampleFunc: func(sink *app.Sink) gst.FlowReturn { 243 - sample := sink.PullSample() 244 - if sample == nil { 245 - return gst.FlowEOS 246 - } 247 - 248 - buffer := sample.GetBuffer() 249 - if buffer == nil { 250 - return gst.FlowError 251 - } 252 - 253 - samples := buffer.Map(gst.MapRead).Bytes() 254 - defer buffer.Unmap() 255 - 256 - clockTime := buffer.Duration() 257 - dur := clockTime.AsDuration() 258 - mediaSample := media.Sample{Data: samples} 259 - if dur != nil { 260 - mediaSample.Duration = *dur 261 - } else { 262 - log.Log(ctx, "no audio duration", "samples", len(samples)) 263 - // cancel() 264 - return gst.FlowOK 265 - } 266 - 267 - return gst.FlowOK 268 - }, 269 - EOSFunc: func(sink *app.Sink) { 270 - log.Warn(ctx, "audioappsink EOSFunc") 271 - cancel() 272 - }, 273 - }) 274 - 275 - // Start the pipeline 276 - pipeline.SetState(gst.StatePlaying) 277 - log.Warn(ctx, "playing pipeline") 278 - 279 - <-ctx.Done() 280 - log.Warn(ctx, "!!!!!!!!!!!!!!!!!!!!!!! ctx done") 281 - return nil 282 - }
+28
test/leak-check.sh
··· 1 + #!/bin/bash 2 + 3 + set -euo pipefail 4 + 5 + # Get the current executing directory 6 + DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && cd .. && pwd )" 7 + 8 + TMPDIR="$(mktemp -d)" 9 + cd $TMPDIR 10 + 11 + MALLOC_CONF=prof_leak:true,lg_prof_sample:0,prof_final:true LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 "$DIR/build-linux-amd64/streamplace" --no-firehose --wide-open & 12 + STREAMPLACE_PID=$! 13 + 14 + sleep 3 15 + 16 + "$DIR/build-linux-amd64/streamplace" whip --count=3 --duration=30s || true 17 + 18 + sleep 5 19 + curl -X POST http://127.0.0.1:39090/gc 20 + sleep 3 21 + 22 + kill -SIGABRT "$STREAMPLACE_PID" 23 + 24 + wait 25 + 26 + outfile=$(realpath "$(ls)") 27 + echo "processing $outfile" 28 + jeprof --text "$DIR/build-linux-amd64/streamplace" "$outfile" | head -20