tangled
alpha
login
or
join now
stream.place
/
streamplace
77
fork
atom
Live video on the AT Protocol
77
fork
atom
overview
issues
1
pulls
pipelines
multistreaming: implement multistream status
Eli Mallon
5 months ago
6e8473f9
e2ea36c8
+235
-33
9 changed files
expand all
collapse all
unified
split
js
docs
src
content
docs
lex-reference
multistream
place-stream-multistream-defs.md
openapi.json
lexicons
place
stream
multistream
defs.json
pkg
director
stream_session.go
media
rtmp_push.go
statedb
multistream_event.go
multistream_target.go
statedb.go
streamplace
multistreamdefs.go
+43
-5
js/docs/src/content/docs/lex-reference/multistream/place-stream-multistream-defs.md
···
15
15
16
16
**Properties:**
17
17
18
18
-
| Name | Type | Req'd | Description | Constraints |
19
19
-
| -------- | --------- | ----- | ----------- | ---------------- |
20
20
-
| `uri` | `string` | ✅ | | Format: `at-uri` |
21
21
-
| `cid` | `string` | ✅ | | Format: `cid` |
22
22
-
| `record` | `unknown` | ✅ | | |
18
18
+
| Name | Type | Req'd | Description | Constraints |
19
19
+
| ------------- | ------------------------------------------------------------------------------------------- | ----- | ----------- | ---------------- |
20
20
+
| `uri` | `string` | ✅ | | Format: `at-uri` |
21
21
+
| `cid` | `string` | ✅ | | Format: `cid` |
22
22
+
| `record` | `unknown` | ✅ | | |
23
23
+
| `latestEvent` | [`place.stream.multistream.defs#event`](/lex-reference/place-stream-multistream-defs#event) | ❌ | | |
24
24
+
25
25
+
---
26
26
+
27
27
+
<a name="event"></a>
28
28
+
29
29
+
### `event`
30
30
+
31
31
+
**Type:** `object`
32
32
+
33
33
+
**Properties:**
34
34
+
35
35
+
| Name | Type | Req'd | Description | Constraints |
36
36
+
| ----------- | -------- | ----- | ----------- | ---------------------------------------------- |
37
37
+
| `message` | `string` | ✅ | | |
38
38
+
| `status` | `string` | ✅ | | Enum: `inactive`, `pending`, `active`, `error` |
39
39
+
| `createdAt` | `string` | ✅ | | Format: `datetime` |
23
40
24
41
---
25
42
···
44
61
},
45
62
"record": {
46
63
"type": "unknown"
64
64
+
},
65
65
+
"latestEvent": {
66
66
+
"type": "ref",
67
67
+
"ref": "place.stream.multistream.defs#event"
68
68
+
}
69
69
+
}
70
70
+
},
71
71
+
"event": {
72
72
+
"type": "object",
73
73
+
"required": ["message", "status", "createdAt"],
74
74
+
"properties": {
75
75
+
"message": {
76
76
+
"type": "string"
77
77
+
},
78
78
+
"status": {
79
79
+
"type": "string",
80
80
+
"enum": ["inactive", "pending", "active", "error"]
81
81
+
},
82
82
+
"createdAt": {
83
83
+
"type": "string",
84
84
+
"format": "datetime"
47
85
}
48
86
}
49
87
}
+21
-1
js/docs/src/content/docs/lex-reference/openapi.json
···
1889
1889
"type": "string",
1890
1890
"format": "cid"
1891
1891
},
1892
1892
-
"record": {}
1892
1892
+
"record": {},
1893
1893
+
"latestEvent": {
1894
1894
+
"$ref": "#/components/schemas/place.stream.multistream.defs_event"
1895
1895
+
}
1893
1896
},
1894
1897
"required": ["uri", "cid", "record"]
1898
1898
+
},
1899
1899
+
"place.stream.multistream.defs_event": {
1900
1900
+
"type": "object",
1901
1901
+
"properties": {
1902
1902
+
"message": {
1903
1903
+
"type": "string"
1904
1904
+
},
1905
1905
+
"status": {
1906
1906
+
"type": "string",
1907
1907
+
"enum": ["inactive", "pending", "active", "error"]
1908
1908
+
},
1909
1909
+
"createdAt": {
1910
1910
+
"type": "string",
1911
1911
+
"format": "date-time"
1912
1912
+
}
1913
1913
+
},
1914
1914
+
"required": ["message", "status", "createdAt"]
1895
1915
},
1896
1916
"place.stream.livestream_livestreamView": {
1897
1917
"type": "object",
+17
-1
lexicons/place/stream/multistream/defs.json
···
8
8
"properties": {
9
9
"uri": { "type": "string", "format": "at-uri" },
10
10
"cid": { "type": "string", "format": "cid" },
11
11
-
"record": { "type": "unknown" }
11
11
+
"record": { "type": "unknown" },
12
12
+
"latestEvent": {
13
13
+
"type": "ref",
14
14
+
"ref": "place.stream.multistream.defs#event"
15
15
+
}
16
16
+
}
17
17
+
},
18
18
+
"event": {
19
19
+
"type": "object",
20
20
+
"required": ["message", "status", "createdAt"],
21
21
+
"properties": {
22
22
+
"message": { "type": "string" },
23
23
+
"status": {
24
24
+
"type": "string",
25
25
+
"enum": ["inactive", "pending", "active", "error"]
26
26
+
},
27
27
+
"createdAt": { "type": "string", "format": "datetime" }
12
28
}
13
29
}
14
30
}
+17
-13
pkg/director/stream_session.go
···
127
127
uri string
128
128
}
129
129
130
130
-
func (rm *runningMultistream) Cancel() {
131
131
-
rm.cancel()
132
132
-
}
133
133
-
134
130
// we're making an attempt here not to log (sensitive) stream keys, so we're
135
131
// referencing by atproto URI
136
132
func (ss *StreamSession) HandleMultistreamTargets(ctx context.Context) error {
···
145
141
return fmt.Errorf("failed to list multistream targets: %w", err)
146
142
}
147
143
currentRunning := map[string]bool{}
148
148
-
for _, target := range targets {
149
149
-
rec, ok := target.Record.Val.(*streamplace.MultistreamTarget)
144
144
+
for _, targetView := range targets {
145
145
+
rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
150
146
if !ok {
151
151
-
log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", target.Uri)
147
147
+
log.Error(ctx, "failed to convert multistream target to streamplace multistream target", "uri", targetView.Uri)
152
148
continue
153
149
}
154
154
-
key := fmt.Sprintf("%s:%s", target.Uri, rec.Url)
150
150
+
key := fmt.Sprintf("%s:%s", targetView.Uri, rec.Url)
155
151
if running[key] == nil {
156
152
childCtx, childCancel := context.WithCancel(ctx)
157
153
ss.Go(ctx, func() error {
158
158
-
log.Log(ctx, "starting multistream target", "uri", target.Uri)
159
159
-
return ss.StartMultistreamTarget(childCtx, target.Record.Val.(*streamplace.MultistreamTarget))
154
154
+
log.Log(ctx, "starting multistream target", "uri", targetView.Uri)
155
155
+
err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, "starting multistream target", "pending")
156
156
+
if err != nil {
157
157
+
log.Error(ctx, "failed to create multistream event", "error", err)
158
158
+
}
159
159
+
return ss.StartMultistreamTarget(childCtx, targetView)
160
160
})
161
161
running[key] = &runningMultistream{
162
162
cancel: childCancel,
···
168
168
for key := range running {
169
169
if !currentRunning[key] {
170
170
log.Log(ctx, "stopping multistream target", "uri", running[key].uri)
171
171
-
running[key].Cancel()
171
171
+
running[key].cancel()
172
172
delete(running, key)
173
173
}
174
174
}
···
181
181
}
182
182
}
183
183
184
184
-
func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, target *streamplace.MultistreamTarget) error {
184
184
+
func (ss *StreamSession) StartMultistreamTarget(ctx context.Context, targetView *streamplace.MultistreamDefs_TargetView) error {
185
185
for {
186
186
-
err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", target.Url)
186
186
+
err := ss.mm.RTMPPush(ctx, ss.repoDID, "source", targetView)
187
187
if err != nil {
188
188
log.Error(ctx, "failed to push to RTMP server", "error", err)
189
189
+
err := ss.statefulDB.CreateMultistreamEvent(targetView.Uri, err.Error(), "error")
190
190
+
if err != nil {
191
191
+
log.Error(ctx, "failed to create multistream event", "error", err)
192
192
+
}
189
193
}
190
194
select {
191
195
case <-ctx.Done():
+56
-2
pkg/media/rtmp_push.go
···
7
7
"io"
8
8
"net"
9
9
"net/url"
10
10
+
"reflect"
10
11
"strings"
12
12
+
"time"
11
13
12
14
"github.com/go-gst/go-gst/gst"
13
15
"github.com/google/uuid"
14
16
"stream.place/streamplace/pkg/bus"
15
17
"stream.place/streamplace/pkg/log"
18
18
+
"stream.place/streamplace/pkg/streamplace"
16
19
)
17
20
18
21
// This function remains in scope for the duration of a single users' playback
19
19
-
func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetURL string) error {
22
22
+
func (mm *MediaManager) RTMPPush(ctx context.Context, user string, rendition string, targetView *streamplace.MultistreamDefs_TargetView) error {
20
23
uu, err := uuid.NewV7()
21
24
if err != nil {
22
25
return err
23
26
}
24
27
ctx, cancel := context.WithCancel(ctx)
25
28
defer cancel()
26
26
-
ctx = log.WithLogValues(ctx, "webrtcID", uu.String())
29
29
+
ctx = log.WithLogValues(ctx, "pushID", uu.String())
27
30
ctx = log.WithLogValues(ctx, "mediafunc", "RTMPPush")
31
31
+
rec, ok := targetView.Record.Val.(*streamplace.MultistreamTarget)
32
32
+
if !ok {
33
33
+
return fmt.Errorf("failed to convert target view to multistream target")
34
34
+
}
35
35
+
targetURL := rec.Url
28
36
29
37
pipelineSlice := []string{
30
38
"flvmux name=muxer ! rtmp2sink name=rtmp2sink",
···
65
73
} else {
66
74
return fmt.Errorf("invalid target URL scheme: %s", u.Scheme)
67
75
}
76
76
+
77
77
+
go func() {
78
78
+
pollFreq := time.Second * 1
79
79
+
for {
80
80
+
select {
81
81
+
case <-ctx.Done():
82
82
+
return
83
83
+
case <-time.After(pollFreq):
84
84
+
prop, err := rtmp2sink.GetProperty("stats")
85
85
+
if err != nil {
86
86
+
log.Error(ctx, "error getting rtmp2sink peak-kbps", "error", err)
87
87
+
continue
88
88
+
}
89
89
+
if prop == nil {
90
90
+
log.Error(ctx, "failed to get rtmp2sink peak-kbps", "prop", prop)
91
91
+
continue
92
92
+
}
93
93
+
log.Warn(ctx, "rtmp2sink peak-kbps", "prop", reflect.TypeOf(prop))
94
94
+
propVal, ok := prop.(*gst.Structure)
95
95
+
if !ok {
96
96
+
log.Error(ctx, "failed to convert rtmp2sink peak-kbps", "prop", prop)
97
97
+
continue
98
98
+
}
99
99
+
outBytesAcked, err := propVal.GetValue("out-bytes-acked")
100
100
+
if err != nil {
101
101
+
log.Error(ctx, "failed to get rtmp2sink out-bytes-acked", "error", err)
102
102
+
continue
103
103
+
}
104
104
+
outBytesAckedVal, ok := outBytesAcked.(uint64)
105
105
+
if !ok {
106
106
+
log.Error(ctx, "failed to convert rtmp2sink out-bytes-acked", "prop", prop)
107
107
+
continue
108
108
+
}
109
109
+
if outBytesAckedVal > 0 {
110
110
+
err = mm.atsync.StatefulDB.CreateMultistreamEvent(targetView.Uri, fmt.Sprintf("wrote %d bytes", outBytesAckedVal), "active")
111
111
+
if err != nil {
112
112
+
log.Error(ctx, "failed to create multistream event", "error", err)
113
113
+
}
114
114
+
// increase pollFreq, once it's working we don't need to spam the database
115
115
+
pollFreq = time.Second * 15
116
116
+
}
117
117
+
log.Debug(ctx, "rtmp2sink out-bytes-acked", "outBytesAckedVal", outBytesAckedVal)
118
118
+
}
119
119
+
120
120
+
}
121
121
+
}()
68
122
69
123
segBuffer := make(chan *bus.Seg, 1024)
70
124
go func() {
+34
pkg/statedb/multistream_event.go
···
1
1
+
package statedb
2
2
+
3
3
+
import (
4
4
+
"time"
5
5
+
6
6
+
"github.com/google/uuid"
7
7
+
)
8
8
+
9
9
+
type MultistreamEvent struct {
10
10
+
ID string `gorm:"column:id;primarykey"`
11
11
+
TargetURI string `gorm:"column:target_uri;primarykey;index:idx_target_created,priority:1"`
12
12
+
Message string `gorm:"column:message"`
13
13
+
Status string `gorm:"column:status"`
14
14
+
CreatedAt time.Time `gorm:"column:created_at;index:idx_target_created,priority:2"`
15
15
+
}
16
16
+
17
17
+
func (m *MultistreamEvent) TableName() string {
18
18
+
return "multistream_events"
19
19
+
}
20
20
+
21
21
+
func (state *StatefulDB) CreateMultistreamEvent(targetURI, message, status string) error {
22
22
+
uu, err := uuid.NewV7()
23
23
+
if err != nil {
24
24
+
return err
25
25
+
}
26
26
+
event := &MultistreamEvent{
27
27
+
ID: uu.String(),
28
28
+
TargetURI: targetURI,
29
29
+
Message: message,
30
30
+
Status: status,
31
31
+
CreatedAt: time.Now().UTC(),
32
32
+
}
33
33
+
return state.DB.Create(event).Error
34
34
+
}
+35
-8
pkg/statedb/multistream_target.go
···
3
3
import (
4
4
"bytes"
5
5
"fmt"
6
6
+
"time"
6
7
7
7
-
"github.com/bluesky-social/indigo/lex/util"
8
8
+
lexutil "github.com/bluesky-social/indigo/lex/util"
9
9
+
"github.com/bluesky-social/indigo/util"
8
10
"stream.place/streamplace/pkg/spid"
9
11
"stream.place/streamplace/pkg/streamplace"
10
12
)
···
76
78
return &streamplace.MultistreamDefs_TargetView{
77
79
Uri: uri,
78
80
Cid: cid.String(),
79
79
-
Record: &util.LexiconTypeDecoder{Val: input.MultistreamTarget},
81
81
+
Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget},
80
82
}, nil
81
83
}
82
84
···
84
86
return nil, nil
85
87
}
86
88
89
89
+
type TargetWithEvent struct {
90
90
+
MultistreamTarget
91
91
+
LatestEventID *string `gorm:"column:latest_event_id"`
92
92
+
LatestEventStatus *string `gorm:"column:latest_event_status"`
93
93
+
LatestEventMessage *string `gorm:"column:latest_event_message"`
94
94
+
LatestEventCreatedAt *time.Time `gorm:"column:latest_event_created_at"`
95
95
+
}
96
96
+
87
97
func (state *StatefulDB) ListMultistreamTargets(repoDID string, limit int, offset int, active *bool) ([]*streamplace.MultistreamDefs_TargetView, error) {
88
88
-
var targets []MultistreamTarget
89
89
-
query := state.DB.Where("repo_did = ?", repoDID)
98
98
+
99
99
+
var targets []TargetWithEvent
100
100
+
query := state.DB.Table("multistream_targets").
101
101
+
Select("multistream_targets.*, me.id as latest_event_id, me.status as latest_event_status, me.message as latest_event_message, me.created_at as latest_event_created_at").
102
102
+
Joins(`LEFT JOIN multistream_events me ON multistream_targets.uri = me.target_uri
103
103
+
AND me.created_at = (SELECT MAX(created_at) FROM multistream_events WHERE target_uri = multistream_targets.uri)`).
104
104
+
Where("repo_did = ?", repoDID)
90
105
91
106
if active != nil {
92
107
query = query.Where("active = ?", *active)
···
103
118
result := make([]*streamplace.MultistreamDefs_TargetView, len(targets))
104
119
for i, target := range targets {
105
120
var multistreamTarget streamplace.MultistreamTarget
106
106
-
err = multistreamTarget.UnmarshalCBOR(bytes.NewReader(target.MultistreamTarget))
121
121
+
err = multistreamTarget.UnmarshalCBOR(bytes.NewReader(target.MultistreamTarget.MultistreamTarget))
107
122
if err != nil {
108
123
return nil, fmt.Errorf("failed to unmarshal multistream target: %w", err)
109
124
}
···
112
127
return nil, fmt.Errorf("failed to get CID: %w", err)
113
128
}
114
129
115
115
-
result[i] = &streamplace.MultistreamDefs_TargetView{
130
130
+
targetView := &streamplace.MultistreamDefs_TargetView{
116
131
Uri: target.URI,
117
132
Cid: cid.String(),
118
118
-
Record: &util.LexiconTypeDecoder{Val: &multistreamTarget},
133
133
+
Record: &lexutil.LexiconTypeDecoder{Val: &multistreamTarget},
134
134
+
}
135
135
+
136
136
+
// Add the latest event if it exists
137
137
+
if target.LatestEventID != nil {
138
138
+
event := &streamplace.MultistreamDefs_Event{
139
139
+
Status: *target.LatestEventStatus,
140
140
+
Message: *target.LatestEventMessage,
141
141
+
CreatedAt: target.LatestEventCreatedAt.Format(util.ISO8601),
142
142
+
}
143
143
+
targetView.LatestEvent = event
119
144
}
145
145
+
146
146
+
result[i] = targetView
120
147
}
121
148
122
149
return result, nil
···
177
204
return &streamplace.MultistreamDefs_TargetView{
178
205
Uri: uri,
179
206
Cid: cid.String(),
180
180
-
Record: &util.LexiconTypeDecoder{Val: input.MultistreamTarget},
207
207
+
Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget},
181
208
}, nil
182
209
}
183
210
+1
pkg/statedb/statedb.go
···
48
48
Repo{},
49
49
Webhook{},
50
50
MultistreamTarget{},
51
51
+
MultistreamEvent{},
51
52
}
52
53
53
54
var NoPostgresDatabaseCode = "3D000"
+11
-3
pkg/streamplace/multistreamdefs.go
···
8
8
"github.com/bluesky-social/indigo/lex/util"
9
9
)
10
10
11
11
+
// MultistreamDefs_Event is a "event" in the place.stream.multistream.defs schema.
12
12
+
type MultistreamDefs_Event struct {
13
13
+
CreatedAt string `json:"createdAt" cborgen:"createdAt"`
14
14
+
Message string `json:"message" cborgen:"message"`
15
15
+
Status string `json:"status" cborgen:"status"`
16
16
+
}
17
17
+
11
18
// MultistreamDefs_TargetView is a "targetView" in the place.stream.multistream.defs schema.
12
19
type MultistreamDefs_TargetView struct {
13
13
-
Cid string `json:"cid" cborgen:"cid"`
14
14
-
Record *util.LexiconTypeDecoder `json:"record" cborgen:"record"`
15
15
-
Uri string `json:"uri" cborgen:"uri"`
20
20
+
Cid string `json:"cid" cborgen:"cid"`
21
21
+
LatestEvent *MultistreamDefs_Event `json:"latestEvent,omitempty" cborgen:"latestEvent,omitempty"`
22
22
+
Record *util.LexiconTypeDecoder `json:"record" cborgen:"record"`
23
23
+
Uri string `json:"uri" cborgen:"uri"`
16
24
}