Live video on the AT Protocol

moderation: implement labeler subscriptions

+153
+146
pkg/atproto/labeler_firehose.go
··· 1 + package atproto 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "net/url" 8 + "time" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/label" 12 + "github.com/bluesky-social/indigo/events" 13 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 + "github.com/gorilla/websocket" 15 + "golang.org/x/sync/errgroup" 16 + "stream.place/streamplace/pkg/aqhttp" 17 + "stream.place/streamplace/pkg/log" 18 + ) 19 + 20 + func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error { 21 + retryCount := 0 22 + retryWindow := time.Now() 23 + 24 + for { 25 + if ctx.Err() != nil { 26 + return nil 27 + } 28 + err := atsync.StartLabelerFirehoseRetry(ctx, did) 29 + if err != nil { 30 + log.Error(ctx, "firehose error", "err", err) 31 + 32 + // Check if we're within the 1-minute window 33 + now := time.Now() 34 + if now.Sub(retryWindow) > time.Minute { 35 + // Reset the counter if more than a minute has passed 36 + retryCount = 1 37 + retryWindow = now 38 + } else { 39 + // Increment retry count if within the window 40 + retryCount++ 41 + if retryCount >= 3 { 42 + log.Error(ctx, "firehose failed 3 times within a minute, crashing", "err", err) 43 + return fmt.Errorf("firehose failed 3 times within a minute: %w", err) 44 + } 45 + } 46 + } 47 + } 48 + } 49 + 50 + func (atsync *ATProtoSynchronizer) StartLabelerFirehoseRetry(ctx context.Context, did string) error { 51 + ctx = log.WithLogValues(ctx, "func", "StartLabelerFirehose") 52 + 53 + ident, err := ResolveIdent(ctx, did) 54 + if err != nil { 55 + return fmt.Errorf("failed to resolve DID %s: %w", did, err) 56 + } 57 + 58 + ctx = log.WithLogValues(ctx, "labelerDID", ident.DID.String(), "labelerHandle", ident.Handle.String()) 59 + 60 + pub, err := ident.GetPublicKey("atproto_label") 61 + if err != nil { 62 + return fmt.Errorf("failed to get public key for labeler %s: %w", did, err) 63 + } 64 + 65 + labeler, ok := ident.Services["atproto_labeler"] 66 + if !ok { 67 + return fmt.Errorf("labeler %s does not have a atproto_labeler service", did) 68 + } 69 + 70 + ctx = log.WithLogValues(ctx, "func", "StartFirehose") 71 + ctx, cancel := context.WithCancel(ctx) 72 + defer cancel() 73 + dialer := websocket.DefaultDialer 74 + u, err := url.Parse(labeler.URL) 75 + if err != nil { 76 + return fmt.Errorf("invalid labeler URI: %w", err) 77 + } 78 + u.Path = "xrpc/com.atproto.label.subscribeLabels" 79 + if u.Scheme == "http" { 80 + u.Scheme = "ws" 81 + } else if u.Scheme == "https" { 82 + u.Scheme = "wss" 83 + } else { 84 + return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL) 85 + } 86 + query := u.Query() 87 + query.Set("cursor", "0") 88 + u.RawQuery = query.Encode() 89 + 90 + con, _, err := dialer.Dial(u.String(), http.Header{ 91 + "User-Agent": []string{aqhttp.UserAgent}, 92 + }) 93 + if err != nil { 94 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 95 + } 96 + 97 + rsc := &events.RepoStreamCallbacks{ 98 + LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 99 + log.Log(ctx, "labeler labels", "labels", evt.Labels, "seq", evt.Seq) 100 + for _, labelLex := range evt.Labels { 101 + l := label.FromLexicon(labelLex) 102 + err = l.VerifySignature(pub) 103 + if err != nil { 104 + log.Error(ctx, "failed to verify label signature", "err", err) 105 + continue 106 + } 107 + err = l.VerifySyntax() 108 + if err != nil { 109 + log.Error(ctx, "failed to verify label syntax", "err", err) 110 + continue 111 + } 112 + log.Log(ctx, "labeler label", "cid", l.CID, "createdAt", l.CreatedAt, "expiresAt", l.ExpiresAt, "negated", l.Negated, "sourceDID", l.SourceDID, "uri", l.URI, "val", l.Val, "version", l.Version) 113 + } 114 + return nil 115 + }, 116 + LabelInfo: func(evt *comatproto.LabelSubscribeLabels_Info) error { 117 + log.Log(ctx, "labeler info", "name", evt.Name, "message", evt.Message) 118 + return nil 119 + }, 120 + Error: func(evt *events.ErrorFrame) error { 121 + log.Error(ctx, "firehose error", "err", evt.Error, "message", evt.Message) 122 + cancel() 123 + return fmt.Errorf("firehose error: %s", evt.Error) 124 + }, 125 + } 126 + 127 + scheduler := parallel.NewScheduler( 128 + 10, 129 + 100, 130 + did, 131 + rsc.EventHandler, 132 + ) 133 + 134 + log.Log(ctx, "starting labeler firehose consumer", "labelerDID", did) 135 + 136 + g, ctx := errgroup.WithContext(ctx) 137 + 138 + g.Go(func() error { 139 + return events.HandleRepoStream(ctx, con, scheduler, nil) 140 + }) 141 + 142 + <-ctx.Done() 143 + 144 + return nil 145 + 146 + }
+5
pkg/cmd/streamplace.go
··· 365 365 group.Go(func() error { 366 366 return atsync.StartFirehose(ctx) 367 367 }) 368 + for _, labeler := range cli.Labelers { 369 + group.Go(func() error { 370 + return atsync.StartLabelerFirehose(ctx, labeler) 371 + }) 372 + } 368 373 } 369 374 370 375 group.Go(func() error {
+2
pkg/config/config.go
··· 107 107 NewWebRTCPlayback bool 108 108 AppleTeamID string 109 109 AndroidCertFingerprint string 110 + Labelers []string 110 111 } 111 112 112 113 func (cli *CLI) NewFlagSet(name string) *flag.FlagSet { ··· 164 165 fs.BoolVar(&cli.NewWebRTCPlayback, "new-webrtc-playback", true, "enable new webrtc playback") 165 166 fs.StringVar(&cli.AppleTeamID, "apple-team-id", "", "apple team id for deep linking") 166 167 fs.StringVar(&cli.AndroidCertFingerprint, "android-cert-fingerprint", "", "android cert fingerprint for deep linking") 168 + cli.StringSliceFlag(fs, &cli.Labelers, "labelers", "", "did of labelers that this instance should subscribe to") 167 169 168 170 if runtime.GOOS == "linux" { 169 171 fs.BoolVar(&cli.NoMist, "no-mist", true, "Disable MistServer")