Live video on the AT Protocol

queue: working follower notification blasts

+30 -30
+1 -1
pkg/atproto/lexicon_repo_test.go
··· 22 22 cli.DataDir = t.TempDir() 23 23 mod, err := model.MakeDB(":memory:") 24 24 require.NoError(t, err) 25 - state, err := statedb.MakeDB(&cli, nil) 25 + state, err := statedb.MakeDB(&cli, nil, mod) 26 26 require.NoError(t, err) 27 27 28 28 // creating a new repo
+1 -1
pkg/cmd/streamplace.go
··· 321 321 if err != nil { 322 322 return err 323 323 } 324 - state, err := statedb.MakeDB(&cli, noter) 324 + state, err := statedb.MakeDB(&cli, noter, mod) 325 325 if err != nil { 326 326 return err 327 327 }
+5 -20
pkg/statedb/notification.go
··· 44 44 return nots, nil 45 45 } 46 46 47 - // todo fixme we don't have followers in this database 48 - func (state *StatefulDB) GetFollowersNotificationTokens(userDID string) ([]string, error) { 49 - var tokens []string 50 - 47 + func (state *StatefulDB) GetManyNotificationTokens(userDIDs []string) ([]string, error) { 48 + tokens := []string{} 51 49 err := state.DB.Model(&Notification{}). 52 - Distinct("notifications.token"). 53 - Joins("JOIN follows ON follows.user_did = notifications.repo_did"). 54 - Where("follows.subject_did = ?", userDID). 55 - Pluck("notifications.token", &tokens). 50 + Where("repo_did IN (?)", userDIDs). 51 + Pluck("token", &tokens). 56 52 Error 57 - 58 53 if err != nil { 59 - return nil, fmt.Errorf("error retrieving follower notification tokens: %w", err) 60 - } 61 - 62 - // also you prolly wanna get one for yourself 63 - nots, err := state.ListUserNotifications(userDID) 64 - if err != nil { 65 - return nil, fmt.Errorf("error retrieving user notifications: %w", err) 66 - } 67 - for _, not := range nots { 68 - tokens = append(tokens, not.Token) 54 + return nil, fmt.Errorf("error retrieving notification tokens: %w", err) 69 55 } 70 - 71 56 return tokens, nil 72 57 }
+19 -7
pkg/statedb/queue_processor.go
··· 77 77 } 78 78 userDID := lsv.Author.Did 79 79 80 - if state.noter != nil { 81 - log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 82 - notifications, err := state.GetFollowersNotificationTokens(userDID) 83 - if err != nil { 84 - return err 85 - } 80 + log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 81 + followers, err := state.model.GetUserFollowers(ctx, userDID) 82 + if err != nil { 83 + return err 84 + } 86 85 86 + followersDIDs := make([]string, 0, len(followers)) 87 + for _, follower := range followers { 88 + followersDIDs = append(followersDIDs, follower.UserDID) 89 + } 90 + 91 + log.Log(ctx, "found followers", "count", len(followersDIDs)) 92 + 93 + notifications, err := state.GetManyNotificationTokens(followersDIDs) 94 + if err != nil { 95 + return err 96 + } 97 + 98 + if state.noter != nil { 87 99 nb := &notificationpkg.NotificationBlast{ 88 100 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle), 89 101 Body: rec.Title, ··· 98 110 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 99 111 } 100 112 } else { 101 - log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID) 113 + log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications)) 102 114 } 103 115 104 116 for _, webhook := range state.CLI.DiscordWebhooks {
+4 -1
pkg/statedb/statedb.go
··· 16 16 "gorm.io/gorm" 17 17 "stream.place/streamplace/pkg/config" 18 18 "stream.place/streamplace/pkg/log" 19 + "stream.place/streamplace/pkg/model" 19 20 notificationpkg "stream.place/streamplace/pkg/notifications" 20 21 ) 21 22 ··· 32 33 Type DBType 33 34 locks *NamedLocks 34 35 noter notificationpkg.FirebaseNotifier 36 + model model.Model 35 37 // pokeQueue is used to wake up the queue processor when a new task is enqueued 36 38 pokeQueue chan struct{} 37 39 } ··· 48 50 var NoPostgresDatabaseCode = "3D000" 49 51 50 52 // Stateful database for storing private streamplace state 51 - func MakeDB(cli *config.CLI, noter notificationpkg.FirebaseNotifier) (*StatefulDB, error) { 53 + func MakeDB(cli *config.CLI, noter notificationpkg.FirebaseNotifier, model model.Model) (*StatefulDB, error) { 52 54 dbURL := cli.DBURL 53 55 log.Log(context.Background(), "starting stateful database", "dbURL", redactDBURL(dbURL)) 54 56 var dial gorm.Dialector ··· 100 102 CLI: cli, 101 103 Type: dbType, 102 104 locks: NewNamedLocks(), 105 + model: model, 103 106 pokeQueue: make(chan struct{}, 1), 104 107 }, nil 105 108 }