Live video on the AT Protocol

statedb: init

authored by

Eli Mallon and committed by
Eli Mallon
4f6cfa19 781146f4

+229 -83
+4 -1
pkg/api/api.go
··· 41 41 "stream.place/streamplace/pkg/notifications" 42 42 "stream.place/streamplace/pkg/spmetrics" 43 43 "stream.place/streamplace/pkg/spxrpc" 44 + "stream.place/streamplace/pkg/statedb" 44 45 "stream.place/streamplace/pkg/streamplace" 45 46 46 47 metrics "github.com/slok/go-http-metrics/metrics/prometheus" ··· 53 54 type StreamplaceAPI struct { 54 55 CLI *config.CLI 55 56 Model model.Model 57 + StatefulDB *statedb.StatefulDB 56 58 Updater *Updater 57 59 Signer *eip712.EIP712Signer 58 60 Mimes map[string]string ··· 80 82 mu sync.RWMutex 81 83 } 82 84 83 - func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 85 + func MakeStreamplaceAPI(cli *config.CLI, mod model.Model, statefulDB *statedb.StatefulDB, signer *eip712.EIP712Signer, noter notifications.FirebaseNotifier, mm *media.MediaManager, ms media.MediaSigner, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer, d *director.Director, op *oatproxy.OATProxy) (*StreamplaceAPI, error) { 84 86 updater, err := PrepareUpdater(cli) 85 87 if err != nil { 86 88 return nil, err 87 89 } 88 90 a := &StreamplaceAPI{CLI: cli, 89 91 Model: mod, 92 + StatefulDB: statefulDB, 90 93 Updater: updater, 91 94 Signer: signer, 92 95 FirebaseNotifier: noter,
+1 -1
pkg/api/api_internal.go
··· 437 437 }) 438 438 439 439 router.GET("/oauth-sessions", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 440 - sessions, err := a.Model.ListOAuthSessions() 440 + sessions, err := a.StatefulDB.ListOAuthSessions() 441 441 if err != nil { 442 442 errors.WriteHTTPInternalServerError(w, "unable to get oauth sessions", err) 443 443 return
+12 -6
pkg/cmd/streamplace.go
··· 35 35 "stream.place/streamplace/pkg/rtmps" 36 36 v0 "stream.place/streamplace/pkg/schema/v0" 37 37 "stream.place/streamplace/pkg/spmetrics" 38 + "stream.place/streamplace/pkg/statedb" 38 39 39 40 "github.com/ThalesGroup/crypto11" 40 41 _ "github.com/go-gst/go-glib/glib" ··· 302 303 signer = hwsigner 303 304 } 304 305 var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 305 - mod, err := model.MakeDB(cli.DBPath) 306 + 307 + mod, err := model.MakeDB(cli.IndexDBPath) 308 + if err != nil { 309 + return err 310 + } 311 + statefulDB, err := statedb.MakeDB(&cli) 306 312 if err != nil { 307 313 return err 308 314 } ··· 361 367 362 368 op := oatproxy.New(&oatproxy.Config{ 363 369 Host: cli.PublicHost, 364 - CreateOAuthSession: mod.CreateOAuthSession, 365 - UpdateOAuthSession: mod.UpdateOAuthSession, 366 - GetOAuthSession: mod.LoadOAuthSession, 370 + CreateOAuthSession: statefulDB.CreateOAuthSession, 371 + UpdateOAuthSession: statefulDB.UpdateOAuthSession, 372 + GetOAuthSession: statefulDB.LoadOAuthSession, 367 373 Scope: "atproto transition:generic", 368 374 UpstreamJWK: cli.JWK, 369 375 DownstreamJWK: cli.AccessJWK, 370 376 ClientMetadata: clientMetadata, 371 377 }) 372 - d := director.NewDirector(mm, mod, &cli, b, op) 373 - a, err := api.MakeStreamplaceAPI(&cli, mod, eip712signer, noter, mm, ms, b, atsync, d, op) 378 + d := director.NewDirector(mm, mod, &cli, b, op, statefulDB) 379 + a, err := api.MakeStreamplaceAPI(&cli, mod, statefulDB, eip712signer, noter, mm, ms, b, atsync, d, op) 374 380 if err != nil { 375 381 return err 376 382 }
+5 -2
pkg/config/config.go
··· 51 51 AdminAccount string 52 52 Build *BuildFlags 53 53 DataDir string 54 - DBPath string 54 + DBURL string 55 + IndexDBPath string 55 56 EthAccountAddr string 56 57 EthKeystorePath string 57 58 EthPassword string ··· 124 125 cli.DataDirFlag(fs, &cli.TLSCertPath, "tls-cert", filepath.Join("tls", "tls.crt"), "Path to TLS certificate") 125 126 cli.DataDirFlag(fs, &cli.TLSKeyPath, "tls-key", filepath.Join("tls", "tls.key"), "Path to TLS key") 126 127 fs.StringVar(&cli.SigningKeyPath, "signing-key", "", "Path to signing key for pushing OTA updates to the app") 127 - cli.DataDirFlag(fs, &cli.DBPath, "db-path", "db.sqlite", "path to sqlite database file") 128 + fs.StringVar(&cli.DBURL, "db-url", "sqlite://$SP_DATA_DIR/state.sqlite", "URL of the database to use for storing private streamplace state") 129 + cli.dataDirFlags = append(cli.dataDirFlags, &cli.DBURL) 130 + cli.DataDirFlag(fs, &cli.IndexDBPath, "index-db-path", "db.sqlite", "path to sqlite database file for maintaining atproto index") 128 131 fs.StringVar(&cli.AdminAccount, "admin-account", "", "ethereum account that administrates this streamplace node") 129 132 fs.StringVar(&cli.FirebaseServiceAccount, "firebase-service-account", "", "JSON string of a firebase service account key") 130 133 fs.StringVar(&cli.GitLabURL, "gitlab-url", "https://git.stream.place/api/v4/projects/1", "gitlab url for generating download links")
+5 -1
pkg/director/director.go
··· 12 12 "stream.place/streamplace/pkg/log" 13 13 "stream.place/streamplace/pkg/media" 14 14 "stream.place/streamplace/pkg/model" 15 + "stream.place/streamplace/pkg/statedb" 15 16 ) 16 17 17 18 // director is responsible for managing the lifecycle of a stream, making business ··· 28 29 streamSessions map[string]*StreamSession 29 30 streamSessionsMu sync.Mutex 30 31 op *oatproxy.OATProxy 32 + statefulDB *statedb.StatefulDB 31 33 } 32 34 33 - func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy) *Director { 35 + func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director { 34 36 return &Director{ 35 37 mm: mm, 36 38 mod: mod, ··· 39 41 streamSessions: make(map[string]*StreamSession), 40 42 streamSessionsMu: sync.Mutex{}, 41 43 op: op, 44 + statefulDB: statefulDB, 42 45 } 43 46 } 44 47 ··· 68 71 op: d.op, 69 72 packets: make([]bus.PacketizedSegment, 0), 70 73 started: make(chan struct{}), 74 + statefulDB: d.statefulDB, 71 75 } 72 76 d.streamSessions[not.Segment.RepoDID] = ss 73 77 g.Go(func() error {
+3 -1
pkg/director/stream_session.go
··· 22 22 "stream.place/streamplace/pkg/model" 23 23 "stream.place/streamplace/pkg/renditions" 24 24 "stream.place/streamplace/pkg/spmetrics" 25 + "stream.place/streamplace/pkg/statedb" 25 26 "stream.place/streamplace/pkg/streamplace" 26 27 "stream.place/streamplace/pkg/thumbnail" 27 28 ) ··· 42 43 started chan struct{} 43 44 ctx context.Context 44 45 packets []bus.PacketizedSegment 46 + statefulDB *statedb.StatefulDB 45 47 } 46 48 47 49 func (ss *StreamSession) Start(ctx context.Context, not *media.NewSegmentNotification) error { ··· 252 254 return nil 253 255 } 254 256 255 - session, err := ss.mod.GetSessionByDID(repoDID) 257 + session, err := ss.statefulDB.GetSessionByDID(repoDID) 256 258 if err != nil { 257 259 return fmt.Errorf("could not get OAuth session for repoDID: %w", err) 258 260 }
-8
pkg/model/model.go
··· 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 13 "github.com/lmittmann/tint" 14 14 slogGorm "github.com/orandin/slog-gorm" 15 - "github.com/streamplace/oatproxy/pkg/oatproxy" 16 15 "gorm.io/driver/sqlite" 17 16 "gorm.io/gorm" 18 17 "stream.place/streamplace/pkg/config" ··· 93 92 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 94 93 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 95 94 96 - CreateOAuthSession(id string, session *oatproxy.OAuthSession) error 97 - LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) 98 - UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error 99 - ListOAuthSessions() ([]oatproxy.OAuthSession, error) 100 - GetSessionByDID(did string) (*oatproxy.OAuthSession, error) 101 - 102 95 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 103 96 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 104 97 DeleteServerSettings(ctx context.Context, server string, repoDID string) error ··· 173 166 ChatMessage{}, 174 167 ChatProfile{}, 175 168 Gate{}, 176 - oatproxy.OAuthSession{}, 177 169 ServerSettings{}, 178 170 XrpcStreamEvent{}, 179 171 Labeler{},
-50
pkg/model/oauth_session.go
··· 1 - package model 2 - 3 - import ( 4 - "errors" 5 - 6 - "github.com/streamplace/oatproxy/pkg/oatproxy" 7 - "gorm.io/gorm" 8 - ) 9 - 10 - func (m *DBModel) CreateOAuthSession(id string, session *oatproxy.OAuthSession) error { 11 - return m.DB.Create(session).Error 12 - } 13 - 14 - func (m *DBModel) LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) { 15 - var session oatproxy.OAuthSession 16 - if err := m.DB.Where("downstream_dpop_jkt = ?", id).First(&session).Error; err != nil { 17 - if errors.Is(err, gorm.ErrRecordNotFound) { 18 - return nil, nil 19 - } 20 - return nil, err 21 - } 22 - return &session, nil 23 - } 24 - 25 - func (m *DBModel) UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error { 26 - res := m.DB.Model(&oatproxy.OAuthSession{}).Where("downstream_dpop_jkt = ?", id).Updates(session) 27 - if res.Error != nil { 28 - return res.Error 29 - } 30 - if res.RowsAffected == 0 { 31 - return errors.New("no rows affected") 32 - } 33 - return nil 34 - } 35 - 36 - func (m *DBModel) ListOAuthSessions() ([]oatproxy.OAuthSession, error) { 37 - var sessions []oatproxy.OAuthSession 38 - if err := m.DB.Find(&sessions).Error; err != nil { 39 - return nil, err 40 - } 41 - return sessions, nil 42 - } 43 - 44 - func (m *DBModel) GetSessionByDID(did string) (*oatproxy.OAuthSession, error) { 45 - var session oatproxy.OAuthSession 46 - if err := m.DB.Where("repo_did = ? AND revoked_at IS NULL", did).Order("updated_at DESC").First(&session).Error; err != nil { 47 - return nil, err 48 - } 49 - return &session, nil 50 - }
+14 -13
pkg/resync/resync.go
··· 16 16 // resync a fresh database from the PDSses, copying over the few pieces of local state 17 17 // that we have 18 18 func Resync(ctx context.Context, cli *config.CLI) error { 19 - oldMod, err := model.MakeDB(cli.DBPath) 19 + oldMod, err := model.MakeDB(cli.IndexDBPath) 20 20 if err != nil { 21 21 return err 22 22 } 23 - tempDBPath := cli.DBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 23 + tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 24 newMod, err := model.MakeDB(tempDBPath) 25 25 if err != nil { 26 26 return err ··· 80 80 return err 81 81 } 82 82 83 - oauthSessions, err := oldMod.ListOAuthSessions() 84 - if err != nil { 85 - return err 86 - } 87 - for _, session := range oauthSessions { 88 - err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 89 - if err != nil { 90 - return fmt.Errorf("failed to create oauth session: %w", err) 91 - } 92 - } 93 - log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 83 + // TODO: Update OAuth session migration to use new statefulDB 84 + // oauthSessions, err := oldMod.ListOAuthSessions() 85 + // if err != nil { 86 + // return err 87 + // } 88 + // for _, session := range oauthSessions { 89 + // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 90 + // if err != nil { 91 + // return fmt.Errorf("failed to create oauth session: %w", err) 92 + // } 93 + // } 94 + // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 94 95 95 96 notificationTokens, err := oldMod.ListNotifications() 96 97 if err != nil {
+50
pkg/statedb/oauth_session.go
··· 1 + package statedb 2 + 3 + import ( 4 + "errors" 5 + 6 + "github.com/streamplace/oatproxy/pkg/oatproxy" 7 + "gorm.io/gorm" 8 + ) 9 + 10 + func (db *StatefulDB) CreateOAuthSession(id string, session *oatproxy.OAuthSession) error { 11 + return db.DB.Create(session).Error 12 + } 13 + 14 + func (db *StatefulDB) LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) { 15 + var session oatproxy.OAuthSession 16 + if err := db.DB.Where("downstream_dpop_jkt = ?", id).First(&session).Error; err != nil { 17 + if errors.Is(err, gorm.ErrRecordNotFound) { 18 + return nil, nil 19 + } 20 + return nil, err 21 + } 22 + return &session, nil 23 + } 24 + 25 + func (db *StatefulDB) UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error { 26 + res := db.DB.Model(&oatproxy.OAuthSession{}).Where("downstream_dpop_jkt = ?", id).Updates(session) 27 + if res.Error != nil { 28 + return res.Error 29 + } 30 + if res.RowsAffected == 0 { 31 + return errors.New("no rows affected") 32 + } 33 + return nil 34 + } 35 + 36 + func (db *StatefulDB) ListOAuthSessions() ([]oatproxy.OAuthSession, error) { 37 + var sessions []oatproxy.OAuthSession 38 + if err := db.DB.Find(&sessions).Error; err != nil { 39 + return nil, err 40 + } 41 + return sessions, nil 42 + } 43 + 44 + func (db *StatefulDB) GetSessionByDID(did string) (*oatproxy.OAuthSession, error) { 45 + var session oatproxy.OAuthSession 46 + if err := db.DB.Where("repo_did = ? AND revoked_at IS NULL", did).Order("updated_at DESC").First(&session).Error; err != nil { 47 + return nil, err 48 + } 49 + return &session, nil 50 + }
+135
pkg/statedb/statedb.go
··· 1 + package statedb 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/url" 7 + "os" 8 + "strings" 9 + "time" 10 + 11 + "github.com/lmittmann/tint" 12 + slogGorm "github.com/orandin/slog-gorm" 13 + "github.com/streamplace/oatproxy/pkg/oatproxy" 14 + "gorm.io/driver/postgres" 15 + "gorm.io/driver/sqlite" 16 + "gorm.io/gorm" 17 + "stream.place/streamplace/pkg/config" 18 + "stream.place/streamplace/pkg/log" 19 + ) 20 + 21 + type StatefulDB struct { 22 + DB *gorm.DB 23 + CLI *config.CLI 24 + } 25 + 26 + var NoPostgresDatabaseCode = "3D000" 27 + 28 + // Stateful database for storing private streamplace state 29 + func MakeDB(cli *config.CLI) (*StatefulDB, error) { 30 + dbURL := cli.DBURL 31 + log.Log(context.Background(), "starting stateful database", "dbURL", redactDBURL(dbURL)) 32 + var dial gorm.Dialector 33 + isSQLite := false 34 + isPostgres := false 35 + if dbURL == ":memory:" { 36 + dial = sqlite.Open(":memory:") 37 + isSQLite = true 38 + } else if strings.HasPrefix(dbURL, "sqlite://") { 39 + dial = sqlite.Open(dbURL[len("sqlite://"):]) 40 + isSQLite = true 41 + } else if strings.HasPrefix(dbURL, "postgres://") { 42 + dial = postgres.Open(dbURL) 43 + isPostgres = true 44 + } else { 45 + return nil, fmt.Errorf("unsupported database URL (most start with sqlite:// or postgres://): %s", redactDBURL(dbURL)) 46 + } 47 + 48 + db, err := openDB(dial) 49 + 50 + if err != nil { 51 + if isPostgres && strings.Contains(err.Error(), NoPostgresDatabaseCode) { 52 + db, err = makePostgresDB(dbURL) 53 + if err != nil { 54 + return nil, fmt.Errorf("error creating streamplace database: %w", err) 55 + } 56 + } else { 57 + return nil, fmt.Errorf("error starting database: %w", err) 58 + } 59 + } 60 + if isSQLite { 61 + err = db.Exec("PRAGMA journal_mode=WAL;").Error 62 + if err != nil { 63 + return nil, fmt.Errorf("error setting journal mode: %w", err) 64 + } 65 + sqlDB, err := db.DB() 66 + if err != nil { 67 + return nil, fmt.Errorf("error getting database: %w", err) 68 + } 69 + sqlDB.SetMaxOpenConns(1) 70 + } 71 + for _, model := range []any{ 72 + oatproxy.OAuthSession{}, 73 + } { 74 + err = db.AutoMigrate(model) 75 + if err != nil { 76 + return nil, err 77 + } 78 + } 79 + return &StatefulDB{DB: db}, nil 80 + } 81 + 82 + func openDB(dial gorm.Dialector) (*gorm.DB, error) { 83 + gormLogger := slogGorm.New( 84 + slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 85 + TimeFormat: time.RFC3339, 86 + })), 87 + // slogGorm.WithTraceAll(), 88 + ) 89 + 90 + return gorm.Open(dial, &gorm.Config{ 91 + SkipDefaultTransaction: true, 92 + TranslateError: true, 93 + Logger: gormLogger, 94 + }) 95 + } 96 + 97 + // helper function for creating the requested postgres database 98 + func makePostgresDB(dbURL string) (*gorm.DB, error) { 99 + u, err := url.Parse(dbURL) 100 + if err != nil { 101 + return nil, err 102 + } 103 + dbName := strings.TrimPrefix(u.Path, "/") 104 + u.Path = "/postgres" 105 + 106 + rootDial := postgres.Open(u.String()) 107 + 108 + db, err := openDB(rootDial) 109 + if err != nil { 110 + return nil, err 111 + } 112 + 113 + // postgres doesn't support prepared statements for CREATE DATABASE. don't SQL inject yourself. 114 + err = db.Exec(fmt.Sprintf("CREATE DATABASE %s;", dbName)).Error 115 + if err != nil { 116 + return nil, err 117 + } 118 + 119 + log.Warn(context.Background(), "created postgres database", "dbName", dbName) 120 + 121 + realDial := postgres.Open(dbURL) 122 + 123 + return openDB(realDial) 124 + } 125 + 126 + func redactDBURL(dbURL string) string { 127 + u, err := url.Parse(dbURL) 128 + if err != nil { 129 + return "db url is malformed" 130 + } 131 + if u.User != nil { 132 + u.User = url.UserPassword(u.User.Username(), "redacted") 133 + } 134 + return u.String() 135 + }