Live video on the AT Protocol

storage: implement a default segment retention policy

authored by

Eli Mallon and committed by
Natalie B.
a6f7c3be 2dcbbabd

+35 -6
+5 -3
pkg/cmd/streamplace.go
··· 399 399 return storage.StartSegmentCleaner(ctx, ldb, cli) 400 400 }) 401 401 402 - group.Go(func() error { 403 - return ldb.StartSegmentCleaner(ctx) 404 - }) 402 + if cli.LegacySegmentCleaner { 403 + group.Go(func() error { 404 + return ldb.StartSegmentCleaner(ctx) 405 + }) 406 + } 405 407 406 408 group.Go(func() error { 407 409 return replicator.Start(ctx, cli)
+17
pkg/config/config.go
··· 137 137 DisableIrohRelay bool 138 138 DevAccountCreds map[string]string 139 139 StreamSessionTimeout time.Duration 140 + LegacySegmentCleaner bool 141 + SegmentArchiveRetention time.Duration 140 142 Replicators []string 141 143 WebsocketURL string 142 144 BehindHTTPSProxy bool ··· 735 737 Destination: &cli.StreamSessionTimeout, 736 738 Sources: urfavecli.EnvVars("SP_STREAM_SESSION_TIMEOUT"), 737 739 }, 740 + &urfavecli.BoolFlag{ 741 + Name: "legacy-segment-cleaner", 742 + Usage: "re-enable the legacy segment cleaner. shouldn't be needed but can be useful in cases where localdb is too big.", 743 + Value: false, 744 + Destination: &cli.LegacySegmentCleaner, 745 + Sources: urfavecli.EnvVars("SP_LEGACY_SEGMENT_CLEANER"), 746 + }, 747 + &urfavecli.DurationFlag{ 748 + Name: "segment-archive-retention", 749 + Usage: "for users who don't specify a distribution policy, how long to keep segments around?", 750 + Value: 24 * time.Hour, 751 + Destination: &cli.SegmentArchiveRetention, 752 + Sources: urfavecli.EnvVars("SP_SEGMENT_ARCHIVE_RETENTION"), 753 + }, 738 754 &urfavecli.StringFlag{ 739 755 Name: "replicators", 740 756 Usage: "comma-separated list of replication protocols to use (websocket, iroh)", ··· 850 866 Destination: &cli.MistHTTPPort, 851 867 Sources: urfavecli.EnvVars("SP_MIST_HTTP_PORT"), 852 868 }) 869 + 853 870 } 854 871 855 872 LivepeerFlagSet = flag.NewFlagSet("livepeer", flag.ContinueOnError)
+12 -2
pkg/media/validate.go
··· 114 114 } 115 115 var deleteAfter *time.Time 116 116 if meta.DistributionPolicy != nil && meta.DistributionPolicy.DeleteAfterSeconds != nil { 117 - expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 118 - deleteAfter = &expiryTime 117 + secs := *meta.DistributionPolicy.DeleteAfterSeconds 118 + if secs == -1 { 119 + deleteAfter = nil 120 + } else { 121 + expiryTime := meta.StartTime.Time().Add(time.Duration(secs) * time.Second) 122 + deleteAfter = &expiryTime 123 + } 124 + } else { 125 + if mm.cli.SegmentArchiveRetention.Seconds() != 0 { 126 + tomorrow := time.Now().Add(mm.cli.SegmentArchiveRetention).UTC() 127 + deleteAfter = &tomorrow 128 + } 119 129 } 120 130 seg := &localdb.Segment{ 121 131 ID: *label,
+1 -1
pkg/storage/storage.go
··· 49 49 50 50 func deleteSegment(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, seg localdb.Segment) error { 51 51 if time.Since(seg.StartTime) < moderationRetention { 52 - log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 52 + log.Debug(ctx, "Skipping deletion of segment for moderation retention", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 53 53 return nil 54 54 } 55 55 aqt := aqtime.FromTime(seg.StartTime)