Live video on the AT Protocol

model: implement database migrations (#279)

* model: implement database migrations

* media: shut down streams when keys are revoked

authored by

Eli Mallon and committed by
GitHub
ef53db11 26b23232

+306 -61
+6 -41
pkg/atproto/atproto.go
··· 4 4 "bytes" 5 5 "context" 6 6 "fmt" 7 - "strings" 8 - "sync" 9 7 10 8 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 9 _ "github.com/bluesky-social/indigo/api/bsky" 12 - atcrypto "github.com/bluesky-social/indigo/atproto/crypto" 13 10 "github.com/bluesky-social/indigo/atproto/identity" 14 11 "github.com/bluesky-social/indigo/atproto/syntax" 15 12 "github.com/bluesky-social/indigo/repo" ··· 17 14 "github.com/ipfs/go-cid" 18 15 "go.opentelemetry.io/otel" 19 16 "stream.place/streamplace/pkg/aqhttp" 20 - "stream.place/streamplace/pkg/constants" 21 17 "stream.place/streamplace/pkg/log" 22 18 "stream.place/streamplace/pkg/model" 23 19 ) 24 20 25 21 var SyncGetRepo = comatproto.SyncGetRepo 26 22 27 - // handleLocks provides per-handle synchronization 28 - var handleLocks = struct { 29 - sync.Mutex 30 - locks map[string]*sync.Mutex 31 - }{ 32 - locks: make(map[string]*sync.Mutex), 33 - } 34 - 35 - // getHandleLock returns a mutex for the given handle 36 - func getHandleLock(handle string) *sync.Mutex { 37 - handleLocks.Lock() 38 - defer handleLocks.Unlock() 39 - 40 - if lock, exists := handleLocks.locks[handle]; exists { 41 - return lock 42 - } 43 - 44 - lock := &sync.Mutex{} 45 - handleLocks.locks[handle] = lock 46 - return lock 47 - } 48 - 49 23 func (atsync *ATProtoSynchronizer) SyncBlueskyRepoCached(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) { 50 24 ctx, span := otel.Tracer("signer").Start(ctx, "SyncBlueskyRepoCached") 51 25 defer span.End() ··· 66 40 } 67 41 68 42 func (atsync *ATProtoSynchronizer) SyncBlueskyRepo(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) { 69 - ctx = log.WithLogValues(ctx, "func", "SyncBlueskyRepo", "handle", handle) 70 - // Get handle-specific lock and ensure synchronized access 71 - 72 43 ident, err := ResolveIdent(ctx, handle) 73 44 if err != nil { 74 45 return nil, fmt.Errorf("failed to resolve Bluesky handle %s: %w", handle, err) 75 46 } 76 47 48 + ctx = log.WithLogValues(ctx, "did", ident.DID.String(), "handle", ident.Handle.String()) 49 + 77 50 handleLock := getHandleLock(ident.DID.String()) 78 51 handleLock.Lock() 79 52 defer handleLock.Unlock() ··· 102 75 } 103 76 104 77 log.Log(ctx, "resolved bluesky identity", "did", ident.DID, "handle", ident.Handle, "pds", ident.PDSEndpoint()) 78 + pdsLock := getPDSLock(ident.PDSEndpoint()) 105 79 xrpcc := xrpc.Client{ 106 80 Host: ident.PDSEndpoint(), 107 81 Client: &aqhttp.Client, ··· 109 83 if xrpcc.Host == "" { 110 84 return nil, fmt.Errorf("no PDS endpoint found for Bluesky identity %s", handle) 111 85 } 86 + pdsLock.Lock() 112 87 repoBytes, err := SyncGetRepo(ctx, &xrpcc, ident.DID.String(), rev) 88 + pdsLock.Unlock() 113 89 if err != nil { 114 90 return nil, fmt.Errorf("failed to fetch repo for %s from PDS %s: %w", ident.DID.String(), xrpcc.Host, err) 115 91 } ··· 124 100 // return nil, fmt.Errorf("failed to write encoded repo bytes to file: %w", err) 125 101 // } 126 102 127 - log.Log(ctx, "got diff", "bytes", len(repoBytes)) 103 + log.Debug(ctx, "got diff", "bytes", len(repoBytes)) 128 104 129 105 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repoBytes)) 130 106 if err != nil { ··· 177 153 } 178 154 179 155 return &newRepo, nil 180 - } 181 - 182 - func parseSigningKey(ctx context.Context, key string) error { 183 - if !strings.HasPrefix(key, constants.DID_KEY_PREFIX) { 184 - return fmt.Errorf("invalid key format for DID key: %s", key) 185 - } 186 - _, err := atcrypto.ParsePublicDIDKey(key) 187 - if err != nil { 188 - return fmt.Errorf("failed to parse multibase key %s: %w", key, err) 189 - } 190 - return nil 191 156 } 192 157 193 158 var ResolveIdent = resolveIdent
+20
pkg/atproto/firehose.go
··· 246 246 } 247 247 } 248 248 249 + if collection.String() == constants.PLACE_STREAM_KEY { 250 + log.Warn(ctx, "revoking stream key", "userDID", evt.Repo, "rkey", rkey.String()) 251 + key, err := atsync.Model.GetSigningKeyByRKey(ctx, rkey.String()) 252 + if err != nil { 253 + log.Error(ctx, "failed to get signing key", "err", err) 254 + continue 255 + } 256 + if key == nil { 257 + log.Warn(ctx, "no signing key found for stream key", "userDID", evt.Repo, "rkey", rkey.String()) 258 + continue 259 + } 260 + now := time.Now() 261 + key.RevokedAt = &now 262 + err = atsync.Model.UpdateSigningKey(key) 263 + if err != nil { 264 + log.Error(ctx, "failed to revoke signing key", "err", err) 265 + } 266 + atsync.Bus.Publish(evt.Repo, key) 267 + } 268 + 249 269 default: 250 270 log.Error(ctx, "unexpected record op kind") 251 271 }
+47
pkg/atproto/locks.go
··· 1 + package atproto 2 + 3 + import "sync" 4 + 5 + // handleLocks provides per-handle synchronization 6 + var handleLocks = struct { 7 + sync.Mutex 8 + locks map[string]*sync.Mutex 9 + }{ 10 + locks: make(map[string]*sync.Mutex), 11 + } 12 + 13 + // getHandleLock returns a mutex for the given handle 14 + func getHandleLock(handle string) *sync.Mutex { 15 + handleLocks.Lock() 16 + defer handleLocks.Unlock() 17 + 18 + if lock, exists := handleLocks.locks[handle]; exists { 19 + return lock 20 + } 21 + 22 + lock := &sync.Mutex{} 23 + handleLocks.locks[handle] = lock 24 + return lock 25 + } 26 + 27 + // pdsLocks provides per-pds synchronization 28 + var pdsLocks = struct { 29 + sync.Mutex 30 + locks map[string]*sync.Mutex 31 + }{ 32 + locks: make(map[string]*sync.Mutex), 33 + } 34 + 35 + // getpdsLock returns a mutex for the given pds 36 + func getPDSLock(pds string) *sync.Mutex { 37 + pdsLocks.Lock() 38 + defer pdsLocks.Unlock() 39 + 40 + if lock, exists := pdsLocks.locks[pds]; exists { 41 + return lock 42 + } 43 + 44 + lock := &sync.Mutex{} 45 + pdsLocks.locks[pds] = lock 46 + return lock 47 + }
+10 -7
pkg/atproto/sync.go
··· 87 87 if err != nil { 88 88 return fmt.Errorf("failed to sync bluesky repo: %w", err) 89 89 } 90 - streamerRepo, err := atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 90 + 91 + _, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model) 91 92 if err != nil { 92 - return fmt.Errorf("failed to sync bluesky repo: %w", err) 93 + log.Error(ctx, "failed to sync bluesky repo", "err", err) 93 94 } 95 + 94 96 log.Debug(ctx, "streamplace.ChatMessage detected", "message", rec.Text, "repo", repo.Handle) 95 - block, err := atsync.Model.GetUserBlock(ctx, streamerRepo.DID, userDID) 97 + block, err := atsync.Model.GetUserBlock(ctx, rec.Streamer, userDID) 96 98 if err != nil { 97 99 return fmt.Errorf("failed to get user block: %w", err) 98 100 } 99 101 if block != nil { 100 - log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", streamerRepo.DID) 102 + log.Debug(ctx, "excluding message from blocked user", "userDID", userDID, "subjectDID", rec.Streamer) 101 103 return nil 102 104 } 103 105 mcm := &model.ChatMessage{ ··· 107 109 ChatMessage: recCBOR, 108 110 RepoDID: userDID, 109 111 Repo: repo, 110 - StreamerRepoDID: streamerRepo.DID, 112 + StreamerRepoDID: rec.Streamer, 111 113 IndexedAt: &now, 112 114 } 113 115 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { ··· 129 131 if err != nil { 130 132 log.Error(ctx, "failed to convert chat message to streamplace message view", "err", err) 131 133 } 132 - go atsync.Bus.Publish(streamerRepo.DID, scm) 134 + go atsync.Bus.Publish(rec.Streamer, scm) 133 135 134 136 if !isUpdate && !isFirstSync { 135 137 for _, webhook := range atsync.CLI.DiscordWebhooks { 136 - if webhook.DID == streamerRepo.DID && webhook.Type == "chat" { 138 + if webhook.DID == rec.Streamer && webhook.Type == "chat" { 137 139 go func() { 138 140 err := discord.SendChat(ctx, webhook, r, scm) 139 141 if err != nil { ··· 362 364 } 363 365 key := model.SigningKey{ 364 366 DID: rec.SigningKey, 367 + RKey: rkey.String(), 365 368 CreatedAt: time.Time(), 366 369 RepoDID: userDID, 367 370 }
+4
pkg/cmd/streamplace.go
··· 29 29 "stream.place/streamplace/pkg/notifications" 30 30 "stream.place/streamplace/pkg/replication" 31 31 "stream.place/streamplace/pkg/replication/boring" 32 + "stream.place/streamplace/pkg/resync" 32 33 "stream.place/streamplace/pkg/rtmps" 33 34 v0 "stream.place/streamplace/pkg/schema/v0" 34 35 "stream.place/streamplace/pkg/spmetrics" ··· 206 207 spmetrics.Version.WithLabelValues(build.Version).Inc() 207 208 208 209 aqhttp.UserAgent = fmt.Sprintf("streamplace/%s", build.Version) 210 + if len(os.Args) > 1 && os.Args[1] == "resync" { 211 + return resync.Resync(ctx, &cli) 212 + } 209 213 210 214 err = os.MkdirAll(cli.DataDir, os.ModePerm) 211 215 if err != nil {
+34
pkg/media/key_revocation.go
··· 1 + package media 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/go-gst/go-gst/gst" 8 + "stream.place/streamplace/pkg/model" 9 + ) 10 + 11 + // Handle shutting down a pipeline when a signing key is revoked 12 + func (mm *MediaManager) HandleKeyRevocation(ctx context.Context, ms MediaSigner, pipeline *gst.Pipeline) { 13 + sub := mm.bus.Subscribe(ms.Streamer()) 14 + defer mm.bus.Unsubscribe(ms.Streamer(), sub) 15 + for { 16 + select { 17 + case <-ctx.Done(): 18 + return 19 + case msg := <-sub: 20 + signingKey, ok := msg.(*model.SigningKey) 21 + if !ok { 22 + continue 23 + } 24 + if signingKey.RevokedAt == nil { 25 + continue 26 + } 27 + if signingKey.DID == ms.DID() { 28 + err := fmt.Errorf("signing key revoked, ending stream: %s", signingKey.RKey) 29 + pipeline.Error(err.Error(), err) 30 + return 31 + } 32 + } 33 + } 34 + }
+12
pkg/media/media_signer.go
··· 15 15 "go.opentelemetry.io/otel" 16 16 "stream.place/streamplace/pkg/aqio" 17 17 "stream.place/streamplace/pkg/aqtime" 18 + "stream.place/streamplace/pkg/atproto" 18 19 "stream.place/streamplace/pkg/config" 19 20 "stream.place/streamplace/pkg/crypto/aqpub" 20 21 "stream.place/streamplace/pkg/crypto/signers" ··· 26 27 SignMP4(ctx context.Context, input io.ReadSeeker, start int64) ([]byte, error) 27 28 Pub() aqpub.Pub 28 29 Streamer() string 30 + DID() string 29 31 } 30 32 31 33 type MediaSignerLocal struct { ··· 34 36 AQPub aqpub.Pub 35 37 Cert []byte 36 38 TAURL string 39 + did string 37 40 } 38 41 39 42 func prepareCert(ctx context.Context, cli *config.CLI, signer crypto.Signer) ([]byte, string, error) { ··· 77 80 if err != nil { 78 81 return nil, err 79 82 } 83 + did, err := atproto.ParsePubKey(signer.Public().(*ecdsa.PublicKey)) 84 + if err != nil { 85 + return nil, err 86 + } 80 87 return &MediaSignerLocal{ 81 88 Signer: signer, 82 89 Cert: cert, 83 90 StreamerName: streamer, 84 91 TAURL: cli.TAURL, 85 92 AQPub: pub, 93 + did: did.DIDKey(), 86 94 }, nil 87 95 } 88 96 ··· 173 181 func (ms *MediaSignerLocal) Pub() aqpub.Pub { 174 182 return ms.AQPub 175 183 } 184 + 185 + func (ms *MediaSignerLocal) DID() string { 186 + return ms.did 187 + }
+11
pkg/media/media_signer_ext.go
··· 14 14 "github.com/decred/dcrd/dcrec/secp256k1" 15 15 "github.com/mr-tron/base58" 16 16 "go.opentelemetry.io/otel" 17 + "stream.place/streamplace/pkg/atproto" 17 18 "stream.place/streamplace/pkg/config" 18 19 "stream.place/streamplace/pkg/crypto/aqpub" 19 20 "stream.place/streamplace/pkg/spmetrics" ··· 27 28 streamer string 28 29 keyBs []byte 29 30 taURL string 31 + did string 30 32 } 31 33 32 34 func MakeMediaSignerExt(ctx context.Context, cli *config.CLI, streamer string, keyBs []byte) (MediaSigner, error) { ··· 43 45 if err != nil { 44 46 return nil, err 45 47 } 48 + did, err := atproto.ParsePubKey(signer.Public().(*ecdsa.PublicKey)) 49 + if err != nil { 50 + return nil, err 51 + } 46 52 return &MediaSignerExt{ 47 53 // cli: cli, 48 54 signer: signer, ··· 51 57 pub: pub, 52 58 keyBs: keyBs, 53 59 taURL: cli.TAURL, 60 + did: did.DIDKey(), 54 61 }, nil 55 62 } 56 63 ··· 115 122 func (ms *MediaSignerExt) Streamer() string { 116 123 return ms.streamer 117 124 } 125 + 126 + func (ms *MediaSignerExt) DID() string { 127 + return ms.did 128 + }
+2
pkg/media/mkv_ingest.go
··· 68 68 busErr <- err 69 69 }() 70 70 71 + go mm.HandleKeyRevocation(ctx, ms, pipeline) 72 + 71 73 err = pipeline.SetState(gst.StatePlaying) 72 74 if err != nil { 73 75 return err
+11 -8
pkg/media/webrtc_ingest.go
··· 99 99 } 100 100 audioSrc := app.SrcFromElement(audioSrcElem) 101 101 102 - go func() { 103 - <-ctx.Done() 104 - if cErr := peerConnection.Close(); cErr != nil { 105 - log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 106 - } 107 - }() 108 - 109 102 // Set the remote SessionDescription 110 103 if err = peerConnection.SetRemoteDescription(*offer); err != nil { 111 104 return nil, fmt.Errorf("failed to set remote description: %w", err) ··· 145 138 146 139 go func() { 147 140 if err := HandleBusMessages(ctx, pipeline); err != nil { 148 - log.Log(ctx, "pipeilne error", "error", err) 141 + log.Log(ctx, "pipeline error", "error", err) 149 142 } 150 143 cancel() 144 + }() 145 + 146 + // subscription to bus messages for key revocation 147 + go mm.HandleKeyRevocation(ctx, signer, pipeline) 148 + 149 + go func() { 150 + <-ctx.Done() 151 + if cErr := peerConnection.Close(); cErr != nil { 152 + log.Log(ctx, "cannot close peerConnection: %v\n", cErr) 153 + } 151 154 }() 152 155 153 156 log.Debug(ctx, "starting pipeline")
+2
pkg/model/model.go
··· 49 49 GetRepoByHandle(handle string) (*Repo, error) 50 50 GetRepoByHandleOrDID(arg string) (*Repo, error) 51 51 GetRepoBySigningKey(signingKey string) (*Repo, error) 52 + GetAllRepos() ([]Repo, error) 52 53 UpdateRepo(repo *Repo) error 53 54 54 55 UpdateSigningKey(key *SigningKey) error 55 56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 57 + GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 56 58 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 57 59 58 60 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
+9
pkg/model/repo.go
··· 30 30 return &repoModel, nil 31 31 } 32 32 33 + func (m *DBModel) GetAllRepos() ([]Repo, error) { 34 + var repos []Repo 35 + res := m.DB.Find(&repos) 36 + if res.Error != nil { 37 + return nil, res.Error 38 + } 39 + return repos, nil 40 + } 41 + 33 42 func (m *DBModel) GetRepoByHandle(handle string) (*Repo, error) { 34 43 var repoModel Repo 35 44 res := m.DB.Where("handle = ?", handle).First(&repoModel)
+1
pkg/model/resync.go
··· 1 + package model
+27 -5
pkg/model/signing_key.go
··· 3 3 import ( 4 4 "context" 5 5 "errors" 6 + "fmt" 6 7 "time" 7 8 8 9 "go.opentelemetry.io/otel" ··· 10 11 ) 11 12 12 13 type SigningKey struct { 13 - DID string `gorm:"primaryKey;column:did" json:"did"` 14 - RepoDID string `gorm:"primaryKey;column:repo_did" json:"repoDID"` 15 - Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:RepoDID;references:DID"` 16 - CreatedAt time.Time `json:"createdAt"` 17 - RevokedAt time.Time `json:"revokedAt"` 14 + DID string `gorm:"primaryKey;column:did" json:"did"` 15 + RepoDID string `gorm:"primaryKey;column:repo_did" json:"repoDID"` 16 + RKey string `gorm:"column:rkey;index" json:"rkey"` 17 + Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:RepoDID;references:DID"` 18 + CreatedAt time.Time `json:"createdAt"` 19 + RevokedAt *time.Time `json:"revokedAt"` 18 20 } 19 21 20 22 func (SigningKey) TableName() string { ··· 32 34 res := m.DB.Model(SigningKey{}).Where("did = ?", did).Where("repo_did = ?", repoDID).First(&key) 33 35 if errors.Is(res.Error, gorm.ErrRecordNotFound) { 34 36 return nil, nil 37 + } 38 + if key.RevokedAt != nil { 39 + return nil, fmt.Errorf("signing key revoked") 40 + } 41 + if res.Error != nil { 42 + return nil, res.Error 43 + } 44 + return &key, nil 45 + } 46 + 47 + func (m *DBModel) GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) { 48 + _, span := otel.Tracer("signer").Start(ctx, "GetSigningKeyByRKey") 49 + defer span.End() 50 + var key SigningKey 51 + res := m.DB.Model(SigningKey{}).Where("rkey = ?", rkey).First(&key) 52 + if errors.Is(res.Error, gorm.ErrRecordNotFound) { 53 + return nil, nil 54 + } 55 + if key.RevokedAt != nil { 56 + return nil, fmt.Errorf("signing key revoked") 35 57 } 36 58 if res.Error != nil { 37 59 return nil, res.Error
+110
pkg/resync/resync.go
··· 1 + package resync 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + 8 + "golang.org/x/sync/errgroup" 9 + "stream.place/streamplace/pkg/atproto" 10 + "stream.place/streamplace/pkg/bus" 11 + "stream.place/streamplace/pkg/config" 12 + "stream.place/streamplace/pkg/log" 13 + "stream.place/streamplace/pkg/model" 14 + ) 15 + 16 + // resync a fresh database from the PDSses, copying over the few pieces of local state 17 + // that we have 18 + func Resync(ctx context.Context, cli *config.CLI) error { 19 + oldMod, err := model.MakeDB(cli.DBPath) 20 + if err != nil { 21 + return err 22 + } 23 + tempDBPath := cli.DBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 + newMod, err := model.MakeDB(tempDBPath) 25 + if err != nil { 26 + return err 27 + } 28 + repos, err := oldMod.GetAllRepos() 29 + if err != nil { 30 + return err 31 + } 32 + 33 + atsync := &atproto.ATProtoSynchronizer{ 34 + CLI: cli, 35 + Model: newMod, 36 + Noter: nil, 37 + Bus: bus.NewBus(), 38 + } 39 + 40 + doneMap := make(map[string]bool) 41 + 42 + g, ctx := errgroup.WithContext(ctx) 43 + 44 + doneChan := make(chan string) 45 + go func() { 46 + for { 47 + select { 48 + case <-ctx.Done(): 49 + return 50 + case did := <-doneChan: 51 + doneMap[did] = true 52 + case <-time.After(10 * time.Second): 53 + for _, repo := range repos { 54 + if !doneMap[repo.DID] { 55 + log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 56 + } 57 + } 58 + } 59 + } 60 + }() 61 + 62 + for _, repo := range repos { 63 + repo := repo // capture range variable 64 + doneMap[repo.DID] = false 65 + g.Go(func() error { 66 + log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle) 67 + ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle) 68 + _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod) 69 + if err != nil { 70 + log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err) 71 + return nil 72 + } 73 + log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle) 74 + doneChan <- repo.DID 75 + return nil 76 + }) 77 + } 78 + 79 + if err := g.Wait(); err != nil { 80 + return err 81 + } 82 + 83 + oauthSessions, err := oldMod.ListOAuthSessions() 84 + if err != nil { 85 + return err 86 + } 87 + for _, session := range oauthSessions { 88 + err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 89 + if err != nil { 90 + return fmt.Errorf("failed to create oauth session: %w", err) 91 + } 92 + } 93 + log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 94 + 95 + notificationTokens, err := oldMod.ListNotifications() 96 + if err != nil { 97 + return err 98 + } 99 + for _, token := range notificationTokens { 100 + err := newMod.CreateNotification(token.Token, token.RepoDID) 101 + if err != nil { 102 + return fmt.Errorf("failed to create notification: %w", err) 103 + } 104 + } 105 + log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 106 + 107 + log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 108 + 109 + return nil 110 + }