Live video on the AT Protocol

redcircle: it's in the queue now

+65 -59
+6 -27
pkg/atproto/redcircle.go pkg/redcircle/redcircle.go
··· 1 - package atproto 1 + package redcircle 2 2 3 3 import ( 4 4 "bytes" ··· 14 14 15 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 16 "github.com/bluesky-social/indigo/atproto/atdata" 17 + "github.com/streamplace/oatproxy/pkg/oatproxy" 17 18 18 19 "github.com/bluesky-social/indigo/xrpc" 19 20 "github.com/tdewolff/canvas" ··· 21 22 "stream.place/streamplace/pkg/aqhttp" 22 23 "stream.place/streamplace/pkg/constants" 23 24 "stream.place/streamplace/pkg/log" 25 + "stream.place/streamplace/pkg/model" 24 26 ) 25 27 26 28 const WIDTH = 1000.0 ··· 47 49 return nil, fmt.Errorf("failed to decode image: %w", err) 48 50 } 49 51 50 - // profileWidth := profileImg.Bounds().Max.X 51 - 52 52 scaleFactor := (float64(profileImg.Bounds().Max.X) / float64(WIDTH)) * BorderScaleFactor 53 - // scaleFactor := 1.0 54 - 55 - fmt.Println("scaleFactor", scaleFactor) 56 - 57 - // 58 53 59 54 // Draw the decoded image onto the canvas 60 55 canvasCtx := canvas.NewContext(c) ··· 97 92 return imageData, nil 98 93 } 99 94 100 - func (atsync *ATProtoSynchronizer) UpdateProfilePicture(ctx context.Context, repoDID string) error { 101 - session, err := atsync.StatefulDB.GetSessionByDID(repoDID) 102 - if err != nil { 103 - return fmt.Errorf("failed to get session: %w", err) 104 - } 105 - if session == nil { 106 - return fmt.Errorf("no session found for repoDID: %s", repoDID) 107 - } 108 - session, err = atsync.OATProxy.RefreshIfNeeded(session) 109 - if err != nil { 110 - return fmt.Errorf("failed to refresh session: %w", err) 111 - } 112 - client, err := atsync.OATProxy.GetXrpcClient(session) 113 - if err != nil { 114 - return fmt.Errorf("failed to get xrpc client: %w", err) 115 - } 116 - 117 - oldProfile, err := atsync.Model.GetBskyProfile(ctx, repoDID, false) 95 + func UpdateProfilePicture(ctx context.Context, repoDID string, client *oatproxy.XrpcClient, mod model.Model) error { 96 + oldProfile, err := mod.GetBskyProfile(ctx, repoDID, false) 118 97 if err != nil { 119 98 return fmt.Errorf("failed to get old profile: %w", err) 120 99 } ··· 125 104 return fmt.Errorf("no avatar found for old profile") 126 105 } 127 106 128 - repo, err := atsync.Model.GetRepo(repoDID) 107 + repo, err := mod.GetRepo(repoDID) 129 108 if err != nil { 130 109 return fmt.Errorf("failed to get repo: %w", err) 131 110 }
pkg/atproto/redcircle.png pkg/redcircle/redcircle.png
+1 -1
pkg/atproto/redcircle_test.go pkg/redcircle/redcircle_test.go
··· 1 - package atproto 1 + package redcircle 2 2 3 3 import ( 4 4 "context"
pkg/atproto/robot.jpg pkg/redcircle/robot.jpg
pkg/atproto/scumbag.png pkg/redcircle/scumbag.png
+5 -30
pkg/atproto/sync.go
··· 366 366 } 367 367 go atsync.Bus.Publish(userDID, lsv) 368 368 369 - var postView *bsky.FeedDefs_PostView 370 - if lsHydrated.Post != nil { 371 - postView, err = lsHydrated.Post.ToBskyPostView() 372 - if err != nil { 373 - return fmt.Errorf("failed to convert livestream post to bsky post view: %w", err) 374 - } 375 - } 376 - 377 369 if !isUpdate && !isFirstSync { 378 - task := &statedb.NotificationTask{ 379 - Livestream: lsv, 380 - FeedPost: postView, 381 - PDSURL: r.PDS, 382 - } 383 - 384 - cp, err := atsync.Model.GetChatProfile(ctx, userDID) 385 - if err != nil { 386 - return fmt.Errorf("failed to get chat profile: %w", err) 387 - } 388 - if cp != nil { 389 - spcp, err := cp.ToStreamplaceChatProfile() 390 - if err != nil { 391 - return fmt.Errorf("failed to convert chat profile to streamplace chat profile: %w", err) 370 + if rec.IntegrationSettings != nil && rec.IntegrationSettings.UpdateBskyProfile != nil && *rec.IntegrationSettings.UpdateBskyProfile { 371 + task := &statedb.AddRedCircleTask{ 372 + UserDID: userDID, 392 373 } 393 - task.ChatProfile = spcp 394 - } 395 - 396 - if rec.IntegrationSettings != nil && rec.IntegrationSettings.UpdateBskyProfile != nil && *rec.IntegrationSettings.UpdateBskyProfile { 397 - // TODO DONTMERGE: move this to a background job 398 - log.Warn(ctx, "updating bluesky profile picture", "userDID", userDID) 399 - err = atsync.UpdateProfilePicture(ctx, userDID) 374 + _, err = atsync.StatefulDB.EnqueueTask(ctx, statedb.TaskAddRedCircle, task, statedb.WithTaskKey(fmt.Sprintf("add-red-circle::%s", aturi.String()))) 400 375 if err != nil { 401 - return fmt.Errorf("failed to update profile picture: %w", err) 376 + return fmt.Errorf("failed to enqueue add red circle task: %w", err) 402 377 } 403 378 } 404 379 }
pkg/atproto/test.jpg pkg/redcircle/test.jpg
+1
pkg/cmd/streamplace.go
··· 278 278 ClientMetadata: clientMetadata, 279 279 Public: cli.PublicOAuth, 280 280 }) 281 + state.OATProxy = op 281 282 282 283 err = atsync.Migrate(ctx) 283 284 if err != nil {
pkg/redcircle/test_scumbag.jpg

This is a binary file and will not be displayed.

+51
pkg/statedb/queue_processor.go
··· 12 12 "stream.place/streamplace/pkg/integrations/webhook" 13 13 "stream.place/streamplace/pkg/log" 14 14 notificationpkg "stream.place/streamplace/pkg/notifications" 15 + "stream.place/streamplace/pkg/redcircle" 15 16 "stream.place/streamplace/pkg/streamplace" 16 17 ) 17 18 18 19 var TaskNotification = "notification" 19 20 var TaskChat = "chat" 21 + var TaskAddRedCircle = "add_red_circle" 22 + var TaskRemoveRedCircle = "remove_red_circle" 20 23 21 24 type NotificationTask struct { 22 25 Livestream *streamplace.Livestream_LivestreamView ··· 27 30 28 31 type ChatTask struct { 29 32 MessageView *streamplace.ChatDefs_MessageView 33 + } 34 + 35 + type AddRedCircleTask struct { 36 + UserDID string 37 + } 38 + 39 + type RemoveRedCircleTask struct { 40 + UserDID string 30 41 } 31 42 32 43 func (state *StatefulDB) ProcessQueue(ctx context.Context) error { ··· 60 71 return state.processNotificationTask(ctx, task) 61 72 case TaskChat: 62 73 return state.processChatMessageTask(ctx, task) 74 + case TaskAddRedCircle: 75 + return state.processAddRedCircleTask(ctx, task) 76 + case TaskRemoveRedCircle: 77 + return state.processRemoveRedCircleTask(ctx, task) 63 78 default: 64 79 return fmt.Errorf("unknown task type: %s", task.Type) 65 80 } 81 + } 82 + 83 + func (state *StatefulDB) processAddRedCircleTask(ctx context.Context, task *AppTask) error { 84 + var addRedCircleTask AddRedCircleTask 85 + if err := json.Unmarshal(task.Payload, &addRedCircleTask); err != nil { 86 + return err 87 + } 88 + repoDID := addRedCircleTask.UserDID 89 + session, err := state.GetSessionByDID(repoDID) 90 + if err != nil { 91 + return fmt.Errorf("failed to get session: %w", err) 92 + } 93 + if session == nil { 94 + return fmt.Errorf("no session found for repoDID: %s", repoDID) 95 + } 96 + session, err = state.OATProxy.RefreshIfNeeded(session) 97 + if err != nil { 98 + return fmt.Errorf("failed to refresh session: %w", err) 99 + } 100 + client, err := state.OATProxy.GetXrpcClient(session) 101 + if err != nil { 102 + return fmt.Errorf("failed to get xrpc client: %w", err) 103 + } 104 + err = redcircle.UpdateProfilePicture(ctx, repoDID, client, state.model) 105 + if err != nil { 106 + return fmt.Errorf("failed to update profile picture: %w", err) 107 + } 108 + return nil 109 + } 110 + 111 + func (state *StatefulDB) processRemoveRedCircleTask(ctx context.Context, task *AppTask) error { 112 + var removeRedCircleTask RemoveRedCircleTask 113 + if err := json.Unmarshal(task.Payload, &removeRedCircleTask); err != nil { 114 + return err 115 + } 116 + return nil 66 117 } 67 118 68 119 func (state *StatefulDB) processNotificationTask(ctx context.Context, task *AppTask) error {
+1 -1
pkg/statedb/statedb.go
··· 37 37 // pgLockConn is used to hold a connection to the database for locking 38 38 pgLockConn *gorm.DB 39 39 pgLockConnMu sync.Mutex 40 - op *oatproxy.OATProxy 40 + OATProxy *oatproxy.OATProxy 41 41 } 42 42 43 43 // list tables here so we can migrate them