Live video on the AT Protocol
at e2f4cab80c5c652ecea6ffa489ebaa71b42d86ff 282 lines 9.1 kB view raw
1package statedb 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/xrpc" 14 "gorm.io/gorm" 15 "stream.place/streamplace/pkg/integrations/webhook" 16 "stream.place/streamplace/pkg/log" 17 notificationpkg "stream.place/streamplace/pkg/notifications" 18 "stream.place/streamplace/pkg/streamplace" 19 20 comatproto "github.com/bluesky-social/indigo/api/atproto" 21) 22 23var TaskNotification = "notification" 24var TaskChat = "chat" 25var TaskFinalizeLivestream = "finalize_livestream" 26 27type NotificationTask struct { 28 Livestream *streamplace.Livestream_LivestreamView 29 FeedPost *bsky.FeedDefs_PostView 30 ChatProfile *streamplace.ChatProfile 31 PDSURL string 32} 33 34type ChatTask struct { 35 MessageView *streamplace.ChatDefs_MessageView 36} 37 38type FinalizeLivestreamTask struct { 39 LivestreamURI string `json:"livestreamURI"` 40} 41 42func (state *StatefulDB) ProcessQueue(ctx context.Context) error { 43 for { 44 task, err := state.DequeueTask(ctx, "queue_processor") 45 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 46 return err 47 } 48 if task != nil { 49 err := state.processTask(ctx, task) 50 if err != nil { 51 log.Error(ctx, "failed to process task", "err", err) 52 } 53 } else { 54 select { 55 case <-ctx.Done(): 56 return ctx.Err() 57 case <-time.After(1 * time.Second): 58 continue 59 case <-state.pokeQueue: 60 continue 61 } 62 } 63 64 } 65} 66 67func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error { 68 switch task.Type { 69 case TaskNotification: 70 return state.processNotificationTask(ctx, task) 71 case TaskChat: 72 return state.processChatMessageTask(ctx, task) 73 case TaskFinalizeLivestream: 74 return state.processFinalizeLivestreamTask(ctx, task) 75 default: 76 return fmt.Errorf("unknown task type: %s", task.Type) 77 } 78} 79 80func (state *StatefulDB) processFinalizeLivestreamTask(ctx context.Context, task *AppTask) error { 81 ctx = log.WithLogValues(ctx, "func", "processFinalizeLivestreamTask") 82 log.Debug(ctx, "processing finalize livestream task") 83 log.Warn(ctx, "processing finalize livestream task") 84 var finalizeLivestreamTask FinalizeLivestreamTask 85 if err := json.Unmarshal(task.Payload, &finalizeLivestreamTask); err != nil { 86 return err 87 } 88 livestream, err := state.model.GetLivestream(finalizeLivestreamTask.LivestreamURI) 89 if err != nil { 90 return fmt.Errorf("failed to get latest livestream for userDID: %w", err) 91 } 92 if livestream == nil { 93 return fmt.Errorf("no livestream found for URI: %s", finalizeLivestreamTask.LivestreamURI) 94 } 95 lastLivestreamView, err := livestream.ToLivestreamView() 96 if err != nil { 97 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err) 98 } 99 rec, ok := lastLivestreamView.Record.Val.(*streamplace.Livestream) 100 if !ok { 101 return fmt.Errorf("livestream is not a streamplace livestream") 102 } 103 if rec.LastSeenAt == nil { 104 return fmt.Errorf("livestream has no last seen at") 105 } 106 lastSeenTime, err := time.Parse(time.RFC3339, *rec.LastSeenAt) 107 if err != nil { 108 return fmt.Errorf("could not parse last seen at: %w", err) 109 } 110 if rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 { 111 log.Debug(ctx, "livestream has no idle timeout, skipping finalization", "uri", livestream.URI) 112 return nil 113 } 114 if time.Since(lastSeenTime) < (time.Duration(*rec.IdleTimeoutSeconds) * time.Second) { 115 log.Debug(ctx, "livestream is active, skipping finalization", "lastSeenAt", lastSeenTime) 116 return nil 117 } 118 session, err := state.GetSessionByDID(livestream.RepoDID) 119 if err != nil { 120 return fmt.Errorf("failed to get session: %w", err) 121 } 122 session, err = state.OATProxy.RefreshIfNeeded(session) 123 if err != nil { 124 return fmt.Errorf("failed to refresh session: %w", err) 125 } 126 client, err := state.OATProxy.GetXrpcClient(session) 127 if err != nil { 128 return fmt.Errorf("failed to get xrpc client: %w", err) 129 } 130 if rec.EndedAt != nil { 131 log.Debug(ctx, "livestream has already ended, skipping", "uri", livestream.URI, "endedAt", *rec.EndedAt) 132 return nil 133 } 134 135 uri, err := syntax.ParseATURI(livestream.URI) 136 if err != nil { 137 return fmt.Errorf("failed to parse ATURI: %w", err) 138 } 139 140 rec.EndedAt = rec.LastSeenAt 141 142 inp := comatproto.RepoPutRecord_Input{ 143 Collection: "place.stream.livestream", 144 Record: &lexutil.LexiconTypeDecoder{Val: rec}, 145 Rkey: uri.RecordKey().String(), 146 Repo: livestream.RepoDID, 147 SwapRecord: &livestream.CID, 148 } 149 out := comatproto.RepoPutRecord_Output{} 150 151 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 152 if err != nil { 153 return fmt.Errorf("failed to update livestream record: %w", err) 154 } 155 156 log.Log(ctx, "livestream finalized", "uri", livestream.URI, "endedAt", *rec.EndedAt) 157 158 return nil 159} 160 161func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error { 162 var notificationTask NotificationTask 163 if err := json.Unmarshal(task.Payload, &notificationTask); err != nil { 164 return err 165 } 166 lsv := notificationTask.Livestream 167 rec, ok := lsv.Record.Val.(*streamplace.Livestream) 168 if !ok { 169 return fmt.Errorf("invalid livestream record") 170 } 171 userDID := lsv.Author.Did 172 173 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID) 174 followers, err := state.model.GetUserFollowers(ctx, userDID) 175 if err != nil { 176 return err 177 } 178 179 followersDIDs := make([]string, 0, len(followers)) 180 for _, follower := range followers { 181 followersDIDs = append(followersDIDs, follower.UserDID) 182 } 183 184 log.Log(ctx, "found followers", "count", len(followersDIDs)) 185 186 notifications, err := state.GetManyNotificationTokens(followersDIDs) 187 if err != nil { 188 return err 189 } 190 191 if state.noter != nil { 192 nb := &notificationpkg.NotificationBlast{ 193 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle), 194 Body: rec.Title, 195 Data: map[string]string{ 196 "path": fmt.Sprintf("/%s", lsv.Author.Handle), 197 }, 198 } 199 err = state.noter.Blast(ctx, notifications, nb) 200 if err != nil { 201 log.Error(ctx, "failed to blast notifications", "err", err) 202 } else { 203 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb) 204 } 205 } else { 206 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications)) 207 } 208 209 // Send to webhooks using webhook manager 210 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream") 211 if err != nil { 212 log.Error(ctx, "failed to get livestream webhooks", "err", err) 213 } else { 214 for _, w := range webhooks { 215 lexiconWebhook, err := w.ToLexicon() 216 if err != nil { 217 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 218 continue 219 } 220 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 221 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile) 222 if err != nil { 223 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid) 224 err := state.IncrementWebhookError(wid) 225 if err != nil { 226 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 227 } 228 } else { 229 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid) 230 err := state.ResetWebhookError(wid) 231 if err != nil { 232 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 233 } 234 } 235 }(lexiconWebhook, w.ID) 236 } 237 } 238 return nil 239} 240 241func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error { 242 var chatTask ChatTask 243 if err := json.Unmarshal(task.Payload, &chatTask); err != nil { 244 return err 245 } 246 scm := chatTask.MessageView 247 rec, ok := scm.Record.Val.(*streamplace.ChatMessage) 248 if !ok { 249 return fmt.Errorf("invalid chat message record") 250 } 251 252 // Send to webhooks using webhook manager 253 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat") 254 if err != nil { 255 log.Error(ctx, "failed to get chat webhooks", "err", err) 256 } else { 257 for _, w := range webhooks { 258 lexiconWebhook, err := w.ToLexicon() 259 if err != nil { 260 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID) 261 continue 262 } 263 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) { 264 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm) 265 if err != nil { 266 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid) 267 err = state.IncrementWebhookError(wid) 268 if err != nil { 269 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid) 270 } 271 } else { 272 log.Log(ctx, "sent chat to webhook", "webhook_id", wid) 273 err = state.ResetWebhookError(wid) 274 if err != nil { 275 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid) 276 } 277 } 278 }(lexiconWebhook, w.ID) 279 } 280 } 281 return nil 282}