Live video on the AT Protocol

moderation: we can ban people now yay

+187 -33
+8
pkg/api/stream_key.go
··· 77 } 78 79 ctx = log.WithLogValues(ctx, "did", did) 80 81 var mediaSigner media.MediaSigner 82 if a.CLI.ExternalSigning {
··· 77 } 78 79 ctx = log.WithLogValues(ctx, "did", did) 80 + labels, err := a.Model.GetActiveLabels(did) 81 + if err != nil { 82 + return nil, fmt.Errorf("failed to get active labels: %w", err) 83 + } 84 + if atproto.IsBanned(labels...) { 85 + log.Error(ctx, "user is banned", "did", did) 86 + return nil, fmt.Errorf("user is banned") 87 + } 88 89 var mediaSigner media.MediaSigner 90 if a.CLI.ExternalSigning {
+56 -11
pkg/atproto/labeler_firehose.go
··· 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/label" 13 "github.com/bluesky-social/indigo/events" 14 "github.com/bluesky-social/indigo/events/schedulers/parallel" 15 "github.com/gorilla/websocket" 16 "golang.org/x/sync/errgroup" 17 "stream.place/streamplace/pkg/aqhttp" ··· 108 109 rsc := &events.RepoStreamCallbacks{ 110 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 111 - log.Log(ctx, "labeler labels", "labels", evt.Labels, "seq", evt.Seq) 112 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq) 113 if err != nil { 114 log.Error(ctx, "failed to update labeler cursor", "err", err) ··· 131 log.Error(ctx, "failed to marshal label", "err", err) 132 continue 133 } 134 err = atsync.Model.CreateLabel(&model.Label{ 135 - Cid: l.CID, 136 - Cts: l.CreatedAt, 137 - Exp: l.ExpiresAt, 138 - Neg: l.Negated, 139 - Sig: l.Sig, 140 - Src: l.SourceDID, 141 - Uri: l.URI, 142 - Val: l.Val, 143 - Ver: &l.Version, 144 - Record: bs.Bytes(), 145 }) 146 if err != nil { 147 log.Error(ctx, "failed to create label", "err", err) 148 continue 149 } 150 } 151 return nil 152 },
··· 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/label" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 + "github.com/bluesky-social/indigo/util" 17 "github.com/gorilla/websocket" 18 "golang.org/x/sync/errgroup" 19 "stream.place/streamplace/pkg/aqhttp" ··· 110 111 rsc := &events.RepoStreamCallbacks{ 112 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 113 err = atsync.Model.UpdateLabelerCursor(did, evt.Seq) 114 if err != nil { 115 log.Error(ctx, "failed to update labeler cursor", "err", err) ··· 132 log.Error(ctx, "failed to marshal label", "err", err) 133 continue 134 } 135 + cts, err := time.Parse(util.ISO8601, l.CreatedAt) 136 + if err != nil { 137 + log.Error(ctx, "failed to parse label created time", "err", err) 138 + continue 139 + } 140 + var exp time.Time 141 + if l.ExpiresAt != nil { 142 + e, err := time.Parse(util.ISO8601, *l.ExpiresAt) 143 + if err != nil { 144 + log.Error(ctx, "failed to parse label expiration time", "err", err) 145 + continue 146 + } 147 + exp = e.UTC() 148 + } else { 149 + exp = time.Time{} 150 + } 151 + neg := false 152 + if l.Negated != nil { 153 + neg = *l.Negated 154 + } 155 + 156 + var targetDID string 157 + 158 + // the URI can either be a true URI or a DID, so 159 + aturi, err1 := syntax.ParseATURI(l.URI) 160 + if err1 != nil { 161 + did, err2 := syntax.ParseDID(l.URI) 162 + if err2 != nil { 163 + log.Error(ctx, "failed to parse label URI as either ATURI or DID", "err1", err1, "err2", err2) 164 + continue 165 + } 166 + targetDID = did.String() 167 + } else { 168 + did, err := aturi.Authority().AsDID() 169 + if err != nil { 170 + log.Error(ctx, "failed to parse label URI as ATURI", "err", err) 171 + continue 172 + } 173 + targetDID = did.String() 174 + } 175 + 176 + log.Log(ctx, "labeler label", "targetDID", targetDID, "uri", l.URI, "cid", l.CID, "createdAt", cts, "expiresAt", exp, "negated", neg, "sourceDID", l.SourceDID, "val", l.Val, "version", l.Version) 177 err = atsync.Model.CreateLabel(&model.Label{ 178 + Cid: l.CID, 179 + Cts: cts.UTC(), 180 + Exp: exp, 181 + Neg: neg, 182 + Sig: l.Sig, 183 + Src: l.SourceDID, 184 + Uri: l.URI, 185 + Val: l.Val, 186 + Ver: &l.Version, 187 + Record: bs.Bytes(), 188 + RepoDID: targetDID, 189 }) 190 if err != nil { 191 log.Error(ctx, "failed to create label", "err", err) 192 continue 193 } 194 + atsync.Bus.Publish(targetDID, labelLex) 195 } 196 return nil 197 },
+52
pkg/atproto/labels.go
···
··· 1 + package atproto 2 + 3 + import ( 4 + "strings" 5 + 6 + comatproto "github.com/bluesky-social/indigo/api/atproto" 7 + ) 8 + 9 + const ( 10 + // from com.atproto.label.defs 11 + LabelHide = "!hide" 12 + LabelNoPromote = "!no-promote" 13 + LabelWarn = "!warn" 14 + LabelNoUnauthenticated = "!no-unauthenticated" 15 + LabelDMCAViolation = "dmca-violation" 16 + LabelDoxxing = "doxxing" 17 + LabelPorn = "porn" 18 + LabelSexual = "sexual" 19 + LabelNudity = "nudity" 20 + LabelNSFL = "nsfl" 21 + LabelGore = "gore" 22 + 23 + // referenced in https://atproto.com/specs/label 24 + LabelTakedown = "!takedown" 25 + LabelSuspend = "!suspend" 26 + ) 27 + 28 + var bannedLabels = map[string]bool{ 29 + LabelDMCAViolation: true, 30 + LabelDoxxing: true, 31 + LabelPorn: true, 32 + LabelSexual: true, 33 + LabelNudity: true, 34 + LabelNSFL: true, 35 + LabelGore: true, 36 + LabelTakedown: true, 37 + LabelSuspend: true, 38 + } 39 + 40 + // Given a users' labels, determine if they are banned 41 + func IsBanned(labels ...*comatproto.LabelDefs_Label) bool { 42 + for _, l := range labels { 43 + if !strings.HasPrefix(l.Uri, "did:") { 44 + // this is a label on a record, not a user 45 + continue 46 + } 47 + if bannedLabels[l.Val] { 48 + return true 49 + } 50 + } 51 + return false 52 + }
+5 -1
pkg/bus/bus.go
··· 62 func (b *Bus) Publish(user string, msg Message) { 63 b.mu.Lock() 64 defer b.mu.Unlock() 65 - for _, sub := range b.clients[user] { 66 go func(sub Subscription) { 67 sub <- msg 68 }(sub)
··· 62 func (b *Bus) Publish(user string, msg Message) { 63 b.mu.Lock() 64 defer b.mu.Unlock() 65 + subs, ok := b.clients[user] 66 + if !ok { 67 + return 68 + } 69 + for _, sub := range subs { 70 go func(sub Subscription) { 71 sub <- msg 72 }(sub)
+5 -5
pkg/cmd/streamplace.go
··· 365 group.Go(func() error { 366 return atsync.StartFirehose(ctx) 367 }) 368 - for _, labeler := range cli.Labelers { 369 - group.Go(func() error { 370 - return atsync.StartLabelerFirehose(ctx, labeler) 371 - }) 372 - } 373 } 374 375 group.Go(func() error {
··· 365 group.Go(func() error { 366 return atsync.StartFirehose(ctx) 367 }) 368 + } 369 + for _, labeler := range cli.Labelers { 370 + group.Go(func() error { 371 + return atsync.StartLabelerFirehose(ctx, labeler) 372 + }) 373 } 374 375 group.Go(func() error {
+20 -11
pkg/media/key_revocation.go
··· 4 "context" 5 "fmt" 6 7 "github.com/go-gst/go-gst/gst" 8 "stream.place/streamplace/pkg/model" 9 ) 10 11 - // Handle shutting down a pipeline when a signing key is revoked 12 func (mm *MediaManager) HandleKeyRevocation(ctx context.Context, ms MediaSigner, pipeline *gst.Pipeline) { 13 sub := mm.bus.Subscribe(ms.Streamer()) 14 defer mm.bus.Unsubscribe(ms.Streamer(), sub) ··· 17 case <-ctx.Done(): 18 return 19 case msg := <-sub: 20 - signingKey, ok := msg.(*model.SigningKey) 21 - if !ok { 22 continue 23 - } 24 - if signingKey.RevokedAt == nil { 25 - continue 26 - } 27 - if signingKey.DID == ms.DID() { 28 - err := fmt.Errorf("signing key revoked, ending stream: %s", signingKey.RKey) 29 - pipeline.Error(err.Error(), err) 30 - return 31 } 32 } 33 }
··· 4 "context" 5 "fmt" 6 7 + comatproto "github.com/bluesky-social/indigo/api/atproto" 8 "github.com/go-gst/go-gst/gst" 9 + "stream.place/streamplace/pkg/atproto" 10 "stream.place/streamplace/pkg/model" 11 ) 12 13 + // Handle shutting down a pipeline when a signing key is revoked or a user gets banned 14 func (mm *MediaManager) HandleKeyRevocation(ctx context.Context, ms MediaSigner, pipeline *gst.Pipeline) { 15 sub := mm.bus.Subscribe(ms.Streamer()) 16 defer mm.bus.Unsubscribe(ms.Streamer(), sub) ··· 19 case <-ctx.Done(): 20 return 21 case msg := <-sub: 22 + switch v := msg.(type) { 23 + case *model.SigningKey: 24 + if v.RevokedAt == nil { 25 + continue 26 + } 27 + if v.DID == ms.DID() { 28 + err := fmt.Errorf("signing key revoked, ending stream: %s", v.RKey) 29 + pipeline.Error(err.Error(), err) 30 + return 31 + } 32 + case *comatproto.LabelDefs_Label: 33 + if atproto.IsBanned(v) { 34 + err := fmt.Errorf("user banned, ending stream: %s", v.Uri) 35 + pipeline.Error(err.Error(), err) 36 + return 37 + } 38 + default: 39 continue 40 } 41 } 42 }
+40 -5
pkg/model/label.go
··· 1 package model 2 3 - import "gorm.io/gorm/clause" 4 5 type Label struct { 6 // cid: Optionally, CID specifying the specific version of 'uri' resource this label applies to. 7 Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty" gorm:"column:cid"` 8 // cts: Timestamp when this label was created. 9 - Cts string `json:"cts" cborgen:"cts" gorm:"column:cts"` 10 // exp: Timestamp at which this label expires (no longer applies). 11 - Exp *string `json:"exp,omitempty" cborgen:"exp,omitempty" gorm:"column:exp"` 12 // neg: If true, this is a negation label, overwriting a previous label. 13 - Neg *bool `json:"neg,omitempty" cborgen:"neg,omitempty" gorm:"column:neg"` 14 // sig: Signature of dag-cbor encoded label. 15 Sig []byte `json:"sig,omitempty" cborgen:"sig,omitempty" gorm:"column:sig"` 16 // src: DID of the actor who created this label. ··· 22 // ver: The AT Protocol version of the label object. 23 Ver *int64 `json:"ver,omitempty" cborgen:"ver,omitempty" gorm:"column:ver"` 24 25 - Record []byte `json:"record,omitempty" cborgen:"record,omitempty" gorm:"column:record"` 26 } 27 28 func (m *DBModel) CreateLabel(label *Label) error { ··· 35 UpdateAll: true, 36 }).Create(label).Error 37 }
··· 1 package model 2 3 + import ( 4 + "bytes" 5 + "time" 6 + 7 + comatproto "github.com/bluesky-social/indigo/api/atproto" 8 + "gorm.io/gorm/clause" 9 + ) 10 11 type Label struct { 12 // cid: Optionally, CID specifying the specific version of 'uri' resource this label applies to. 13 Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty" gorm:"column:cid"` 14 // cts: Timestamp when this label was created. 15 + Cts time.Time `json:"cts" cborgen:"cts" gorm:"column:cts"` 16 // exp: Timestamp at which this label expires (no longer applies). 17 + Exp time.Time `json:"exp,omitempty" cborgen:"exp,omitempty" gorm:"column:exp"` 18 // neg: If true, this is a negation label, overwriting a previous label. 19 + Neg bool `json:"neg,omitempty" cborgen:"neg,omitempty" gorm:"column:neg"` 20 // sig: Signature of dag-cbor encoded label. 21 Sig []byte `json:"sig,omitempty" cborgen:"sig,omitempty" gorm:"column:sig"` 22 // src: DID of the actor who created this label. ··· 28 // ver: The AT Protocol version of the label object. 29 Ver *int64 `json:"ver,omitempty" cborgen:"ver,omitempty" gorm:"column:ver"` 30 31 + Record []byte `json:"record,omitempty" cborgen:"record,omitempty" gorm:"column:record"` 32 + RepoDID string `json:"repoDID,omitempty" cborgen:"repoDID,omitempty" gorm:"column:repo_did"` 33 } 34 35 func (m *DBModel) CreateLabel(label *Label) error { ··· 42 UpdateAll: true, 43 }).Create(label).Error 44 } 45 + 46 + func (m *DBModel) GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) { 47 + now := time.Now().UTC() 48 + var labels []Label 49 + err := m.DB.Where("uri = ? AND (exp IS NULL OR exp < ?) AND neg = ?", uri, now, false).Find(&labels).Error 50 + if err != nil { 51 + return nil, err 52 + } 53 + lexs := make([]*comatproto.LabelDefs_Label, len(labels)) 54 + for i, l := range labels { 55 + lex, err := l.ToLexicon() 56 + if err != nil { 57 + return nil, err 58 + } 59 + lexs[i] = lex 60 + } 61 + return lexs, nil 62 + } 63 + 64 + func (l Label) ToLexicon() (*comatproto.LabelDefs_Label, error) { 65 + r := bytes.NewReader(l.Record) 66 + var lex comatproto.LabelDefs_Label 67 + err := lex.UnmarshalCBOR(r) 68 + if err != nil { 69 + return nil, err 70 + } 71 + return &lex, nil 72 + }
+1
pkg/model/model.go
··· 113 UpdateLabelerCursor(did string, cursor int64) error 114 115 CreateLabel(label *Label) error 116 } 117 118 func MakeDB(dbURL string) (Model, error) {
··· 113 UpdateLabelerCursor(did string, cursor int64) error 114 115 CreateLabel(label *Label) error 116 + GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 117 } 118 119 func MakeDB(dbURL string) (Model, error) {