Live video on the AT Protocol

statedb: migrate notification tokens

authored by

Eli Mallon and committed by
Eli Mallon
89e58ff3 4f6cfa19

+50 -47
+1 -1
pkg/api/api.go
··· 476 476 w.WriteHeader(400) 477 477 return 478 478 } 479 - err = a.Model.CreateNotification(n.Token, n.RepoDID) 479 + err = a.StatefulDB.CreateNotification(n.Token, n.RepoDID) 480 480 if err != nil { 481 481 log.Log(ctx, "error creating notification", "error", err) 482 482 w.WriteHeader(400)
+2 -2
pkg/api/api_internal.go
··· 379 379 }) 380 380 381 381 router.GET("/notifications", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { 382 - notifications, err := a.Model.ListNotifications() 382 + notifications, err := a.StatefulDB.ListNotifications() 383 383 if err != nil { 384 384 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 385 385 return ··· 458 458 errors.WriteHTTPBadRequest(w, "invalid request body", err) 459 459 return 460 460 } 461 - notifications, err := a.Model.ListNotifications() 461 + notifications, err := a.StatefulDB.ListNotifications() 462 462 if err != nil { 463 463 errors.WriteHTTPInternalServerError(w, "unable to get notifications", err) 464 464 return
+8 -6
pkg/atproto/firehose.go
··· 25 25 "stream.place/streamplace/pkg/log" 26 26 "stream.place/streamplace/pkg/model" 27 27 notificationpkg "stream.place/streamplace/pkg/notifications" 28 + "stream.place/streamplace/pkg/statedb" 28 29 29 30 "slices" 30 31 ··· 32 33 ) 33 34 34 35 type ATProtoSynchronizer struct { 35 - CLI *config.CLI 36 - Model model.Model 37 - LastSeen time.Time 38 - LastEvent time.Time 39 - Noter notificationpkg.FirebaseNotifier 40 - Bus *bus.Bus 36 + CLI *config.CLI 37 + Model model.Model 38 + StatefulDB *statedb.StatefulDB 39 + LastSeen time.Time 40 + LastEvent time.Time 41 + Noter notificationpkg.FirebaseNotifier 42 + Bus *bus.Bus 41 43 } 42 44 43 45 func (atsync *ATProtoSynchronizer) StartFirehose(ctx context.Context) error {
+1 -1
pkg/atproto/sync.go
··· 344 344 345 345 if !isUpdate && !isFirstSync { 346 346 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", u, "createdAt", rec.CreatedAt, "repo", userDID) 347 - notifications, err := atsync.Model.GetFollowersNotificationTokens(userDID) 347 + notifications, err := atsync.StatefulDB.GetFollowersNotificationTokens(userDID) 348 348 if err != nil { 349 349 return err 350 350 }
+5 -4
pkg/cmd/streamplace.go
··· 341 341 342 342 b := bus.NewBus() 343 343 atsync := &atproto.ATProtoSynchronizer{ 344 - CLI: &cli, 345 - Model: mod, 346 - Noter: noter, 347 - Bus: b, 344 + CLI: &cli, 345 + Model: mod, 346 + StatefulDB: statefulDB, 347 + Noter: noter, 348 + Bus: b, 348 349 } 349 350 mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 350 351 if err != nil {
+4 -3
pkg/media/media_test.go
··· 42 42 AllowedStreams: []string{"did:key:zQ3shhoPCrDZWE8CryCEHYCrb1x8mCkr2byTkF5EGJT7dgazC"}, 43 43 }) 44 44 atsync := &atproto.ATProtoSynchronizer{ 45 - CLI: cli, 46 - Model: mod, 47 - Bus: bus.NewBus(), 45 + CLI: cli, 46 + Model: mod, 47 + StatefulDB: nil, // Test doesn't need StatefulDB for now 48 + Bus: bus.NewBus(), 48 49 } 49 50 mm, err := MakeMediaManager(context.Background(), cli, signer, &boring.BoringReplicator{}, mod, bus.NewBus(), atsync) 50 51 require.NoError(t, err)
-5
pkg/model/model.go
··· 25 25 } 26 26 27 27 type Model interface { 28 - CreateNotification(token, repoDID string) error 29 - ListNotifications() ([]Notification, error) 30 - 31 28 CreatePlayerEvent(event PlayerEventAPI) error 32 29 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 33 30 PlayerReport(playerID string) (map[string]any, error) ··· 62 59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 63 60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 64 61 DeleteFollow(ctx context.Context, userDID, rev string) error 65 - GetFollowersNotificationTokens(userDID string) ([]string, error) 66 62 67 63 CreateFeedPost(ctx context.Context, post *FeedPost) error 68 64 ListFeedPosts() ([]FeedPost, error) ··· 152 148 } 153 149 sqlDB.SetMaxOpenConns(1) 154 150 for _, model := range []any{ 155 - Notification{}, 156 151 PlayerEvent{}, 157 152 Segment{}, 158 153 Thumbnail{},
+11 -10
pkg/model/notification.go pkg/statedb/notification.go
··· 1 - package model 1 + package statedb 2 2 3 3 import ( 4 4 "fmt" ··· 15 15 DeletedAt gorm.DeletedAt `gorm:"index"` 16 16 } 17 17 18 - func (m *DBModel) CreateNotification(token string, repoDID string) error { 18 + func (db *StatefulDB) CreateNotification(token string, repoDID string) error { 19 19 not := Notification{ 20 20 Token: token, 21 21 } 22 22 if repoDID != "" { 23 23 not.RepoDID = repoDID 24 24 } 25 - err := m.DB.Save(&not).Error 25 + err := db.DB.Save(&not).Error 26 26 if err != nil { 27 27 return err 28 28 } 29 29 return nil 30 30 } 31 31 32 - func (m *DBModel) ListNotifications() ([]Notification, error) { 32 + func (db *StatefulDB) ListNotifications() ([]Notification, error) { 33 33 nots := []Notification{} 34 - err := m.DB.Find(&nots).Error 34 + err := db.DB.Find(&nots).Error 35 35 if err != nil { 36 36 return nil, fmt.Errorf("error retrieving notifications: %w", err) 37 37 } 38 38 return nots, nil 39 39 } 40 40 41 - func (m *DBModel) ListUserNotifications(userDID string) ([]Notification, error) { 41 + func (db *StatefulDB) ListUserNotifications(userDID string) ([]Notification, error) { 42 42 nots := []Notification{} 43 - err := m.DB.Where("repo_did = ?", userDID).Find(&nots).Error 43 + err := db.DB.Where("repo_did = ?", userDID).Find(&nots).Error 44 44 if err != nil { 45 45 return nil, fmt.Errorf("error retrieving notifications: %w", err) 46 46 } 47 47 return nots, nil 48 48 } 49 49 50 - func (m *DBModel) GetFollowersNotificationTokens(userDID string) ([]string, error) { 50 + // todo fixme we don't have followers in this database 51 + func (db *StatefulDB) GetFollowersNotificationTokens(userDID string) ([]string, error) { 51 52 var tokens []string 52 53 53 - err := m.DB.Model(&Notification{}). 54 + err := db.DB.Model(&Notification{}). 54 55 Distinct("notifications.token"). 55 56 Joins("JOIN follows ON follows.user_did = notifications.repo_did"). 56 57 Where("follows.subject_did = ?", userDID). ··· 62 63 } 63 64 64 65 // also you prolly wanna get one for yourself 65 - nots, err := m.ListUserNotifications(userDID) 66 + nots, err := db.ListUserNotifications(userDID) 66 67 if err != nil { 67 68 return nil, fmt.Errorf("error retrieving user notifications: %w", err) 68 69 }
+17 -15
pkg/resync/resync.go
··· 31 31 } 32 32 33 33 atsync := &atproto.ATProtoSynchronizer{ 34 - CLI: cli, 35 - Model: newMod, 36 - Noter: nil, 37 - Bus: bus.NewBus(), 34 + CLI: cli, 35 + Model: newMod, 36 + StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready 37 + Noter: nil, 38 + Bus: bus.NewBus(), 38 39 } 39 40 40 41 doneMap := make(map[string]bool) ··· 93 94 // } 94 95 // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 95 96 96 - notificationTokens, err := oldMod.ListNotifications() 97 - if err != nil { 98 - return err 99 - } 100 - for _, token := range notificationTokens { 101 - err := newMod.CreateNotification(token.Token, token.RepoDID) 102 - if err != nil { 103 - return fmt.Errorf("failed to create notification: %w", err) 104 - } 105 - } 106 - log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 97 + // TODO: Update notification migration to use new statefulDB 98 + // notificationTokens, err := oldMod.ListNotifications() 99 + // if err != nil { 100 + // return err 101 + // } 102 + // for _, token := range notificationTokens { 103 + // err := newMod.CreateNotification(token.Token, token.RepoDID) 104 + // if err != nil { 105 + // return fmt.Errorf("failed to create notification: %w", err) 106 + // } 107 + // } 108 + // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 107 109 108 110 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 109 111
+1
pkg/statedb/statedb.go
··· 70 70 } 71 71 for _, model := range []any{ 72 72 oatproxy.OAuthSession{}, 73 + Notification{}, 73 74 } { 74 75 err = db.AutoMigrate(model) 75 76 if err != nil {