tangled
alpha
login
or
join now
stream.place
/
streamplace
77
fork
atom
Live video on the AT Protocol
77
fork
atom
overview
issues
1
pulls
pipelines
storage: implement a default segment retention policy
Eli Mallon
1 week ago
58f4d085
821874e7
+35
-6
4 changed files
expand all
collapse all
unified
split
pkg
cmd
streamplace.go
config
config.go
media
validate.go
storage
storage.go
+5
-3
pkg/cmd/streamplace.go
···
399
399
return storage.StartSegmentCleaner(ctx, ldb, cli)
400
400
})
401
401
402
402
-
group.Go(func() error {
403
403
-
return ldb.StartSegmentCleaner(ctx)
404
404
-
})
402
402
+
if cli.LegacySegmentCleaner {
403
403
+
group.Go(func() error {
404
404
+
return ldb.StartSegmentCleaner(ctx)
405
405
+
})
406
406
+
}
405
407
406
408
group.Go(func() error {
407
409
return replicator.Start(ctx, cli)
+17
pkg/config/config.go
···
137
137
DisableIrohRelay bool
138
138
DevAccountCreds map[string]string
139
139
StreamSessionTimeout time.Duration
140
140
+
LegacySegmentCleaner bool
141
141
+
SegmentArchiveRetention time.Duration
140
142
Replicators []string
141
143
WebsocketURL string
142
144
BehindHTTPSProxy bool
···
735
737
Destination: &cli.StreamSessionTimeout,
736
738
Sources: urfavecli.EnvVars("SP_STREAM_SESSION_TIMEOUT"),
737
739
},
740
740
+
&urfavecli.BoolFlag{
741
741
+
Name: "legacy-segment-cleaner",
742
742
+
Usage: "re-enable the legacy segment cleaner. shouldn't be needed but can be useful in cases where localdb is too big.",
743
743
+
Value: false,
744
744
+
Destination: &cli.LegacySegmentCleaner,
745
745
+
Sources: urfavecli.EnvVars("SP_LEGACY_SEGMENT_CLEANER"),
746
746
+
},
747
747
+
&urfavecli.DurationFlag{
748
748
+
Name: "segment-archive-retention",
749
749
+
Usage: "for users who don't specify a distribution policy, how long to keep segments around?",
750
750
+
Value: 24 * time.Hour,
751
751
+
Destination: &cli.SegmentArchiveRetention,
752
752
+
Sources: urfavecli.EnvVars("SP_SEGMENT_ARCHIVE_RETENTION"),
753
753
+
},
738
754
&urfavecli.StringFlag{
739
755
Name: "replicators",
740
756
Usage: "comma-separated list of replication protocols to use (websocket, iroh)",
···
850
866
Destination: &cli.MistHTTPPort,
851
867
Sources: urfavecli.EnvVars("SP_MIST_HTTP_PORT"),
852
868
})
869
869
+
853
870
}
854
871
855
872
LivepeerFlagSet = flag.NewFlagSet("livepeer", flag.ContinueOnError)
+12
-2
pkg/media/validate.go
···
114
114
}
115
115
var deleteAfter *time.Time
116
116
if meta.DistributionPolicy != nil && meta.DistributionPolicy.DeleteAfterSeconds != nil {
117
117
-
expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second)
118
118
-
deleteAfter = &expiryTime
117
117
+
secs := *meta.DistributionPolicy.DeleteAfterSeconds
118
118
+
if secs == -1 {
119
119
+
deleteAfter = nil
120
120
+
} else {
121
121
+
expiryTime := meta.StartTime.Time().Add(time.Duration(secs) * time.Second)
122
122
+
deleteAfter = &expiryTime
123
123
+
}
124
124
+
} else {
125
125
+
if mm.cli.SegmentArchiveRetention.Seconds() != 0 {
126
126
+
tomorrow := time.Now().Add(mm.cli.SegmentArchiveRetention).UTC()
127
127
+
deleteAfter = &tomorrow
128
128
+
}
119
129
}
120
130
seg := &localdb.Segment{
121
131
ID: *label,
+1
-1
pkg/storage/storage.go
···
49
49
50
50
func deleteSegment(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, seg localdb.Segment) error {
51
51
if time.Since(seg.StartTime) < moderationRetention {
52
52
-
log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime))
52
52
+
log.Debug(ctx, "Skipping deletion of segment for moderation retention", "id", seg.ID, "time since start", time.Since(seg.StartTime))
53
53
return nil
54
54
}
55
55
aqt := aqtime.FromTime(seg.StartTime)