Live video on the AT Protocol

localdb: move segment and thumbnail to their own database

+695 -663
+5 -57
pkg/api/api.go
··· 35 "stream.place/streamplace/pkg/director" 36 apierrors "stream.place/streamplace/pkg/errors" 37 "stream.place/streamplace/pkg/linking" 38 "stream.place/streamplace/pkg/log" 39 "stream.place/streamplace/pkg/media" 40 "stream.place/streamplace/pkg/mist/mistconfig" ··· 56 CLI *config.CLI 57 Model model.Model 58 StatefulDB *statedb.StatefulDB 59 Updater *Updater 60 Signer *eip712.EIP712Signer 61 Mimes map[string]string ··· 93 mu sync.RWMutex 94 } 95 96 - func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 97 updater, err := PrepareUpdater(cli) 98 if err != nil { 99 return nil, err ··· 117 sessionsLock: sync.RWMutex{}, 118 rtmpSessions: make(map[string]*media.RTMPSession), 119 rtmpSessionsLock: sync.Mutex{}, 120 } 121 a.Mimes, err = updater.GetMimes() 122 if err != nil { ··· 152 Recorder: metrics.NewRecorder(metrics.Config{}), 153 }) 154 var xrpc http.Handler 155 - xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus) 156 if err != nil { 157 return nil, err 158 } ··· 203 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 204 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 205 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 206 - addHandle(apiRouter, "GET", "/api/segment/recent", a.HandleRecentSegments(ctx)) 207 - addHandle(apiRouter, "GET", "/api/segment/recent/:repoDID", a.HandleUserRecentSegments(ctx)) 208 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 209 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 210 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) ··· 558 return 559 } 560 w.WriteHeader(201) 561 - } 562 - } 563 - 564 - func (a *StreamplaceAPI) HandleRecentSegments(ctx context.Context) httprouter.Handle { 565 - return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 566 - segs, err := a.Model.MostRecentSegments() 567 - if err != nil { 568 - apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 569 - return 570 - } 571 - bs, err := json.Marshal(segs) 572 - if err != nil { 573 - apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 574 - return 575 - } 576 - w.Header().Add("Content-Type", "application/json") 577 - if _, err := w.Write(bs); err != nil { 578 - log.Error(ctx, "error writing response", "error", err) 579 - } 580 - } 581 - } 582 - 583 - func (a *StreamplaceAPI) HandleUserRecentSegments(ctx context.Context) httprouter.Handle { 584 - return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { 585 - user := params.ByName("repoDID") 586 - if user == "" { 587 - apierrors.WriteHTTPBadRequest(w, "user required", nil) 588 - return 589 - } 590 - user, err := a.NormalizeUser(ctx, user) 591 - if err != nil { 592 - apierrors.WriteHTTPNotFound(w, "user not found", err) 593 - return 594 - } 595 - seg, err := a.Model.LatestSegmentForUser(user) 596 - if err != nil { 597 - apierrors.WriteHTTPInternalServerError(w, "could not get segments", err) 598 - return 599 - } 600 - streamplaceSeg, err := seg.ToStreamplaceSegment() 601 - if err != nil { 602 - apierrors.WriteHTTPInternalServerError(w, "could not convert segment to streamplace segment", err) 603 - return 604 - } 605 - bs, err := json.Marshal(streamplaceSeg) 606 - if err != nil { 607 - apierrors.WriteHTTPInternalServerError(w, "could not marshal segments", err) 608 - return 609 - } 610 - w.Header().Add("Content-Type", "application/json") 611 - if _, err := w.Write(bs); err != nil { 612 - log.Error(ctx, "error writing response", "error", err) 613 - } 614 } 615 } 616
··· 35 "stream.place/streamplace/pkg/director" 36 apierrors "stream.place/streamplace/pkg/errors" 37 "stream.place/streamplace/pkg/linking" 38 + "stream.place/streamplace/pkg/localdb" 39 "stream.place/streamplace/pkg/log" 40 "stream.place/streamplace/pkg/media" 41 "stream.place/streamplace/pkg/mist/mistconfig" ··· 57 CLI *config.CLI 58 Model model.Model 59 StatefulDB *statedb.StatefulDB 60 + LocalDB localdb.LocalDB 61 Updater *Updater 62 Signer *eip712.EIP712Signer 63 Mimes map[string]string ··· 95 mu sync.RWMutex 96 } 97 98 + func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy, ldb localdb.LocalDB) (*StreamplaceAPI, error) { 99 updater, err := PrepareUpdater(cli) 100 if err != nil { 101 return nil, err ··· 119 sessionsLock: sync.RWMutex{}, 120 rtmpSessions: make(map[string]*media.RTMPSession), 121 rtmpSessionsLock: sync.Mutex{}, 122 + LocalDB: ldb, 123 } 124 a.Mimes, err = updater.GetMimes() 125 if err != nil { ··· 155 Recorder: metrics.NewRecorder(metrics.Config{}), 156 }) 157 var xrpc http.Handler 158 + xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB) 159 if err != nil { 160 return nil, err 161 } ··· 206 addHandle(apiRouter, "GET", "/api/chat/:repoDID", a.HandleChat(ctx)) 207 addHandle(apiRouter, "GET", "/api/websocket/:repoDID", a.HandleWebsocket(ctx)) 208 addHandle(apiRouter, "GET", "/api/livestream/:repoDID", a.HandleLivestream(ctx)) 209 addHandle(apiRouter, "GET", "/api/bluesky/resolve/:handle", a.HandleBlueskyResolve(ctx)) 210 addHandle(apiRouter, "GET", "/api/view-count/:user", a.HandleViewCount(ctx)) 211 addHandle(apiRouter, "GET", "/api/clip/:user/:file", a.HandleClip(ctx)) ··· 559 return 560 } 561 w.WriteHeader(201) 562 } 563 } 564
+2 -2
pkg/api/api_internal.go
··· 298 errors.WriteHTTPBadRequest(w, "id required", nil) 299 return 300 } 301 - segment, err := a.Model.GetSegment(id) 302 if err != nil { 303 errors.WriteHTTPBadRequest(w, err.Error(), err) 304 return ··· 553 } 554 after := time.Now().Add(-time.Duration(secs) * time.Second) 555 w.Header().Set("Content-Type", "video/mp4") 556 - err = media.ClipUser(ctx, a.Model, a.CLI, user, w, nil, &after) 557 if err != nil { 558 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 559 return
··· 298 errors.WriteHTTPBadRequest(w, "id required", nil) 299 return 300 } 301 + segment, err := a.LocalDB.GetSegment(id) 302 if err != nil { 303 errors.WriteHTTPBadRequest(w, err.Error(), err) 304 return ··· 553 } 554 after := time.Now().Add(-time.Duration(secs) * time.Second) 555 w.Header().Set("Content-Type", "video/mp4") 556 + err = media.ClipUser(ctx, a.LocalDB, a.CLI, user, w, nil, &after) 557 if err != nil { 558 errors.WriteHTTPInternalServerError(w, "unable to clip user", err) 559 return
+1 -1
pkg/api/playback.go
··· 272 errors.WriteHTTPNotFound(w, "user not found", err) 273 return 274 } 275 - thumb, err := a.Model.LatestThumbnailForUser(user) 276 if err != nil { 277 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 278 return
··· 272 errors.WriteHTTPNotFound(w, "user not found", err) 273 return 274 } 275 + thumb, err := a.LocalDB.LatestThumbnailForUser(user) 276 if err != nil { 277 errors.WriteHTTPInternalServerError(w, "could not query thumbnail", err) 278 return
+1 -1
pkg/api/websocket.go
··· 181 }() 182 183 go func() { 184 - seg, err := a.Model.LatestSegmentForUser(repoDID) 185 if err != nil { 186 log.Error(ctx, "could not get replies", "error", err) 187 return
··· 181 }() 182 183 go func() { 184 + seg, err := a.LocalDB.LatestSegmentForUser(repoDID) 185 if err != nil { 186 log.Error(ctx, "could not get replies", "error", err) 187 return
+11 -5
pkg/cmd/streamplace.go
··· 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 "stream.place/streamplace/pkg/log" 33 "stream.place/streamplace/pkg/media" 34 "stream.place/streamplace/pkg/notifications" ··· 237 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 238 } 239 240 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 241 if err != nil { 242 return err ··· 291 return fmt.Errorf("failed to migrate: %w", err) 292 } 293 294 - mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 295 if err != nil { 296 return err 297 } ··· 380 ClientMetadata: clientMetadata, 381 Public: cli.PublicOAuth, 382 }) 383 - d := director.NewDirector(mm, mod, &cli, b, op, state, replicator) 384 - a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op) 385 if err != nil { 386 return err 387 } ··· 446 }) 447 448 group.Go(func() error { 449 - return storage.StartSegmentCleaner(ctx, mod, &cli) 450 }) 451 452 group.Go(func() error { 453 - return mod.StartSegmentCleaner(ctx) 454 }) 455 456 group.Go(func() error {
··· 29 "stream.place/streamplace/pkg/director" 30 "stream.place/streamplace/pkg/gstinit" 31 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 32 + "stream.place/streamplace/pkg/localdb" 33 "stream.place/streamplace/pkg/log" 34 "stream.place/streamplace/pkg/media" 35 "stream.place/streamplace/pkg/notifications" ··· 238 return fmt.Errorf("error creating streamplace dir at %s:%w", cli.DataDir, err) 239 } 240 241 + ldb, err := localdb.MakeDB(cli.LocalDBURL) 242 + if err != nil { 243 + return err 244 + } 245 + 246 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 247 if err != nil { 248 return err ··· 297 return fmt.Errorf("failed to migrate: %w", err) 298 } 299 300 + mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync, ldb) 301 if err != nil { 302 return err 303 } ··· 386 ClientMetadata: clientMetadata, 387 Public: cli.PublicOAuth, 388 }) 389 + d := director.NewDirector(mm, mod, &cli, b, op, state, replicator, ldb) 390 + a, err := api.MakeStreamplaceAPI(&cli, mod, state, noter, mm, ms, b, atsync, d, op, ldb) 391 if err != nil { 392 return err 393 } ··· 452 }) 453 454 group.Go(func() error { 455 + return storage.StartSegmentCleaner(ctx, ldb, &cli) 456 }) 457 458 group.Go(func() error { 459 + return ldb.StartSegmentCleaner(ctx) 460 }) 461 462 group.Go(func() error {
+3
pkg/config/config.go
··· 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 EthAccountAddr string 60 EthKeystorePath string 61 EthPassword string ··· 242 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations") 243 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 244 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry") 245 246 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 247 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
··· 56 Build *BuildFlags 57 DataDir string 58 DBURL string 59 + LocalDBURL string 60 EthAccountAddr string 61 EthKeystorePath string 62 EthPassword string ··· 243 cli.StringSliceFlag(fs, &cli.AdminDIDs, "admin-dids", []string{}, "comma-separated list of DIDs that are authorized to modify branding and other admin operations") 244 cli.StringSliceFlag(fs, &cli.Syndicate, "syndicate", []string{}, "list of DIDs that we should rebroadcast ('*' for everybody)") 245 fs.BoolVar(&cli.PlayerTelemetry, "player-telemetry", true, "enable player telemetry") 246 + fs.StringVar(&cli.LocalDBURL, "local-db-url", "sqlite://$SP_DATA_DIR/localdb.sqlite", "URL of the local database to use for storing local data") 247 + cli.dataDirFlags = append(cli.dataDirFlags, &cli.LocalDBURL) 248 249 fs.Bool("external-signing", true, "DEPRECATED, does nothing.") 250 fs.Bool("insecure", false, "DEPRECATED, does nothing.")
+5 -1
pkg/director/director.go
··· 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/media" 14 "stream.place/streamplace/pkg/model" ··· 32 op *oatproxy.OATProxy 33 statefulDB *statedb.StatefulDB 34 replicator replication.Replicator 35 } 36 37 - func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator) *Director { 38 return &Director{ 39 mm: mm, 40 mod: mod, ··· 45 op: op, 46 statefulDB: statefulDB, 47 replicator: replicator, 48 } 49 } 50 ··· 79 // Initialize notification channels (buffered size 1 for coalescing) 80 statusUpdateChan: make(chan struct{}, 1), 81 originUpdateChan: make(chan struct{}, 1), 82 } 83 d.streamSessions[not.Segment.RepoDID] = ss 84 g.Go(func() error {
··· 9 "golang.org/x/sync/errgroup" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 + "stream.place/streamplace/pkg/localdb" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/media" 15 "stream.place/streamplace/pkg/model" ··· 33 op *oatproxy.OATProxy 34 statefulDB *statedb.StatefulDB 35 replicator replication.Replicator 36 + localDB localdb.LocalDB 37 } 38 39 + func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, replicator replication.Replicator, ldb localdb.LocalDB) *Director { 40 return &Director{ 41 mm: mm, 42 mod: mod, ··· 47 op: op, 48 statefulDB: statefulDB, 49 replicator: replicator, 50 + localDB: ldb, 51 } 52 } 53 ··· 82 // Initialize notification channels (buffered size 1 for coalescing) 83 statusUpdateChan: make(chan struct{}, 1), 84 originUpdateChan: make(chan struct{}, 1), 85 + localDB: d.localDB, 86 } 87 d.streamSessions[not.Segment.RepoDID] = ss 88 g.Go(func() error {
+6 -4
pkg/director/stream_session.go
··· 20 "stream.place/streamplace/pkg/bus" 21 "stream.place/streamplace/pkg/config" 22 "stream.place/streamplace/pkg/livepeer" 23 "stream.place/streamplace/pkg/log" 24 "stream.place/streamplace/pkg/media" 25 "stream.place/streamplace/pkg/model" ··· 44 lastStatus time.Time 45 lastStatusCID *string 46 lastOriginTime time.Time 47 48 // Channels for background workers 49 statusUpdateChan chan struct{} // Signal to update status ··· 178 aqt := aqtime.FromTime(notif.Segment.StartTime) 179 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 180 notif.Segment.MediaData.Size = len(notif.Data) 181 - err := ss.mod.CreateSegment(notif.Segment) 182 if err != nil { 183 return fmt.Errorf("could not add segment to database: %w", err) 184 } ··· 292 return nil 293 } 294 defer lock.Unlock() 295 - oldThumb, err := ss.mod.LatestThumbnailForUser(not.Segment.RepoDID) 296 if err != nil { 297 return err 298 } ··· 311 if err != nil { 312 return err 313 } 314 - thumb := &model.Thumbnail{ 315 Format: "jpeg", 316 SegmentID: not.Segment.ID, 317 } 318 - err = ss.mod.CreateThumbnail(thumb) 319 if err != nil { 320 return err 321 }
··· 20 "stream.place/streamplace/pkg/bus" 21 "stream.place/streamplace/pkg/config" 22 "stream.place/streamplace/pkg/livepeer" 23 + "stream.place/streamplace/pkg/localdb" 24 "stream.place/streamplace/pkg/log" 25 "stream.place/streamplace/pkg/media" 26 "stream.place/streamplace/pkg/model" ··· 45 lastStatus time.Time 46 lastStatusCID *string 47 lastOriginTime time.Time 48 + localDB localdb.LocalDB 49 50 // Channels for background workers 51 statusUpdateChan chan struct{} // Signal to update status ··· 180 aqt := aqtime.FromTime(notif.Segment.StartTime) 181 ctx = log.WithLogValues(ctx, "segID", notif.Segment.ID, "repoDID", notif.Segment.RepoDID, "timestamp", aqt.FileSafeString()) 182 notif.Segment.MediaData.Size = len(notif.Data) 183 + err := ss.localDB.CreateSegment(notif.Segment) 184 if err != nil { 185 return fmt.Errorf("could not add segment to database: %w", err) 186 } ··· 294 return nil 295 } 296 defer lock.Unlock() 297 + oldThumb, err := ss.localDB.LatestThumbnailForUser(not.Segment.RepoDID) 298 if err != nil { 299 return err 300 } ··· 313 if err != nil { 314 return err 315 } 316 + thumb := &localdb.Thumbnail{ 317 Format: "jpeg", 318 SegmentID: not.Segment.ID, 319 } 320 + err = ss.localDB.CreateThumbnail(thumb) 321 if err != nil { 322 return err 323 }
+81
pkg/localdb/localdb.go
···
··· 1 + package localdb 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + "time" 8 + 9 + "gorm.io/driver/sqlite" 10 + "gorm.io/gorm" 11 + "gorm.io/plugin/prometheus" 12 + "stream.place/streamplace/pkg/config" 13 + "stream.place/streamplace/pkg/log" 14 + ) 15 + 16 + type LocalDB interface { 17 + CreateSegment(segment *Segment) error 18 + MostRecentSegments() ([]Segment, error) 19 + LatestSegmentForUser(user string) (*Segment, error) 20 + LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 21 + FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) 22 + CreateThumbnail(thumb *Thumbnail) error 23 + LatestThumbnailForUser(user string) (*Thumbnail, error) 24 + GetSegment(id string) (*Segment, error) 25 + GetExpiredSegments(ctx context.Context) ([]Segment, error) 26 + DeleteSegment(ctx context.Context, id string) error 27 + StartSegmentCleaner(ctx context.Context) error 28 + SegmentCleaner(ctx context.Context) error 29 + } 30 + 31 + type LocalDatabase struct { 32 + DB *gorm.DB 33 + } 34 + 35 + func MakeDB(dbURL string) (LocalDB, error) { 36 + log.Log(context.Background(), "starting database", "dbURL", dbURL) 37 + if strings.HasPrefix(dbURL, "sqlite://") { 38 + dbURL = dbURL[len("sqlite://"):] 39 + } else if dbURL != ":memory:" { 40 + return nil, fmt.Errorf("unsupported database URL (most start with sqlite://): %s", dbURL) 41 + } 42 + dial := sqlite.Open(dbURL) 43 + 44 + db, err := gorm.Open(dial, &gorm.Config{ 45 + SkipDefaultTransaction: true, 46 + TranslateError: true, 47 + Logger: config.GormLogger, 48 + }) 49 + if err != nil { 50 + return nil, fmt.Errorf("error starting database: %w", err) 51 + } 52 + err = db.Exec("PRAGMA journal_mode=WAL;").Error 53 + if err != nil { 54 + return nil, fmt.Errorf("error setting journal mode: %w", err) 55 + } 56 + 57 + err = db.Use(prometheus.New(prometheus.Config{ 58 + DBName: "localdb", 59 + RefreshInterval: 10, 60 + StartServer: false, 61 + })) 62 + if err != nil { 63 + return nil, fmt.Errorf("error using prometheus plugin: %w", err) 64 + } 65 + 66 + sqlDB, err := db.DB() 67 + if err != nil { 68 + return nil, fmt.Errorf("error getting database: %w", err) 69 + } 70 + sqlDB.SetMaxOpenConns(1) 71 + for _, model := range []any{ 72 + Segment{}, 73 + Thumbnail{}, 74 + } { 75 + err = db.AutoMigrate(model) 76 + if err != nil { 77 + return nil, err 78 + } 79 + } 80 + return &LocalDatabase{DB: db}, nil 81 + }
+410
pkg/localdb/segment.go
···
··· 1 + package localdb 2 + 3 + import ( 4 + "context" 5 + "database/sql/driver" 6 + "encoding/json" 7 + "errors" 8 + "fmt" 9 + "time" 10 + 11 + "gorm.io/gorm" 12 + "stream.place/streamplace/pkg/aqtime" 13 + "stream.place/streamplace/pkg/log" 14 + "stream.place/streamplace/pkg/streamplace" 15 + ) 16 + 17 + type SegmentMediadataVideo struct { 18 + Width int `json:"width"` 19 + Height int `json:"height"` 20 + FPSNum int `json:"fpsNum"` 21 + FPSDen int `json:"fpsDen"` 22 + BFrames bool `json:"bframes"` 23 + } 24 + 25 + type SegmentMediadataAudio struct { 26 + Rate int `json:"rate"` 27 + Channels int `json:"channels"` 28 + } 29 + 30 + type SegmentMediaData struct { 31 + Video []*SegmentMediadataVideo `json:"video"` 32 + Audio []*SegmentMediadataAudio `json:"audio"` 33 + Duration int64 `json:"duration"` 34 + Size int `json:"size"` 35 + } 36 + 37 + // Scan scan value into Jsonb, implements sql.Scanner interface 38 + func (j *SegmentMediaData) Scan(value any) error { 39 + bytes, ok := value.([]byte) 40 + if !ok { 41 + return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 + } 43 + 44 + result := SegmentMediaData{} 45 + err := json.Unmarshal(bytes, &result) 46 + *j = SegmentMediaData(result) 47 + return err 48 + } 49 + 50 + // Value return json value, implement driver.Valuer interface 51 + func (j SegmentMediaData) Value() (driver.Value, error) { 52 + return json.Marshal(j) 53 + } 54 + 55 + // ContentRights represents content rights and attribution information 56 + type ContentRights struct { 57 + CopyrightNotice *string `json:"copyrightNotice,omitempty"` 58 + CopyrightYear *int64 `json:"copyrightYear,omitempty"` 59 + Creator *string `json:"creator,omitempty"` 60 + CreditLine *string `json:"creditLine,omitempty"` 61 + License *string `json:"license,omitempty"` 62 + } 63 + 64 + // Scan scan value into ContentRights, implements sql.Scanner interface 65 + func (c *ContentRights) Scan(value any) error { 66 + if value == nil { 67 + *c = ContentRights{} 68 + return nil 69 + } 70 + bytes, ok := value.([]byte) 71 + if !ok { 72 + return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value)) 73 + } 74 + 75 + result := ContentRights{} 76 + err := json.Unmarshal(bytes, &result) 77 + *c = ContentRights(result) 78 + return err 79 + } 80 + 81 + // Value return json value, implement driver.Valuer interface 82 + func (c ContentRights) Value() (driver.Value, error) { 83 + return json.Marshal(c) 84 + } 85 + 86 + // DistributionPolicy represents distribution policy information 87 + type DistributionPolicy struct { 88 + DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"` 89 + } 90 + 91 + // Scan scan value into DistributionPolicy, implements sql.Scanner interface 92 + func (d *DistributionPolicy) Scan(value any) error { 93 + if value == nil { 94 + *d = DistributionPolicy{} 95 + return nil 96 + } 97 + bytes, ok := value.([]byte) 98 + if !ok { 99 + return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value)) 100 + } 101 + 102 + result := DistributionPolicy{} 103 + err := json.Unmarshal(bytes, &result) 104 + *d = DistributionPolicy(result) 105 + return err 106 + } 107 + 108 + // Value return json value, implement driver.Valuer interface 109 + func (d DistributionPolicy) Value() (driver.Value, error) { 110 + return json.Marshal(d) 111 + } 112 + 113 + // ContentWarningsSlice is a custom type for storing content warnings as JSON in the database 114 + type ContentWarningsSlice []string 115 + 116 + // Scan scan value into ContentWarningsSlice, implements sql.Scanner interface 117 + func (c *ContentWarningsSlice) Scan(value any) error { 118 + if value == nil { 119 + *c = ContentWarningsSlice{} 120 + return nil 121 + } 122 + bytes, ok := value.([]byte) 123 + if !ok { 124 + return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value)) 125 + } 126 + 127 + result := ContentWarningsSlice{} 128 + err := json.Unmarshal(bytes, &result) 129 + *c = ContentWarningsSlice(result) 130 + return err 131 + } 132 + 133 + // Value return json value, implement driver.Valuer interface 134 + func (c ContentWarningsSlice) Value() (driver.Value, error) { 135 + return json.Marshal(c) 136 + } 137 + 138 + type Segment struct { 139 + ID string `json:"id" gorm:"primaryKey"` 140 + SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 141 + StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"` 142 + RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"` 143 + Title string `json:"title"` 144 + Size int `json:"size" gorm:"column:size"` 145 + MediaData *SegmentMediaData `json:"mediaData,omitempty"` 146 + ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"` 147 + ContentRights *ContentRights `json:"contentRights,omitempty"` 148 + DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"` 149 + DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"` 150 + } 151 + 152 + func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 153 + aqt := aqtime.FromTime(s.StartTime) 154 + if s.MediaData == nil { 155 + return nil, fmt.Errorf("media data is nil") 156 + } 157 + if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 158 + return nil, fmt.Errorf("video data is nil") 159 + } 160 + if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 161 + return nil, fmt.Errorf("audio data is nil") 162 + } 163 + duration := s.MediaData.Duration 164 + sizei64 := int64(s.Size) 165 + 166 + // Convert model metadata to streamplace metadata 167 + var contentRights *streamplace.MetadataContentRights 168 + if s.ContentRights != nil { 169 + contentRights = &streamplace.MetadataContentRights{ 170 + CopyrightNotice: s.ContentRights.CopyrightNotice, 171 + CopyrightYear: s.ContentRights.CopyrightYear, 172 + Creator: s.ContentRights.Creator, 173 + CreditLine: s.ContentRights.CreditLine, 174 + License: s.ContentRights.License, 175 + } 176 + } 177 + 178 + var contentWarnings *streamplace.MetadataContentWarnings 179 + if len(s.ContentWarnings) > 0 { 180 + contentWarnings = &streamplace.MetadataContentWarnings{ 181 + Warnings: []string(s.ContentWarnings), 182 + } 183 + } 184 + 185 + var distributionPolicy *streamplace.MetadataDistributionPolicy 186 + if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil { 187 + distributionPolicy = &streamplace.MetadataDistributionPolicy{ 188 + DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds, 189 + } 190 + } 191 + 192 + return &streamplace.Segment{ 193 + LexiconTypeID: "place.stream.segment", 194 + Creator: s.RepoDID, 195 + Id: s.ID, 196 + SigningKey: s.SigningKeyDID, 197 + StartTime: string(aqt), 198 + Duration: &duration, 199 + Size: &sizei64, 200 + ContentRights: contentRights, 201 + ContentWarnings: contentWarnings, 202 + DistributionPolicy: distributionPolicy, 203 + Video: []*streamplace.Segment_Video{ 204 + { 205 + Codec: "h264", 206 + Width: int64(s.MediaData.Video[0].Width), 207 + Height: int64(s.MediaData.Video[0].Height), 208 + Framerate: &streamplace.Segment_Framerate{ 209 + Num: int64(s.MediaData.Video[0].FPSNum), 210 + Den: int64(s.MediaData.Video[0].FPSDen), 211 + }, 212 + Bframes: &s.MediaData.Video[0].BFrames, 213 + }, 214 + }, 215 + Audio: []*streamplace.Segment_Audio{ 216 + { 217 + Codec: "opus", 218 + Rate: int64(s.MediaData.Audio[0].Rate), 219 + Channels: int64(s.MediaData.Audio[0].Channels), 220 + }, 221 + }, 222 + }, nil 223 + } 224 + 225 + func (m *LocalDatabase) CreateSegment(seg *Segment) error { 226 + err := m.DB.Model(Segment{}).Create(seg).Error 227 + if err != nil { 228 + return err 229 + } 230 + return nil 231 + } 232 + 233 + // should return the most recent segment for each user, ordered by most recent first 234 + // only includes segments from the last 30 seconds 235 + func (m *LocalDatabase) MostRecentSegments() ([]Segment, error) { 236 + var segments []Segment 237 + thirtySecondsAgo := time.Now().Add(-30 * time.Second) 238 + 239 + err := m.DB.Table("segments"). 240 + Select("segments.*"). 241 + Where("start_time > ?", thirtySecondsAgo.UTC()). 242 + Order("start_time DESC"). 243 + Find(&segments).Error 244 + if err != nil { 245 + return nil, err 246 + } 247 + if segments == nil { 248 + return []Segment{}, nil 249 + } 250 + 251 + segmentMap := make(map[string]Segment) 252 + for _, seg := range segments { 253 + prev, ok := segmentMap[seg.RepoDID] 254 + if !ok { 255 + segmentMap[seg.RepoDID] = seg 256 + } else { 257 + if seg.StartTime.After(prev.StartTime) { 258 + segmentMap[seg.RepoDID] = seg 259 + } 260 + } 261 + } 262 + 263 + filteredSegments := []Segment{} 264 + for _, seg := range segmentMap { 265 + filteredSegments = append(filteredSegments, seg) 266 + } 267 + 268 + return filteredSegments, nil 269 + } 270 + 271 + func (m *LocalDatabase) LatestSegmentForUser(user string) (*Segment, error) { 272 + var seg Segment 273 + err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 274 + if err != nil { 275 + return nil, err 276 + } 277 + return &seg, nil 278 + } 279 + 280 + func (m *LocalDatabase) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) { 281 + if len(repoDIDs) == 0 { 282 + return []string{}, nil 283 + } 284 + 285 + thirtySecondsAgo := time.Now().Add(-30 * time.Second) 286 + 287 + var liveDIDs []string 288 + 289 + err := m.DB.Table("segments"). 290 + Select("DISTINCT repo_did"). 291 + Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()). 292 + Pluck("repo_did", &liveDIDs).Error 293 + 294 + if err != nil { 295 + return nil, err 296 + } 297 + 298 + return liveDIDs, nil 299 + } 300 + 301 + func (m *LocalDatabase) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 302 + var segs []Segment 303 + if before == nil { 304 + later := time.Now().Add(1000 * time.Hour) 305 + before = &later 306 + } 307 + if after == nil { 308 + earlier := time.Time{} 309 + after = &earlier 310 + } 311 + err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 312 + if err != nil { 313 + return nil, err 314 + } 315 + return segs, nil 316 + } 317 + 318 + func (m *LocalDatabase) GetSegment(id string) (*Segment, error) { 319 + var seg Segment 320 + 321 + err := m.DB.Model(&Segment{}). 322 + Preload("Repo"). 323 + Where("id = ?", id). 324 + First(&seg).Error 325 + 326 + if errors.Is(err, gorm.ErrRecordNotFound) { 327 + return nil, nil 328 + } 329 + if err != nil { 330 + return nil, err 331 + } 332 + 333 + return &seg, nil 334 + } 335 + 336 + func (m *LocalDatabase) GetExpiredSegments(ctx context.Context) ([]Segment, error) { 337 + 338 + var expiredSegments []Segment 339 + now := time.Now() 340 + err := m.DB. 341 + Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()). 342 + Find(&expiredSegments).Error 343 + if err != nil { 344 + return nil, err 345 + } 346 + 347 + return expiredSegments, nil 348 + } 349 + 350 + func (m *LocalDatabase) DeleteSegment(ctx context.Context, id string) error { 351 + return m.DB.Delete(&Segment{}, "id = ?", id).Error 352 + } 353 + 354 + func (m *LocalDatabase) StartSegmentCleaner(ctx context.Context) error { 355 + err := m.SegmentCleaner(ctx) 356 + if err != nil { 357 + return err 358 + } 359 + ticker := time.NewTicker(1 * time.Minute) 360 + defer ticker.Stop() 361 + 362 + for { 363 + select { 364 + case <-ctx.Done(): 365 + return nil 366 + case <-ticker.C: 367 + err := m.SegmentCleaner(ctx) 368 + if err != nil { 369 + log.Error(ctx, "Failed to clean segments", "error", err) 370 + } 371 + } 372 + } 373 + } 374 + 375 + func (m *LocalDatabase) SegmentCleaner(ctx context.Context) error { 376 + // Calculate the cutoff time (10 minutes ago) 377 + cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 378 + 379 + // Find all unique repo_did values 380 + var repoDIDs []string 381 + if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 382 + log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 383 + return err 384 + } 385 + 386 + // For each user, keep their last 10 segments and delete older ones 387 + for _, repoDID := range repoDIDs { 388 + // Get IDs of the last 10 segments for this user 389 + var keepSegmentIDs []string 390 + if err := m.DB.Model(&Segment{}). 391 + Where("repo_did = ?", repoDID). 392 + Order("start_time DESC"). 393 + Limit(10). 394 + Pluck("id", &keepSegmentIDs).Error; err != nil { 395 + log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 396 + return err 397 + } 398 + 399 + // Delete old segments except the ones we want to keep 400 + result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 401 + repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 402 + 403 + if result.Error != nil { 404 + log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 405 + } else if result.RowsAffected > 0 { 406 + log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 407 + } 408 + } 409 + return nil 410 + }
+59
pkg/localdb/segment_test.go
···
··· 1 + package localdb 2 + 3 + import ( 4 + "fmt" 5 + "sync" 6 + "testing" 7 + "time" 8 + 9 + "github.com/stretchr/testify/require" 10 + "stream.place/streamplace/pkg/config" 11 + ) 12 + 13 + func TestSegmentPerf(t *testing.T) { 14 + config.DisableSQLLogging() 15 + // dburl := filepath.Join(t.TempDir(), "test.db") 16 + db, err := MakeDB(":memory:") 17 + require.NoError(t, err) 18 + // Create a ldb instance 19 + ldb := db.(*LocalDatabase) 20 + t.Cleanup(func() { 21 + // os.Remove(dburl) 22 + }) 23 + 24 + defer config.EnableSQLLogging() 25 + // Create 250000 segments with timestamps 1 hour ago, each one second apart 26 + wg := sync.WaitGroup{} 27 + segCount := 250000 28 + wg.Add(segCount) 29 + baseTime := time.Now() 30 + for i := 0; i < segCount; i++ { 31 + segment := &Segment{ 32 + ID: fmt.Sprintf("segment-%d", i), 33 + RepoDID: "did:plc:test123", 34 + StartTime: baseTime.Add(-time.Duration(i) * time.Second).UTC(), 35 + } 36 + go func() { 37 + defer wg.Done() 38 + err = ldb.DB.Create(segment).Error 39 + require.NoError(t, err) 40 + }() 41 + } 42 + wg.Wait() 43 + 44 + startTime := time.Now() 45 + wg = sync.WaitGroup{} 46 + runs := 1000 47 + wg.Add(runs) 48 + for i := 0; i < runs; i++ { 49 + go func() { 50 + defer wg.Done() 51 + _, err := ldb.MostRecentSegments() 52 + require.NoError(t, err) 53 + // require.Len(t, segments, 1) 54 + }() 55 + } 56 + wg.Wait() 57 + fmt.Printf("Time taken: %s\n", time.Since(startTime)) 58 + require.Less(t, time.Since(startTime), 10*time.Second) 59 + }
+60
pkg/localdb/thumbnail.go
···
··· 1 + package localdb 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/google/uuid" 7 + ) 8 + 9 + type Thumbnail struct { 10 + ID string `json:"id" gorm:"primaryKey"` 11 + Format string `json:"format"` 12 + SegmentID string `json:"segmentId" gorm:"index"` 13 + Segment Segment `json:"segment,omitempty" gorm:"foreignKey:SegmentID;references:id"` 14 + } 15 + 16 + func (m *LocalDatabase) CreateThumbnail(thumb *Thumbnail) error { 17 + uu, err := uuid.NewV7() 18 + if err != nil { 19 + return err 20 + } 21 + if thumb.SegmentID == "" { 22 + return fmt.Errorf("segmentID is required") 23 + } 24 + thumb.ID = uu.String() 25 + err = m.DB.Model(Thumbnail{}).Create(thumb).Error 26 + if err != nil { 27 + return err 28 + } 29 + return nil 30 + } 31 + 32 + // return the most recent thumbnail for a user 33 + func (m *LocalDatabase) LatestThumbnailForUser(user string) (*Thumbnail, error) { 34 + var thumbnail Thumbnail 35 + 36 + res := m.DB.Table("thumbnails AS t"). 37 + Select("t.*"). 38 + Joins("JOIN segments AS s ON t.segment_id = s.id"). 39 + Where("s.repo_did = ?", user). 40 + Order("s.start_time DESC"). 41 + Limit(1). 42 + Scan(&thumbnail) 43 + 44 + if res.RowsAffected == 0 { 45 + return nil, nil 46 + } 47 + if res.Error != nil { 48 + return nil, res.Error 49 + } 50 + 51 + var seg Segment 52 + err := m.DB.First(&seg, "id = ?", thumbnail.SegmentID).Error 53 + if err != nil { 54 + return nil, fmt.Errorf("could not find segment for thumbnail SegmentID=%s", thumbnail.SegmentID) 55 + } 56 + 57 + thumbnail.Segment = seg 58 + 59 + return &thumbnail, nil 60 + }
+3 -3
pkg/media/clip_user.go
··· 10 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 - "stream.place/streamplace/pkg/model" 14 ) 15 16 - func ClipUser(ctx context.Context, mod model.Model, cli *config.CLI, user string, writer io.Writer, before *time.Time, after *time.Time) error { 17 - segments, err := mod.LatestSegmentsForUser(user, -1, before, after) 18 if err != nil { 19 return fmt.Errorf("unable to get segments: %w", err) 20 }
··· 10 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 + "stream.place/streamplace/pkg/localdb" 14 ) 15 16 + func ClipUser(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, user string, writer io.Writer, before *time.Time, after *time.Time) error { 17 + segments, err := localDB.LatestSegmentsForUser(user, -1, before, after) 18 if err != nil { 19 return fmt.Errorf("unable to get segments: %w", err) 20 }
+11 -8
pkg/media/media.go
··· 21 c2patypes "stream.place/streamplace/pkg/c2patypes" 22 "stream.place/streamplace/pkg/config" 23 "stream.place/streamplace/pkg/gstinit" 24 "stream.place/streamplace/pkg/model" 25 "stream.place/streamplace/pkg/streamplace" 26 ··· 51 atsync *atproto.ATProtoSynchronizer 52 webrtcAPI *webrtc.API 53 webrtcConfig webrtc.Configuration 54 } 55 56 type NewSegmentNotification struct { 57 - Segment *model.Segment 58 Data []byte 59 Metadata *SegmentMetadata 60 Local bool ··· 65 return SelfTest(ctx) 66 } 67 68 - func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 69 gstinit.InitGST() 70 err := SelfTest(ctx) 71 if err != nil { ··· 127 atsync: atsync, 128 webrtcAPI: api, 129 webrtcConfig: config, 130 }, nil 131 } 132 ··· 190 Title string 191 Creator string 192 ContentWarnings []string 193 - ContentRights *model.ContentRights 194 - DistributionPolicy *model.DistributionPolicy 195 MetadataConfiguration *streamplace.MetadataConfiguration 196 Livestream *streamplace.Livestream 197 } ··· 312 } 313 314 // extractContentRights extracts content rights from the C2PA manifest 315 - func extractContentRights(mani *c2patypes.Manifest) *model.ContentRights { 316 ass := findAssertion(mani, StreamplaceMetadata) 317 if ass == nil { 318 return nil ··· 323 return nil 324 } 325 326 - rights := &model.ContentRights{} 327 328 // Extract copyright notice 329 if notice, ok := data["dc:rights"]; ok { ··· 375 } 376 377 // extractDistributionPolicy extracts distribution policy from the C2PA manifest 378 - func extractDistributionPolicy(mani *c2patypes.Manifest, segmentStart aqtime.AQTime) *model.DistributionPolicy { 379 metadataConfig := extractMetadataConfiguration(mani) 380 if metadataConfig == nil { 381 return nil ··· 392 // deleteAfter contains an offset in seconds from creation time 393 deleteAfterSeconds := *metadataConfig.DistributionPolicy.DeleteAfter 394 395 - return &model.DistributionPolicy{ 396 DeleteAfterSeconds: &deleteAfterSeconds, 397 } 398 }
··· 21 c2patypes "stream.place/streamplace/pkg/c2patypes" 22 "stream.place/streamplace/pkg/config" 23 "stream.place/streamplace/pkg/gstinit" 24 + "stream.place/streamplace/pkg/localdb" 25 "stream.place/streamplace/pkg/model" 26 "stream.place/streamplace/pkg/streamplace" 27 ··· 52 atsync *atproto.ATProtoSynchronizer 53 webrtcAPI *webrtc.API 54 webrtcConfig webrtc.Configuration 55 + localDB localdb.LocalDB 56 } 57 58 type NewSegmentNotification struct { 59 + Segment *localdb.Segment 60 Data []byte 61 Metadata *SegmentMetadata 62 Local bool ··· 67 return SelfTest(ctx) 68 } 69 70 + func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, ldb localdb.LocalDB) (*MediaManager, error) { 71 gstinit.InitGST() 72 err := SelfTest(ctx) 73 if err != nil { ··· 129 atsync: atsync, 130 webrtcAPI: api, 131 webrtcConfig: config, 132 + localDB: ldb, 133 }, nil 134 } 135 ··· 193 Title string 194 Creator string 195 ContentWarnings []string 196 + ContentRights *localdb.ContentRights 197 + DistributionPolicy *localdb.DistributionPolicy 198 MetadataConfiguration *streamplace.MetadataConfiguration 199 Livestream *streamplace.Livestream 200 } ··· 315 } 316 317 // extractContentRights extracts content rights from the C2PA manifest 318 + func extractContentRights(mani *c2patypes.Manifest) *localdb.ContentRights { 319 ass := findAssertion(mani, StreamplaceMetadata) 320 if ass == nil { 321 return nil ··· 326 return nil 327 } 328 329 + rights := &localdb.ContentRights{} 330 331 // Extract copyright notice 332 if notice, ok := data["dc:rights"]; ok { ··· 378 } 379 380 // extractDistributionPolicy extracts distribution policy from the C2PA manifest 381 + func extractDistributionPolicy(mani *c2patypes.Manifest, segmentStart aqtime.AQTime) *localdb.DistributionPolicy { 382 metadataConfig := extractMetadataConfiguration(mani) 383 if metadataConfig == nil { 384 return nil ··· 395 // deleteAfter contains an offset in seconds from creation time 396 deleteAfterSeconds := *metadataConfig.DistributionPolicy.DeleteAfter 397 398 + return &localdb.DistributionPolicy{ 399 DeleteAfterSeconds: &deleteAfterSeconds, 400 } 401 }
+9 -9
pkg/media/media_data_parser.go
··· 13 "github.com/go-gst/go-gst/gst" 14 "github.com/go-gst/go-gst/gst/app" 15 "go.opentelemetry.io/otel" 16 "stream.place/streamplace/pkg/log" 17 - "stream.place/streamplace/pkg/model" 18 ) 19 20 func padProbeEmpty(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { 21 return gst.PadProbeOK 22 } 23 24 - func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*model.SegmentMediaData, error) { 25 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 26 defer span.End() 27 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") ··· 40 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 41 } 42 43 - var videoMetadata *model.SegmentMediadataVideo 44 - var audioMetadata *model.SegmentMediadataAudio 45 46 appsrc, err := pipeline.GetElementByName("appsrc") 47 if err != nil { ··· 118 name := structure.Name() 119 120 if name[:5] == "video" { 121 - videoMetadata = &model.SegmentMediadataVideo{} 122 // Get some common video properties 123 widthVal, _ := structure.GetValue("width") 124 heightVal, _ := structure.GetValue("height") ··· 147 } 148 149 if name[:5] == "audio" { 150 - audioMetadata = &model.SegmentMediadataAudio{} 151 // Get some common audio properties 152 rateVal, _ := structure.GetValue("rate") 153 channelsVal, _ := structure.GetValue("channels") ··· 275 276 videoMetadata.BFrames = hasBFrames 277 278 - meta := &model.SegmentMediaData{ 279 - Video: []*model.SegmentMediadataVideo{videoMetadata}, 280 - Audio: []*model.SegmentMediadataAudio{audioMetadata}, 281 } 282 283 ok, dur := pipeline.QueryDuration(gst.FormatTime)
··· 13 "github.com/go-gst/go-gst/gst" 14 "github.com/go-gst/go-gst/gst/app" 15 "go.opentelemetry.io/otel" 16 + "stream.place/streamplace/pkg/localdb" 17 "stream.place/streamplace/pkg/log" 18 ) 19 20 func padProbeEmpty(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { 21 return gst.PadProbeOK 22 } 23 24 + func ParseSegmentMediaData(ctx context.Context, mp4bs []byte) (*localdb.SegmentMediaData, error) { 25 ctx, span := otel.Tracer("signer").Start(ctx, "ParseSegmentMediaData") 26 defer span.End() 27 ctx = log.WithLogValues(ctx, "GStreamerFunc", "ParseSegmentMediaData") ··· 40 return nil, fmt.Errorf("error creating SegmentMetadata pipeline: %w", err) 41 } 42 43 + var videoMetadata *localdb.SegmentMediadataVideo 44 + var audioMetadata *localdb.SegmentMediadataAudio 45 46 appsrc, err := pipeline.GetElementByName("appsrc") 47 if err != nil { ··· 118 name := structure.Name() 119 120 if name[:5] == "video" { 121 + videoMetadata = &localdb.SegmentMediadataVideo{} 122 // Get some common video properties 123 widthVal, _ := structure.GetValue("width") 124 heightVal, _ := structure.GetValue("height") ··· 147 } 148 149 if name[:5] == "audio" { 150 + audioMetadata = &localdb.SegmentMediadataAudio{} 151 // Get some common audio properties 152 rateVal, _ := structure.GetValue("rate") 153 channelsVal, _ := structure.GetValue("channels") ··· 275 276 videoMetadata.BFrames = hasBFrames 277 278 + meta := &localdb.SegmentMediaData{ 279 + Video: []*localdb.SegmentMediadataVideo{videoMetadata}, 280 + Audio: []*localdb.SegmentMediadataAudio{audioMetadata}, 281 } 282 283 ok, dur := pipeline.QueryDuration(gst.FormatTime)
+4 -1
pkg/media/media_test.go
··· 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/config" 13 ct "stream.place/streamplace/pkg/config/configtesting" 14 "stream.place/streamplace/pkg/model" 15 "stream.place/streamplace/pkg/statedb" 16 ) ··· 23 24 func getStaticTestMediaManager(t *testing.T) (*MediaManager, MediaSigner) { 25 mod, err := model.MakeDB(":memory:") 26 require.NoError(t, err) 27 // signer, err := c2pa.MakeStaticSigner(eip712test.KeyBytes) 28 require.NoError(t, err) ··· 42 StatefulDB: statedb, 43 Bus: bus.NewBus(), 44 } 45 - mm, err := MakeMediaManager(context.Background(), cli, nil, mod, bus.NewBus(), atsync) 46 require.NoError(t, err) 47 // ms, err := MakeMediaSigner(context.Background(), cli, "test-person", signer) 48 // require.NoError(t, err)
··· 11 "stream.place/streamplace/pkg/bus" 12 "stream.place/streamplace/pkg/config" 13 ct "stream.place/streamplace/pkg/config/configtesting" 14 + "stream.place/streamplace/pkg/localdb" 15 "stream.place/streamplace/pkg/model" 16 "stream.place/streamplace/pkg/statedb" 17 ) ··· 24 25 func getStaticTestMediaManager(t *testing.T) (*MediaManager, MediaSigner) { 26 mod, err := model.MakeDB(":memory:") 27 + require.NoError(t, err) 28 + ldb, err := localdb.MakeDB(":memory:") 29 require.NoError(t, err) 30 // signer, err := c2pa.MakeStaticSigner(eip712test.KeyBytes) 31 require.NoError(t, err) ··· 45 StatefulDB: statedb, 46 Bus: bus.NewBus(), 47 } 48 + mm, err := MakeMediaManager(context.Background(), cli, nil, mod, bus.NewBus(), atsync, ldb) 49 require.NoError(t, err) 50 // ms, err := MakeMediaSigner(context.Background(), cli, "test-person", signer) 51 // require.NoError(t, err)
+5 -5
pkg/media/validate.go
··· 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/crypto/signers" 20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 "stream.place/streamplace/pkg/log" 22 - "stream.place/streamplace/pkg/model" 23 ) 24 25 type ManifestAndCert struct { ··· 47 48 label := manifest.Label 49 if label != nil && mm.model != nil { 50 - oldSeg, err := mm.model.GetSegment(*label) 51 if err != nil { 52 return fmt.Errorf("failed to get old segment: %w", err) 53 } ··· 117 expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 118 deleteAfter = &expiryTime 119 } 120 - seg := &model.Segment{ 121 ID: *label, 122 SigningKeyDID: signingKeyDID, 123 RepoDID: repoDID, ··· 125 Title: meta.Title, 126 Size: len(buf), 127 MediaData: mediaData, 128 - ContentWarnings: model.ContentWarningsSlice(meta.ContentWarnings), 129 ContentRights: meta.ContentRights, 130 DistributionPolicy: meta.DistributionPolicy, 131 DeleteAfter: deleteAfter, ··· 205 type ValidationResult struct { 206 Pub *atcrypto.PublicKeyK256 207 Meta *SegmentMetadata 208 - MediaData *model.SegmentMediaData 209 Manifest *c2patypes.Manifest 210 Cert string 211 }
··· 18 "stream.place/streamplace/pkg/constants" 19 "stream.place/streamplace/pkg/crypto/signers" 20 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 21 + "stream.place/streamplace/pkg/localdb" 22 "stream.place/streamplace/pkg/log" 23 ) 24 25 type ManifestAndCert struct { ··· 47 48 label := manifest.Label 49 if label != nil && mm.model != nil { 50 + oldSeg, err := mm.localDB.GetSegment(*label) 51 if err != nil { 52 return fmt.Errorf("failed to get old segment: %w", err) 53 } ··· 117 expiryTime := meta.StartTime.Time().Add(time.Duration(*meta.DistributionPolicy.DeleteAfterSeconds) * time.Second) 118 deleteAfter = &expiryTime 119 } 120 + seg := &localdb.Segment{ 121 ID: *label, 122 SigningKeyDID: signingKeyDID, 123 RepoDID: repoDID, ··· 125 Title: meta.Title, 126 Size: len(buf), 127 MediaData: mediaData, 128 + ContentWarnings: localdb.ContentWarningsSlice(meta.ContentWarnings), 129 ContentRights: meta.ContentRights, 130 DistributionPolicy: meta.DistributionPolicy, 131 DeleteAfter: deleteAfter, ··· 205 type ValidationResult struct { 206 Pub *atcrypto.PublicKeyK256 207 Meta *SegmentMetadata 208 + MediaData *localdb.SegmentMediaData 209 Manifest *c2patypes.Manifest 210 Cert string 211 }
-15
pkg/model/model.go
··· 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 - CreateSegment(segment *Segment) error 32 - MostRecentSegments() ([]Segment, error) 33 - LatestSegmentForUser(user string) (*Segment, error) 34 - LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 35 - FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) 36 - CreateThumbnail(thumb *Thumbnail) error 37 - LatestThumbnailForUser(user string) (*Thumbnail, error) 38 - GetSegment(id string) (*Segment, error) 39 - GetExpiredSegments(ctx context.Context) ([]Segment, error) 40 - DeleteSegment(ctx context.Context, id string) error 41 - StartSegmentCleaner(ctx context.Context) error 42 - SegmentCleaner(ctx context.Context) error 43 - 44 GetIdentity(id string) (*Identity, error) 45 UpdateIdentity(ident *Identity) error 46 ··· 178 sqlDB.SetMaxOpenConns(1) 179 for _, model := range []any{ 180 PlayerEvent{}, 181 - Segment{}, 182 - Thumbnail{}, 183 Identity{}, 184 Repo{}, 185 SigningKey{},
··· 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 GetIdentity(id string) (*Identity, error) 32 UpdateIdentity(ident *Identity) error 33 ··· 165 sqlDB.SetMaxOpenConns(1) 166 for _, model := range []any{ 167 PlayerEvent{}, 168 Identity{}, 169 Repo{}, 170 SigningKey{},
-411
pkg/model/segment.go
··· 1 package model 2 - 3 - import ( 4 - "context" 5 - "database/sql/driver" 6 - "encoding/json" 7 - "errors" 8 - "fmt" 9 - "time" 10 - 11 - "gorm.io/gorm" 12 - "stream.place/streamplace/pkg/aqtime" 13 - "stream.place/streamplace/pkg/log" 14 - "stream.place/streamplace/pkg/streamplace" 15 - ) 16 - 17 - type SegmentMediadataVideo struct { 18 - Width int `json:"width"` 19 - Height int `json:"height"` 20 - FPSNum int `json:"fpsNum"` 21 - FPSDen int `json:"fpsDen"` 22 - BFrames bool `json:"bframes"` 23 - } 24 - 25 - type SegmentMediadataAudio struct { 26 - Rate int `json:"rate"` 27 - Channels int `json:"channels"` 28 - } 29 - 30 - type SegmentMediaData struct { 31 - Video []*SegmentMediadataVideo `json:"video"` 32 - Audio []*SegmentMediadataAudio `json:"audio"` 33 - Duration int64 `json:"duration"` 34 - Size int `json:"size"` 35 - } 36 - 37 - // Scan scan value into Jsonb, implements sql.Scanner interface 38 - func (j *SegmentMediaData) Scan(value any) error { 39 - bytes, ok := value.([]byte) 40 - if !ok { 41 - return errors.New(fmt.Sprint("Failed to unmarshal JSONB value:", value)) 42 - } 43 - 44 - result := SegmentMediaData{} 45 - err := json.Unmarshal(bytes, &result) 46 - *j = SegmentMediaData(result) 47 - return err 48 - } 49 - 50 - // Value return json value, implement driver.Valuer interface 51 - func (j SegmentMediaData) Value() (driver.Value, error) { 52 - return json.Marshal(j) 53 - } 54 - 55 - // ContentRights represents content rights and attribution information 56 - type ContentRights struct { 57 - CopyrightNotice *string `json:"copyrightNotice,omitempty"` 58 - CopyrightYear *int64 `json:"copyrightYear,omitempty"` 59 - Creator *string `json:"creator,omitempty"` 60 - CreditLine *string `json:"creditLine,omitempty"` 61 - License *string `json:"license,omitempty"` 62 - } 63 - 64 - // Scan scan value into ContentRights, implements sql.Scanner interface 65 - func (c *ContentRights) Scan(value any) error { 66 - if value == nil { 67 - *c = ContentRights{} 68 - return nil 69 - } 70 - bytes, ok := value.([]byte) 71 - if !ok { 72 - return errors.New(fmt.Sprint("Failed to unmarshal ContentRights value:", value)) 73 - } 74 - 75 - result := ContentRights{} 76 - err := json.Unmarshal(bytes, &result) 77 - *c = ContentRights(result) 78 - return err 79 - } 80 - 81 - // Value return json value, implement driver.Valuer interface 82 - func (c ContentRights) Value() (driver.Value, error) { 83 - return json.Marshal(c) 84 - } 85 - 86 - // DistributionPolicy represents distribution policy information 87 - type DistributionPolicy struct { 88 - DeleteAfterSeconds *int64 `json:"deleteAfterSeconds,omitempty"` 89 - } 90 - 91 - // Scan scan value into DistributionPolicy, implements sql.Scanner interface 92 - func (d *DistributionPolicy) Scan(value any) error { 93 - if value == nil { 94 - *d = DistributionPolicy{} 95 - return nil 96 - } 97 - bytes, ok := value.([]byte) 98 - if !ok { 99 - return errors.New(fmt.Sprint("Failed to unmarshal DistributionPolicy value:", value)) 100 - } 101 - 102 - result := DistributionPolicy{} 103 - err := json.Unmarshal(bytes, &result) 104 - *d = DistributionPolicy(result) 105 - return err 106 - } 107 - 108 - // Value return json value, implement driver.Valuer interface 109 - func (d DistributionPolicy) Value() (driver.Value, error) { 110 - return json.Marshal(d) 111 - } 112 - 113 - // ContentWarningsSlice is a custom type for storing content warnings as JSON in the database 114 - type ContentWarningsSlice []string 115 - 116 - // Scan scan value into ContentWarningsSlice, implements sql.Scanner interface 117 - func (c *ContentWarningsSlice) Scan(value any) error { 118 - if value == nil { 119 - *c = ContentWarningsSlice{} 120 - return nil 121 - } 122 - bytes, ok := value.([]byte) 123 - if !ok { 124 - return errors.New(fmt.Sprint("Failed to unmarshal ContentWarningsSlice value:", value)) 125 - } 126 - 127 - result := ContentWarningsSlice{} 128 - err := json.Unmarshal(bytes, &result) 129 - *c = ContentWarningsSlice(result) 130 - return err 131 - } 132 - 133 - // Value return json value, implement driver.Valuer interface 134 - func (c ContentWarningsSlice) Value() (driver.Value, error) { 135 - return json.Marshal(c) 136 - } 137 - 138 - type Segment struct { 139 - ID string `json:"id" gorm:"primaryKey"` 140 - SigningKeyDID string `json:"signingKeyDID" gorm:"column:signing_key_did"` 141 - SigningKey *SigningKey `json:"signingKey,omitempty" gorm:"foreignKey:DID;references:SigningKeyDID"` 142 - StartTime time.Time `json:"startTime" gorm:"index:latest_segments,priority:2;index:start_time"` 143 - RepoDID string `json:"repoDID" gorm:"index:latest_segments,priority:1;column:repo_did"` 144 - Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 145 - Title string `json:"title"` 146 - Size int `json:"size" gorm:"column:size"` 147 - MediaData *SegmentMediaData `json:"mediaData,omitempty"` 148 - ContentWarnings ContentWarningsSlice `json:"contentWarnings,omitempty"` 149 - ContentRights *ContentRights `json:"contentRights,omitempty"` 150 - DistributionPolicy *DistributionPolicy `json:"distributionPolicy,omitempty"` 151 - DeleteAfter *time.Time `json:"deleteAfter,omitempty" gorm:"column:delete_after;index:delete_after"` 152 - } 153 - 154 - func (s *Segment) ToStreamplaceSegment() (*streamplace.Segment, error) { 155 - aqt := aqtime.FromTime(s.StartTime) 156 - if s.MediaData == nil { 157 - return nil, fmt.Errorf("media data is nil") 158 - } 159 - if len(s.MediaData.Video) == 0 || s.MediaData.Video[0] == nil { 160 - return nil, fmt.Errorf("video data is nil") 161 - } 162 - if len(s.MediaData.Audio) == 0 || s.MediaData.Audio[0] == nil { 163 - return nil, fmt.Errorf("audio data is nil") 164 - } 165 - duration := s.MediaData.Duration 166 - sizei64 := int64(s.Size) 167 - 168 - // Convert model metadata to streamplace metadata 169 - var contentRights *streamplace.MetadataContentRights 170 - if s.ContentRights != nil { 171 - contentRights = &streamplace.MetadataContentRights{ 172 - CopyrightNotice: s.ContentRights.CopyrightNotice, 173 - CopyrightYear: s.ContentRights.CopyrightYear, 174 - Creator: s.ContentRights.Creator, 175 - CreditLine: s.ContentRights.CreditLine, 176 - License: s.ContentRights.License, 177 - } 178 - } 179 - 180 - var contentWarnings *streamplace.MetadataContentWarnings 181 - if len(s.ContentWarnings) > 0 { 182 - contentWarnings = &streamplace.MetadataContentWarnings{ 183 - Warnings: []string(s.ContentWarnings), 184 - } 185 - } 186 - 187 - var distributionPolicy *streamplace.MetadataDistributionPolicy 188 - if s.DistributionPolicy != nil && s.DistributionPolicy.DeleteAfterSeconds != nil { 189 - distributionPolicy = &streamplace.MetadataDistributionPolicy{ 190 - DeleteAfter: s.DistributionPolicy.DeleteAfterSeconds, 191 - } 192 - } 193 - 194 - return &streamplace.Segment{ 195 - LexiconTypeID: "place.stream.segment", 196 - Creator: s.RepoDID, 197 - Id: s.ID, 198 - SigningKey: s.SigningKeyDID, 199 - StartTime: string(aqt), 200 - Duration: &duration, 201 - Size: &sizei64, 202 - ContentRights: contentRights, 203 - ContentWarnings: contentWarnings, 204 - DistributionPolicy: distributionPolicy, 205 - Video: []*streamplace.Segment_Video{ 206 - { 207 - Codec: "h264", 208 - Width: int64(s.MediaData.Video[0].Width), 209 - Height: int64(s.MediaData.Video[0].Height), 210 - Framerate: &streamplace.Segment_Framerate{ 211 - Num: int64(s.MediaData.Video[0].FPSNum), 212 - Den: int64(s.MediaData.Video[0].FPSDen), 213 - }, 214 - Bframes: &s.MediaData.Video[0].BFrames, 215 - }, 216 - }, 217 - Audio: []*streamplace.Segment_Audio{ 218 - { 219 - Codec: "opus", 220 - Rate: int64(s.MediaData.Audio[0].Rate), 221 - Channels: int64(s.MediaData.Audio[0].Channels), 222 - }, 223 - }, 224 - }, nil 225 - } 226 - 227 - func (m *DBModel) CreateSegment(seg *Segment) error { 228 - err := m.DB.Model(Segment{}).Create(seg).Error 229 - if err != nil { 230 - return err 231 - } 232 - return nil 233 - } 234 - 235 - // should return the most recent segment for each user, ordered by most recent first 236 - // only includes segments from the last 30 seconds 237 - func (m *DBModel) MostRecentSegments() ([]Segment, error) { 238 - var segments []Segment 239 - thirtySecondsAgo := time.Now().Add(-30 * time.Second) 240 - 241 - err := m.DB.Table("segments"). 242 - Select("segments.*"). 243 - Where("start_time > ?", thirtySecondsAgo.UTC()). 244 - Order("start_time DESC"). 245 - Find(&segments).Error 246 - if err != nil { 247 - return nil, err 248 - } 249 - if segments == nil { 250 - return []Segment{}, nil 251 - } 252 - 253 - segmentMap := make(map[string]Segment) 254 - for _, seg := range segments { 255 - prev, ok := segmentMap[seg.RepoDID] 256 - if !ok { 257 - segmentMap[seg.RepoDID] = seg 258 - } else { 259 - if seg.StartTime.After(prev.StartTime) { 260 - segmentMap[seg.RepoDID] = seg 261 - } 262 - } 263 - } 264 - 265 - filteredSegments := []Segment{} 266 - for _, seg := range segmentMap { 267 - filteredSegments = append(filteredSegments, seg) 268 - } 269 - 270 - return filteredSegments, nil 271 - } 272 - 273 - func (m *DBModel) LatestSegmentForUser(user string) (*Segment, error) { 274 - var seg Segment 275 - err := m.DB.Model(Segment{}).Where("repo_did = ?", user).Order("start_time DESC").First(&seg).Error 276 - if err != nil { 277 - return nil, err 278 - } 279 - return &seg, nil 280 - } 281 - 282 - func (m *DBModel) FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) { 283 - if len(repoDIDs) == 0 { 284 - return []string{}, nil 285 - } 286 - 287 - thirtySecondsAgo := time.Now().Add(-30 * time.Second) 288 - 289 - var liveDIDs []string 290 - 291 - err := m.DB.Table("segments"). 292 - Select("DISTINCT repo_did"). 293 - Where("repo_did IN ? AND start_time > ?", repoDIDs, thirtySecondsAgo.UTC()). 294 - Pluck("repo_did", &liveDIDs).Error 295 - 296 - if err != nil { 297 - return nil, err 298 - } 299 - 300 - return liveDIDs, nil 301 - } 302 - 303 - func (m *DBModel) LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) { 304 - var segs []Segment 305 - if before == nil { 306 - later := time.Now().Add(1000 * time.Hour) 307 - before = &later 308 - } 309 - if after == nil { 310 - earlier := time.Time{} 311 - after = &earlier 312 - } 313 - err := m.DB.Model(Segment{}).Where("repo_did = ? AND start_time < ? AND start_time > ?", user, before.UTC(), after.UTC()).Order("start_time DESC").Limit(limit).Find(&segs).Error 314 - if err != nil { 315 - return nil, err 316 - } 317 - return segs, nil 318 - } 319 - 320 - func (m *DBModel) GetSegment(id string) (*Segment, error) { 321 - var seg Segment 322 - 323 - err := m.DB.Model(&Segment{}). 324 - Preload("Repo"). 325 - Where("id = ?", id). 326 - First(&seg).Error 327 - 328 - if errors.Is(err, gorm.ErrRecordNotFound) { 329 - return nil, nil 330 - } 331 - if err != nil { 332 - return nil, err 333 - } 334 - 335 - return &seg, nil 336 - } 337 - 338 - func (m *DBModel) GetExpiredSegments(ctx context.Context) ([]Segment, error) { 339 - 340 - var expiredSegments []Segment 341 - now := time.Now() 342 - err := m.DB. 343 - Where("delete_after IS NOT NULL AND delete_after < ?", now.UTC()). 344 - Find(&expiredSegments).Error 345 - if err != nil { 346 - return nil, err 347 - } 348 - 349 - return expiredSegments, nil 350 - } 351 - 352 - func (m *DBModel) DeleteSegment(ctx context.Context, id string) error { 353 - return m.DB.Delete(&Segment{}, "id = ?", id).Error 354 - } 355 - 356 - func (m *DBModel) StartSegmentCleaner(ctx context.Context) error { 357 - err := m.SegmentCleaner(ctx) 358 - if err != nil { 359 - return err 360 - } 361 - ticker := time.NewTicker(1 * time.Minute) 362 - defer ticker.Stop() 363 - 364 - for { 365 - select { 366 - case <-ctx.Done(): 367 - return nil 368 - case <-ticker.C: 369 - err := m.SegmentCleaner(ctx) 370 - if err != nil { 371 - log.Error(ctx, "Failed to clean segments", "error", err) 372 - } 373 - } 374 - } 375 - } 376 - 377 - func (m *DBModel) SegmentCleaner(ctx context.Context) error { 378 - // Calculate the cutoff time (10 minutes ago) 379 - cutoffTime := aqtime.FromTime(time.Now().Add(-10 * time.Minute)).Time() 380 - 381 - // Find all unique repo_did values 382 - var repoDIDs []string 383 - if err := m.DB.Model(&Segment{}).Distinct("repo_did").Pluck("repo_did", &repoDIDs).Error; err != nil { 384 - log.Error(ctx, "Failed to get unique repo_dids for segment cleaning", "error", err) 385 - return err 386 - } 387 - 388 - // For each user, keep their last 10 segments and delete older ones 389 - for _, repoDID := range repoDIDs { 390 - // Get IDs of the last 10 segments for this user 391 - var keepSegmentIDs []string 392 - if err := m.DB.Model(&Segment{}). 393 - Where("repo_did = ?", repoDID). 394 - Order("start_time DESC"). 395 - Limit(10). 396 - Pluck("id", &keepSegmentIDs).Error; err != nil { 397 - log.Error(ctx, "Failed to get segment IDs to keep", "repo_did", repoDID, "error", err) 398 - return err 399 - } 400 - 401 - // Delete old segments except the ones we want to keep 402 - result := m.DB.Where("repo_did = ? AND start_time < ? AND id NOT IN ?", 403 - repoDID, cutoffTime, keepSegmentIDs).Delete(&Segment{}) 404 - 405 - if result.Error != nil { 406 - log.Error(ctx, "Failed to clean old segments", "repo_did", repoDID, "error", result.Error) 407 - } else if result.RowsAffected > 0 { 408 - log.Log(ctx, "Cleaned old segments", "repo_did", repoDID, "count", result.RowsAffected) 409 - } 410 - } 411 - return nil 412 - }
··· 1 package model
-65
pkg/model/segment_test.go
··· 1 package model 2 - 3 - import ( 4 - "fmt" 5 - "sync" 6 - "testing" 7 - "time" 8 - 9 - "github.com/stretchr/testify/require" 10 - "stream.place/streamplace/pkg/config" 11 - ) 12 - 13 - func TestSegmentPerf(t *testing.T) { 14 - config.DisableSQLLogging() 15 - // dburl := filepath.Join(t.TempDir(), "test.db") 16 - db, err := MakeDB(":memory:") 17 - require.NoError(t, err) 18 - // Create a model instance 19 - model := db.(*DBModel) 20 - t.Cleanup(func() { 21 - // os.Remove(dburl) 22 - }) 23 - 24 - // Create a repo for testing 25 - repo := &Repo{ 26 - DID: "did:plc:test123", 27 - } 28 - err = model.DB.Create(repo).Error 29 - require.NoError(t, err) 30 - 31 - defer config.EnableSQLLogging() 32 - // Create 250000 segments with timestamps 1 hour ago, each one second apart 33 - wg := sync.WaitGroup{} 34 - segCount := 250000 35 - wg.Add(segCount) 36 - baseTime := time.Now() 37 - for i := 0; i < segCount; i++ { 38 - segment := &Segment{ 39 - ID: fmt.Sprintf("segment-%d", i), 40 - RepoDID: repo.DID, 41 - StartTime: baseTime.Add(-time.Duration(i) * time.Second).UTC(), 42 - } 43 - go func() { 44 - defer wg.Done() 45 - err = model.DB.Create(segment).Error 46 - require.NoError(t, err) 47 - }() 48 - } 49 - wg.Wait() 50 - 51 - startTime := time.Now() 52 - wg = sync.WaitGroup{} 53 - runs := 1000 54 - wg.Add(runs) 55 - for i := 0; i < runs; i++ { 56 - go func() { 57 - defer wg.Done() 58 - _, err := model.MostRecentSegments() 59 - require.NoError(t, err) 60 - // require.Len(t, segments, 1) 61 - }() 62 - } 63 - wg.Wait() 64 - fmt.Printf("Time taken: %s\n", time.Since(startTime)) 65 - require.Less(t, time.Since(startTime), 10*time.Second) 66 - }
··· 1 package model
-59
pkg/model/thumbnail.go
··· 1 package model 2 - 3 - import ( 4 - "fmt" 5 - 6 - "github.com/google/uuid" 7 - ) 8 - 9 - type Thumbnail struct { 10 - ID string `json:"id" gorm:"primaryKey"` 11 - Format string `json:"format"` 12 - SegmentID string `json:"segmentId" gorm:"index"` 13 - Segment Segment `json:"segment,omitempty" gorm:"foreignKey:SegmentID;references:id"` 14 - } 15 - 16 - func (m *DBModel) CreateThumbnail(thumb *Thumbnail) error { 17 - uu, err := uuid.NewV7() 18 - if err != nil { 19 - return err 20 - } 21 - if thumb.SegmentID == "" { 22 - return fmt.Errorf("segmentID is required") 23 - } 24 - thumb.ID = uu.String() 25 - err = m.DB.Model(Thumbnail{}).Create(thumb).Error 26 - if err != nil { 27 - return err 28 - } 29 - return nil 30 - } 31 - 32 - // return the most recent thumbnail for a user 33 - func (m *DBModel) LatestThumbnailForUser(user string) (*Thumbnail, error) { 34 - var thumbnail Thumbnail 35 - 36 - res := m.DB.Table("thumbnails AS t"). 37 - Select("t.*"). 38 - Joins("JOIN segments AS s ON t.segment_id = s.id"). 39 - Where("s.repo_did = ?", user). 40 - Order("s.start_time DESC"). 41 - Limit(1). 42 - Scan(&thumbnail) 43 - 44 - if res.RowsAffected == 0 { 45 - return nil, nil 46 - } 47 - if res.Error != nil { 48 - return nil, res.Error 49 - } 50 - 51 - var seg Segment 52 - err := m.DB.First(&seg, "id = ?", thumbnail.SegmentID).Error 53 - if err != nil { 54 - return nil, fmt.Errorf("could not find segment for thumbnail SegmentID=%s", thumbnail.SegmentID) 55 - } 56 - 57 - thumbnail.Segment = seg 58 - 59 - return &thumbnail, nil 60 - }
··· 1 package model
+1 -1
pkg/spxrpc/app_bsky_feed.go
··· 56 outCursor = fmt.Sprintf("%d::%s", ts, last.CID) 57 } 58 } else if name == FeedLiveStreams { 59 - segs, err := s.model.MostRecentSegments() 60 if err != nil { 61 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get recent segments: %v", err)) 62 }
··· 56 outCursor = fmt.Sprintf("%d::%s", ts, last.CID) 57 } 58 } else if name == FeedLiveStreams { 59 + segs, err := s.localDB.MostRecentSegments() 60 if err != nil { 61 return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get recent segments: %v", err)) 62 }
+4 -4
pkg/spxrpc/com_atproto_moderation.go
··· 13 "github.com/labstack/echo/v4" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/media" 18 - "stream.place/streamplace/pkg/model" 19 ) 20 21 func (s *Server) handleComAtprotoModerationCreateReport(ctx context.Context, body *comatprototypes.ModerationCreateReport_Input) (*comatprototypes.ModerationCreateReport_Output, error) { ··· 76 return nil, echo.NewHTTPError(http.StatusBadRequest, "invalid subject") 77 } 78 79 - clipID, err := makeClip(ctx, s.cli, s.model, did) 80 if err != nil { 81 // we still want the report to go through! 82 log.Error(ctx, "failed to make clip for report", "error", err) ··· 99 return &output, nil 100 } 101 102 - func makeClip(ctx context.Context, cli *config.CLI, mod model.Model, did string) (string, error) { 103 after := time.Now().Add(-time.Duration(60) * time.Second) 104 105 uu, err := uuid.NewV7() ··· 113 } 114 defer fd.Close() 115 116 - err = media.ClipUser(ctx, mod, cli, did, fd, nil, &after) 117 if err != nil { 118 return "", echo.NewHTTPError(http.StatusInternalServerError, "failed to clip user") 119 }
··· 13 "github.com/labstack/echo/v4" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "stream.place/streamplace/pkg/config" 16 + "stream.place/streamplace/pkg/localdb" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/media" 19 ) 20 21 func (s *Server) handleComAtprotoModerationCreateReport(ctx context.Context, body *comatprototypes.ModerationCreateReport_Input) (*comatprototypes.ModerationCreateReport_Output, error) { ··· 76 return nil, echo.NewHTTPError(http.StatusBadRequest, "invalid subject") 77 } 78 79 + clipID, err := makeClip(ctx, s.cli, s.localDB, did) 80 if err != nil { 81 // we still want the report to go through! 82 log.Error(ctx, "failed to make clip for report", "error", err) ··· 99 return &output, nil 100 } 101 102 + func makeClip(ctx context.Context, cli *config.CLI, localDB localdb.LocalDB, did string) (string, error) { 103 after := time.Now().Add(-time.Duration(60) * time.Second) 104 105 uu, err := uuid.NewV7() ··· 113 } 114 defer fd.Close() 115 116 + err = media.ClipUser(ctx, localDB, cli, did, fd, nil, &after) 117 if err != nil { 118 return "", echo.NewHTTPError(http.StatusInternalServerError, "failed to clip user") 119 }
+4 -4
pkg/spxrpc/place_stream_live.go
··· 82 beforeTime = &parsedTime 83 } 84 85 - segments, err := s.model.LatestSegmentsForUser(userDID, limit, beforeTime, nil) 86 if err != nil { 87 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments") 88 } ··· 223 } 224 225 // Filter for only live streamers 226 - liveStreamers, err := s.model.FilterLiveRepoDIDs(streamers) 227 if err != nil { 228 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter live streamers") 229 } ··· 256 followDIDs[i] = follow.SubjectDID 257 } 258 259 - liveFollows, err := s.model.FilterLiveRepoDIDs(followDIDs) 260 if err != nil { 261 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter live follows") 262 } ··· 281 // Final fallback: use host's default recommendations 282 defaultStreamers := s.cli.DefaultRecommendedStreamers 283 if len(defaultStreamers) > 0 { 284 - liveDefaults, err := s.model.FilterLiveRepoDIDs(defaultStreamers) 285 if err != nil { 286 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter default streamers") 287 }
··· 82 beforeTime = &parsedTime 83 } 84 85 + segments, err := s.localDB.LatestSegmentsForUser(userDID, limit, beforeTime, nil) 86 if err != nil { 87 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to fetch segments") 88 } ··· 223 } 224 225 // Filter for only live streamers 226 + liveStreamers, err := s.localDB.FilterLiveRepoDIDs(streamers) 227 if err != nil { 228 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter live streamers") 229 } ··· 256 followDIDs[i] = follow.SubjectDID 257 } 258 259 + liveFollows, err := s.localDB.FilterLiveRepoDIDs(followDIDs) 260 if err != nil { 261 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter live follows") 262 } ··· 281 // Final fallback: use host's default recommendations 282 defaultStreamers := s.cli.DefaultRecommendedStreamers 283 if len(defaultStreamers) > 0 { 284 + liveDefaults, err := s.localDB.FilterLiveRepoDIDs(defaultStreamers) 285 if err != nil { 286 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to filter default streamers") 287 }
+4 -1
pkg/spxrpc/spxrpc.go
··· 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 "stream.place/streamplace/pkg/log" 22 "stream.place/streamplace/pkg/model" 23 "stream.place/streamplace/pkg/statedb" ··· 33 statefulDB *statedb.StatefulDB 34 bus *bus.Bus 35 op *oatproxy.OATProxy 36 } 37 38 - func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus) (*Server, error) { 39 e := echo.New() 40 s := &Server{ 41 e: e, ··· 47 statefulDB: statefulDB, 48 bus: bus, 49 op: op, 50 } 51 e.Use(s.ErrorHandlingMiddleware()) 52 e.Use(s.ContextPreservingMiddleware())
··· 18 "stream.place/streamplace/pkg/atproto" 19 "stream.place/streamplace/pkg/bus" 20 "stream.place/streamplace/pkg/config" 21 + "stream.place/streamplace/pkg/localdb" 22 "stream.place/streamplace/pkg/log" 23 "stream.place/streamplace/pkg/model" 24 "stream.place/streamplace/pkg/statedb" ··· 34 statefulDB *statedb.StatefulDB 35 bus *bus.Bus 36 op *oatproxy.OATProxy 37 + localDB localdb.LocalDB 38 } 39 40 + func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus, ldb localdb.LocalDB) (*Server, error) { 41 e := echo.New() 42 s := &Server{ 43 e: e, ··· 49 statefulDB: statefulDB, 50 bus: bus, 51 op: op, 52 + localDB: ldb, 53 } 54 e.Use(s.ErrorHandlingMiddleware()) 55 e.Use(s.ContextPreservingMiddleware())
+6 -6
pkg/storage/storage.go
··· 10 "golang.org/x/sync/errgroup" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 "stream.place/streamplace/pkg/log" 14 - "stream.place/streamplace/pkg/model" 15 ) 16 17 const moderationRetention = 120 * time.Second 18 19 - func StartSegmentCleaner(ctx context.Context, mod model.Model, cli *config.CLI) error { 20 ctx = log.WithLogValues(ctx, "func", "StartSegmentCleaner") 21 g, ctx := errgroup.WithContext(ctx) 22 g.Go(func() error { ··· 25 case <-ctx.Done(): 26 return nil 27 case <-time.After(60 * time.Second): 28 - expiredSegments, err := mod.GetExpiredSegments(ctx) 29 if err != nil { 30 return err 31 } 32 log.Log(ctx, "Cleaning expired segments", "count", len(expiredSegments)) 33 for _, seg := range expiredSegments { 34 g.Go(func() error { 35 - err := deleteSegment(ctx, mod, cli, seg) 36 if err != nil { 37 log.Error(ctx, "Failed to delete segment", "error", err) 38 } ··· 47 return g.Wait() 48 } 49 50 - func deleteSegment(ctx context.Context, mod model.Model, cli *config.CLI, seg model.Segment) error { 51 if time.Since(seg.StartTime) < moderationRetention { 52 log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 53 return nil ··· 61 if err != nil && !errors.Is(err, os.ErrNotExist) { 62 return err 63 } 64 - err = mod.DeleteSegment(ctx, seg.ID) 65 if err != nil { 66 return err 67 }
··· 10 "golang.org/x/sync/errgroup" 11 "stream.place/streamplace/pkg/aqtime" 12 "stream.place/streamplace/pkg/config" 13 + "stream.place/streamplace/pkg/localdb" 14 "stream.place/streamplace/pkg/log" 15 ) 16 17 const moderationRetention = 120 * time.Second 18 19 + func StartSegmentCleaner(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI) error { 20 ctx = log.WithLogValues(ctx, "func", "StartSegmentCleaner") 21 g, ctx := errgroup.WithContext(ctx) 22 g.Go(func() error { ··· 25 case <-ctx.Done(): 26 return nil 27 case <-time.After(60 * time.Second): 28 + expiredSegments, err := localDB.GetExpiredSegments(ctx) 29 if err != nil { 30 return err 31 } 32 log.Log(ctx, "Cleaning expired segments", "count", len(expiredSegments)) 33 for _, seg := range expiredSegments { 34 g.Go(func() error { 35 + err := deleteSegment(ctx, localDB, cli, seg) 36 if err != nil { 37 log.Error(ctx, "Failed to delete segment", "error", err) 38 } ··· 47 return g.Wait() 48 } 49 50 + func deleteSegment(ctx context.Context, localDB localdb.LocalDB, cli *config.CLI, seg localdb.Segment) error { 51 if time.Since(seg.StartTime) < moderationRetention { 52 log.Debug(ctx, "Skipping deletion of segment", "id", seg.ID, "time since start", time.Since(seg.StartTime)) 53 return nil ··· 61 if err != nil && !errors.Is(err, os.ErrNotExist) { 62 return err 63 } 64 + err = localDB.DeleteSegment(ctx, seg.ID) 65 if err != nil { 66 return err 67 }