Live video on the AT Protocol

iroh: working segment replication!!

+106 -1470
+1 -1
pkg/api/api.go
··· 497 497 498 498 func (a *StreamplaceAPI) HandleSegment(ctx context.Context) http.HandlerFunc { 499 499 return func(w http.ResponseWriter, req *http.Request) { 500 - err := a.MediaManager.ValidateMP4(ctx, req.Body) 500 + err := a.MediaManager.ValidateMP4(ctx, req.Body, false) 501 501 if err != nil { 502 502 apierrors.WriteHTTPBadRequest(w, "could not ingest segment", err) 503 503 return
+3 -6
pkg/cmd/streamplace.go
··· 32 32 "stream.place/streamplace/pkg/log" 33 33 "stream.place/streamplace/pkg/media" 34 34 "stream.place/streamplace/pkg/notifications" 35 - "stream.place/streamplace/pkg/replication" 36 - "stream.place/streamplace/pkg/replication/boring" 37 35 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 36 "stream.place/streamplace/pkg/rtmps" 39 37 v0 "stream.place/streamplace/pkg/schema/v0" ··· 307 305 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 308 306 signer = hwsigner 309 307 } 310 - var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 311 308 312 309 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 313 310 if err != nil { ··· 363 360 return fmt.Errorf("failed to migrate: %w", err) 364 361 } 365 362 366 - mm, err := media.MakeMediaManager(ctx, &cli, signer, rep, mod, b, atsync) 363 + mm, err := media.MakeMediaManager(ctx, &cli, signer, mod, b, atsync) 367 364 if err != nil { 368 365 return err 369 366 } ··· 403 400 return err 404 401 } 405 402 secret := buf.Bytes() 406 - swarm, err := iroh_replicator.StartKV(ctx, cli.Tickets, secret) 403 + swarm, err := iroh_replicator.NewSwarm(ctx, cli.Tickets, secret, mm) 407 404 if err != nil { 408 405 return err 409 406 } ··· 419 416 DownstreamJWK: cli.AccessJWK, 420 417 ClientMetadata: clientMetadata, 421 418 }) 422 - d := director.NewDirector(mm, mod, &cli, b, op, state, swarm) 419 + d := director.NewDirector(mm, mod, &cli, b, op, state) 423 420 a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op) 424 421 if err != nil { 425 422 return err
+1 -1
pkg/config/config.go
··· 167 167 fs.StringVar(&cli.PublicHost, "public-host", "", "public host for this streamplace node (excluding https:// e.g. stream.place)") 168 168 fs.BoolVar(&cli.Thumbnail, "thumbnail", true, "enable thumbnail generation") 169 169 fs.BoolVar(&cli.SmearAudio, "smear-audio", false, "enable audio smearing to create 'perfect' segment timestamps") 170 - fs.BoolVar(&cli.ExternalSigning, "external-signing", true, "enable external signing via exec (prevents potential memory leak)") 170 + fs.BoolVar(&cli.ExternalSigning, "external-signing", false, "enable external signing via exec (prevents potential memory leak)") 171 171 fs.StringVar(&cli.TracingEndpoint, "tracing-endpoint", "", "gRPC endpoint to send traces to") 172 172 fs.IntVar(&cli.RateLimitPerSecond, "rate-limit-per-second", 0, "rate limit for requests per second per ip") 173 173 fs.IntVar(&cli.RateLimitBurst, "rate-limit-burst", 0, "rate limit burst for requests per ip")
+2 -32
pkg/director/director.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "sync" 8 7 9 - "github.com/bluesky-social/indigo/util" 10 8 "github.com/streamplace/oatproxy/pkg/oatproxy" 11 9 "golang.org/x/sync/errgroup" 12 10 "stream.place/streamplace/pkg/bus" ··· 14 12 "stream.place/streamplace/pkg/log" 15 13 "stream.place/streamplace/pkg/media" 16 14 "stream.place/streamplace/pkg/model" 17 - "stream.place/streamplace/pkg/replication/iroh_replicator" 18 15 "stream.place/streamplace/pkg/statedb" 19 16 ) 20 17 ··· 33 30 streamSessionsMu sync.Mutex 34 31 op *oatproxy.OATProxy 35 32 statefulDB *statedb.StatefulDB 36 - swarm *iroh_replicator.SwarmKV 37 33 } 38 34 39 - func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, swarm *iroh_replicator.SwarmKV) *Director { 35 + func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director { 40 36 return &Director{ 41 37 mm: mm, 42 38 mod: mod, ··· 46 42 streamSessionsMu: sync.Mutex{}, 47 43 op: op, 48 44 statefulDB: statefulDB, 49 - swarm: swarm, 50 45 } 51 46 } 52 47 53 48 func (d *Director) Start(ctx context.Context) error { 54 - nodeId, err := d.swarm.Node.NodeId() 55 - if err != nil { 56 - return fmt.Errorf("failed to get node id: %w", err) 57 - } 58 - 59 49 newSeg := d.mm.NewSegment() 60 50 ctx, cancel := context.WithCancel(ctx) 61 51 defer cancel() ··· 96 86 }) 97 87 } 98 88 d.streamSessionsMu.Unlock() 99 - go func() { 100 - originInfo := iroh_replicator.OriginInfo{ 101 - NodeID: nodeId.String(), 102 - Time: not.Segment.StartTime.Format(util.ISO8601), 103 - } 104 - bs, err := json.Marshal(originInfo) 105 - if err != nil { 106 - log.Error(ctx, "could not marshal origin info", "error", err) 107 - return 108 - } 109 - err = d.swarm.Put(ctx, not.Segment.RepoDID, bs) 110 - if err != nil { 111 - log.Error(ctx, "could not put segment to swarm", "error", err) 112 - return 113 - } 114 - err = d.swarm.Node.SendSegment(not.Segment.RepoDID, not.Data) 115 - if err != nil { 116 - log.Error(ctx, "could not send segment to swarm", "error", err) 117 - return 118 - } 119 - }() 89 + 120 90 err := ss.NewSegment(ctx, not) 121 91 if err != nil { 122 92 log.Error(ctx, "could not add segment to stream session", "error", err)
-681
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.go
··· 337 337 func init() { 338 338 339 339 FfiConverterDataHandlerINSTANCE.register() 340 - FfiConverterDataHandlerOldINSTANCE.register() 341 340 FfiConverterGoSignerINSTANCE.register() 342 341 uniffiCheckChecksums() 343 342 } ··· 391 390 } 392 391 { 393 392 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 394 - return C.uniffi_iroh_streamplace_checksum_method_datahandlerold_handle_data() 395 - }) 396 - if checksum != 20343 { 397 - // If this happens try cleaning and rebuilding your project 398 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_datahandlerold_handle_data: UniFFI API checksum mismatch") 399 - } 400 - } 401 - { 402 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 403 393 return C.uniffi_iroh_streamplace_checksum_method_db_iter_with_opts() 404 394 }) 405 395 if checksum != 3815 { ··· 432 422 if checksum != 5247 { 433 423 // If this happens try cleaning and rebuilding your project 434 424 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_db_write: UniFFI API checksum mismatch") 435 - } 436 - } 437 - { 438 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 439 - return C.uniffi_iroh_streamplace_checksum_method_endpoint_node_addr() 440 - }) 441 - if checksum != 17254 { 442 - // If this happens try cleaning and rebuilding your project 443 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_endpoint_node_addr: UniFFI API checksum mismatch") 444 425 } 445 426 } 446 427 { ··· 652 633 } 653 634 { 654 635 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 655 - return C.uniffi_iroh_streamplace_checksum_method_receiver_node_addr() 656 - }) 657 - if checksum != 10730 { 658 - // If this happens try cleaning and rebuilding your project 659 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_receiver_node_addr: UniFFI API checksum mismatch") 660 - } 661 - } 662 - { 663 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 664 - return C.uniffi_iroh_streamplace_checksum_method_receiver_subscribe() 665 - }) 666 - if checksum != 24145 { 667 - // If this happens try cleaning and rebuilding your project 668 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_receiver_subscribe: UniFFI API checksum mismatch") 669 - } 670 - } 671 - { 672 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 673 - return C.uniffi_iroh_streamplace_checksum_method_receiver_unsubscribe() 674 - }) 675 - if checksum != 21760 { 676 - // If this happens try cleaning and rebuilding your project 677 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_receiver_unsubscribe: UniFFI API checksum mismatch") 678 - } 679 - } 680 - { 681 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 682 - return C.uniffi_iroh_streamplace_checksum_method_sender_node_addr() 683 - }) 684 - if checksum != 38541 { 685 - // If this happens try cleaning and rebuilding your project 686 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_sender_node_addr: UniFFI API checksum mismatch") 687 - } 688 - } 689 - { 690 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 691 - return C.uniffi_iroh_streamplace_checksum_method_sender_send() 692 - }) 693 - if checksum != 23930 { 694 - // If this happens try cleaning and rebuilding your project 695 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_sender_send: UniFFI API checksum mismatch") 696 - } 697 - } 698 - { 699 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 700 636 return C.uniffi_iroh_streamplace_checksum_method_subscriberesponse_next_raw() 701 637 }) 702 638 if checksum != 55650 { ··· 711 647 if checksum != 50543 { 712 648 // If this happens try cleaning and rebuilding your project 713 649 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_writescope_put: UniFFI API checksum mismatch") 714 - } 715 - } 716 - { 717 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 718 - return C.uniffi_iroh_streamplace_checksum_constructor_endpoint_new() 719 - }) 720 - if checksum != 60672 { 721 - // If this happens try cleaning and rebuilding your project 722 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_constructor_endpoint_new: UniFFI API checksum mismatch") 723 650 } 724 651 } 725 652 { ··· 785 712 panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_constructor_publickey_from_string: UniFFI API checksum mismatch") 786 713 } 787 714 } 788 - { 789 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 790 - return C.uniffi_iroh_streamplace_checksum_constructor_receiver_new() 791 - }) 792 - if checksum != 18072 { 793 - // If this happens try cleaning and rebuilding your project 794 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_constructor_receiver_new: UniFFI API checksum mismatch") 795 - } 796 - } 797 - { 798 - checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 799 - return C.uniffi_iroh_streamplace_checksum_constructor_sender_new() 800 - }) 801 - if checksum != 56457 { 802 - // If this happens try cleaning and rebuilding your project 803 - panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_constructor_sender_new: UniFFI API checksum mismatch") 804 - } 805 - } 806 715 } 807 716 808 717 type FfiConverterUint64 struct{} ··· 1251 1160 C.uniffi_iroh_streamplace_fn_init_callback_vtable_datahandler(&UniffiVTableCallbackInterfaceDataHandlerINSTANCE) 1252 1161 } 1253 1162 1254 - type DataHandlerOld interface { 1255 - HandleData(topic string, data []byte) 1256 - } 1257 - type DataHandlerOldImpl struct { 1258 - ffiObject FfiObject 1259 - } 1260 - 1261 - func (_self *DataHandlerOldImpl) HandleData(topic string, data []byte) { 1262 - _pointer := _self.ffiObject.incrementPointer("DataHandlerOld") 1263 - defer _self.ffiObject.decrementPointer() 1264 - uniffiRustCallAsync[error]( 1265 - nil, 1266 - // completeFn 1267 - func(handle C.uint64_t, status *C.RustCallStatus) struct{} { 1268 - C.ffi_iroh_streamplace_rust_future_complete_void(handle, status) 1269 - return struct{}{} 1270 - }, 1271 - // liftFn 1272 - func(_ struct{}) struct{} { return struct{}{} }, 1273 - C.uniffi_iroh_streamplace_fn_method_datahandlerold_handle_data( 1274 - _pointer, FfiConverterStringINSTANCE.Lower(topic), FfiConverterBytesINSTANCE.Lower(data)), 1275 - // pollFn 1276 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 1277 - C.ffi_iroh_streamplace_rust_future_poll_void(handle, continuation, data) 1278 - }, 1279 - // freeFn 1280 - func(handle C.uint64_t) { 1281 - C.ffi_iroh_streamplace_rust_future_free_void(handle) 1282 - }, 1283 - ) 1284 - 1285 - } 1286 - func (object *DataHandlerOldImpl) Destroy() { 1287 - runtime.SetFinalizer(object, nil) 1288 - object.ffiObject.destroy() 1289 - } 1290 - 1291 - type FfiConverterDataHandlerOld struct { 1292 - handleMap *concurrentHandleMap[DataHandlerOld] 1293 - } 1294 - 1295 - var FfiConverterDataHandlerOldINSTANCE = FfiConverterDataHandlerOld{ 1296 - handleMap: newConcurrentHandleMap[DataHandlerOld](), 1297 - } 1298 - 1299 - func (c FfiConverterDataHandlerOld) Lift(pointer unsafe.Pointer) DataHandlerOld { 1300 - result := &DataHandlerOldImpl{ 1301 - newFfiObject( 1302 - pointer, 1303 - func(pointer unsafe.Pointer, status *C.RustCallStatus) unsafe.Pointer { 1304 - return C.uniffi_iroh_streamplace_fn_clone_datahandlerold(pointer, status) 1305 - }, 1306 - func(pointer unsafe.Pointer, status *C.RustCallStatus) { 1307 - C.uniffi_iroh_streamplace_fn_free_datahandlerold(pointer, status) 1308 - }, 1309 - ), 1310 - } 1311 - runtime.SetFinalizer(result, (*DataHandlerOldImpl).Destroy) 1312 - return result 1313 - } 1314 - 1315 - func (c FfiConverterDataHandlerOld) Read(reader io.Reader) DataHandlerOld { 1316 - return c.Lift(unsafe.Pointer(uintptr(readUint64(reader)))) 1317 - } 1318 - 1319 - func (c FfiConverterDataHandlerOld) Lower(value DataHandlerOld) unsafe.Pointer { 1320 - // TODO: this is bad - all synchronization from ObjectRuntime.go is discarded here, 1321 - // because the pointer will be decremented immediately after this function returns, 1322 - // and someone will be left holding onto a non-locked pointer. 1323 - pointer := unsafe.Pointer(uintptr(c.handleMap.insert(value))) 1324 - return pointer 1325 - 1326 - } 1327 - 1328 - func (c FfiConverterDataHandlerOld) Write(writer io.Writer, value DataHandlerOld) { 1329 - writeUint64(writer, uint64(uintptr(c.Lower(value)))) 1330 - } 1331 - 1332 - type FfiDestroyerDataHandlerOld struct{} 1333 - 1334 - func (_ FfiDestroyerDataHandlerOld) Destroy(value DataHandlerOld) { 1335 - if val, ok := value.(*DataHandlerOldImpl); ok { 1336 - val.Destroy() 1337 - } else { 1338 - panic("Expected *DataHandlerOldImpl") 1339 - } 1340 - } 1341 - 1342 - //export iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldMethod0 1343 - func iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldMethod0(uniffiHandle C.uint64_t, topic C.RustBuffer, data C.RustBuffer, uniffiFutureCallback C.UniffiForeignFutureCompleteVoid, uniffiCallbackData C.uint64_t, uniffiOutReturn *C.UniffiForeignFuture) { 1344 - handle := uint64(uniffiHandle) 1345 - uniffiObj, ok := FfiConverterDataHandlerOldINSTANCE.handleMap.tryGet(handle) 1346 - if !ok { 1347 - panic(fmt.Errorf("no callback in handle map: %d", handle)) 1348 - } 1349 - 1350 - result := make(chan C.UniffiForeignFutureStructVoid, 1) 1351 - cancel := make(chan struct{}, 1) 1352 - guardHandle := cgo.NewHandle(cancel) 1353 - *uniffiOutReturn = C.UniffiForeignFuture{ 1354 - handle: C.uint64_t(guardHandle), 1355 - free: C.UniffiForeignFutureFree(C.iroh_streamplace_uniffiFreeGorutine), 1356 - } 1357 - 1358 - // Wait for compleation or cancel 1359 - go func() { 1360 - select { 1361 - case <-cancel: 1362 - case res := <-result: 1363 - C.call_UniffiForeignFutureCompleteVoid(uniffiFutureCallback, uniffiCallbackData, res) 1364 - } 1365 - }() 1366 - 1367 - // Eval callback asynchroniously 1368 - go func() { 1369 - asyncResult := &C.UniffiForeignFutureStructVoid{} 1370 - defer func() { 1371 - result <- *asyncResult 1372 - }() 1373 - 1374 - uniffiObj.HandleData( 1375 - FfiConverterStringINSTANCE.Lift(GoRustBuffer{ 1376 - inner: topic, 1377 - }), 1378 - FfiConverterBytesINSTANCE.Lift(GoRustBuffer{ 1379 - inner: data, 1380 - }), 1381 - ) 1382 - 1383 - }() 1384 - } 1385 - 1386 - var UniffiVTableCallbackInterfaceDataHandlerOldINSTANCE = C.UniffiVTableCallbackInterfaceDataHandlerOld{ 1387 - handleData: (C.UniffiCallbackInterfaceDataHandlerOldMethod0)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldMethod0), 1388 - 1389 - uniffiFree: (C.UniffiCallbackInterfaceFree)(C.iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldFree), 1390 - } 1391 - 1392 - //export iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldFree 1393 - func iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldFree(handle C.uint64_t) { 1394 - FfiConverterDataHandlerOldINSTANCE.handleMap.remove(uint64(handle)) 1395 - } 1396 - 1397 - func (c FfiConverterDataHandlerOld) register() { 1398 - C.uniffi_iroh_streamplace_fn_init_callback_vtable_datahandlerold(&UniffiVTableCallbackInterfaceDataHandlerOldINSTANCE) 1399 - } 1400 - 1401 1163 // Iroh-streamplace specific metadata database. 1402 1164 type DbInterface interface { 1403 1165 IterWithOpts(filter *Filter) ([]Entry, error) ··· 1529 1291 value.Destroy() 1530 1292 } 1531 1293 1532 - type EndpointInterface interface { 1533 - NodeAddr() *NodeAddr 1534 - } 1535 - type Endpoint struct { 1536 - ffiObject FfiObject 1537 - } 1538 - 1539 - // Create a new endpoint. 1540 - func NewEndpoint() (*Endpoint, error) { 1541 - res, err := uniffiRustCallAsync[Error]( 1542 - FfiConverterErrorINSTANCE, 1543 - // completeFn 1544 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 1545 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 1546 - return res 1547 - }, 1548 - // liftFn 1549 - func(ffi unsafe.Pointer) *Endpoint { 1550 - return FfiConverterEndpointINSTANCE.Lift(ffi) 1551 - }, 1552 - C.uniffi_iroh_streamplace_fn_constructor_endpoint_new(), 1553 - // pollFn 1554 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 1555 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 1556 - }, 1557 - // freeFn 1558 - func(handle C.uint64_t) { 1559 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 1560 - }, 1561 - ) 1562 - 1563 - if err == nil { 1564 - return res, nil 1565 - } 1566 - 1567 - return res, err 1568 - } 1569 - 1570 - func (_self *Endpoint) NodeAddr() *NodeAddr { 1571 - _pointer := _self.ffiObject.incrementPointer("*Endpoint") 1572 - defer _self.ffiObject.decrementPointer() 1573 - res, _ := uniffiRustCallAsync[error]( 1574 - nil, 1575 - // completeFn 1576 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 1577 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 1578 - return res 1579 - }, 1580 - // liftFn 1581 - func(ffi unsafe.Pointer) *NodeAddr { 1582 - return FfiConverterNodeAddrINSTANCE.Lift(ffi) 1583 - }, 1584 - C.uniffi_iroh_streamplace_fn_method_endpoint_node_addr( 1585 - _pointer), 1586 - // pollFn 1587 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 1588 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 1589 - }, 1590 - // freeFn 1591 - func(handle C.uint64_t) { 1592 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 1593 - }, 1594 - ) 1595 - 1596 - return res 1597 - } 1598 - func (object *Endpoint) Destroy() { 1599 - runtime.SetFinalizer(object, nil) 1600 - object.ffiObject.destroy() 1601 - } 1602 - 1603 - type FfiConverterEndpoint struct{} 1604 - 1605 - var FfiConverterEndpointINSTANCE = FfiConverterEndpoint{} 1606 - 1607 - func (c FfiConverterEndpoint) Lift(pointer unsafe.Pointer) *Endpoint { 1608 - result := &Endpoint{ 1609 - newFfiObject( 1610 - pointer, 1611 - func(pointer unsafe.Pointer, status *C.RustCallStatus) unsafe.Pointer { 1612 - return C.uniffi_iroh_streamplace_fn_clone_endpoint(pointer, status) 1613 - }, 1614 - func(pointer unsafe.Pointer, status *C.RustCallStatus) { 1615 - C.uniffi_iroh_streamplace_fn_free_endpoint(pointer, status) 1616 - }, 1617 - ), 1618 - } 1619 - runtime.SetFinalizer(result, (*Endpoint).Destroy) 1620 - return result 1621 - } 1622 - 1623 - func (c FfiConverterEndpoint) Read(reader io.Reader) *Endpoint { 1624 - return c.Lift(unsafe.Pointer(uintptr(readUint64(reader)))) 1625 - } 1626 - 1627 - func (c FfiConverterEndpoint) Lower(value *Endpoint) unsafe.Pointer { 1628 - // TODO: this is bad - all synchronization from ObjectRuntime.go is discarded here, 1629 - // because the pointer will be decremented immediately after this function returns, 1630 - // and someone will be left holding onto a non-locked pointer. 1631 - pointer := value.ffiObject.incrementPointer("*Endpoint") 1632 - defer value.ffiObject.decrementPointer() 1633 - return pointer 1634 - 1635 - } 1636 - 1637 - func (c FfiConverterEndpoint) Write(writer io.Writer, value *Endpoint) { 1638 - writeUint64(writer, uint64(uintptr(c.Lower(value)))) 1639 - } 1640 - 1641 - type FfiDestroyerEndpoint struct{} 1642 - 1643 - func (_ FfiDestroyerEndpoint) Destroy(value *Endpoint) { 1644 - value.Destroy() 1645 - } 1646 - 1647 1294 // A filter for subscriptions and iteration. 1648 1295 type FilterInterface interface { 1649 1296 // Restrict to the global namespace, no per stream data. ··· 2559 2206 type FfiDestroyerPublicKey struct{} 2560 2207 2561 2208 func (_ FfiDestroyerPublicKey) Destroy(value *PublicKey) { 2562 - value.Destroy() 2563 - } 2564 - 2565 - type ReceiverInterface interface { 2566 - NodeAddr() *NodeAddr 2567 - // Subscribe to the given topic on the remote. 2568 - Subscribe(remoteId *PublicKey, topic string) error 2569 - // Unsubscribe from this topic on the remote. 2570 - Unsubscribe(remoteId *PublicKey, topic string) error 2571 - } 2572 - type Receiver struct { 2573 - ffiObject FfiObject 2574 - } 2575 - 2576 - // Create a new receiver. 2577 - func NewReceiver(endpoint *Endpoint, handler DataHandlerOld) (*Receiver, error) { 2578 - res, err := uniffiRustCallAsync[Error]( 2579 - FfiConverterErrorINSTANCE, 2580 - // completeFn 2581 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 2582 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 2583 - return res 2584 - }, 2585 - // liftFn 2586 - func(ffi unsafe.Pointer) *Receiver { 2587 - return FfiConverterReceiverINSTANCE.Lift(ffi) 2588 - }, 2589 - C.uniffi_iroh_streamplace_fn_constructor_receiver_new(FfiConverterEndpointINSTANCE.Lower(endpoint), FfiConverterDataHandlerOldINSTANCE.Lower(handler)), 2590 - // pollFn 2591 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2592 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 2593 - }, 2594 - // freeFn 2595 - func(handle C.uint64_t) { 2596 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 2597 - }, 2598 - ) 2599 - 2600 - if err == nil { 2601 - return res, nil 2602 - } 2603 - 2604 - return res, err 2605 - } 2606 - 2607 - func (_self *Receiver) NodeAddr() *NodeAddr { 2608 - _pointer := _self.ffiObject.incrementPointer("*Receiver") 2609 - defer _self.ffiObject.decrementPointer() 2610 - res, _ := uniffiRustCallAsync[error]( 2611 - nil, 2612 - // completeFn 2613 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 2614 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 2615 - return res 2616 - }, 2617 - // liftFn 2618 - func(ffi unsafe.Pointer) *NodeAddr { 2619 - return FfiConverterNodeAddrINSTANCE.Lift(ffi) 2620 - }, 2621 - C.uniffi_iroh_streamplace_fn_method_receiver_node_addr( 2622 - _pointer), 2623 - // pollFn 2624 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2625 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 2626 - }, 2627 - // freeFn 2628 - func(handle C.uint64_t) { 2629 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 2630 - }, 2631 - ) 2632 - 2633 - return res 2634 - } 2635 - 2636 - // Subscribe to the given topic on the remote. 2637 - func (_self *Receiver) Subscribe(remoteId *PublicKey, topic string) error { 2638 - _pointer := _self.ffiObject.incrementPointer("*Receiver") 2639 - defer _self.ffiObject.decrementPointer() 2640 - _, err := uniffiRustCallAsync[Error]( 2641 - FfiConverterErrorINSTANCE, 2642 - // completeFn 2643 - func(handle C.uint64_t, status *C.RustCallStatus) struct{} { 2644 - C.ffi_iroh_streamplace_rust_future_complete_void(handle, status) 2645 - return struct{}{} 2646 - }, 2647 - // liftFn 2648 - func(_ struct{}) struct{} { return struct{}{} }, 2649 - C.uniffi_iroh_streamplace_fn_method_receiver_subscribe( 2650 - _pointer, FfiConverterPublicKeyINSTANCE.Lower(remoteId), FfiConverterStringINSTANCE.Lower(topic)), 2651 - // pollFn 2652 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2653 - C.ffi_iroh_streamplace_rust_future_poll_void(handle, continuation, data) 2654 - }, 2655 - // freeFn 2656 - func(handle C.uint64_t) { 2657 - C.ffi_iroh_streamplace_rust_future_free_void(handle) 2658 - }, 2659 - ) 2660 - 2661 - if err == nil { 2662 - return nil 2663 - } 2664 - 2665 - return err 2666 - } 2667 - 2668 - // Unsubscribe from this topic on the remote. 2669 - func (_self *Receiver) Unsubscribe(remoteId *PublicKey, topic string) error { 2670 - _pointer := _self.ffiObject.incrementPointer("*Receiver") 2671 - defer _self.ffiObject.decrementPointer() 2672 - _, err := uniffiRustCallAsync[Error]( 2673 - FfiConverterErrorINSTANCE, 2674 - // completeFn 2675 - func(handle C.uint64_t, status *C.RustCallStatus) struct{} { 2676 - C.ffi_iroh_streamplace_rust_future_complete_void(handle, status) 2677 - return struct{}{} 2678 - }, 2679 - // liftFn 2680 - func(_ struct{}) struct{} { return struct{}{} }, 2681 - C.uniffi_iroh_streamplace_fn_method_receiver_unsubscribe( 2682 - _pointer, FfiConverterPublicKeyINSTANCE.Lower(remoteId), FfiConverterStringINSTANCE.Lower(topic)), 2683 - // pollFn 2684 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2685 - C.ffi_iroh_streamplace_rust_future_poll_void(handle, continuation, data) 2686 - }, 2687 - // freeFn 2688 - func(handle C.uint64_t) { 2689 - C.ffi_iroh_streamplace_rust_future_free_void(handle) 2690 - }, 2691 - ) 2692 - 2693 - if err == nil { 2694 - return nil 2695 - } 2696 - 2697 - return err 2698 - } 2699 - func (object *Receiver) Destroy() { 2700 - runtime.SetFinalizer(object, nil) 2701 - object.ffiObject.destroy() 2702 - } 2703 - 2704 - type FfiConverterReceiver struct{} 2705 - 2706 - var FfiConverterReceiverINSTANCE = FfiConverterReceiver{} 2707 - 2708 - func (c FfiConverterReceiver) Lift(pointer unsafe.Pointer) *Receiver { 2709 - result := &Receiver{ 2710 - newFfiObject( 2711 - pointer, 2712 - func(pointer unsafe.Pointer, status *C.RustCallStatus) unsafe.Pointer { 2713 - return C.uniffi_iroh_streamplace_fn_clone_receiver(pointer, status) 2714 - }, 2715 - func(pointer unsafe.Pointer, status *C.RustCallStatus) { 2716 - C.uniffi_iroh_streamplace_fn_free_receiver(pointer, status) 2717 - }, 2718 - ), 2719 - } 2720 - runtime.SetFinalizer(result, (*Receiver).Destroy) 2721 - return result 2722 - } 2723 - 2724 - func (c FfiConverterReceiver) Read(reader io.Reader) *Receiver { 2725 - return c.Lift(unsafe.Pointer(uintptr(readUint64(reader)))) 2726 - } 2727 - 2728 - func (c FfiConverterReceiver) Lower(value *Receiver) unsafe.Pointer { 2729 - // TODO: this is bad - all synchronization from ObjectRuntime.go is discarded here, 2730 - // because the pointer will be decremented immediately after this function returns, 2731 - // and someone will be left holding onto a non-locked pointer. 2732 - pointer := value.ffiObject.incrementPointer("*Receiver") 2733 - defer value.ffiObject.decrementPointer() 2734 - return pointer 2735 - 2736 - } 2737 - 2738 - func (c FfiConverterReceiver) Write(writer io.Writer, value *Receiver) { 2739 - writeUint64(writer, uint64(uintptr(c.Lower(value)))) 2740 - } 2741 - 2742 - type FfiDestroyerReceiver struct{} 2743 - 2744 - func (_ FfiDestroyerReceiver) Destroy(value *Receiver) { 2745 - value.Destroy() 2746 - } 2747 - 2748 - type SenderInterface interface { 2749 - NodeAddr() *NodeAddr 2750 - // Sends the given data to all subscribers that have subscribed to this `key`. 2751 - Send(key string, data []byte) error 2752 - } 2753 - type Sender struct { 2754 - ffiObject FfiObject 2755 - } 2756 - 2757 - // Create a new sender. 2758 - func NewSender(endpoint *Endpoint) *Sender { 2759 - res, _ := uniffiRustCallAsync[error]( 2760 - nil, 2761 - // completeFn 2762 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 2763 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 2764 - return res 2765 - }, 2766 - // liftFn 2767 - func(ffi unsafe.Pointer) *Sender { 2768 - return FfiConverterSenderINSTANCE.Lift(ffi) 2769 - }, 2770 - C.uniffi_iroh_streamplace_fn_constructor_sender_new(FfiConverterEndpointINSTANCE.Lower(endpoint)), 2771 - // pollFn 2772 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2773 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 2774 - }, 2775 - // freeFn 2776 - func(handle C.uint64_t) { 2777 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 2778 - }, 2779 - ) 2780 - 2781 - return res 2782 - } 2783 - 2784 - func (_self *Sender) NodeAddr() *NodeAddr { 2785 - _pointer := _self.ffiObject.incrementPointer("*Sender") 2786 - defer _self.ffiObject.decrementPointer() 2787 - res, _ := uniffiRustCallAsync[error]( 2788 - nil, 2789 - // completeFn 2790 - func(handle C.uint64_t, status *C.RustCallStatus) unsafe.Pointer { 2791 - res := C.ffi_iroh_streamplace_rust_future_complete_pointer(handle, status) 2792 - return res 2793 - }, 2794 - // liftFn 2795 - func(ffi unsafe.Pointer) *NodeAddr { 2796 - return FfiConverterNodeAddrINSTANCE.Lift(ffi) 2797 - }, 2798 - C.uniffi_iroh_streamplace_fn_method_sender_node_addr( 2799 - _pointer), 2800 - // pollFn 2801 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2802 - C.ffi_iroh_streamplace_rust_future_poll_pointer(handle, continuation, data) 2803 - }, 2804 - // freeFn 2805 - func(handle C.uint64_t) { 2806 - C.ffi_iroh_streamplace_rust_future_free_pointer(handle) 2807 - }, 2808 - ) 2809 - 2810 - return res 2811 - } 2812 - 2813 - // Sends the given data to all subscribers that have subscribed to this `key`. 2814 - func (_self *Sender) Send(key string, data []byte) error { 2815 - _pointer := _self.ffiObject.incrementPointer("*Sender") 2816 - defer _self.ffiObject.decrementPointer() 2817 - _, err := uniffiRustCallAsync[Error]( 2818 - FfiConverterErrorINSTANCE, 2819 - // completeFn 2820 - func(handle C.uint64_t, status *C.RustCallStatus) struct{} { 2821 - C.ffi_iroh_streamplace_rust_future_complete_void(handle, status) 2822 - return struct{}{} 2823 - }, 2824 - // liftFn 2825 - func(_ struct{}) struct{} { return struct{}{} }, 2826 - C.uniffi_iroh_streamplace_fn_method_sender_send( 2827 - _pointer, FfiConverterStringINSTANCE.Lower(key), FfiConverterBytesINSTANCE.Lower(data)), 2828 - // pollFn 2829 - func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 2830 - C.ffi_iroh_streamplace_rust_future_poll_void(handle, continuation, data) 2831 - }, 2832 - // freeFn 2833 - func(handle C.uint64_t) { 2834 - C.ffi_iroh_streamplace_rust_future_free_void(handle) 2835 - }, 2836 - ) 2837 - 2838 - if err == nil { 2839 - return nil 2840 - } 2841 - 2842 - return err 2843 - } 2844 - func (object *Sender) Destroy() { 2845 - runtime.SetFinalizer(object, nil) 2846 - object.ffiObject.destroy() 2847 - } 2848 - 2849 - type FfiConverterSender struct{} 2850 - 2851 - var FfiConverterSenderINSTANCE = FfiConverterSender{} 2852 - 2853 - func (c FfiConverterSender) Lift(pointer unsafe.Pointer) *Sender { 2854 - result := &Sender{ 2855 - newFfiObject( 2856 - pointer, 2857 - func(pointer unsafe.Pointer, status *C.RustCallStatus) unsafe.Pointer { 2858 - return C.uniffi_iroh_streamplace_fn_clone_sender(pointer, status) 2859 - }, 2860 - func(pointer unsafe.Pointer, status *C.RustCallStatus) { 2861 - C.uniffi_iroh_streamplace_fn_free_sender(pointer, status) 2862 - }, 2863 - ), 2864 - } 2865 - runtime.SetFinalizer(result, (*Sender).Destroy) 2866 - return result 2867 - } 2868 - 2869 - func (c FfiConverterSender) Read(reader io.Reader) *Sender { 2870 - return c.Lift(unsafe.Pointer(uintptr(readUint64(reader)))) 2871 - } 2872 - 2873 - func (c FfiConverterSender) Lower(value *Sender) unsafe.Pointer { 2874 - // TODO: this is bad - all synchronization from ObjectRuntime.go is discarded here, 2875 - // because the pointer will be decremented immediately after this function returns, 2876 - // and someone will be left holding onto a non-locked pointer. 2877 - pointer := value.ffiObject.incrementPointer("*Sender") 2878 - defer value.ffiObject.decrementPointer() 2879 - return pointer 2880 - 2881 - } 2882 - 2883 - func (c FfiConverterSender) Write(writer io.Writer, value *Sender) { 2884 - writeUint64(writer, uint64(uintptr(c.Lower(value)))) 2885 - } 2886 - 2887 - type FfiDestroyerSender struct{} 2888 - 2889 - func (_ FfiDestroyerSender) Destroy(value *Sender) { 2890 2209 value.Destroy() 2891 2210 } 2892 2211
-180
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.h
··· 392 392 393 393 394 394 #endif 395 - #ifndef UNIFFI_FFIDEF_CALLBACK_INTERFACE_DATA_HANDLER_OLD_METHOD0 396 - #define UNIFFI_FFIDEF_CALLBACK_INTERFACE_DATA_HANDLER_OLD_METHOD0 397 - typedef void (*UniffiCallbackInterfaceDataHandlerOldMethod0)(uint64_t uniffi_handle, RustBuffer topic, RustBuffer data, UniffiForeignFutureCompleteVoid uniffi_future_callback, uint64_t uniffi_callback_data, UniffiForeignFuture* uniffi_out_return); 398 - 399 - // Making function static works arround: 400 - // https://github.com/golang/go/issues/11263 401 - static void call_UniffiCallbackInterfaceDataHandlerOldMethod0( 402 - UniffiCallbackInterfaceDataHandlerOldMethod0 cb, uint64_t uniffi_handle, RustBuffer topic, RustBuffer data, UniffiForeignFutureCompleteVoid uniffi_future_callback, uint64_t uniffi_callback_data, UniffiForeignFuture* uniffi_out_return) 403 - { 404 - return cb(uniffi_handle, topic, data, uniffi_future_callback, uniffi_callback_data, uniffi_out_return); 405 - } 406 - 407 - 408 - #endif 409 395 #ifndef UNIFFI_FFIDEF_CALLBACK_INTERFACE_GO_SIGNER_METHOD0 410 396 #define UNIFFI_FFIDEF_CALLBACK_INTERFACE_GO_SIGNER_METHOD0 411 397 typedef void (*UniffiCallbackInterfaceGoSignerMethod0)(uint64_t uniffi_handle, RustBuffer data, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ); ··· 428 414 } UniffiVTableCallbackInterfaceDataHandler; 429 415 430 416 #endif 431 - #ifndef UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_DATA_HANDLER_OLD 432 - #define UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_DATA_HANDLER_OLD 433 - typedef struct UniffiVTableCallbackInterfaceDataHandlerOld { 434 - UniffiCallbackInterfaceDataHandlerOldMethod0 handleData; 435 - UniffiCallbackInterfaceFree uniffiFree; 436 - } UniffiVTableCallbackInterfaceDataHandlerOld; 437 - 438 - #endif 439 417 #ifndef UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_GO_SIGNER 440 418 #define UNIFFI_FFIDEF_V_TABLE_CALLBACK_INTERFACE_GO_SIGNER 441 419 typedef struct UniffiVTableCallbackInterfaceGoSigner { ··· 464 442 uint64_t uniffi_iroh_streamplace_fn_method_datahandler_handle_data(void* ptr, RustBuffer topic, RustBuffer data 465 443 ); 466 444 #endif 467 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_DATAHANDLEROLD 468 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_DATAHANDLEROLD 469 - void* uniffi_iroh_streamplace_fn_clone_datahandlerold(void* ptr, RustCallStatus *out_status 470 - ); 471 - #endif 472 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_DATAHANDLEROLD 473 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_DATAHANDLEROLD 474 - void uniffi_iroh_streamplace_fn_free_datahandlerold(void* ptr, RustCallStatus *out_status 475 - ); 476 - #endif 477 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_INIT_CALLBACK_VTABLE_DATAHANDLEROLD 478 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_INIT_CALLBACK_VTABLE_DATAHANDLEROLD 479 - void uniffi_iroh_streamplace_fn_init_callback_vtable_datahandlerold(UniffiVTableCallbackInterfaceDataHandlerOld* vtable 480 - ); 481 - #endif 482 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_DATAHANDLEROLD_HANDLE_DATA 483 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_DATAHANDLEROLD_HANDLE_DATA 484 - uint64_t uniffi_iroh_streamplace_fn_method_datahandlerold_handle_data(void* ptr, RustBuffer topic, RustBuffer data 485 - ); 486 - #endif 487 445 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_DB 488 446 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_DB 489 447 void* uniffi_iroh_streamplace_fn_clone_db(void* ptr, RustCallStatus *out_status ··· 512 470 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_DB_WRITE 513 471 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_DB_WRITE 514 472 void* uniffi_iroh_streamplace_fn_method_db_write(void* ptr, RustBuffer secret, RustCallStatus *out_status 515 - ); 516 - #endif 517 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_ENDPOINT 518 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_ENDPOINT 519 - void* uniffi_iroh_streamplace_fn_clone_endpoint(void* ptr, RustCallStatus *out_status 520 - ); 521 - #endif 522 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_ENDPOINT 523 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_ENDPOINT 524 - void uniffi_iroh_streamplace_fn_free_endpoint(void* ptr, RustCallStatus *out_status 525 - ); 526 - #endif 527 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_ENDPOINT_NEW 528 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_ENDPOINT_NEW 529 - uint64_t uniffi_iroh_streamplace_fn_constructor_endpoint_new(void 530 - 531 - ); 532 - #endif 533 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_ENDPOINT_NODE_ADDR 534 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_ENDPOINT_NODE_ADDR 535 - uint64_t uniffi_iroh_streamplace_fn_method_endpoint_node_addr(void* ptr 536 473 ); 537 474 #endif 538 475 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_FILTER ··· 746 683 RustBuffer uniffi_iroh_streamplace_fn_method_publickey_uniffi_trait_display(void* ptr, RustCallStatus *out_status 747 684 ); 748 685 #endif 749 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_RECEIVER 750 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_RECEIVER 751 - void* uniffi_iroh_streamplace_fn_clone_receiver(void* ptr, RustCallStatus *out_status 752 - ); 753 - #endif 754 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_RECEIVER 755 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_RECEIVER 756 - void uniffi_iroh_streamplace_fn_free_receiver(void* ptr, RustCallStatus *out_status 757 - ); 758 - #endif 759 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_RECEIVER_NEW 760 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_RECEIVER_NEW 761 - uint64_t uniffi_iroh_streamplace_fn_constructor_receiver_new(void* endpoint, void* handler 762 - ); 763 - #endif 764 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_NODE_ADDR 765 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_NODE_ADDR 766 - uint64_t uniffi_iroh_streamplace_fn_method_receiver_node_addr(void* ptr 767 - ); 768 - #endif 769 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_SUBSCRIBE 770 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_SUBSCRIBE 771 - uint64_t uniffi_iroh_streamplace_fn_method_receiver_subscribe(void* ptr, void* remote_id, RustBuffer topic 772 - ); 773 - #endif 774 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_UNSUBSCRIBE 775 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_RECEIVER_UNSUBSCRIBE 776 - uint64_t uniffi_iroh_streamplace_fn_method_receiver_unsubscribe(void* ptr, void* remote_id, RustBuffer topic 777 - ); 778 - #endif 779 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SENDER 780 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SENDER 781 - void* uniffi_iroh_streamplace_fn_clone_sender(void* ptr, RustCallStatus *out_status 782 - ); 783 - #endif 784 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_SENDER 785 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_FREE_SENDER 786 - void uniffi_iroh_streamplace_fn_free_sender(void* ptr, RustCallStatus *out_status 787 - ); 788 - #endif 789 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_SENDER_NEW 790 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CONSTRUCTOR_SENDER_NEW 791 - uint64_t uniffi_iroh_streamplace_fn_constructor_sender_new(void* endpoint 792 - ); 793 - #endif 794 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_SENDER_NODE_ADDR 795 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_SENDER_NODE_ADDR 796 - uint64_t uniffi_iroh_streamplace_fn_method_sender_node_addr(void* ptr 797 - ); 798 - #endif 799 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_SENDER_SEND 800 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_SENDER_SEND 801 - uint64_t uniffi_iroh_streamplace_fn_method_sender_send(void* ptr, RustBuffer key, RustBuffer data 802 - ); 803 - #endif 804 686 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SUBSCRIBERESPONSE 805 687 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_CLONE_SUBSCRIBERESPONSE 806 688 void* uniffi_iroh_streamplace_fn_clone_subscriberesponse(void* ptr, RustCallStatus *out_status ··· 1155 1037 1156 1038 ); 1157 1039 #endif 1158 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DATAHANDLEROLD_HANDLE_DATA 1159 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DATAHANDLEROLD_HANDLE_DATA 1160 - uint16_t uniffi_iroh_streamplace_checksum_method_datahandlerold_handle_data(void 1161 - 1162 - ); 1163 - #endif 1164 1040 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DB_ITER_WITH_OPTS 1165 1041 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DB_ITER_WITH_OPTS 1166 1042 uint16_t uniffi_iroh_streamplace_checksum_method_db_iter_with_opts(void ··· 1182 1058 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DB_WRITE 1183 1059 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_DB_WRITE 1184 1060 uint16_t uniffi_iroh_streamplace_checksum_method_db_write(void 1185 - 1186 - ); 1187 - #endif 1188 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_ENDPOINT_NODE_ADDR 1189 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_ENDPOINT_NODE_ADDR 1190 - uint16_t uniffi_iroh_streamplace_checksum_method_endpoint_node_addr(void 1191 1061 1192 1062 ); 1193 1063 #endif ··· 1329 1199 1330 1200 ); 1331 1201 #endif 1332 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_NODE_ADDR 1333 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_NODE_ADDR 1334 - uint16_t uniffi_iroh_streamplace_checksum_method_receiver_node_addr(void 1335 - 1336 - ); 1337 - #endif 1338 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_SUBSCRIBE 1339 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_SUBSCRIBE 1340 - uint16_t uniffi_iroh_streamplace_checksum_method_receiver_subscribe(void 1341 - 1342 - ); 1343 - #endif 1344 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_UNSUBSCRIBE 1345 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_RECEIVER_UNSUBSCRIBE 1346 - uint16_t uniffi_iroh_streamplace_checksum_method_receiver_unsubscribe(void 1347 - 1348 - ); 1349 - #endif 1350 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SENDER_NODE_ADDR 1351 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SENDER_NODE_ADDR 1352 - uint16_t uniffi_iroh_streamplace_checksum_method_sender_node_addr(void 1353 - 1354 - ); 1355 - #endif 1356 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SENDER_SEND 1357 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SENDER_SEND 1358 - uint16_t uniffi_iroh_streamplace_checksum_method_sender_send(void 1359 - 1360 - ); 1361 - #endif 1362 1202 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SUBSCRIBERESPONSE_NEXT_RAW 1363 1203 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_SUBSCRIBERESPONSE_NEXT_RAW 1364 1204 uint16_t uniffi_iroh_streamplace_checksum_method_subscriberesponse_next_raw(void ··· 1371 1211 1372 1212 ); 1373 1213 #endif 1374 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_ENDPOINT_NEW 1375 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_ENDPOINT_NEW 1376 - uint16_t uniffi_iroh_streamplace_checksum_constructor_endpoint_new(void 1377 - 1378 - ); 1379 - #endif 1380 1214 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_FILTER_NEW 1381 1215 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_FILTER_NEW 1382 1216 uint16_t uniffi_iroh_streamplace_checksum_constructor_filter_new(void ··· 1419 1253 1420 1254 ); 1421 1255 #endif 1422 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_RECEIVER_NEW 1423 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_RECEIVER_NEW 1424 - uint16_t uniffi_iroh_streamplace_checksum_constructor_receiver_new(void 1425 - 1426 - ); 1427 - #endif 1428 - #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_SENDER_NEW 1429 - #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_CONSTRUCTOR_SENDER_NEW 1430 - uint16_t uniffi_iroh_streamplace_checksum_constructor_sender_new(void 1431 - 1432 - ); 1433 - #endif 1434 1256 #ifndef UNIFFI_FFIDEF_FFI_IROH_STREAMPLACE_UNIFFI_CONTRACT_VERSION 1435 1257 #define UNIFFI_FFIDEF_FFI_IROH_STREAMPLACE_UNIFFI_CONTRACT_VERSION 1436 1258 uint32_t ffi_iroh_streamplace_uniffi_contract_version(void ··· 1440 1262 1441 1263 void iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerMethod0(uint64_t uniffi_handle, RustBuffer topic, RustBuffer data, UniffiForeignFutureCompleteVoid uniffi_future_callback, uint64_t uniffi_callback_data, UniffiForeignFuture* uniffi_out_return); 1442 1264 void iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerFree(uint64_t handle); 1443 - void iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldMethod0(uint64_t uniffi_handle, RustBuffer topic, RustBuffer data, UniffiForeignFutureCompleteVoid uniffi_future_callback, uint64_t uniffi_callback_data, UniffiForeignFuture* uniffi_out_return); 1444 - void iroh_streamplace_cgo_dispatchCallbackInterfaceDataHandlerOldFree(uint64_t handle); 1445 1265 void iroh_streamplace_cgo_dispatchCallbackInterfaceGoSignerMethod0(uint64_t uniffi_handle, RustBuffer data, RustBuffer* uniffi_out_return, RustCallStatus* callStatus ); 1446 1266 void iroh_streamplace_cgo_dispatchCallbackInterfaceGoSignerFree(uint64_t handle); 1447 1267
-56
pkg/iroh/iroh_streamplace_test.go
··· 1 - package iroh_streamplace 2 - 3 - import ( 4 - "testing" 5 - 6 - "github.com/stretchr/testify/assert" 7 - 8 - iroh "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 9 - ) 10 - 11 - type Message struct { 12 - topic string 13 - data []byte 14 - } 15 - 16 - type TestHandler struct { 17 - messages chan Message 18 - } 19 - 20 - func (handler TestHandler) HandleData(topic string, data []byte) { 21 - handler.messages <- Message{topic, data} 22 - } 23 - 24 - func TestBasicRoundtrip(t *testing.T) { 25 - ep1, err := iroh.NewEndpoint() 26 - assert.Nil(t, err) 27 - sender := iroh.NewSender(ep1) 28 - 29 - messages := make(chan Message, 5) 30 - handler := TestHandler{messages: messages} 31 - 32 - ep2, err := iroh.NewEndpoint() 33 - assert.NoError(t, err) 34 - receiver, err := iroh.NewReceiver(ep2, &handler) 35 - assert.NoError(t, err) 36 - 37 - senderAddr := sender.NodeAddr() 38 - senderId := senderAddr.NodeId() 39 - 40 - // subscribe 41 - err = receiver.Subscribe(senderId, "foo") 42 - assert.NoError(t, err) 43 - 44 - // send a few messages 45 - for i := range 5 { 46 - err = sender.Send("foo", []byte{byte(i), 0, 0, 0}) 47 - assert.NoError(t, err) 48 - } 49 - 50 - // make sure the receiver got them 51 - for i := range 5 { 52 - msg := <-messages 53 - assert.Equal(t, msg.topic, "foo") 54 - assert.Equal(t, msg.data, []byte{byte(i), 0, 0, 0}) 55 - } 56 - }
+3 -3
pkg/media/media.go
··· 58 58 Segment *model.Segment 59 59 Data []byte 60 60 Metadata *SegmentMetadata 61 + Local bool 61 62 } 62 63 63 64 func RunSelfTest(ctx context.Context) error { ··· 65 66 return SelfTest(ctx) 66 67 } 67 68 68 - func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, rep replication.Replicator, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 69 + func MakeMediaManager(ctx context.Context, cli *config.CLI, signer crypto.Signer, mod model.Model, bus *bus.Bus, atsync *atproto.ATProtoSynchronizer) (*MediaManager, error) { 69 70 gstinit.InitGST() 70 71 err := SelfTest(ctx) 71 72 if err != nil { ··· 121 122 122 123 return &MediaManager{ 123 124 cli: cli, 124 - replicator: rep, 125 125 hlsRunning: map[string]*M3U8{}, 126 126 httpPipes: map[string]io.Writer{}, 127 127 model: mod, ··· 135 135 func (mm *MediaManager) HandleData(node *irohStreamplace.PublicKey, data []byte) { 136 136 r := bytes.NewReader(data) 137 137 ctx := context.Background() 138 - err := mm.ValidateMP4(ctx, r) 138 + err := mm.ValidateMP4(ctx, r, true) 139 139 if err != nil { 140 140 log.Log(ctx, "invalid incoming segment", "error", err) 141 141 }
+2 -3
pkg/media/media_test.go
··· 13 13 "stream.place/streamplace/pkg/config" 14 14 ct "stream.place/streamplace/pkg/config/configtesting" 15 15 "stream.place/streamplace/pkg/model" 16 - "stream.place/streamplace/pkg/replication/boring" 17 16 ) 18 17 19 18 func getFixture(name string) string { ··· 40 39 StatefulDB: nil, // Test doesn't need StatefulDB for now 41 40 Bus: bus.NewBus(), 42 41 } 43 - mm, err := MakeMediaManager(context.Background(), cli, nil, &boring.BoringReplicator{}, mod, bus.NewBus(), atsync) 42 + mm, err := MakeMediaManager(context.Background(), cli, nil, mod, bus.NewBus(), atsync) 44 43 require.NoError(t, err) 45 44 // ms, err := MakeMediaSigner(context.Background(), cli, "test-person", signer) 46 45 // require.NoError(t, err) ··· 126 125 f, err := os.Open(getFixture("sample-segment.mp4")) 127 126 require.NoError(t, err) 128 127 mm, _ := getStaticTestMediaManager(t) 129 - err = mm.ValidateMP4(context.Background(), f) 128 + err = mm.ValidateMP4(context.Background(), f, true) 130 129 require.NoError(t, err) 131 130 }
+1 -2
pkg/media/rtcrec_test.go
··· 11 11 "stream.place/streamplace/pkg/config" 12 12 "stream.place/streamplace/pkg/crypto/spkey" 13 13 "stream.place/streamplace/pkg/globalerror" 14 - "stream.place/streamplace/pkg/replication/boring" 15 14 "stream.place/streamplace/pkg/rtcrec" 16 15 ) 17 16 ··· 26 25 fs := cli.NewFlagSet("rtcrec-test") 27 26 err = cli.Parse(fs, []string{"--data-dir", dir, "-wide-open=true"}) 28 27 require.NoError(t, err) 29 - mm, err := MakeMediaManager(context.Background(), cli, nil, &boring.BoringReplicator{}, nil, nil, nil) 28 + mm, err := MakeMediaManager(context.Background(), cli, nil, nil, nil, nil) 30 29 require.NoError(t, err) 31 30 priv, pub, err := spkey.GenerateStreamKey() 32 31 require.NoError(t, err)
+1 -1
pkg/media/segmenter.go
··· 87 87 return 88 88 } 89 89 90 - err = mm.ValidateMP4(ctx, bytes.NewReader(bs)) 90 + err = mm.ValidateMP4(ctx, bytes.NewReader(bs), true) 91 91 if err != nil { 92 92 log.Error(ctx, "error validating segment", "error", err) 93 93 globalerror.GlobalError(err)
+3 -2
pkg/media/validate.go
··· 23 23 Cert string `json:"cert"` 24 24 } 25 25 26 - func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader) error { 26 + func (mm *MediaManager) ValidateMP4(ctx context.Context, input io.Reader, local bool) error { 27 27 ctx, span := otel.Tracer("signer").Start(ctx, "ValidateMP4") 28 28 defer span.End() 29 29 buf, err := io.ReadAll(input) ··· 82 82 return err 83 83 } 84 84 defer fd.Close() 85 - go mm.replicator.NewSegment(ctx, buf) 85 + 86 86 r := bytes.NewReader(buf) 87 87 if _, err := io.Copy(fd, r); err != nil { 88 88 return err ··· 102 102 Segment: seg, 103 103 Data: buf, 104 104 Metadata: meta, 105 + Local: local, 105 106 } 106 107 for _, ch := range mm.newSegmentSubs { 107 108 go func() { ch <- not }()
+2 -21
pkg/replication/iroh_replicator/iroh.go
··· 2 2 3 3 import ( 4 4 "context" 5 - 6 - "stream.place/streamplace/pkg/log" 7 - 8 - irohStreamplace "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 9 5 ) 10 6 11 7 // IrohReplicator implements the replication mechanism using iroh 12 8 type IrohReplicator struct { 13 - topic string 14 - sender *irohStreamplace.Sender 15 9 } 16 10 17 - func NewIrohReplicator(ctx context.Context, ep *irohStreamplace.Endpoint, topic string) (*IrohReplicator, error) { 18 - sender := irohStreamplace.NewSender(ep) 11 + func NewIrohReplicator(ctx context.Context) (*IrohReplicator, error) { 19 12 20 - return &IrohReplicator{ 21 - topic: topic, 22 - sender: sender, 23 - }, nil 13 + return &IrohReplicator{}, nil 24 14 } 25 15 26 16 func (rep *IrohReplicator) NewSegment(ctx context.Context, bs []byte) { 27 - go func(topic string) { 28 - err := sendSegment(rep.sender, topic, bs) 29 - if err != nil { 30 - log.Log(ctx, "error replicating segment", "error", err) 31 - } 32 - }(rep.topic) 33 - } 34 17 35 - func sendSegment(endpoint *irohStreamplace.Sender, topic string, bs []byte) error { 36 - return endpoint.Send(topic, bs) 37 18 }
+86 -26
pkg/replication/iroh_replicator/kv.go
··· 1 1 package iroh_replicator 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "encoding/json" 6 7 "fmt" 7 8 "time" 8 9 10 + "github.com/bluesky-social/indigo/util" 11 + "golang.org/x/sync/errgroup" 9 12 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 10 13 "stream.place/streamplace/pkg/log" 14 + "stream.place/streamplace/pkg/media" 11 15 ) 12 16 13 - type SwarmKV struct { 14 - Node *iroh_streamplace.Node 15 - DB *iroh_streamplace.Db 16 - w *iroh_streamplace.WriteScope 17 + type IrohSwarm struct { 18 + Node *iroh_streamplace.Node 19 + DB *iroh_streamplace.Db 20 + w *iroh_streamplace.WriteScope 21 + mm *media.MediaManager 22 + segChan chan *media.NewSegmentNotification 23 + nodeId string 17 24 } 18 25 19 26 // A message saying "hey I ingested node data at this time" ··· 22 29 Time string `json:"time"` 23 30 } 24 31 25 - type DataHandler struct{} 26 - 27 - func (handler *DataHandler) HandleData(topic string, data []byte) { 28 - log.Log(context.Background(), "HandleData", "topic", topic, "data", len(data)) 29 - } 30 - 31 - func StartKV(ctx context.Context, tickets []string, secret []byte) (*SwarmKV, error) { 32 - handler := &DataHandler{} 32 + func NewSwarm(ctx context.Context, tickets []string, secret []byte, mm *media.MediaManager) (*IrohSwarm, error) { 33 33 ctx = log.WithLogValues(ctx, "func", "StartKV") 34 34 35 35 log.Log(ctx, "Starting with tickets", "tickets", tickets) ··· 39 39 MaxSendDuration: 1000_000_000, // 1s 40 40 } 41 41 log.Log(ctx, "Config created", "config", config) 42 - node, err := iroh_streamplace.NodeReceiver(config, handler) 42 + 43 + swarm := IrohSwarm{ 44 + mm: mm, 45 + } 46 + 47 + node, err := iroh_streamplace.NodeReceiver(config, &swarm) 43 48 if err != nil { 44 49 return nil, fmt.Errorf("failed to create NodeSender: %w", err) 45 50 } ··· 47 52 db := node.Db() 48 53 w := node.NodeScope() 49 54 50 - node_id, err := node.NodeId() 55 + swarm.DB = db 56 + swarm.w = w 57 + swarm.Node = node 58 + 59 + nodeId, err := node.NodeId() 51 60 if err != nil { 52 61 return nil, fmt.Errorf("failed to get NodeId: %w", err) 53 62 } 54 - log.Log(ctx, "Node ID:", "node_id", node_id) 63 + log.Log(ctx, "Node ID:", "node_id", nodeId) 64 + swarm.nodeId = nodeId.String() 55 65 56 66 ticket, err := node.Ticket() 57 67 if err != nil { ··· 59 69 } 60 70 log.Log(ctx, "Ticket:", "ticket", ticket) 61 71 62 - swarm := SwarmKV{ 63 - Node: node, 64 - DB: db, 65 - w: w, 66 - } 67 72 return &swarm, nil 68 73 } 69 74 70 75 var activeSubs = make(map[string]bool) 71 76 72 - func (swarm *SwarmKV) Start(ctx context.Context, tickets []string) error { 77 + func (swarm *IrohSwarm) Start(ctx context.Context, tickets []string) error { 73 78 if len(tickets) > 0 { 74 79 err := swarm.Node.JoinPeers(tickets) 75 80 if err != nil { ··· 84 89 nodeIdStr := nodeId.String() 85 90 log.Log(ctx, "Node ID:", "node_id", nodeIdStr) 86 91 92 + g, ctx := errgroup.WithContext(ctx) 93 + g.Go(func() error { 94 + return swarm.startKV(ctx) 95 + }) 96 + g.Go(func() error { 97 + return swarm.startSegmentSender(ctx) 98 + }) 99 + return g.Wait() 100 + } 101 + 102 + func (swarm *IrohSwarm) startKV(ctx context.Context) error { 87 103 sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter()) 88 104 for { 89 105 if ctx.Err() != nil { ··· 110 126 continue 111 127 } 112 128 if !activeSubs[keyStr] { 113 - if info.NodeID == nodeIdStr { 129 + if info.NodeID == swarm.nodeId { 114 130 activeSubs[keyStr] = true 115 131 continue 116 132 } ··· 137 153 } 138 154 } 139 155 140 - func (swarm *SwarmKV) Put(ctx context.Context, key string, value []byte) error { 141 - // streamerBs := []byte(streamer) 142 - keyBs := []byte(key) 143 - return swarm.w.Put(nil, keyBs, value) 156 + func (swarm *IrohSwarm) startSegmentSender(ctx context.Context) error { 157 + ch := swarm.mm.NewSegment() 158 + for { 159 + select { 160 + case <-ctx.Done(): 161 + return ctx.Err() 162 + case not := <-ch: 163 + err := swarm.SendSegment(ctx, not) 164 + if err != nil { 165 + log.Error(ctx, "could not send segment to swarm", "error", err) 166 + } 167 + continue 168 + } 169 + } 170 + } 171 + 172 + func (swarm *IrohSwarm) HandleData(topic string, data []byte) { 173 + err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 174 + if err != nil { 175 + log.Error(context.Background(), "could not validate segment", "error", err) 176 + } 177 + } 178 + 179 + func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error { 180 + if !not.Local { 181 + return nil 182 + } 183 + originInfo := OriginInfo{ 184 + NodeID: swarm.nodeId, 185 + Time: not.Segment.StartTime.Format(util.ISO8601), 186 + } 187 + bs, err := json.Marshal(originInfo) 188 + if err != nil { 189 + log.Error(ctx, "could not marshal origin info", "error", err) 190 + return err 191 + } 192 + keyBs := []byte(not.Segment.RepoDID) 193 + err = swarm.w.Put(nil, keyBs, bs) 194 + if err != nil { 195 + log.Error(ctx, "could not put segment to swarm", "error", err) 196 + return err 197 + } 198 + err = swarm.Node.SendSegment(not.Segment.RepoDID, not.Data) 199 + if err != nil { 200 + log.Error(ctx, "could not send segment to swarm", "error", err) 201 + return err 202 + } 203 + return nil 144 204 }
-226
rust/iroh-streamplace/src/api.rs
··· 1 - //! Protocol API 2 - 3 - use std::collections::{BTreeMap, BTreeSet}; 4 - 5 - use bytes::Bytes; 6 - use iroh::{Endpoint, NodeId, protocol::ProtocolHandler}; 7 - use irpc::{Client, WithChannels, channel::oneshot, rpc::RemoteService, rpc_requests}; 8 - use irpc_iroh::{IrohProtocol, IrohRemoteConnection}; 9 - use n0_future::future::Boxed; 10 - use serde::{Deserialize, Serialize}; 11 - use tracing::{debug, warn}; 12 - 13 - /// Subscribe to the given `key` 14 - #[derive(Debug, Serialize, Deserialize)] 15 - struct Subscribe { 16 - key: String, 17 - // TODO: verify 18 - remote_id: NodeId, 19 - } 20 - 21 - /// Unsubscribe from the given `key` 22 - #[derive(Debug, Serialize, Deserialize)] 23 - struct Unsubscribe { 24 - key: String, 25 - // TODO: verify 26 - remote_id: NodeId, 27 - } 28 - 29 - #[derive(Debug, Serialize, Deserialize)] 30 - struct SendSegment { 31 - key: String, 32 - data: Bytes, 33 - } 34 - 35 - #[derive(Debug, Clone, Serialize, Deserialize)] 36 - struct RecvSegment { 37 - key: String, 38 - data: Bytes, 39 - } 40 - 41 - // Use the macro to generate both the Protocol and Message enums 42 - // plus implement Channels for each type 43 - #[rpc_requests(message = Message)] 44 - #[derive(Serialize, Deserialize, Debug)] 45 - enum Protocol { 46 - #[rpc(tx=oneshot::Sender<()>)] 47 - Subscribe(Subscribe), 48 - #[rpc(tx=oneshot::Sender<()>)] 49 - Unsubscribe(Unsubscribe), 50 - #[rpc(tx=oneshot::Sender<()>)] 51 - SendSegment(SendSegment), 52 - #[rpc(tx=oneshot::Sender<()>)] 53 - RecvSegment(RecvSegment), 54 - } 55 - 56 - struct Actor { 57 - endpoint: iroh::Endpoint, 58 - recv: tokio::sync::mpsc::Receiver<Message>, 59 - subscriptions: BTreeMap<String, BTreeSet<NodeId>>, 60 - connections: BTreeMap<NodeId, Connection>, 61 - handler: Box<dyn Fn(String, Vec<u8>) -> Boxed<()> + Send + Sync + 'static>, 62 - } 63 - 64 - #[derive(Debug)] 65 - struct Connection { 66 - _id: NodeId, 67 - rpc: Client<Protocol>, 68 - } 69 - 70 - impl Actor { 71 - fn spawn( 72 - endpoint: &iroh::Endpoint, 73 - handler: impl Fn(String, Vec<u8>) -> Boxed<()> + Send + Sync + 'static, 74 - ) -> Api { 75 - let (tx, rx) = tokio::sync::mpsc::channel(1); 76 - let actor = Self { 77 - endpoint: endpoint.clone(), 78 - recv: rx, 79 - subscriptions: BTreeMap::new(), 80 - connections: BTreeMap::new(), 81 - handler: Box::new(handler), 82 - }; 83 - n0_future::task::spawn(actor.run()); 84 - Api { 85 - inner: Client::local(tx), 86 - } 87 - } 88 - 89 - async fn run(mut self) { 90 - while let Some(msg) = self.recv.recv().await { 91 - self.handle(msg).await; 92 - } 93 - } 94 - 95 - async fn handle(&mut self, msg: Message) { 96 - match msg { 97 - Message::Subscribe(sub) => { 98 - debug!("subscribe {:?}", sub); 99 - let WithChannels { tx, inner, .. } = sub; 100 - 101 - self.subscriptions 102 - .entry(inner.key) 103 - .or_default() 104 - .insert(inner.remote_id); 105 - 106 - tx.send(()).await.ok(); 107 - } 108 - Message::Unsubscribe(sub) => { 109 - debug!("unsubscribe {:?}", sub); 110 - let WithChannels { tx, inner, .. } = sub; 111 - 112 - if let Some(e) = self.subscriptions.get_mut(&inner.key) { 113 - e.remove(&inner.remote_id); 114 - } 115 - 116 - tx.send(()).await.ok(); 117 - } 118 - Message::SendSegment(segment) => { 119 - debug!("send segment {:?}", segment); 120 - let WithChannels { tx, inner, .. } = segment; 121 - 122 - let msg = RecvSegment { 123 - key: inner.key.clone(), 124 - data: inner.data.clone(), 125 - }; 126 - 127 - for (key, remotes) in &self.subscriptions { 128 - if key == &inner.key { 129 - for remote in remotes { 130 - debug!("sending to topic {}: {}", key, remote); 131 - 132 - // ensure connection 133 - if !self.connections.contains_key(remote) { 134 - let conn = IrohRemoteConnection::new( 135 - self.endpoint.clone(), 136 - (*remote).into(), 137 - Api::ALPN.to_vec(), 138 - ); 139 - 140 - let conn = Connection { 141 - rpc: Client::boxed(conn), 142 - _id: *remote, 143 - }; 144 - self.connections.insert(*remote, conn); 145 - } 146 - let conn = self.connections.get(remote).expect("just checked"); 147 - 148 - if let Err(err) = conn.rpc.rpc(msg.clone()).await { 149 - warn!("failed to send to {}: {:?}", remote, err); 150 - // remove conn 151 - self.connections.remove(remote); 152 - } 153 - } 154 - } 155 - } 156 - 157 - tx.send(()).await.ok(); 158 - } 159 - Message::RecvSegment(segment) => { 160 - debug!("recv segment {:?}", segment); 161 - let WithChannels { tx, inner, .. } = segment; 162 - (self.handler)(inner.key, inner.data.to_vec()).await; 163 - tx.send(()).await.ok(); 164 - } 165 - } 166 - } 167 - } 168 - 169 - /// The actual API to interact with 170 - pub(crate) struct Api { 171 - inner: Client<Protocol>, 172 - } 173 - 174 - impl Api { 175 - pub(crate) const ALPN: &[u8] = b"/iroh/streamplace/1"; 176 - 177 - pub(crate) fn spawn(endpoint: &iroh::Endpoint) -> Self { 178 - Actor::spawn(endpoint, |_, _| Box::pin(async move {})) 179 - } 180 - 181 - pub(crate) fn spawn_with_handler( 182 - endpoint: &iroh::Endpoint, 183 - handler: impl Fn(String, Vec<u8>) -> Boxed<()> + Send + Sync + 'static, 184 - ) -> Self { 185 - Actor::spawn(endpoint, handler) 186 - } 187 - 188 - pub(crate) fn connect(endpoint: Endpoint, addr: impl Into<iroh::NodeAddr>) -> Api { 189 - let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); 190 - Api { 191 - inner: Client::boxed(conn), 192 - } 193 - } 194 - 195 - pub(crate) fn expose(&self) -> impl ProtocolHandler { 196 - let local = self 197 - .inner 198 - .as_local() 199 - .expect("can not listen on remote service"); 200 - IrohProtocol::new(Protocol::remote_handler(local)) 201 - } 202 - 203 - pub(crate) async fn subscribe(&self, key: String, self_id: NodeId) -> irpc::Result<()> { 204 - self.inner 205 - .rpc(Subscribe { 206 - key, 207 - remote_id: self_id, 208 - }) 209 - .await 210 - } 211 - 212 - pub(crate) async fn unsubscribe(&self, key: String, self_id: NodeId) -> irpc::Result<()> { 213 - self.inner 214 - .rpc(Unsubscribe { 215 - key, 216 - remote_id: self_id, 217 - }) 218 - .await 219 - } 220 - 221 - /// Send this segment to all subscriptions. 222 - pub(crate) async fn send_segment(&self, key: String, data: Bytes) -> irpc::Result<()> { 223 - let msg = SendSegment { key, data }; 224 - self.inner.rpc(msg).await 225 - } 226 - }
-30
rust/iroh-streamplace/src/endpoint.rs
··· 1 - use iroh::Watcher; 2 - 3 - use crate::{error::Error, node_addr::NodeAddr}; 4 - 5 - #[derive(uniffi::Object, Debug, Clone)] 6 - pub struct Endpoint { 7 - pub(crate) endpoint: iroh::Endpoint, 8 - } 9 - 10 - #[uniffi::export] 11 - impl Endpoint { 12 - /// Create a new endpoint. 13 - #[uniffi::constructor(async_runtime = "tokio")] 14 - pub async fn new() -> Result<Self, Error> { 15 - let endpoint = iroh::Endpoint::builder() 16 - .discovery_n0() 17 - .discovery_local_network() 18 - .bind() 19 - .await?; 20 - 21 - Ok(Self { endpoint }) 22 - } 23 - 24 - #[uniffi::method(async_runtime = "tokio")] 25 - pub async fn node_addr(&self) -> NodeAddr { 26 - let _ = self.endpoint.home_relay().initialized().await; 27 - let addr = self.endpoint.node_addr().initialized().await; 28 - addr.into() 29 - } 30 - }
+1 -6
rust/iroh-streamplace/src/lib.rs
··· 1 1 uniffi::setup_scaffolding!(); 2 2 3 3 pub mod c2pa; 4 - pub mod endpoint; 5 4 pub mod error; 6 - pub mod public_key; 7 5 pub mod node; 8 - pub mod receiver; 9 - pub mod sender; 10 6 pub mod node_addr; 11 - 12 - mod api; 7 + pub mod public_key;
-150
rust/iroh-streamplace/src/receiver.rs
··· 1 - use std::sync::Arc; 2 - 3 - use iroh::protocol::Router; 4 - 5 - use crate::{api::Api, endpoint::Endpoint, error::Error, public_key::PublicKey, node_addr::NodeAddr}; 6 - 7 - #[derive(uniffi::Object)] 8 - pub struct Receiver { 9 - endpoint: Endpoint, 10 - _api: Api, 11 - _router: iroh::protocol::Router, 12 - } 13 - 14 - #[uniffi::export] 15 - impl Receiver { 16 - /// Create a new receiver. 17 - #[uniffi::constructor(async_runtime = "tokio")] 18 - pub async fn new( 19 - endpoint: &Endpoint, 20 - handler: Arc<dyn DataHandlerOld>, 21 - ) -> Result<Receiver, Error> { 22 - let api = Api::spawn_with_handler(&endpoint.endpoint, move |id, data| { 23 - let handler = handler.clone(); 24 - Box::pin(async move { 25 - handler.handle_data(id, data).await; 26 - }) 27 - }); 28 - let router = Router::builder(endpoint.endpoint.clone()) 29 - .accept(Api::ALPN, api.expose()) 30 - .spawn(); 31 - 32 - Ok(Receiver { 33 - endpoint: endpoint.clone(), 34 - _api: api, 35 - _router: router, 36 - }) 37 - } 38 - 39 - /// Subscribe to the given topic on the remote. 40 - #[uniffi::method(async_runtime = "tokio")] 41 - pub async fn subscribe(&self, remote_id: Arc<PublicKey>, topic: &str) -> Result<(), Error> { 42 - let remote_id: iroh::NodeId = remote_id.as_ref().into(); 43 - let api = Api::connect(self.endpoint.endpoint.clone(), remote_id); 44 - api.subscribe(topic.to_string(), self.endpoint.endpoint.node_id()) 45 - .await?; 46 - Ok(()) 47 - } 48 - 49 - /// Unsubscribe from this topic on the remote. 50 - #[uniffi::method(async_runtime = "tokio")] 51 - pub async fn unsubscribe( 52 - &self, 53 - remote_id: Arc<PublicKey>, 54 - topic: &str, 55 - ) -> Result<(), Error> { 56 - let remote_id: iroh::NodeId = remote_id.as_ref().into(); 57 - let api = Api::connect(self.endpoint.endpoint.clone(), remote_id); 58 - api.unsubscribe(topic.to_string(), self.endpoint.endpoint.node_id()) 59 - .await?; 60 - Ok(()) 61 - } 62 - 63 - #[uniffi::method(async_runtime = "tokio")] 64 - pub async fn node_addr(&self) -> NodeAddr { 65 - self.endpoint.node_addr().await 66 - } 67 - } 68 - 69 - #[uniffi::export(with_foreign)] 70 - #[async_trait::async_trait] 71 - pub trait DataHandlerOld: Send + Sync { 72 - async fn handle_data(&self, topic: String, data: Vec<u8>); 73 - } 74 - 75 - #[cfg(test)] 76 - mod tests { 77 - 78 - use super::*; 79 - use crate::sender::Sender; 80 - 81 - #[tokio::test] 82 - async fn test_roundtrip() { 83 - tracing_subscriber::fmt() 84 - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) 85 - .init(); 86 - 87 - let ep1 = Endpoint::new().await.unwrap(); 88 - let sender = Sender::new(&ep1).await.unwrap(); 89 - 90 - let (s, mut r) = tokio::sync::mpsc::channel(5); 91 - 92 - #[derive(Debug, Clone)] 93 - struct TestHandler { 94 - messages: tokio::sync::mpsc::Sender<(String, Vec<u8>)>, 95 - } 96 - 97 - #[async_trait::async_trait] 98 - impl DataHandlerOld for TestHandler { 99 - async fn handle_data(&self, topic: String, data: Vec<u8>) { 100 - self.messages.send((topic, data)).await.unwrap(); 101 - } 102 - } 103 - 104 - let handler = TestHandler { messages: s }; 105 - let ep2 = Endpoint::new().await.unwrap(); 106 - let receiver = Receiver::new(&ep2, Arc::new(handler.clone())) 107 - .await 108 - .unwrap(); 109 - 110 - let sender_addr = sender.node_addr().await; 111 - println!("sender addr: {sender_addr:?}"); 112 - 113 - let receiver_addr = receiver.node_addr().await; 114 - println!("recv addr: {receiver_addr:?}"); 115 - 116 - // subscribe 117 - receiver 118 - .subscribe(Arc::new(sender_addr.node_id()), "foo") 119 - .await 120 - .unwrap(); 121 - 122 - // send a few messages 123 - for i in 0u8..5 { 124 - sender.send("foo", &[i, 0, 0, 0]).await.unwrap(); 125 - } 126 - 127 - // make sure the receiver got them 128 - for i in 0u8..5 { 129 - let (topic, msg) = r.recv().await.unwrap(); 130 - assert_eq!(topic, "foo"); 131 - assert_eq!(msg, vec![i, 0, 0, 0]); 132 - } 133 - 134 - // unsubscribe 135 - receiver 136 - .unsubscribe(Arc::new(sender_addr.node_id()), "foo") 137 - .await 138 - .unwrap(); 139 - 140 - // send a message, shouldn't error 141 - sender.send("foo", &[1]).await.unwrap(); 142 - 143 - // no message received, times out 144 - let res = tokio::time::timeout(std::time::Duration::from_millis(200), async { 145 - r.recv().await.unwrap(); 146 - }) 147 - .await; 148 - assert!(res.is_err()); 149 - } 150 - }
-43
rust/iroh-streamplace/src/sender.rs
··· 1 - use bytes::Bytes; 2 - use iroh::protocol::Router; 3 - 4 - use crate::{api::Api, c2pa::SPError, endpoint::Endpoint, error::Error, node_addr::NodeAddr}; 5 - 6 - #[derive(uniffi::Object)] 7 - pub struct Sender { 8 - endpoint: Endpoint, 9 - api: Api, 10 - _router: iroh::protocol::Router, 11 - } 12 - 13 - #[uniffi::export] 14 - impl Sender { 15 - /// Create a new sender. 16 - #[uniffi::constructor(async_runtime = "tokio")] 17 - pub async fn new(endpoint: &Endpoint) -> Sender { 18 - let api = Api::spawn(&endpoint.endpoint); 19 - let router = Router::builder(endpoint.endpoint.clone()) 20 - .accept(Api::ALPN, api.expose()) 21 - .spawn(); 22 - 23 - Sender { 24 - endpoint: endpoint.clone(), 25 - api, 26 - _router: router, 27 - } 28 - } 29 - 30 - /// Sends the given data to all subscribers that have subscribed to this `key`. 31 - #[uniffi::method(async_runtime = "tokio")] 32 - pub async fn send(&self, key: &str, data: &[u8]) -> Result<(), Error> { 33 - self.api 34 - .send_segment(key.to_string(), Bytes::copy_from_slice(data)) 35 - .await?; 36 - Ok(()) 37 - } 38 - 39 - #[uniffi::method(async_runtime = "tokio")] 40 - pub async fn node_addr(&self) -> NodeAddr { 41 - self.endpoint.node_addr().await 42 - } 43 - }