Live video on the AT Protocol

iroh: kv cleanup

+45 -22
+45 -22
pkg/replication/iroh_replicator/kv.go
··· 15 15 ) 16 16 17 17 type IrohSwarm struct { 18 - Node *iroh_streamplace.Node 19 - DB *iroh_streamplace.Db 20 - w *iroh_streamplace.WriteScope 21 - mm *media.MediaManager 22 - segChan chan *media.NewSegmentNotification 23 - nodeId string 18 + Node *iroh_streamplace.Node 19 + DB *iroh_streamplace.Db 20 + w *iroh_streamplace.WriteScope 21 + mm *media.MediaManager 22 + segChan chan *media.NewSegmentNotification 23 + nodeId string 24 + activeSubs map[string]*OriginInfo 24 25 } 25 26 26 27 // A message saying "hey I ingested node data at this time" ··· 41 42 log.Log(ctx, "Config created", "config", config) 42 43 43 44 swarm := IrohSwarm{ 44 - mm: mm, 45 + mm: mm, 46 + activeSubs: make(map[string]*OriginInfo), 45 47 } 46 48 47 49 node, err := iroh_streamplace.NodeReceiver(config, &swarm) ··· 72 74 return &swarm, nil 73 75 } 74 76 75 - var activeSubs = make(map[string]bool) 76 - 77 77 func (swarm *IrohSwarm) Start(ctx context.Context, tickets []string) error { 78 78 if len(tickets) > 0 { 79 79 err := swarm.Node.JoinPeers(tickets) ··· 110 110 return fmt.Errorf("failed to get next subscription event: %w", err) 111 111 } 112 112 if ev == nil { 113 - log.Log(ctx, "Got empty event from sub.NextRaw(), pausing for a second") 113 + log.Debug(ctx, "Got empty event from sub.NextRaw(), pausing for a second") 114 114 time.Sleep(1 * time.Second) 115 115 continue 116 116 } ··· 118 118 case iroh_streamplace.SubscribeItemEntry: 119 119 keyStr := string(item.Key) 120 120 valueStr := string(item.Value) 121 - log.Log(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr) 121 + log.Debug(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr) 122 + if len(keyStr) > 0 && keyStr[0] != '{' { 123 + // not JSON, it's one of the rust messages 124 + continue 125 + } 122 126 var info OriginInfo 123 127 err := json.Unmarshal(item.Value, &info) 124 128 if err != nil { 125 129 log.Error(ctx, "could not unmarshal origin info", "error", err) 126 130 continue 127 131 } 128 - if !activeSubs[keyStr] { 129 - if info.NodeID == swarm.nodeId { 130 - activeSubs[keyStr] = true 132 + oldSub, ok := swarm.activeSubs[keyStr] 133 + if ok { 134 + if oldSub.NodeID == info.NodeID { 135 + // mmyep. same node still has the stream. great news. 131 136 continue 132 137 } 133 - pubKey, err := iroh_streamplace.PublicKeyFromString(info.NodeID) 138 + log.Log(ctx, "Stream origin changed, swapping to new node", "old_node", oldSub.NodeID, "new_node", info.NodeID, "streamer", keyStr) 139 + pubKey, err := iroh_streamplace.PublicKeyFromString(oldSub.NodeID) 134 140 if err != nil { 135 141 log.Error(ctx, "could not create public key", "error", err) 136 142 continue 137 143 } 138 - activeSubs[keyStr] = true 139 - err = swarm.Node.Subscribe(keyStr, pubKey) 144 + // different node has the stream. we need to unsubscribe from the old node. 145 + err = swarm.Node.Unsubscribe(keyStr, pubKey) 140 146 if err != nil { 141 - log.Error(ctx, "could not subscribe to key", "error", err) 147 + log.Error(ctx, "could not unsubscribe from key", "error", err) 142 148 continue 143 149 } 150 + delete(swarm.activeSubs, keyStr) 144 151 } 152 + if info.NodeID == swarm.nodeId { 153 + // oh, i have this stream. cool. do nothing. 154 + continue 155 + } 156 + log.Log(ctx, "Subscribing to stream", "new_node", info.NodeID, "streamer", keyStr) 157 + pubKey, err := iroh_streamplace.PublicKeyFromString(info.NodeID) 158 + if err != nil { 159 + log.Error(ctx, "could not create public key", "error", err) 160 + continue 161 + } 162 + err = swarm.Node.Subscribe(keyStr, pubKey) 163 + if err != nil { 164 + log.Error(ctx, "could not subscribe to key", "error", err) 165 + continue 166 + } 167 + swarm.activeSubs[keyStr] = &info 145 168 146 169 case iroh_streamplace.SubscribeItemCurrentDone: 147 - log.Log(ctx, "SubscribeItemCurrentDone", "currentDone", item) 170 + log.Debug(ctx, "SubscribeItemCurrentDone", "currentDone", item) 148 171 case iroh_streamplace.SubscribeItemExpired: 149 - log.Log(ctx, "SubscribeItemExpired", "expired", item) 172 + log.Debug(ctx, "SubscribeItemExpired", "expired", item) 150 173 case iroh_streamplace.SubscribeItemOther: 151 - log.Log(ctx, "SubscribeItemOther", "other", item) 174 + log.Debug(ctx, "SubscribeItemOther", "other", item) 152 175 } 153 176 } 154 177 } ··· 172 195 func (swarm *IrohSwarm) HandleData(topic string, data []byte) { 173 196 err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 174 197 if err != nil { 175 - log.Error(context.Background(), "could not validate segment", "error", err) 198 + log.Error(context.Background(), "could not validate segment", "error", err, "topic", topic, "data", len(data)) 176 199 } 177 200 } 178 201