Live video on the AT Protocol
1package statedb
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "time"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/xrpc"
14 "gorm.io/gorm"
15 "stream.place/streamplace/pkg/integrations/webhook"
16 "stream.place/streamplace/pkg/log"
17 notificationpkg "stream.place/streamplace/pkg/notifications"
18 "stream.place/streamplace/pkg/streamplace"
19
20 comatproto "github.com/bluesky-social/indigo/api/atproto"
21)
22
23var TaskNotification = "notification"
24var TaskChat = "chat"
25var TaskFinalizeLivestream = "finalize_livestream"
26
27type NotificationTask struct {
28 Livestream *streamplace.Livestream_LivestreamView
29 FeedPost *bsky.FeedDefs_PostView
30 ChatProfile *streamplace.ChatProfile
31 PDSURL string
32}
33
34type ChatTask struct {
35 MessageView *streamplace.ChatDefs_MessageView
36}
37
38type FinalizeLivestreamTask struct {
39 LivestreamURI string `json:"livestreamURI"`
40}
41
42func (state *StatefulDB) ProcessQueue(ctx context.Context) error {
43 for {
44 task, err := state.DequeueTask(ctx, "queue_processor")
45 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
46 return err
47 }
48 if task != nil {
49 err := state.processTask(ctx, task)
50 if err != nil {
51 log.Error(ctx, "failed to process task", "err", err)
52 }
53 } else {
54 select {
55 case <-ctx.Done():
56 return ctx.Err()
57 case <-time.After(1 * time.Second):
58 continue
59 case <-state.pokeQueue:
60 continue
61 }
62 }
63
64 }
65}
66
67func (state *StatefulDB) processTask(ctx context.Context, task *AppTask) error {
68 switch task.Type {
69 case TaskNotification:
70 return state.processNotificationTask(ctx, task)
71 case TaskChat:
72 return state.processChatMessageTask(ctx, task)
73 case TaskFinalizeLivestream:
74 return state.processFinalizeLivestreamTask(ctx, task)
75 default:
76 return fmt.Errorf("unknown task type: %s", task.Type)
77 }
78}
79
80func (state *StatefulDB) processFinalizeLivestreamTask(ctx context.Context, task *AppTask) error {
81 ctx = log.WithLogValues(ctx, "func", "processFinalizeLivestreamTask")
82 log.Debug(ctx, "processing finalize livestream task")
83 log.Warn(ctx, "processing finalize livestream task")
84 var finalizeLivestreamTask FinalizeLivestreamTask
85 if err := json.Unmarshal(task.Payload, &finalizeLivestreamTask); err != nil {
86 return err
87 }
88 livestream, err := state.model.GetLivestream(finalizeLivestreamTask.LivestreamURI)
89 if err != nil {
90 return fmt.Errorf("failed to get latest livestream for userDID: %w", err)
91 }
92 if livestream == nil {
93 return fmt.Errorf("no livestream found for URI: %s", finalizeLivestreamTask.LivestreamURI)
94 }
95 lastLivestreamView, err := livestream.ToLivestreamView()
96 if err != nil {
97 return fmt.Errorf("failed to convert livestream to streamplace livestream: %w", err)
98 }
99 rec, ok := lastLivestreamView.Record.Val.(*streamplace.Livestream)
100 if !ok {
101 return fmt.Errorf("livestream is not a streamplace livestream")
102 }
103 if rec.LastSeenAt == nil {
104 return fmt.Errorf("livestream has no last seen at")
105 }
106 lastSeenTime, err := time.Parse(time.RFC3339, *rec.LastSeenAt)
107 if err != nil {
108 return fmt.Errorf("could not parse last seen at: %w", err)
109 }
110 if rec.IdleTimeoutSeconds == nil || *rec.IdleTimeoutSeconds == 0 {
111 log.Debug(ctx, "livestream has no idle timeout, skipping finalization", "uri", livestream.URI)
112 return nil
113 }
114 if time.Since(lastSeenTime) < (time.Duration(*rec.IdleTimeoutSeconds) * time.Second) {
115 log.Debug(ctx, "livestream is active, skipping finalization", "lastSeenAt", lastSeenTime)
116 return nil
117 }
118 session, err := state.GetSessionByDID(livestream.RepoDID)
119 if err != nil {
120 return fmt.Errorf("failed to get session: %w", err)
121 }
122 session, err = state.OATProxy.RefreshIfNeeded(session)
123 if err != nil {
124 return fmt.Errorf("failed to refresh session: %w", err)
125 }
126 client, err := state.OATProxy.GetXrpcClient(session)
127 if err != nil {
128 return fmt.Errorf("failed to get xrpc client: %w", err)
129 }
130 if rec.EndedAt != nil {
131 log.Debug(ctx, "livestream has already ended, skipping", "uri", livestream.URI, "endedAt", *rec.EndedAt)
132 return nil
133 }
134
135 uri, err := syntax.ParseATURI(livestream.URI)
136 if err != nil {
137 return fmt.Errorf("failed to parse ATURI: %w", err)
138 }
139
140 rec.EndedAt = rec.LastSeenAt
141
142 inp := comatproto.RepoPutRecord_Input{
143 Collection: "place.stream.livestream",
144 Record: &lexutil.LexiconTypeDecoder{Val: rec},
145 Rkey: uri.RecordKey().String(),
146 Repo: livestream.RepoDID,
147 SwapRecord: &livestream.CID,
148 }
149 out := comatproto.RepoPutRecord_Output{}
150
151 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out)
152 if err != nil {
153 return fmt.Errorf("failed to update livestream record: %w", err)
154 }
155
156 log.Log(ctx, "livestream finalized", "uri", livestream.URI, "endedAt", *rec.EndedAt)
157
158 return nil
159}
160
161func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error {
162 var notificationTask NotificationTask
163 if err := json.Unmarshal(task.Payload, ¬ificationTask); err != nil {
164 return err
165 }
166 lsv := notificationTask.Livestream
167 rec, ok := lsv.Record.Val.(*streamplace.Livestream)
168 if !ok {
169 return fmt.Errorf("invalid livestream record")
170 }
171 userDID := lsv.Author.Did
172
173 log.Warn(ctx, "Livestream detected! Blasting followers!", "title", rec.Title, "url", rec.Url, "createdAt", rec.CreatedAt, "repo", userDID)
174 followers, err := state.model.GetUserFollowers(ctx, userDID)
175 if err != nil {
176 return err
177 }
178
179 followersDIDs := make([]string, 0, len(followers))
180 for _, follower := range followers {
181 followersDIDs = append(followersDIDs, follower.UserDID)
182 }
183
184 log.Log(ctx, "found followers", "count", len(followersDIDs))
185
186 notifications, err := state.GetManyNotificationTokens(followersDIDs)
187 if err != nil {
188 return err
189 }
190
191 if state.noter != nil {
192 nb := ¬ificationpkg.NotificationBlast{
193 Title: fmt.Sprintf("🔴 @%s is LIVE!", lsv.Author.Handle),
194 Body: rec.Title,
195 Data: map[string]string{
196 "path": fmt.Sprintf("/%s", lsv.Author.Handle),
197 },
198 }
199 err = state.noter.Blast(ctx, notifications, nb)
200 if err != nil {
201 log.Error(ctx, "failed to blast notifications", "err", err)
202 } else {
203 log.Log(ctx, "sent notifications", "user", userDID, "count", len(notifications), "content", nb)
204 }
205 } else {
206 log.Log(ctx, "no notifier configured, skipping notifications", "user", userDID, "count", len(notifications))
207 }
208
209 // Send to webhooks using webhook manager
210 webhooks, err := state.GetActiveWebhooksForUser(userDID, "livestream")
211 if err != nil {
212 log.Error(ctx, "failed to get livestream webhooks", "err", err)
213 } else {
214 for _, w := range webhooks {
215 lexiconWebhook, err := w.ToLexicon()
216 if err != nil {
217 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
218 continue
219 }
220 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
221 err := webhook.SendLivestreamWebhook(ctx, lexiconWebhook, notificationTask.PDSURL, lsv, notificationTask.FeedPost, notificationTask.ChatProfile)
222 if err != nil {
223 log.Error(ctx, "failed to send livestream to webhook", "err", err, "webhook_id", wid)
224 err := state.IncrementWebhookError(wid)
225 if err != nil {
226 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
227 }
228 } else {
229 log.Log(ctx, "sent livestream to webhook", "webhook_id", wid)
230 err := state.ResetWebhookError(wid)
231 if err != nil {
232 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
233 }
234 }
235 }(lexiconWebhook, w.ID)
236 }
237 }
238 return nil
239}
240
241func (state *StatefulDB) processChatMessageTask(ctx context.Context, task *AppTask) error {
242 var chatTask ChatTask
243 if err := json.Unmarshal(task.Payload, &chatTask); err != nil {
244 return err
245 }
246 scm := chatTask.MessageView
247 rec, ok := scm.Record.Val.(*streamplace.ChatMessage)
248 if !ok {
249 return fmt.Errorf("invalid chat message record")
250 }
251
252 // Send to webhooks using webhook manager
253 webhooks, err := state.GetActiveWebhooksForUser(rec.Streamer, "chat")
254 if err != nil {
255 log.Error(ctx, "failed to get chat webhooks", "err", err)
256 } else {
257 for _, w := range webhooks {
258 lexiconWebhook, err := w.ToLexicon()
259 if err != nil {
260 log.Error(ctx, "failed to convert webhook to lexicon", "err", err, "webhook_id", w.ID)
261 continue
262 }
263 go func(lexiconWebhook *streamplace.ServerDefs_Webhook, wid string) {
264 err := webhook.SendChatWebhook(ctx, lexiconWebhook, scm.Author.Did, scm)
265 if err != nil {
266 log.Error(ctx, "failed to send chat to webhook", "err", err, "webhook_id", wid)
267 err = state.IncrementWebhookError(wid)
268 if err != nil {
269 log.Error(ctx, "failed to increment webhook error count", "err", err, "webhook_id", wid)
270 }
271 } else {
272 log.Log(ctx, "sent chat to webhook", "webhook_id", wid)
273 err = state.ResetWebhookError(wid)
274 if err != nil {
275 log.Error(ctx, "failed to reset webhook error count", "err", err, "webhook_id", wid)
276 }
277 }
278 }(lexiconWebhook, w.ID)
279 }
280 }
281 return nil
282}