tangled
alpha
login
or
join now
stream.place
/
streamplace
75
fork
atom
Live video on the AT Protocol
75
fork
atom
overview
issues
1
pulls
pipelines
spxrpc: add service auth capability
Eli Mallon
2 weeks ago
0ae6327f
4cf9880d
+215
-4
6 changed files
expand all
collapse all
unified
split
pkg
cmd
streamplace.go
config
config.go
spxrpc
place_stream_playback.go
service_auth.go
spxrpc.go
statedb
service_auth.go
+6
pkg/cmd/streamplace.go
···
215
}
216
cli.AccessJWK = accessJWK
217
0
0
0
0
0
0
218
b := bus.NewBus()
219
atsync := &atproto.ATProtoSynchronizer{
220
CLI: cli,
···
215
}
216
cli.AccessJWK = accessJWK
217
218
+
serviceAuthKey, err := state.EnsureServiceAuthKey(ctx)
219
+
if err != nil {
220
+
return err
221
+
}
222
+
cli.ServiceAuthKey = serviceAuthKey
223
+
224
b := bus.NewBus()
225
atsync := &atproto.ATProtoSynchronizer{
226
CLI: cli,
+1
pkg/config/config.go
···
116
RateLimitWebsocket int
117
JWK jwk.Key
118
AccessJWK jwk.Key
0
119
dataDirFlags []*string
120
DiscordWebhooks []*discordtypes.Webhook
121
NewWebRTCPlayback bool
···
116
RateLimitWebsocket int
117
JWK jwk.Key
118
AccessJWK jwk.Key
119
+
ServiceAuthKey jwk.Key
120
dataDirFlags []*string
121
DiscordWebhooks []*discordtypes.Webhook
122
NewWebRTCPlayback bool
+55
-4
pkg/spxrpc/place_stream_playback.go
···
9
"github.com/labstack/echo/v4"
10
"github.com/pion/webrtc/v4"
11
"github.com/streamplace/oatproxy/pkg/oatproxy"
0
0
12
)
13
14
func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context, rendition string, streamer string, r io.Reader, _contentType string) (io.Reader, error) {
···
20
return nil, echo.NewHTTPError(http.StatusBadRequest, "rendition is required")
21
}
22
viewer := ""
23
-
session, _ := oatproxy.GetOAuthSession(ctx)
24
-
if session != nil {
25
-
viewer = session.DID
26
-
}
27
repo, err := s.ATSync.SyncBlueskyRepoCached(ctx, streamer)
28
if err != nil {
29
return nil, err
30
}
0
0
0
0
0
0
0
0
0
0
0
0
31
body, err := io.ReadAll(r)
32
if err != nil {
33
return nil, echo.NewHTTPError(http.StatusBadRequest, "error reading body", err)
34
}
35
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
36
answer, err := s.mm.WebRTCPlayback2(ctx, repo.DID, rendition, &offer, viewer)
37
if err != nil {
38
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error playing back", err)
···
9
"github.com/labstack/echo/v4"
10
"github.com/pion/webrtc/v4"
11
"github.com/streamplace/oatproxy/pkg/oatproxy"
12
+
"stream.place/streamplace/pkg/aqhttp"
13
+
"stream.place/streamplace/pkg/log"
14
)
15
16
func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context, rendition string, streamer string, r io.Reader, _contentType string) (io.Reader, error) {
···
22
return nil, echo.NewHTTPError(http.StatusBadRequest, "rendition is required")
23
}
24
viewer := ""
0
0
0
0
25
repo, err := s.ATSync.SyncBlueskyRepoCached(ctx, streamer)
26
if err != nil {
27
return nil, err
28
}
29
+
streamer = repo.DID
30
+
session, _ := oatproxy.GetOAuthSession(ctx)
31
+
if session != nil {
32
+
viewer = session.DID
33
+
} else {
34
+
svc := GetServiceAuth(ctx)
35
+
if svc != nil {
36
+
log.Warn(ctx, "service auth present", "service_did", svc.DID)
37
+
// this is a signed request from a peer node, allow them to see unpublished streams
38
+
viewer = streamer
39
+
}
40
+
}
41
body, err := io.ReadAll(r)
42
if err != nil {
43
return nil, echo.NewHTTPError(http.StatusBadRequest, "error reading body", err)
44
}
45
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
46
+
if streamer == viewer {
47
+
// this user gets sent right to the origin in case we're unpublished
48
+
origin, err := s.statefulDB.GetLatestBroadcastOriginForStreamer(streamer)
49
+
if err != nil {
50
+
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error getting broadcast origin", err)
51
+
}
52
+
log.Warn(ctx, "origin", "origin", origin)
53
+
myDID := s.cli.ServerDID()
54
+
if origin != nil && origin.ServerDID != myDID {
55
+
log.Warn(ctx, "proxying to origin", "origin", origin.ServerDID, "myDID", myDID)
56
+
token, err := CreateServiceToken(s.cli.ServiceAuthKey, s.cli.ServerDID())
57
+
if err != nil {
58
+
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error creating service token", err)
59
+
}
60
+
parsedOrigin := origin.ServerDID
61
+
if len(parsedOrigin) > len("did:web:") && parsedOrigin[:len("did:web:")] == "did:web:" {
62
+
parsedOrigin = parsedOrigin[len("did:web:"):]
63
+
}
64
+
url := "https://" + parsedOrigin + "/xrpc/place.stream.playback.whep?rendition=" + rendition + "&streamer=" + streamer
65
+
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader([]byte(offer.SDP)))
66
+
if err != nil {
67
+
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to construct origin request", err)
68
+
}
69
+
req.Header.Set("Content-Type", _contentType)
70
+
req.Header.Set("X-Streamplace-Service-Auth", token)
71
+
resp, err := aqhttp.Client.Do(req)
72
+
if err != nil {
73
+
return nil, echo.NewHTTPError(http.StatusBadGateway, "error proxying to origin", err)
74
+
}
75
+
defer resp.Body.Close()
76
+
if resp.StatusCode != http.StatusOK {
77
+
body, _ := io.ReadAll(resp.Body)
78
+
return nil, echo.NewHTTPError(resp.StatusCode, "origin error: "+string(body))
79
+
}
80
+
data, err := io.ReadAll(resp.Body)
81
+
if err != nil {
82
+
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error reading origin response", err)
83
+
}
84
+
return bytes.NewReader(data), nil
85
+
}
86
+
}
87
answer, err := s.mm.WebRTCPlayback2(ctx, repo.DID, rendition, &offer, viewer)
88
if err != nil {
89
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error playing back", err)
+100
pkg/spxrpc/service_auth.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
package spxrpc
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"time"
7
+
8
+
"github.com/labstack/echo/v4"
9
+
"github.com/lestrrat-go/jwx/v2/jwa"
10
+
"github.com/lestrrat-go/jwx/v2/jwk"
11
+
"github.com/lestrrat-go/jwx/v2/jwt"
12
+
"stream.place/streamplace/pkg/log"
13
+
)
14
+
15
+
const serviceAuthHeader = "X-Streamplace-Service-Auth"
16
+
const serviceTokenLifetime = 5 * time.Minute
17
+
18
+
// ServiceIdentity represents an authenticated peer node in the same station.
19
+
type ServiceIdentity struct {
20
+
DID string
21
+
}
22
+
23
+
type serviceAuthContextKeyType struct{}
24
+
25
+
var serviceAuthContextKey = serviceAuthContextKeyType{}
26
+
27
+
// GetServiceAuth returns the authenticated service identity from the context,
28
+
// or nil if the request did not come from an authenticated peer node.
29
+
func GetServiceAuth(ctx context.Context) *ServiceIdentity {
30
+
v := ctx.Value(serviceAuthContextKey)
31
+
if v == nil {
32
+
return nil
33
+
}
34
+
identity, ok := v.(*ServiceIdentity)
35
+
if !ok {
36
+
return nil
37
+
}
38
+
return identity
39
+
}
40
+
41
+
// ServiceAuthMiddleware checks for a service-to-service JWT in the
42
+
// X-Streamplace-Service-Auth header. If present and valid, it populates
43
+
// the context with the caller's ServiceIdentity. If absent or invalid,
44
+
// the request passes through unchanged for the normal OAuth flow.
45
+
func (s *Server) ServiceAuthMiddleware() echo.MiddlewareFunc {
46
+
return func(next echo.HandlerFunc) echo.HandlerFunc {
47
+
return func(c echo.Context) error {
48
+
tokenStr := c.Request().Header.Get(serviceAuthHeader)
49
+
if tokenStr == "" {
50
+
return next(c)
51
+
}
52
+
53
+
ctx := c.Request().Context()
54
+
key := s.cli.ServiceAuthKey
55
+
if key == nil {
56
+
log.Warn(ctx, "service auth token present but no service auth key configured")
57
+
return next(c)
58
+
}
59
+
60
+
token, err := jwt.Parse([]byte(tokenStr), jwt.WithKey(jwa.HS256, key), jwt.WithValidate(true))
61
+
if err != nil {
62
+
log.Warn(ctx, "invalid service auth token", "error", err)
63
+
return next(c)
64
+
}
65
+
66
+
issuer := token.Issuer()
67
+
if issuer == "" {
68
+
log.Warn(ctx, "service auth token missing issuer claim")
69
+
return next(c)
70
+
}
71
+
72
+
identity := &ServiceIdentity{DID: issuer}
73
+
ctx = context.WithValue(ctx, serviceAuthContextKey, identity)
74
+
c.SetRequest(c.Request().WithContext(ctx))
75
+
log.Warn(ctx, "authenticated service request", "service_did", issuer)
76
+
return next(c)
77
+
}
78
+
}
79
+
}
80
+
81
+
// CreateServiceToken generates a signed JWT for authenticating this node
82
+
// to another node in the same station.
83
+
func CreateServiceToken(key jwk.Key, serverDID string) (string, error) {
84
+
now := time.Now()
85
+
token, err := jwt.NewBuilder().
86
+
Issuer(serverDID).
87
+
IssuedAt(now).
88
+
Expiration(now.Add(serviceTokenLifetime)).
89
+
Build()
90
+
if err != nil {
91
+
return "", fmt.Errorf("failed to build service token: %w", err)
92
+
}
93
+
94
+
signed, err := jwt.Sign(token, jwt.WithKey(jwa.HS256, key))
95
+
if err != nil {
96
+
return "", fmt.Errorf("failed to sign service token: %w", err)
97
+
}
98
+
99
+
return string(signed), nil
100
+
}
+1
pkg/spxrpc/spxrpc.go
···
57
e.Use(s.ErrorHandlingMiddleware())
58
e.Use(s.ContextPreservingMiddleware())
59
e.Use(echomiddleware.Handler("", mdlw))
0
60
e.Use(op.OAuthMiddleware)
61
err := s.RegisterHandlersPlaceStream(e)
62
if err != nil {
···
57
e.Use(s.ErrorHandlingMiddleware())
58
e.Use(s.ContextPreservingMiddleware())
59
e.Use(echomiddleware.Handler("", mdlw))
60
+
e.Use(s.ServiceAuthMiddleware())
61
e.Use(op.OAuthMiddleware)
62
err := s.RegisterHandlersPlaceStream(e)
63
if err != nil {
+52
pkg/statedb/service_auth.go
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
package statedb
2
+
3
+
import (
4
+
"context"
5
+
"crypto/rand"
6
+
"encoding/json"
7
+
"fmt"
8
+
9
+
"github.com/lestrrat-go/jwx/v2/jwk"
10
+
"stream.place/streamplace/pkg/log"
11
+
)
12
+
13
+
// EnsureServiceAuthKey ensures a shared symmetric key exists in the config table
14
+
// for intra-service JWT authentication. All nodes sharing the same database will
15
+
// use the same key, enabling mutual authentication within a station.
16
+
func (state *StatefulDB) EnsureServiceAuthKey(ctx context.Context) (jwk.Key, error) {
17
+
conf, err := state.GetConfig("service-auth-key")
18
+
if err != nil {
19
+
return nil, fmt.Errorf("failed to get service auth key: %w", err)
20
+
}
21
+
22
+
if conf != nil {
23
+
key, err := jwk.ParseKey(conf.Value)
24
+
if err != nil {
25
+
return nil, fmt.Errorf("failed to parse service auth key: %w", err)
26
+
}
27
+
return key, nil
28
+
}
29
+
30
+
log.Warn(ctx, "no service auth key found, generating new one")
31
+
32
+
secret := make([]byte, 32)
33
+
if _, err := rand.Read(secret); err != nil {
34
+
return nil, fmt.Errorf("failed to generate random bytes: %w", err)
35
+
}
36
+
37
+
key, err := jwk.FromRaw(secret)
38
+
if err != nil {
39
+
return nil, fmt.Errorf("failed to create symmetric key: %w", err)
40
+
}
41
+
42
+
b, err := json.Marshal(key)
43
+
if err != nil {
44
+
return nil, fmt.Errorf("failed to marshal service auth key: %w", err)
45
+
}
46
+
47
+
if err := state.PutConfig("service-auth-key", b); err != nil {
48
+
return nil, fmt.Errorf("failed to save service auth key: %w", err)
49
+
}
50
+
51
+
return key, nil
52
+
}