tangled
alpha
login
or
join now
willdot.net
/
message-broker
2
fork
atom
An experimental pub/sub client and server project.
2
fork
atom
overview
issues
pulls
pipelines
better handling when unsubscribing
willdot.net
2 years ago
3aae3cde
69f44053
+52
-27
2 changed files
expand all
collapse all
unified
split
server
server.go
subscriber.go
+10
-1
server/server.go
···
364
364
if !ok {
365
365
return
366
366
}
367
367
-
367
367
+
sub, ok := t.subscriptions[peer.Addr()]
368
368
+
if !ok {
369
369
+
return
370
370
+
}
371
371
+
sub.unsubscribe()
368
372
delete(t.subscriptions, peer.Addr())
369
373
}
370
374
···
373
377
defer s.mu.Unlock()
374
378
375
379
for _, topic := range s.topics {
380
380
+
sub, ok := topic.subscriptions[peer.Addr()]
381
381
+
if !ok {
382
382
+
continue
383
383
+
}
384
384
+
sub.unsubscribe()
376
385
delete(topic.subscriptions, peer.Addr())
377
386
}
378
387
}
+42
-26
server/subscriber.go
···
11
11
)
12
12
13
13
type subscriber struct {
14
14
-
peer *peer.Peer
15
15
-
topic string
16
16
-
messages chan message
14
14
+
peer *peer.Peer
15
15
+
topic string
16
16
+
messages chan message
17
17
+
unsubscribeCh chan struct{}
17
18
18
19
ackDelay time.Duration
19
20
ackTimeout time.Duration
···
30
31
31
32
func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration) *subscriber {
32
33
s := &subscriber{
33
33
-
peer: peer,
34
34
-
topic: topic,
35
35
-
messages: make(chan message),
36
36
-
ackDelay: ackDelay,
37
37
-
ackTimeout: ackTimeout,
34
34
+
peer: peer,
35
35
+
topic: topic,
36
36
+
messages: make(chan message),
37
37
+
ackDelay: ackDelay,
38
38
+
ackTimeout: ackTimeout,
39
39
+
unsubscribeCh: make(chan struct{}),
38
40
}
39
41
40
42
go s.sendMessages()
···
43
45
}
44
46
45
47
func (s *subscriber) sendMessages() {
46
46
-
// TODO: should think about how to break out of this if the subsciber closes its connection etc
47
47
-
for msg := range s.messages {
48
48
-
ack, err := s.sendMessage(s.topic, msg)
49
49
-
if err != nil {
50
50
-
slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr())
51
51
-
}
48
48
+
for {
49
49
+
select {
50
50
+
case <-s.unsubscribeCh:
51
51
+
return
52
52
+
case msg := <-s.messages:
53
53
+
ack, err := s.sendMessage(s.topic, msg)
54
54
+
if err != nil {
55
55
+
slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr())
56
56
+
}
52
57
53
53
-
if ack {
54
54
-
continue
55
55
-
}
58
58
+
if ack {
59
59
+
continue
60
60
+
}
56
61
57
57
-
if msg.deliveryCount >= 5 {
58
58
-
slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr())
59
59
-
continue
60
60
-
}
62
62
+
if msg.deliveryCount >= 5 {
63
63
+
slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr())
64
64
+
continue
65
65
+
}
61
66
62
62
-
msg.deliveryCount++
63
63
-
s.addMessage(msg, s.ackDelay)
67
67
+
msg.deliveryCount++
68
68
+
s.addMessage(msg, s.ackDelay)
69
69
+
}
64
70
}
65
71
}
66
72
67
73
func (s *subscriber) addMessage(msg message, delay time.Duration) {
68
74
go func() {
69
69
-
time.Sleep(delay)
70
70
-
// TODO: should think about how to break out of this if the subsciber closes its connection etc
71
71
-
s.messages <- msg
75
75
+
timer := time.NewTimer(delay)
76
76
+
defer timer.Stop()
77
77
+
78
78
+
select {
79
79
+
case <-s.unsubscribeCh:
80
80
+
return
81
81
+
case <-timer.C:
82
82
+
s.messages <- msg
83
83
+
}
72
84
}()
73
85
}
74
86
···
122
134
123
135
return ack, err
124
136
}
137
137
+
138
138
+
func (s *subscriber) unsubscribe() {
139
139
+
close(s.unsubscribeCh)
140
140
+
}