Live video on the AT Protocol

iroh: add AddTickets

+115 -11
+43
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.go
··· 498 498 } 499 499 { 500 500 checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 501 + return C.uniffi_iroh_streamplace_checksum_method_node_add_tickets() 502 + }) 503 + if checksum != 8701 { 504 + // If this happens try cleaning and rebuilding your project 505 + panic("iroh_streamplace: uniffi_iroh_streamplace_checksum_method_node_add_tickets: UniFFI API checksum mismatch") 506 + } 507 + } 508 + { 509 + checksum := rustCall(func(_uniffiStatus *C.RustCallStatus) C.uint16_t { 501 510 return C.uniffi_iroh_streamplace_checksum_method_node_db() 502 511 }) 503 512 if checksum != 39096 { ··· 1568 1577 1569 1578 // Iroh-streamplace node that can send, forward or receive stream segments. 1570 1579 type NodeInterface interface { 1580 + // Add tickets for remote peers 1581 + AddTickets(peers []string) error 1571 1582 // Get a handle to the db to watch for changes locally or globally. 1572 1583 Db() *Db 1573 1584 // Join peers by their node tickets. ··· 1682 1693 } 1683 1694 1684 1695 return res, err 1696 + } 1697 + 1698 + // Add tickets for remote peers 1699 + func (_self *Node) AddTickets(peers []string) error { 1700 + _pointer := _self.ffiObject.incrementPointer("*Node") 1701 + defer _self.ffiObject.decrementPointer() 1702 + _, err := uniffiRustCallAsync[JoinPeersError]( 1703 + FfiConverterJoinPeersErrorINSTANCE, 1704 + // completeFn 1705 + func(handle C.uint64_t, status *C.RustCallStatus) struct{} { 1706 + C.ffi_iroh_streamplace_rust_future_complete_void(handle, status) 1707 + return struct{}{} 1708 + }, 1709 + // liftFn 1710 + func(_ struct{}) struct{} { return struct{}{} }, 1711 + C.uniffi_iroh_streamplace_fn_method_node_add_tickets( 1712 + _pointer, FfiConverterSequenceStringINSTANCE.Lower(peers)), 1713 + // pollFn 1714 + func(handle C.uint64_t, continuation C.UniffiRustFutureContinuationCallback, data C.uint64_t) { 1715 + C.ffi_iroh_streamplace_rust_future_poll_void(handle, continuation, data) 1716 + }, 1717 + // freeFn 1718 + func(handle C.uint64_t) { 1719 + C.ffi_iroh_streamplace_rust_future_free_void(handle) 1720 + }, 1721 + ) 1722 + 1723 + if err == nil { 1724 + return nil 1725 + } 1726 + 1727 + return err 1685 1728 } 1686 1729 1687 1730 // Get a handle to the db to watch for changes locally or globally.
+11
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.h
··· 568 568 uint64_t uniffi_iroh_streamplace_fn_constructor_node_sender(RustBuffer config 569 569 ); 570 570 #endif 571 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_NODE_ADD_TICKETS 572 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_NODE_ADD_TICKETS 573 + uint64_t uniffi_iroh_streamplace_fn_method_node_add_tickets(void* ptr, RustBuffer peers 574 + ); 575 + #endif 571 576 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_NODE_DB 572 577 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_FN_METHOD_NODE_DB 573 578 void* uniffi_iroh_streamplace_fn_method_node_db(void* ptr, RustCallStatus *out_status ··· 1106 1111 #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_GOSIGNER_SIGN 1107 1112 #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_GOSIGNER_SIGN 1108 1113 uint16_t uniffi_iroh_streamplace_checksum_method_gosigner_sign(void 1114 + 1115 + ); 1116 + #endif 1117 + #ifndef UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_NODE_ADD_TICKETS 1118 + #define UNIFFI_FFIDEF_UNIFFI_IROH_STREAMPLACE_CHECKSUM_METHOD_NODE_ADD_TICKETS 1119 + uint16_t uniffi_iroh_streamplace_checksum_method_node_add_tickets(void 1109 1120 1110 1121 ); 1111 1122 #endif
+20 -11
pkg/replication/iroh_replicator/kv.go
··· 15 15 ) 16 16 17 17 type IrohSwarm struct { 18 - Node *iroh_streamplace.Node 19 - DB *iroh_streamplace.Db 20 - w *iroh_streamplace.WriteScope 21 - mm *media.MediaManager 22 - segChan chan *media.NewSegmentNotification 23 - nodeId string 24 - activeSubs map[string]*OriginInfo 18 + Node *iroh_streamplace.Node 19 + DB *iroh_streamplace.Db 20 + w *iroh_streamplace.WriteScope 21 + mm *media.MediaManager 22 + segChan chan *media.NewSegmentNotification 23 + nodeId string 24 + activeSubs map[string]*OriginInfo 25 + handleDataScoped func(topic string, data []byte) 25 26 } 26 27 27 28 // A message saying "hey I ingested node data at this time" ··· 44 45 swarm := IrohSwarm{ 45 46 mm: mm, 46 47 activeSubs: make(map[string]*OriginInfo), 48 + } 49 + 50 + // workaround to get context into the HandleData callback 51 + swarm.handleDataScoped = func(topic string, data []byte) { 52 + if ctx.Err() != nil { 53 + return 54 + } 55 + err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 56 + if err != nil { 57 + log.Error(ctx, "could not validate segment", "error", err, "topic", topic, "data", len(data)) 58 + } 47 59 } 48 60 49 61 node, err := iroh_streamplace.NodeReceiver(config, &swarm) ··· 193 205 } 194 206 195 207 func (swarm *IrohSwarm) HandleData(topic string, data []byte) { 196 - err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 197 - if err != nil { 198 - log.Error(context.Background(), "could not validate segment", "error", err, "topic", topic, "data", len(data)) 199 - } 208 + swarm.handleDataScoped(topic, data) 200 209 } 201 210 202 211 func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error {
+41
rust/iroh-streamplace/src/node/streams.rs
··· 98 98 } 99 99 100 100 #[derive(Debug, Serialize, Deserialize)] 101 + pub struct AddTickets { 102 + pub peers: Vec<NodeAddr>, 103 + } 104 + 105 + #[derive(Debug, Serialize, Deserialize)] 101 106 pub struct GetNodeAddr; 102 107 103 108 // Use the macro to generate both the Protocol and Message enums ··· 113 118 SendSegment(SendSegment), 114 119 #[rpc(tx=oneshot::Sender<()>)] 115 120 JoinPeers(JoinPeers), 121 + #[rpc(tx=oneshot::Sender<()>)] 122 + AddTickets(AddTickets), 116 123 #[rpc(tx=oneshot::Sender<NodeAddr>)] 117 124 GetNodeAddr(GetNodeAddr), 118 125 } ··· 477 484 self.subscriptions.remove(&key); 478 485 tx.send(()).await.ok(); 479 486 } 487 + ApiMessage::AddTickets(msg) => { 488 + trace!("{:?}", msg.inner); 489 + let WithChannels { 490 + tx, 491 + inner: api::AddTickets { peers }, 492 + .. 493 + } = msg; 494 + for addr in &peers { 495 + self.router.endpoint().add_node_addr(addr.clone()).ok(); 496 + } 497 + // self.client.inner().join_peers(ids).await.ok(); 498 + tx.send(()).await.ok(); 499 + } 480 500 ApiMessage::JoinPeers(msg) => { 481 501 trace!("{:?}", msg.inner); 482 502 let WithChannels { ··· 702 722 .collect::<Vec<_>>(); 703 723 self.api 704 724 .rpc(api::JoinPeers { peers: addrs }) 725 + .await 726 + .map_err(|e| JoinPeersError::Irpc { 727 + message: e.to_string(), 728 + }) 729 + } 730 + 731 + /// Add tickets for remote peers 732 + pub async fn add_tickets(&self, peers: Vec<String>) -> Result<(), JoinPeersError> { 733 + let peers = peers 734 + .iter() 735 + .map(|p| NodeTicket::from_str(p)) 736 + .collect::<Result<Vec<_>, _>>() 737 + .map_err(|e| JoinPeersError::Ticket { 738 + message: e.to_string(), 739 + })?; 740 + let addrs = peers 741 + .iter() 742 + .map(|t| t.node_addr().clone()) 743 + .collect::<Vec<_>>(); 744 + self.api 745 + .rpc(api::AddTickets { peers: addrs }) 705 746 .await 706 747 .map_err(|e| JoinPeersError::Irpc { 707 748 message: e.to_string(),