tangled
alpha
login
or
join now
stream.place
/
streamplace
74
fork
atom
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
feat: update to latest version with error handling
dignifiedquire
7 months ago
6b4b6c09
f97d68d4
+37
-16
5 changed files
expand all
collapse all
unified
split
go.mod
go.sum
pkg
cmd
streamplace.go
replication
iroh
iroh.go
subprojects
iroh_streamplace.wrap
+1
-1
go.mod
···
306
306
github.com/multiformats/go-multibase v0.2.0 // indirect
307
307
github.com/multiformats/go-varint v0.0.7 // indirect
308
308
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
309
309
-
github.com/n0-computer/iroh-streamplace v0.0.0-20250805111327-449fe3719c26 // indirect
309
309
+
github.com/n0-computer/iroh-streamplace v0.0.0-20250813141041-0b3bbbf8912a // indirect
310
310
github.com/nakabonne/nestif v0.3.1 // indirect
311
311
github.com/nishanths/exhaustive v0.12.0 // indirect
312
312
github.com/nishanths/predeclared v0.2.2 // indirect
+2
go.sum
···
739
739
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
740
740
github.com/n0-computer/iroh-streamplace v0.0.0-20250805111327-449fe3719c26 h1:QUZ2LCu5a8mRhGWZHS5hnZiEiVXq6uQP8RZdSSPJ140=
741
741
github.com/n0-computer/iroh-streamplace v0.0.0-20250805111327-449fe3719c26/go.mod h1:rDqM/Vg0c1Vd49Uztot7HQ5VYghhKkf7tZZegO1smlM=
742
742
+
github.com/n0-computer/iroh-streamplace v0.0.0-20250813141041-0b3bbbf8912a h1:yDQET+YM00t8cWptO1C2fUGXC3EJYMHDTBsCV4en2EM=
743
743
+
github.com/n0-computer/iroh-streamplace v0.0.0-20250813141041-0b3bbbf8912a/go.mod h1:rDqM/Vg0c1Vd49Uztot7HQ5VYghhKkf7tZZegO1smlM=
742
744
github.com/nakabonne/nestif v0.3.1 h1:wm28nZjhQY5HyYPx+weN3Q65k6ilSBxDb8v5S81B81U=
743
745
github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4Ngq6aY7OE=
744
746
github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhKRf3Swg=
+12
-3
pkg/cmd/streamplace.go
···
27
27
"stream.place/streamplace/pkg/log"
28
28
"stream.place/streamplace/pkg/media"
29
29
"stream.place/streamplace/pkg/notifications"
30
30
-
"stream.place/streamplace/pkg/replication"
31
30
"stream.place/streamplace/pkg/replication/iroh"
32
31
"stream.place/streamplace/pkg/resync"
33
32
"stream.place/streamplace/pkg/rtmps"
···
268
267
log.Log(ctx, "successfully initialized hardware signer", "address", addr)
269
268
signer = hwsigner
270
269
}
271
271
-
var rep replication.Replicator = iroh.NewIrohReplicator(cli.Peers)
270
270
+
irohEndpoint, err2 := irohStreamplace.NewEndpoint()
271
271
+
if err2.AsError() != nil {
272
272
+
return err2.AsError()
273
273
+
}
274
274
+
rep, err := iroh.NewIrohReplicator(ctx, irohEndpoint, cli.Peers)
275
275
+
if err != nil {
276
276
+
return err
277
277
+
}
272
278
mod, err := model.MakeDB(cli.DBPath)
273
279
if err != nil {
274
280
return err
···
312
318
return err
313
319
}
314
320
315
315
-
rec := irohStreamplace.NewReceiverEndpoint(mm)
321
321
+
rec, err2 := irohStreamplace.NewReceiver(irohEndpoint, mm)
322
322
+
if err2.AsError() != nil {
323
323
+
return err2.AsError()
324
324
+
}
316
325
go func() {
317
326
// for now to make sure things are still alive, just print our info every 15 seconds
318
327
for {
+21
-11
pkg/replication/iroh/iroh.go
···
11
11
// IrohReplicator implements the replication mechanism using iroh
12
12
type IrohReplicator struct {
13
13
peers []*irohStreamplace.PublicKey
14
14
-
endpoint *irohStreamplace.SenderEndpoint
14
14
+
sender *irohStreamplace.Sender
15
15
}
16
16
17
17
-
func NewIrohReplicator(peers []string) *IrohReplicator {
18
18
-
endpoint := irohStreamplace.NewSenderEndpoint()
17
17
+
func NewIrohReplicator(ctx context.Context, ep *irohStreamplace.Endpoint, peers []string) (*IrohReplicator, error) {
18
18
+
sender, err := irohStreamplace.NewSender(ep)
19
19
+
if err.AsError() != nil {
20
20
+
return nil, err.AsError()
21
21
+
}
19
22
20
23
nodeIds := make([]*irohStreamplace.PublicKey, len(peers))
21
24
for i := range(peers) {
22
22
-
nodeId := irohStreamplace.PublicKeyFromString(peers[i])
25
25
+
nodeId, err := irohStreamplace.PublicKeyFromString(peers[i])
26
26
+
if err.AsError() != nil {
27
27
+
log.Log(ctx, "invalid Node ID", "warning", err.Error())
28
28
+
continue
29
29
+
}
23
30
nodeAddr := irohStreamplace.NewNodeAddr(nodeId, nil, nil)
24
24
-
endpoint.AddPeer(nodeAddr)
31
31
+
err = sender.AddPeer(nodeAddr)
32
32
+
if err.AsError() != nil {
33
33
+
log.Log(ctx, "failed to connect to peer", "warning", err.Error())
34
34
+
continue
35
35
+
}
25
36
nodeIds[i] = nodeId
26
37
}
27
38
28
39
return &IrohReplicator {
29
40
peers: nodeIds,
30
30
-
endpoint: endpoint,
31
31
-
}
41
41
+
sender: sender,
42
42
+
}, nil
32
43
}
33
44
34
45
func (rep *IrohReplicator) NewSegment(ctx context.Context, bs []byte) {
35
46
for _, p := range rep.peers {
36
47
go func(peer *irohStreamplace.PublicKey) {
37
37
-
err := sendSegment(rep.endpoint, peer, bs)
48
48
+
err := sendSegment(rep.sender, peer, bs)
38
49
if err != nil {
39
50
log.Log(ctx, "error replicating segment", "error", err)
40
51
}
···
42
53
}
43
54
}
44
55
45
45
-
func sendSegment(endpoint *irohStreamplace.SenderEndpoint, peer *irohStreamplace.PublicKey, bs []byte) error {
46
46
-
endpoint.Send(peer, bs)
47
47
-
return nil
56
56
+
func sendSegment(endpoint *irohStreamplace.Sender, peer *irohStreamplace.PublicKey, bs []byte) error {
57
57
+
return endpoint.Send(peer, bs).AsError()
48
58
}
+1
-1
subprojects/iroh_streamplace.wrap
···
1
1
[wrap-git]
2
2
url = https://github.com/n0-computer/iroh-streamplace.git
3
3
-
revision = HEAD
3
3
+
revision = 0b3bbbf8912a1d2d6a843a1473fed0d295567f81
4
4
depth = 1