Live video on the AT Protocol

replication: put irohreplicator behind an interface

+77 -97
+30 -25
pkg/cmd/streamplace.go
··· 14 14 "path/filepath" 15 15 "runtime" 16 16 "runtime/pprof" 17 + "slices" 17 18 "strconv" 18 19 "strings" 19 20 "syscall" ··· 35 36 "stream.place/streamplace/pkg/log" 36 37 "stream.place/streamplace/pkg/media" 37 38 "stream.place/streamplace/pkg/notifications" 39 + "stream.place/streamplace/pkg/replication" 38 40 "stream.place/streamplace/pkg/replication/iroh_replicator" 39 41 "stream.place/streamplace/pkg/rtmps" 40 42 v0 "stream.place/streamplace/pkg/schema/v0" ··· 404 406 } 405 407 } 406 408 407 - exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 408 - if err != nil { 409 - return err 410 - } 411 - if !exists { 412 - secret := make([]byte, 32) 413 - _, err := rand.Read(secret) 409 + var replicator replication.Replicator = nil 410 + if slices.Contains(cli.Replicators, config.ReplicatorIroh) { 411 + exists, err := cli.DataFileExists([]string{"iroh-kv-secret"}) 414 412 if err != nil { 415 - return fmt.Errorf("failed to generate random secret: %w", err) 413 + return err 416 414 } 417 - err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 415 + if !exists { 416 + secret := make([]byte, 32) 417 + _, err := rand.Read(secret) 418 + if err != nil { 419 + return fmt.Errorf("failed to generate random secret: %w", err) 420 + } 421 + err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true) 422 + if err != nil { 423 + return err 424 + } 425 + } 426 + buf := bytes.Buffer{} 427 + err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 418 428 if err != nil { 419 429 return err 420 430 } 421 - } 422 - buf := bytes.Buffer{} 423 - err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf) 424 - if err != nil { 425 - return err 426 - } 427 - secret := buf.Bytes() 428 - var topic []byte 429 - if cli.IrohTopic != "" { 430 - topic, err = hexutil.Decode("0x" + cli.IrohTopic) 431 + secret := buf.Bytes() 432 + var topic []byte 433 + if cli.IrohTopic != "" { 434 + topic, err = hexutil.Decode("0x" + cli.IrohTopic) 435 + if err != nil { 436 + return err 437 + } 438 + } 439 + replicator, err = iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 431 440 if err != nil { 432 441 return err 433 442 } 434 - } 435 - swarm, err := iroh_replicator.NewSwarm(ctx, &cli, secret, topic, mm, b, mod) 436 - if err != nil { 437 - return err 438 443 } 439 444 440 445 op := oatproxy.New(&oatproxy.Config{ ··· 449 454 ClientMetadata: clientMetadata, 450 455 Public: cli.PublicOAuth, 451 456 }) 452 - d := director.NewDirector(mm, mod, &cli, b, op, state, swarm) 457 + d := director.NewDirector(mm, mod, &cli, b, op, state, replicator) 453 458 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 454 459 if err != nil { 455 460 return err ··· 517 522 }) 518 523 519 524 group.Go(func() error { 520 - return swarm.Start(ctx, cli.Tickets) 525 + return replicator.Start(ctx, &cli) 521 526 }) 522 527 523 528 if cli.LivepeerGateway {
+15 -8
pkg/config/config.go
··· 131 131 DisableIrohRelay bool 132 132 DevAccountCreds map[string]string 133 133 StreamSessionTimeout time.Duration 134 + Replicators []string 134 135 } 135 136 136 137 // ContentFilters represents the content filtering configuration ··· 143 144 Enabled bool `json:"enabled"` 144 145 } `json:"distribution_policy"` 145 146 } 147 + 148 + const ( 149 + ReplicatorHTTP string = "http" 150 + ReplicatorIroh string = "iroh" 151 + ) 146 152 147 153 func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { 148 154 fs := flag.NewFlagSet("streamplace", flag.ExitOnError) ··· 178 184 fs.StringVar(&cli.LivepeerGatewayURL, "livepeer-gateway-url", "", "URL of the Livepeer Gateway to use for transcoding") 179 185 fs.BoolVar(&cli.LivepeerGateway, "livepeer-gateway", false, "enable embedded Livepeer Gateway") 180 186 fs.BoolVar(&cli.WideOpen, "wide-open", false, "allow ALL streams to be uploaded to this node (not recommended for production)") 181 - cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", "", "if set, only allow these addresses or atproto DIDs to upload to this node") 182 - cli.StringSliceFlag(fs, &cli.Peers, "peers", "", "other streamplace nodes to replicate to") 183 - cli.StringSliceFlag(fs, &cli.Redirects, "redirects", "", "http 302s /path/one:/path/two,/path/three:/path/four") 187 + cli.StringSliceFlag(fs, &cli.AllowedStreams, "allowed-streams", []string{}, "if set, only allow these addresses or atproto DIDs to upload to this node") 188 + cli.StringSliceFlag(fs, &cli.Peers, "peers", []string{}, "other streamplace nodes to replicate to") 189 + cli.StringSliceFlag(fs, &cli.Redirects, "redirects", []string{}, "http 302s /path/one:/path/two,/path/three:/path/four") 184 190 cli.DebugFlag(fs, &cli.Debug, "debug", "", "modified log verbosity for specific functions or files in form func=ToHLS:3,file=gstreamer.go:4") 185 191 fs.BoolVar(&cli.TestStream, "test-stream", false, "run a built-in test stream on boot") 186 192 fs.BoolVar(&cli.NoFirehose, "no-firehose", false, "disable the bluesky firehose") ··· 205 211 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 206 212 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 207 213 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 208 - cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 214 + cli.StringSliceFlag(fs, &cli.Labelers, "labelers", []string{}, "did of labelers that this instance should subscribe to") 209 215 fs.StringVar(&cli.AtprotoDID, "atproto-did", "", "atproto did to respond to on /.well-known/atproto-did (default did:web:PUBLIC_HOST)") 210 216 cli.JSONFlag(fs, &cli.ContentFilters, "content-filters", "{}", "JSON content filtering rules") 211 217 fs.BoolVar(&cli.LivepeerHelp, "livepeer-help", false, "print help for livepeer flags and exit") ··· 213 219 fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging") 214 220 fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting") 215 221 fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug") 216 - cli.StringSliceFlag(fs, &cli.Tickets, "tickets", "[]", "tickets to join the swarm with") 222 + cli.StringSliceFlag(fs, &cli.Tickets, "tickets", []string{}, "tickets to join the swarm with") 217 223 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 218 224 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 219 225 cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 220 226 fs.DurationVar(&cli.StreamSessionTimeout, "stream-session-timeout", 60*time.Second, "how long to wait before considering a stream inactive on this node?") 227 + cli.StringSliceFlag(fs, &cli.Replicators, "replicators", []string{ReplicatorIroh}, "list of replication protocols to use (http, iroh)") 221 228 222 229 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 223 230 _ = starter.NewLivepeerConfig(lpFlags) ··· 533 540 }) 534 541 } 535 542 536 - func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name, defaultValue, usage string) { 537 - *dest = []string{} 543 + func (cli *CLI) StringSliceFlag(fs *flag.FlagSet, dest *[]string, name string, defaultValue []string, usage string) { 544 + *dest = defaultValue 538 545 usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 539 546 fs.Func(name, usage, func(s string) error { 540 547 if s == "" { 541 548 return nil 542 549 } 543 550 strs := strings.Split(s, ",") 544 - *dest = append(*dest, strs...) 551 + *dest = append([]string{}, strs...) 545 552 return nil 546 553 }) 547 554 }
+5 -5
pkg/director/director.go
··· 12 12 "stream.place/streamplace/pkg/log" 13 13 "stream.place/streamplace/pkg/media" 14 14 "stream.place/streamplace/pkg/model" 15 - "stream.place/streamplace/pkg/replication/iroh_replicator" 15 + "stream.place/streamplace/pkg/replication" 16 16 "stream.place/streamplace/pkg/statedb" 17 17 ) 18 18 ··· 31 31 streamSessionsMu sync.Mutex 32 32 op *oatproxy.OATProxy 33 33 statefulDB *statedb.StatefulDB 34 - swarm *iroh_replicator.IrohSwarm 34 + replicator replication.Replicator 35 35 } 36 36 37 - func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, swarm *iroh_replicator.IrohSwarm) *Director { 37 + func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator) *Director { 38 38 return &Director{ 39 39 mm: mm, 40 40 mod: mod, ··· 44 44 streamSessionsMu: sync.Mutex{}, 45 45 op: op, 46 46 statefulDB: statefulDB, 47 - swarm: swarm, 47 + replicator: replicator, 48 48 } 49 49 } 50 50 ··· 75 75 packets: make([]bus.PacketizedSegment, 0), 76 76 started: make(chan struct{}), 77 77 statefulDB: d.statefulDB, 78 - swarm: d.swarm, 78 + replicator: d.replicator, 79 79 } 80 80 d.streamSessions[not.Segment.RepoDID] = ss 81 81 g.Go(func() error {
+6 -3
pkg/director/stream_session.go
··· 23 23 "stream.place/streamplace/pkg/media" 24 24 "stream.place/streamplace/pkg/model" 25 25 "stream.place/streamplace/pkg/renditions" 26 - "stream.place/streamplace/pkg/replication/iroh_replicator" 26 + "stream.place/streamplace/pkg/replication" 27 27 "stream.place/streamplace/pkg/spmetrics" 28 28 "stream.place/streamplace/pkg/statedb" 29 29 "stream.place/streamplace/pkg/streamplace" ··· 50 50 ctx context.Context 51 51 packets []bus.PacketizedSegment 52 52 statefulDB *statedb.StatefulDB 53 - swarm *iroh_replicator.IrohSwarm 53 + replicator replication.Replicator 54 54 } 55 55 56 56 func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { ··· 460 460 Server: fmt.Sprintf("did:web:%s", ss.cli.ServerHost), 461 461 Broadcaster: &broadcaster, 462 462 UpdatedAt: time.Now().UTC().Format(util.ISO8601), 463 - IrohTicket: &ss.swarm.NodeTicket, 463 + } 464 + err := ss.replicator.BuildOriginRecord(&origin) 465 + if err != nil { 466 + return fmt.Errorf("could not build origin record: %w", err) 464 467 } 465 468 466 469 client, err := ss.GetClientByDID(ss.repoDID)
-2
pkg/media/media.go
··· 26 26 "stream.place/streamplace/pkg/streamplace" 27 27 28 28 "stream.place/streamplace/pkg/log" 29 - "stream.place/streamplace/pkg/replication" 30 29 31 30 "github.com/piprate/json-gold/ld" 32 31 ··· 42 41 43 42 type MediaManager struct { 44 43 cli *config.CLI 45 - replicator replication.Replicator 46 44 hlsRunning map[string]*M3U8 47 45 hlsRunningMut sync.Mutex 48 46 httpPipes map[string]io.Writer
-47
pkg/replication/boring/boring.go
··· 1 - package boring 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "io" 8 - "net/http" 9 - 10 - "stream.place/streamplace/pkg/aqhttp" 11 - "stream.place/streamplace/pkg/log" 12 - ) 13 - 14 - // boring HTTP replication mechanism 15 - type BoringReplicator struct { 16 - Peers []string 17 - } 18 - 19 - func (rep *BoringReplicator) NewSegment(ctx context.Context, bs []byte) { 20 - for _, p := range rep.Peers { 21 - go func(peer string) { 22 - ctx := log.WithLogValues(ctx, "peer", peer) 23 - err := sendSegment(ctx, peer, bs) 24 - if err != nil { 25 - log.Log(ctx, "error replicating segment", "error", err) 26 - } 27 - }(p) 28 - } 29 - } 30 - 31 - func sendSegment(ctx context.Context, peer string, bs []byte) error { 32 - r := bytes.NewReader(bs) 33 - peerURL := fmt.Sprintf("%s/api/segment", peer) 34 - req, err := http.NewRequestWithContext(ctx, "POST", peerURL, r) 35 - if err != nil { 36 - return err 37 - } 38 - res, err := aqhttp.Client.Do(req) 39 - if err != nil { 40 - return err 41 - } 42 - if res.StatusCode != 204 { 43 - body, _ := io.ReadAll(res.Body) 44 - return fmt.Errorf("unexpected http code %d body=%s", res.StatusCode, body) 45 - } 46 - return nil 47 - }
+8 -5
pkg/replication/iroh_replicator/kv.go
··· 121 121 return &swarm, nil 122 122 } 123 123 124 - func (swarm *IrohSwarm) Start(ctx context.Context, tickets []string) error { 125 - if len(tickets) > 0 { 126 - err := swarm.Node.JoinPeers(tickets) 124 + func (swarm *IrohSwarm) BuildOriginRecord(origin *streamplace.BroadcastOrigin) error { 125 + origin.IrohTicket = &swarm.NodeTicket 126 + return nil 127 + } 128 + 129 + func (swarm *IrohSwarm) Start(ctx context.Context, cli *config.CLI) error { 130 + if len(cli.Tickets) > 0 { 131 + err := swarm.Node.JoinPeers(cli.Tickets) 127 132 if err != nil { 128 133 return fmt.Errorf("failed to join peers: %w", err) 129 134 } 130 135 } 131 - 132 136 nodeId, err := swarm.Node.NodeId() 133 137 if err != nil { 134 138 return fmt.Errorf("failed to get node id: %w", err) ··· 189 193 log.Debug(ctx, "SubscribeItemOther", "other", item) 190 194 } 191 195 } 192 - return nil 193 196 } 194 197 195 198 func (swarm *IrohSwarm) handleIrohMessage(ctx context.Context, item iroh_streamplace.SubscribeItemEntry) error {
+13 -2
pkg/replication/replicator.go
··· 1 1 package replication 2 2 3 - import "context" 3 + import ( 4 + "context" 5 + 6 + "stream.place/streamplace/pkg/config" 7 + "stream.place/streamplace/pkg/media" 8 + "stream.place/streamplace/pkg/streamplace" 9 + ) 4 10 5 11 type Replicator interface { 6 - NewSegment(context.Context, []byte) 12 + // start the replicator, ending on context cancellation. if your replicator doesn't need to start anything, you can just block on <-ctx.Done() 13 + Start(context.Context, *config.CLI) error 14 + // hey, we have a new segment! send it to whoever 15 + SendSegment(context.Context, *media.NewSegmentNotification) error 16 + // populate this origin record with whatever fields are pertinent to your replicator 17 + BuildOriginRecord(*streamplace.BroadcastOrigin) error 7 18 }