tangled
alpha
login
or
join now
stream.place
/
streamplace
74
fork
atom
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
playback: initial demo of authenticated playback
Eli Mallon
2 weeks ago
7b0ad91e
eeace624
+385
-52
20 changed files
expand all
collapse all
unified
split
Makefile
go.mod
go.sum
js
components
src
components
mobile-player
use-webrtc.tsx
docs
src
content
docs
lex-reference
openapi.json
playback
place-stream-playback-whep.md
lexicons
place
stream
playback
whep.json
pkg
api
api.go
playback.go
atproto
atproto.go
migrate.go
sync.go
bus
segchanman.go
director
stream_session.go
media
media.go
validate.go
spxrpc
place_stream_playback.go
spxrpc.go
stubs.go
streamplace
playbackwhep.go
+2
-2
Makefile
···
410
410
411
411
.PHONY: lexgen-types
412
412
lexgen-types:
413
413
-
go run github.com/bluesky-social/indigo/cmd/lexgen \
413
413
+
go tool github.com/bluesky-social/indigo/cmd/lexgen \
414
414
-outdir ./pkg/spxrpc \
415
415
--build-file util/lexgen-types.json \
416
416
--external-lexicons subprojects/atproto/lexicons \
···
420
420
.PHONY: lexgen-server
421
421
lexgen-server:
422
422
mkdir -p ./pkg/spxrpc \
423
423
-
&& go run github.com/bluesky-social/indigo/cmd/lexgen \
423
423
+
&& go tool github.com/bluesky-social/indigo/cmd/lexgen \
424
424
--gen-server \
425
425
--types-import place.stream:stream.place/streamplace/pkg/streamplace \
426
426
--types-import app.bsky:github.com/bluesky-social/indigo/api/bsky \
+1
-1
go.mod
···
8
8
9
9
replace github.com/AxisCommunications/go-dpop => github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4
10
10
11
11
-
//replace github.com/livepeer/go-livepeer => ../go-livepeer
11
11
+
replace github.com/bluesky-social/indigo => github.com/streamplace/indigo v0.0.0-20260218231908-939cdaf0c507
12
12
13
13
tool github.com/bluesky-social/indigo/cmd/lexgen
14
14
+2
-2
go.sum
···
219
219
github.com/bluenviron/gortsplib/v5 v5.2.1/go.mod h1:sK4+00XQaSpU2iPIKjmhj6Yye+sVbNWEU2IJWYEZI9U=
220
220
github.com/bluenviron/mediacommon/v2 v2.5.2 h1:eq7LHJFksDAVtVdTrwOUl7dO7LE8eKwLgYKYi5MmYaY=
221
221
github.com/bluenviron/mediacommon/v2 v2.5.2/go.mod h1:5V15TiOfeaNVmZPVuOqAwqQSWyvMV86/dijDKu5q9Zs=
222
222
-
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 h1:kNeRrgGJH2g5OvjLqtaQ744YXqduliZYpFkJ/ld47c0=
223
223
-
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
224
222
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
225
223
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
226
224
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
···
1315
1313
github.com/streamplace/atproto-oauth-golang v0.0.0-20250619231223-a9c04fb888ac/go.mod h1:9LlKkqciiO5lRfbX0n4Wn5KNY9nvFb4R3by8FdW2TWc=
1316
1314
github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 h1:L1fS4HJSaAyNnkwfuZubgfeZy8rkWmA0cMtH5Z0HqNc=
1317
1315
github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4/go.mod h1:bGUXY9Wd4mnd+XUrOYZr358J2f6z9QO/dLhL1SsiD+0=
1316
1316
+
github.com/streamplace/indigo v0.0.0-20260218231908-939cdaf0c507 h1:e8M3qPLr37NxEjlr18TaAwGP+OVyherVjgUG5VVmgWI=
1317
1317
+
github.com/streamplace/indigo v0.0.0-20260218231908-939cdaf0c507/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
1318
1318
github.com/streamplace/oatproxy v0.0.0-20260130124113-420429019d3b h1:BB/R1egvkEqZhGeKL3tqAlTn0mkoOaaMY6r6s18XJYA=
1319
1319
github.com/streamplace/oatproxy v0.0.0-20260130124113-420429019d3b/go.mod h1:pXi24hA7xBHj8eEywX6wGqJOR9FaEYlGwQ/72rN6okw=
1320
1320
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+42
-21
js/components/src/components/mobile-player/use-webrtc.tsx
···
1
1
import { useEffect, useRef, useState } from "react";
2
2
import * as sdpTransform from "sdp-transform";
3
3
-
import { PlayerStatus, usePlayerStore, useStreamKey } from "../..";
3
3
+
import { StreamplaceAgent } from "streamplace";
4
4
+
import { PlayerStatus, usePDSAgent, usePlayerStore, useStreamKey } from "../..";
4
5
import { RTCPeerConnection, RTCSessionDescription } from "./webrtc-primitives";
5
6
6
7
export default function useWebRTC(
···
9
10
const [mediaStream, setMediaStream] = useState<MediaStream | null>(null);
10
11
const [stuck, setStuck] = useState<boolean>(false);
11
12
const setStatus = usePlayerStore((x) => x.setStatus);
13
13
+
let agent = usePDSAgent();
12
14
13
15
const lastChange = useRef<number>(0);
14
16
15
17
useEffect(() => {
18
18
+
if (!agent) {
19
19
+
return;
20
20
+
}
16
21
const peerConnection = new RTCPeerConnection({
17
22
bundlePolicy: "max-bundle",
18
23
});
···
44
49
}
45
50
});
46
51
peerConnection.addEventListener("negotiationneeded", () => {
47
47
-
negotiateConnectionWithClientOffer(peerConnection, endpoint);
52
52
+
negotiateConnectionWithClientOffer(
53
53
+
peerConnection,
54
54
+
endpoint,
55
55
+
undefined,
56
56
+
agent,
57
57
+
);
48
58
});
49
59
50
60
let lastFramesReceived = 0;
···
82
92
clearInterval(handle);
83
93
peerConnection.close();
84
94
};
85
85
-
}, [endpoint]);
95
95
+
}, [endpoint, agent]);
86
96
return [mediaStream, stuck];
87
97
}
88
98
···
102
112
peerConnection: RTCPeerConnection,
103
113
endpoint: string,
104
114
bearerToken?: string,
115
115
+
agent?: StreamplaceAgent,
105
116
) {
106
117
/** https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/createOffer */
107
118
const offer = await peerConnection.createOffer({
···
134
145
* This specifies how the client should communicate,
135
146
* and what kind of media client and server have negotiated to exchange.
136
147
*/
137
137
-
let response = await postSDPOffer(`${endpoint}`, ofr.sdp, bearerToken);
138
138
-
if (response.status === 201) {
139
139
-
let answerSDP = await response.text();
148
148
+
let response = await postSDPOffer(
149
149
+
`${endpoint}`,
150
150
+
ofr.sdp,
151
151
+
bearerToken,
152
152
+
agent,
153
153
+
);
154
154
+
let text = new TextDecoder().decode(response.data);
155
155
+
if (response.success) {
140
156
if ((peerConnection.connectionState as string) === "closed") {
141
157
return;
142
158
}
143
159
await peerConnection.setRemoteDescription(
144
144
-
new RTCSessionDescription({ type: "answer", sdp: answerSDP }),
160
160
+
new RTCSessionDescription({ type: "answer", sdp: text }),
145
161
);
146
146
-
return response.headers.get("Location");
147
147
-
} else if (response.status === 405) {
148
148
-
console.log(
149
149
-
"Remember to update the URL passed into the WHIP or WHEP client",
150
150
-
);
162
162
+
return "https://stream.place/example";
151
163
} else {
152
152
-
const errorMessage = await response.text();
153
153
-
console.error(errorMessage);
164
164
+
console.error(text);
154
165
}
155
166
} catch (e) {
156
167
console.error(`posting sdp offer failed: ${e}`);
···
165
176
endpoint: string,
166
177
data: string,
167
178
bearerToken?: string,
179
179
+
agent?: StreamplaceAgent,
168
180
) {
169
169
-
return await fetch(endpoint, {
170
170
-
method: "POST",
171
171
-
mode: "cors",
172
172
-
headers: {
173
173
-
"content-type": "application/sdp",
174
174
-
...(bearerToken ? { Authorization: `Bearer ${bearerToken}` } : {}),
181
181
+
if (!agent) {
182
182
+
throw new Error("No agent found");
183
183
+
}
184
184
+
return await agent.place.stream.playback.whep(data, {
185
185
+
qp: {
186
186
+
rendition: "source",
187
187
+
streamer: agent.did!,
175
188
},
176
176
-
body: data,
177
189
});
190
190
+
// return await fetch(endpoint, {
191
191
+
// method: "POST",
192
192
+
// mode: "cors",
193
193
+
// headers: {
194
194
+
// "content-type": "application/sdp",
195
195
+
// ...(bearerToken ? { Authorization: `Bearer ${bearerToken}` } : {}),
196
196
+
// },
197
197
+
// body: data,
198
198
+
// });
178
199
}
179
200
180
201
/**
+71
js/docs/src/content/docs/lex-reference/openapi.json
···
517
517
}
518
518
}
519
519
},
520
520
+
"/xrpc/place.stream.playback.whep": {
521
521
+
"post": {
522
522
+
"summary": "Play a stream over WebRTC using WHEP.",
523
523
+
"operationId": "place.stream.playback.whep",
524
524
+
"tags": ["place.stream.playback"],
525
525
+
"responses": {
526
526
+
"200": {
527
527
+
"description": "Success",
528
528
+
"content": {
529
529
+
"*/*": {
530
530
+
"schema": {}
531
531
+
}
532
532
+
}
533
533
+
},
534
534
+
"400": {
535
535
+
"description": "Bad Request",
536
536
+
"content": {
537
537
+
"application/json": {
538
538
+
"schema": {
539
539
+
"type": "object",
540
540
+
"required": ["error", "message"],
541
541
+
"properties": {
542
542
+
"error": {
543
543
+
"type": "string",
544
544
+
"oneOf": [
545
545
+
{
546
546
+
"const": "Unauthorized"
547
547
+
}
548
548
+
]
549
549
+
},
550
550
+
"message": {
551
551
+
"type": "string"
552
552
+
}
553
553
+
}
554
554
+
}
555
555
+
}
556
556
+
}
557
557
+
}
558
558
+
},
559
559
+
"parameters": [
560
560
+
{
561
561
+
"name": "streamer",
562
562
+
"in": "query",
563
563
+
"required": true,
564
564
+
"description": "The DID of the streamer to play.",
565
565
+
"schema": {
566
566
+
"type": "string",
567
567
+
"description": "The DID of the streamer to play."
568
568
+
}
569
569
+
},
570
570
+
{
571
571
+
"name": "rendition",
572
572
+
"in": "query",
573
573
+
"required": true,
574
574
+
"description": "The rendition of the stream to play.",
575
575
+
"schema": {
576
576
+
"type": "string",
577
577
+
"description": "The rendition of the stream to play."
578
578
+
}
579
579
+
}
580
580
+
],
581
581
+
"requestBody": {
582
582
+
"required": true,
583
583
+
"content": {
584
584
+
"*/*": {
585
585
+
"schema": {}
586
586
+
}
587
587
+
}
588
588
+
}
589
589
+
}
590
590
+
},
520
591
"/xrpc/place.stream.multistream.createTarget": {
521
592
"post": {
522
593
"summary": "Create a new target for rebroadcasting a Streamplace stream.",
+82
js/docs/src/content/docs/lex-reference/playback/place-stream-playback-whep.md
···
1
1
+
---
2
2
+
title: place.stream.playback.whep
3
3
+
description: Reference for the place.stream.playback.whep lexicon
4
4
+
---
5
5
+
6
6
+
**Lexicon Version:** 1
7
7
+
8
8
+
## Definitions
9
9
+
10
10
+
<a name="main"></a>
11
11
+
12
12
+
### `main`
13
13
+
14
14
+
**Type:** `procedure`
15
15
+
16
16
+
Play a stream over WebRTC using WHEP.
17
17
+
18
18
+
**Parameters:**
19
19
+
20
20
+
| Name | Type | Req'd | Description | Constraints |
21
21
+
| ----------- | -------- | ----- | ------------------------------------ | ----------- |
22
22
+
| `streamer` | `string` | ✅ | The DID of the streamer to play. | |
23
23
+
| `rendition` | `string` | ✅ | The rendition of the stream to play. | |
24
24
+
25
25
+
**Input:**
26
26
+
27
27
+
- **Encoding:** `*/*`
28
28
+
- **Schema:**
29
29
+
30
30
+
_Schema not defined._
31
31
+
**Output:**
32
32
+
33
33
+
- **Encoding:** `*/*`
34
34
+
- **Schema:**
35
35
+
36
36
+
_Schema not defined._
37
37
+
**Possible Errors:**
38
38
+
39
39
+
- `Unauthorized`: This user may not play this stream.
40
40
+
41
41
+
---
42
42
+
43
43
+
## Lexicon Source
44
44
+
45
45
+
```json
46
46
+
{
47
47
+
"lexicon": 1,
48
48
+
"id": "place.stream.playback.whep",
49
49
+
"defs": {
50
50
+
"main": {
51
51
+
"type": "procedure",
52
52
+
"description": "Play a stream over WebRTC using WHEP.",
53
53
+
"parameters": {
54
54
+
"type": "params",
55
55
+
"required": ["streamer", "rendition"],
56
56
+
"properties": {
57
57
+
"streamer": {
58
58
+
"type": "string",
59
59
+
"description": "The DID of the streamer to play."
60
60
+
},
61
61
+
"rendition": {
62
62
+
"type": "string",
63
63
+
"description": "The rendition of the stream to play."
64
64
+
}
65
65
+
}
66
66
+
},
67
67
+
"input": {
68
68
+
"encoding": "*/*"
69
69
+
},
70
70
+
"output": {
71
71
+
"encoding": "*/*"
72
72
+
},
73
73
+
"errors": [
74
74
+
{
75
75
+
"name": "Unauthorized",
76
76
+
"description": "This user may not play this stream."
77
77
+
}
78
78
+
]
79
79
+
}
80
80
+
}
81
81
+
}
82
82
+
```
+36
lexicons/place/stream/playback/whep.json
···
1
1
+
{
2
2
+
"lexicon": 1,
3
3
+
"id": "place.stream.playback.whep",
4
4
+
"defs": {
5
5
+
"main": {
6
6
+
"type": "procedure",
7
7
+
"description": "Play a stream over WebRTC using WHEP.",
8
8
+
"parameters": {
9
9
+
"type": "params",
10
10
+
"required": ["streamer", "rendition"],
11
11
+
"properties": {
12
12
+
"streamer": {
13
13
+
"type": "string",
14
14
+
"description": "The DID of the streamer to play."
15
15
+
},
16
16
+
"rendition": {
17
17
+
"type": "string",
18
18
+
"description": "The rendition of the stream to play."
19
19
+
}
20
20
+
}
21
21
+
},
22
22
+
"input": {
23
23
+
"encoding": "*/*"
24
24
+
},
25
25
+
"output": {
26
26
+
"encoding": "*/*"
27
27
+
},
28
28
+
"errors": [
29
29
+
{
30
30
+
"name": "Unauthorized",
31
31
+
"description": "This user may not play this stream."
32
32
+
}
33
33
+
]
34
34
+
}
35
35
+
}
36
36
+
}
+1
-1
pkg/api/api.go
···
155
155
Recorder: metrics.NewRecorder(metrics.Config{}),
156
156
})
157
157
var xrpc http.Handler
158
158
-
xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB)
158
158
+
xrpc, err := spxrpc.NewServer(ctx, a.CLI, a.Model, a.StatefulDB, a.op, mdlw, a.ATSync, a.Bus, a.LocalDB, a.MediaManager)
159
159
if err != nil {
160
160
return nil, err
161
161
}
+1
-1
pkg/api/playback.go
···
28
28
return user, nil
29
29
}
30
30
// only other allowed case is a bluesky handle
31
31
-
repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user, a.Model)
31
31
+
repo, err := a.ATSync.SyncBlueskyRepoCached(ctx, user)
32
32
if err != nil {
33
33
return "", err
34
34
}
+3
-3
pkg/atproto/atproto.go
···
22
22
23
23
var SyncGetRepo = comatproto.SyncGetRepo
24
24
25
25
-
func (atsync *ATProtoSynchronizer) SyncBlueskyRepoCached(ctx context.Context, handle string, mod model.Model) (*model.Repo, error) {
25
25
+
func (atsync *ATProtoSynchronizer) SyncBlueskyRepoCached(ctx context.Context, handle string) (*model.Repo, error) {
26
26
ctx, span := otel.Tracer("signer").Start(ctx, "SyncBlueskyRepoCached")
27
27
defer span.End()
28
28
-
repo, err := mod.GetRepoByHandleOrDID(handle)
28
28
+
repo, err := atsync.Model.GetRepoByHandleOrDID(handle)
29
29
if err != nil {
30
30
return nil, fmt.Errorf("failed to get repo for %s: %w", handle, err)
31
31
}
···
33
33
return repo, nil
34
34
}
35
35
36
36
-
return atsync.SyncBlueskyRepo(ctx, handle, mod)
36
36
+
return atsync.SyncBlueskyRepo(ctx, handle, atsync.Model)
37
37
}
38
38
39
39
type mstNode struct {
+1
-1
pkg/atproto/migrate.go
···
60
60
currentDID := did
61
61
g.Go(func() error {
62
62
log.Debug(ctx, "syncing repo", "did", currentDID, "progress", currentIndex+1, "total", len(allDIDs))
63
63
-
_, err := atsync.SyncBlueskyRepoCached(ctx, currentDID, atsync.Model)
63
63
+
_, err := atsync.SyncBlueskyRepoCached(ctx, currentDID)
64
64
if err != nil {
65
65
log.Error(ctx, "failed to sync repo", "did", currentDID, "err", err)
66
66
syncErrorMu.Lock()
+11
-11
pkg/atproto/sync.go
···
97
97
}
98
98
99
99
case *streamplace.ChatMessage:
100
100
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
100
100
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
101
101
if err != nil {
102
102
return fmt.Errorf("failed to sync bluesky repo: %w", err)
103
103
}
104
104
105
105
go func() {
106
106
-
_, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
106
106
+
_, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer)
107
107
if err != nil {
108
108
log.Error(ctx, "failed to sync bluesky repo", "err", err)
109
109
}
···
178
178
}
179
179
180
180
case *streamplace.ChatGate:
181
181
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
181
181
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
182
182
if err != nil {
183
183
return fmt.Errorf("failed to sync bluesky repo: %w", err)
184
184
}
···
210
210
go atsync.Bus.Publish(userDID, streamplaceGate)
211
211
212
212
case *streamplace.ChatProfile:
213
213
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
213
213
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
214
214
if err != nil {
215
215
return fmt.Errorf("failed to sync bluesky repo: %w", err)
216
216
}
···
225
225
}
226
226
227
227
case *streamplace.ServerSettings:
228
228
-
_, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
228
228
+
_, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
229
229
if err != nil {
230
230
return fmt.Errorf("failed to sync bluesky repo: %w", err)
231
231
}
···
253
253
}
254
254
255
255
if livestream, ok := d["place.stream.livestream"]; ok {
256
256
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
256
256
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
257
257
if err != nil {
258
258
return fmt.Errorf("failed to sync bluesky repo: %w", err)
259
259
}
···
292
292
// log.Warn(ctx, "chat message detected", "uri", livestream.URI)
293
293
// if this post is a reply to someone's livestream post
294
294
// log.Warn(ctx, "chat message detected", "message", rec.Text)
295
295
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
295
295
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
296
296
if err != nil {
297
297
return fmt.Errorf("failed to sync bluesky repo: %w", err)
298
298
}
···
483
483
}
484
484
485
485
case *streamplace.BroadcastOrigin:
486
486
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
486
486
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
487
487
if err != nil {
488
488
return fmt.Errorf("failed to sync broadcast origin creator bluesky repo: %w", err)
489
489
}
490
490
-
_, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer, atsync.Model)
490
490
+
_, err = atsync.SyncBlueskyRepoCached(ctx, rec.Streamer)
491
491
if err != nil {
492
492
return fmt.Errorf("failed to sync broadcast origin streamer bluesky repo: %w", err)
493
493
}
···
508
508
go atsync.Bus.Publish("", view)
509
509
510
510
case *streamplace.MetadataConfiguration:
511
511
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
511
511
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
512
512
if err != nil {
513
513
return fmt.Errorf("failed to sync bluesky repo: %w", err)
514
514
}
···
524
524
}
525
525
526
526
case *streamplace.ModerationPermission:
527
527
-
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID, atsync.Model)
527
527
+
repo, err := atsync.SyncBlueskyRepoCached(ctx, userDID)
528
528
if err != nil {
529
529
return fmt.Errorf("failed to sync bluesky repo: %w", err)
530
530
}
+1
pkg/bus/segchanman.go
···
16
16
Filepath string
17
17
Data []byte
18
18
PacketizedData *PacketizedSegment
19
19
+
Published bool
19
20
}
20
21
21
22
type PacketizedSegment struct {
+13
-5
pkg/director/stream_session.go
···
208
208
ss.bus.Publish(spseg.Creator, spseg)
209
209
ss.Go(ctx, func() error {
210
210
return ss.AddPlaybackSegment(ctx, spseg, "source", &bus.Seg{
211
211
-
Filepath: notif.Segment.ID,
212
212
-
Data: notif.Data,
211
211
+
Filepath: notif.Segment.ID,
212
212
+
Data: notif.Data,
213
213
+
Published: notif.Metadata.Published,
213
214
})
214
215
})
216
216
+
217
217
+
// everything else is for published segments
218
218
+
if !notif.Metadata.Published {
219
219
+
return nil
220
220
+
}
215
221
216
222
if ss.cli.Thumbnail {
217
223
ss.Go(ctx, func() error {
···
721
727
}
722
728
723
729
func (ss *StreamSession) AddPlaybackSegment(ctx context.Context, spseg *streamplace.Segment, rendition string, seg *bus.Seg) error {
724
724
-
ss.Go(ctx, func() error {
725
725
-
return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
726
726
-
})
730
730
+
if seg.Published {
731
731
+
ss.Go(ctx, func() error {
732
732
+
return ss.AddToHLS(ctx, spseg, rendition, seg.Data)
733
733
+
})
734
734
+
}
727
735
ss.Go(ctx, func() error {
728
736
return ss.AddToWebRTC(ctx, spseg, rendition, seg)
729
737
})
+31
-2
pkg/media/media.go
···
197
197
DistributionPolicy *localdb.DistributionPolicy
198
198
MetadataConfiguration *streamplace.MetadataConfiguration
199
199
Livestream *streamplace.Livestream
200
200
+
Published bool
200
201
}
201
202
202
203
var ErrMissingMetadata = errors.New("missing segment metadata")
203
204
var ErrInvalidMetadata = errors.New("invalid segment metadata")
205
205
+
var C2PAActionsV2Label = "c2pa.actions.v2"
206
206
+
var C2PAPublishedAction = "c2pa.published"
204
207
205
208
func ParseSegmentAssertions(ctx context.Context, mani *c2patypes.Manifest) (*SegmentMetadata, error) {
206
209
_, span := otel.Tracer("signer").Start(ctx, "ParseSegmentAssertions")
207
210
defer span.End()
208
211
var ass *c2patypes.ManifestAssertion
212
212
+
isPublished := false
209
213
for _, a := range mani.Assertions {
210
214
if a.Label == StreamplaceMetadata {
211
215
ass = &a
212
212
-
break
216
216
+
continue
213
217
}
214
218
if a.Label == "place.stream.metadata" {
215
219
// backwards compatibility for old manifests
216
220
ass = &a
217
217
-
break
221
221
+
continue
222
222
+
}
223
223
+
if a.Label == C2PAActionsV2Label {
224
224
+
data, ok := a.Data.(map[string]any)
225
225
+
if !ok {
226
226
+
return nil, ErrInvalidMetadata
227
227
+
}
228
228
+
actions, ok := data["actions"].([]any)
229
229
+
if !ok {
230
230
+
return nil, ErrInvalidMetadata
231
231
+
}
232
232
+
for _, action := range actions {
233
233
+
actionMap, ok := action.(map[string]any)
234
234
+
if !ok {
235
235
+
return nil, ErrInvalidMetadata
236
236
+
}
237
237
+
actionType, ok := actionMap["action"].(string)
238
238
+
if !ok {
239
239
+
return nil, ErrInvalidMetadata
240
240
+
}
241
241
+
if actionType == C2PAPublishedAction {
242
242
+
isPublished = true
243
243
+
break
244
244
+
}
245
245
+
}
218
246
}
219
247
}
220
248
if ass == nil {
···
268
296
DistributionPolicy: distributionPolicy,
269
297
MetadataConfiguration: metadataConfiguration,
270
298
Livestream: livestream,
299
299
+
Published: isPublished,
271
300
}
272
301
return &out, nil
273
302
}
+1
-1
pkg/media/validate.go
···
75
75
signingKeyDID = meta.Creator
76
76
repoDID = meta.Creator
77
77
} else {
78
78
-
repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator, mm.model)
78
78
+
repo, err := mm.atsync.SyncBlueskyRepoCached(ctx, meta.Creator)
79
79
if err != nil {
80
80
return err
81
81
}
+34
pkg/spxrpc/place_stream_playback.go
···
1
1
+
package spxrpc
2
2
+
3
3
+
import (
4
4
+
"bytes"
5
5
+
"context"
6
6
+
"io"
7
7
+
"net/http"
8
8
+
9
9
+
"github.com/labstack/echo/v4"
10
10
+
"github.com/pion/webrtc/v4"
11
11
+
)
12
12
+
13
13
+
func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context, rendition string, streamer string, r io.Reader, contentType string) (io.Reader, error) {
14
14
+
if streamer == "" {
15
15
+
return nil, echo.NewHTTPError(http.StatusBadRequest, "streamer is required")
16
16
+
}
17
17
+
if rendition == "" {
18
18
+
return nil, echo.NewHTTPError(http.StatusBadRequest, "rendition is required")
19
19
+
}
20
20
+
repo, err := s.ATSync.SyncBlueskyRepoCached(ctx, streamer)
21
21
+
if err != nil {
22
22
+
return nil, err
23
23
+
}
24
24
+
body, err := io.ReadAll(r)
25
25
+
if err != nil {
26
26
+
return nil, echo.NewHTTPError(http.StatusBadRequest, "error reading body", err)
27
27
+
}
28
28
+
offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(body)}
29
29
+
answer, err := s.mm.WebRTCPlayback2(ctx, repo.DID, rendition, &offer)
30
30
+
if err != nil {
31
31
+
return nil, echo.NewHTTPError(http.StatusInternalServerError, "error playing back", err)
32
32
+
}
33
33
+
return bytes.NewReader([]byte(answer.SDP)), nil
34
34
+
}
+4
-1
pkg/spxrpc/spxrpc.go
···
20
20
"stream.place/streamplace/pkg/config"
21
21
"stream.place/streamplace/pkg/localdb"
22
22
"stream.place/streamplace/pkg/log"
23
23
+
"stream.place/streamplace/pkg/media"
23
24
"stream.place/streamplace/pkg/model"
24
25
"stream.place/streamplace/pkg/statedb"
25
26
)
···
35
36
bus *bus.Bus
36
37
op *oatproxy.OATProxy
37
38
localDB localdb.LocalDB
39
39
+
mm *media.MediaManager
38
40
}
39
41
40
40
-
func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus, ldb localdb.LocalDB) (*Server, error) {
42
42
+
func NewServer(ctx context.Context, cli *config.CLI, model model.Model, statefulDB *statedb.StatefulDB, op *oatproxy.OATProxy, mdlw middleware.Middleware, atsync *atproto.ATProtoSynchronizer, bus *bus.Bus, ldb localdb.LocalDB, mm *media.MediaManager) (*Server, error) {
41
43
e := echo.New()
42
44
s := &Server{
43
45
e: e,
···
50
52
bus: bus,
51
53
op: op,
52
54
localDB: ldb,
55
55
+
mm: mm,
53
56
}
54
57
e.Use(s.ErrorHandlingMiddleware())
55
58
e.Use(s.ContextPreservingMiddleware())
+18
pkg/spxrpc/stubs.go
···
299
299
e.POST("/xrpc/place.stream.multistream.deleteTarget", s.HandlePlaceStreamMultistreamDeleteTarget)
300
300
e.GET("/xrpc/place.stream.multistream.listTargets", s.HandlePlaceStreamMultistreamListTargets)
301
301
e.POST("/xrpc/place.stream.multistream.putTarget", s.HandlePlaceStreamMultistreamPutTarget)
302
302
+
e.POST("/xrpc/place.stream.playback.whep", s.HandlePlaceStreamPlaybackWhep)
302
303
e.POST("/xrpc/place.stream.server.createWebhook", s.HandlePlaceStreamServerCreateWebhook)
303
304
e.POST("/xrpc/place.stream.server.deleteWebhook", s.HandlePlaceStreamServerDeleteWebhook)
304
305
e.GET("/xrpc/place.stream.server.getServerTime", s.HandlePlaceStreamServerGetServerTime)
···
690
691
return handleErr
691
692
}
692
693
return c.JSON(200, out)
694
694
+
}
695
695
+
696
696
+
func (s *Server) HandlePlaceStreamPlaybackWhep(c echo.Context) error {
697
697
+
ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamPlaybackWhep")
698
698
+
defer span.End()
699
699
+
rendition := c.QueryParam("rendition")
700
700
+
streamer := c.QueryParam("streamer")
701
701
+
body := c.Request().Body
702
702
+
contentType := c.Request().Header.Get("Content-Type")
703
703
+
var out io.Reader
704
704
+
var handleErr error
705
705
+
// func (s *Server) handlePlaceStreamPlaybackWhep(ctx context.Context,rendition string,streamer string,r io.Reader,contentType string) (io.Reader, error)
706
706
+
out, handleErr = s.handlePlaceStreamPlaybackWhep(ctx, rendition, streamer, body, contentType)
707
707
+
if handleErr != nil {
708
708
+
return handleErr
709
709
+
}
710
710
+
return c.Stream(200, "application/octet-stream", out)
693
711
}
694
712
695
713
func (s *Server) HandlePlaceStreamServerCreateWebhook(c echo.Context) error {
+30
pkg/streamplace/playbackwhep.go
···
1
1
+
// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
2
2
+
3
3
+
// Lexicon schema: place.stream.playback.whep
4
4
+
5
5
+
package streamplace
6
6
+
7
7
+
import (
8
8
+
"bytes"
9
9
+
"context"
10
10
+
"io"
11
11
+
12
12
+
lexutil "github.com/bluesky-social/indigo/lex/util"
13
13
+
)
14
14
+
15
15
+
// PlaybackWhep calls the XRPC method "place.stream.playback.whep".
16
16
+
//
17
17
+
// rendition: The rendition of the stream to play.
18
18
+
// streamer: The DID of the streamer to play.
19
19
+
func PlaybackWhep(ctx context.Context, c lexutil.LexClient, input io.Reader, rendition string, streamer string) ([]byte, error) {
20
20
+
buf := new(bytes.Buffer)
21
21
+
22
22
+
params := map[string]interface{}{}
23
23
+
params["rendition"] = rendition
24
24
+
params["streamer"] = streamer
25
25
+
if err := c.LexDo(ctx, lexutil.Procedure, "*/*", "place.stream.playback.whep", params, input, buf); err != nil {
26
26
+
return nil, err
27
27
+
}
28
28
+
29
29
+
return buf.Bytes(), nil
30
30
+
}