tangled
alpha
login
or
join now
willdot.net
/
message-broker
2
fork
atom
An experimental pub/sub client and server project.
2
fork
atom
overview
issues
pulls
pipelines
Refactor. Huge refactor to make conns synchronous
willdot.net
2 years ago
0cbdc735
a4a90b1b
+605
-399
15 changed files
expand all
collapse all
unified
split
.gitignore
dockerfile.example-server
example
main.go
server
main.go
go.mod
go.sum
pubsub
message.go
publisher.go
subscriber.go
subscriber_test.go
server
peer.go
server.go
server_test.go
subscriber.go
topic.go
+2
-1
.gitignore
···
1
1
-
.DS_STORE
1
1
+
.DS_STORE
2
2
+
example/example
+20
dockerfile.example-server
···
1
1
+
FROM golang:latest as builder
2
2
+
3
3
+
WORKDIR /app
4
4
+
5
5
+
COPY go.mod go.sum ./
6
6
+
COPY example/server/ ./
7
7
+
RUN go mod download
8
8
+
9
9
+
COPY . .
10
10
+
11
11
+
RUN CGO_ENABLED=0 go build -o message-broker-server .
12
12
+
13
13
+
FROM alpine:latest
14
14
+
15
15
+
RUN apk --no-cache add ca-certificates
16
16
+
17
17
+
WORKDIR /root/
18
18
+
COPY --from=builder /app/message-broker-server .
19
19
+
20
20
+
CMD ["./message-broker-server"]
+9
-9
example/main.go
···
2
2
3
3
import (
4
4
"context"
5
5
+
"flag"
5
6
"fmt"
6
7
"log/slog"
7
8
8
8
-
"github.com/willdot/messagebroker"
9
9
"github.com/willdot/messagebroker/pubsub"
10
10
-
"github.com/willdot/messagebroker/server"
11
10
)
12
11
12
12
+
var consumeOnly *bool
13
13
+
13
14
func main() {
14
14
-
server, err := server.New(context.Background(), ":3000")
15
15
-
if err != nil {
16
16
-
panic(err)
17
17
-
}
18
18
-
defer server.Shutdown()
15
15
+
consumeOnly = flag.Bool("consume-only", false, "just consumes (doesn't start server and doesn't publish)")
16
16
+
flag.Parse()
19
17
20
20
-
go sendMessages()
18
18
+
if *consumeOnly == false {
19
19
+
go sendMessages()
20
20
+
}
21
21
22
22
sub, err := pubsub.NewSubscriber(":3000")
23
23
if err != nil {
···
49
49
i := 0
50
50
for {
51
51
i++
52
52
-
msg := messagebroker.Message{
52
52
+
msg := pubsub.Message{
53
53
Topic: "topic a",
54
54
Data: []byte(fmt.Sprintf("message %d", i)),
55
55
}
+23
example/server/main.go
···
1
1
+
package main
2
2
+
3
3
+
import (
4
4
+
"log"
5
5
+
"os"
6
6
+
"os/signal"
7
7
+
"syscall"
8
8
+
9
9
+
"github.com/willdot/messagebroker/server"
10
10
+
)
11
11
+
12
12
+
func main() {
13
13
+
srv, err := server.New(":3000")
14
14
+
if err != nil {
15
15
+
log.Fatal(err)
16
16
+
}
17
17
+
defer srv.Shutdown()
18
18
+
19
19
+
signals := make(chan os.Signal, 1)
20
20
+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
21
21
+
22
22
+
<-signals
23
23
+
}
+5
-1
go.mod
···
2
2
3
3
go 1.21.0
4
4
5
5
-
require github.com/stretchr/testify v1.8.4
5
5
+
require (
6
6
+
github.com/docker/distribution v2.8.3+incompatible
7
7
+
github.com/google/uuid v1.4.0
8
8
+
github.com/stretchr/testify v1.8.4
9
9
+
)
6
10
7
11
require (
8
12
github.com/davecgh/go-spew v1.1.1 // indirect
+4
go.sum
···
1
1
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2
2
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3
3
+
github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk=
4
4
+
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
5
5
+
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
6
6
+
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3
7
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4
8
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5
9
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+1
-1
message.go
pubsub/message.go
···
1
1
-
package messagebroker
1
1
+
package pubsub
2
2
3
3
// Message represents a message that can be published or consumed
4
4
type Message struct {
+33
-16
pubsub/publisher.go
···
2
2
3
3
import (
4
4
"encoding/binary"
5
5
-
"encoding/json"
6
5
"fmt"
7
6
"net"
7
7
+
"sync"
8
8
9
9
-
"github.com/willdot/messagebroker"
10
9
"github.com/willdot/messagebroker/server"
11
10
)
12
11
13
12
// Publisher allows messages to be published to a server
14
13
type Publisher struct {
15
15
-
conn net.Conn
14
14
+
conn net.Conn
15
15
+
connMu sync.Mutex
16
16
}
17
17
18
18
// NewPublisher connects to the server at the given address and registers as a publisher
···
39
39
}
40
40
41
41
// Publish will publish the given message to the server
42
42
-
func (p *Publisher) PublishMessage(message messagebroker.Message) error {
43
43
-
b, err := json.Marshal(message)
44
44
-
if err != nil {
45
45
-
return fmt.Errorf("failed to marshal message: %w", err)
42
42
+
func (p *Publisher) PublishMessage(message Message) error {
43
43
+
op := func(conn net.Conn) error {
44
44
+
// send topic first
45
45
+
topic := fmt.Sprintf("topic:%s", message.Topic)
46
46
+
err := binary.Write(p.conn, binary.BigEndian, uint32(len(topic)))
47
47
+
if err != nil {
48
48
+
return fmt.Errorf("failed to write topic size to server")
49
49
+
}
50
50
+
51
51
+
_, err = p.conn.Write([]byte(topic))
52
52
+
if err != nil {
53
53
+
return fmt.Errorf("failed to write topic to server")
54
54
+
}
55
55
+
56
56
+
err = binary.Write(p.conn, binary.BigEndian, uint32(len(message.Data)))
57
57
+
if err != nil {
58
58
+
return fmt.Errorf("failed to write message size to server")
59
59
+
}
60
60
+
61
61
+
_, err = p.conn.Write(message.Data)
62
62
+
if err != nil {
63
63
+
return fmt.Errorf("failed to publish data to server")
64
64
+
}
65
65
+
return nil
46
66
}
47
67
48
48
-
err = binary.Write(p.conn, binary.BigEndian, uint32(len(b)))
49
49
-
if err != nil {
50
50
-
return fmt.Errorf("failed to write message size to server")
51
51
-
}
68
68
+
return p.connOperation(op)
69
69
+
}
52
70
53
53
-
_, err = p.conn.Write(b)
54
54
-
if err != nil {
55
55
-
return fmt.Errorf("failed to publish data to server")
56
56
-
}
71
71
+
func (p *Publisher) connOperation(op connOpp) error {
72
72
+
p.connMu.Lock()
73
73
+
defer p.connMu.Unlock()
57
74
58
58
-
return nil
75
75
+
return op(p.conn)
59
76
}
+141
-97
pubsub/subscriber.go
···
4
4
"context"
5
5
"encoding/binary"
6
6
"encoding/json"
7
7
+
"errors"
7
8
"fmt"
8
8
-
"log/slog"
9
9
"net"
10
10
+
"sync"
10
11
"time"
11
12
12
12
-
"github.com/willdot/messagebroker"
13
13
"github.com/willdot/messagebroker/server"
14
14
)
15
15
16
16
+
type connOpp func(conn net.Conn) error
17
17
+
16
18
// Subscriber allows subscriptions to a server and the consumption of messages
17
19
type Subscriber struct {
18
18
-
conn net.Conn
20
20
+
conn net.Conn
21
21
+
connMu sync.Mutex
19
22
}
20
23
21
24
// NewSubscriber will connect to the server at the given address
···
37
40
38
41
// SubscribeToTopics will subscribe to the provided topics
39
42
func (s *Subscriber) SubscribeToTopics(topicNames []string) error {
40
40
-
err := binary.Write(s.conn, binary.BigEndian, server.Subscribe)
41
41
-
if err != nil {
42
42
-
return fmt.Errorf("failed to subscribe: %w", err)
43
43
-
}
43
43
+
op := func(conn net.Conn) error {
44
44
+
err := binary.Write(conn, binary.BigEndian, server.Subscribe)
45
45
+
if err != nil {
46
46
+
return fmt.Errorf("failed to subscribe: %w", err)
47
47
+
}
44
48
45
45
-
b, err := json.Marshal(topicNames)
46
46
-
if err != nil {
47
47
-
return fmt.Errorf("failed to marshal topic names: %w", err)
48
48
-
}
49
49
+
b, err := json.Marshal(topicNames)
50
50
+
if err != nil {
51
51
+
return fmt.Errorf("failed to marshal topic names: %w", err)
52
52
+
}
49
53
50
50
-
err = binary.Write(s.conn, binary.BigEndian, uint32(len(b)))
51
51
-
if err != nil {
52
52
-
return fmt.Errorf("failed to write topic data length: %w", err)
53
53
-
}
54
54
+
err = binary.Write(conn, binary.BigEndian, uint32(len(b)))
55
55
+
if err != nil {
56
56
+
return fmt.Errorf("failed to write topic data length: %w", err)
57
57
+
}
54
58
55
55
-
_, err = s.conn.Write(b)
56
56
-
if err != nil {
57
57
-
return fmt.Errorf("failed to subscribe to topics: %w", err)
58
58
-
}
59
59
+
_, err = conn.Write(b)
60
60
+
if err != nil {
61
61
+
return fmt.Errorf("failed to subscribe to topics: %w", err)
62
62
+
}
59
63
60
60
-
var resp server.Status
61
61
-
err = binary.Read(s.conn, binary.BigEndian, &resp)
62
62
-
if err != nil {
63
63
-
return fmt.Errorf("failed to read confirmation of subscription: %w", err)
64
64
-
}
64
64
+
var resp server.Status
65
65
+
err = binary.Read(conn, binary.BigEndian, &resp)
66
66
+
if err != nil {
67
67
+
return fmt.Errorf("failed to read confirmation of subscription: %w", err)
68
68
+
}
69
69
+
70
70
+
if resp == server.Subscribed {
71
71
+
return nil
72
72
+
}
65
73
66
66
-
if resp == server.Subscribed {
67
67
-
return nil
68
68
-
}
74
74
+
var dataLen uint32
75
75
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
76
76
+
if err != nil {
77
77
+
return fmt.Errorf("received status %s:", resp)
78
78
+
}
69
79
70
70
-
var dataLen uint32
71
71
-
err = binary.Read(s.conn, binary.BigEndian, &dataLen)
72
72
-
if err != nil {
73
73
-
return fmt.Errorf("received status %s:", resp)
74
74
-
}
80
80
+
buf := make([]byte, dataLen)
81
81
+
_, err = conn.Read(buf)
82
82
+
if err != nil {
83
83
+
return fmt.Errorf("received status %s:", resp)
84
84
+
}
75
85
76
76
-
buf := make([]byte, dataLen)
77
77
-
_, err = s.conn.Read(buf)
78
78
-
if err != nil {
79
79
-
return fmt.Errorf("received status %s:", resp)
86
86
+
return fmt.Errorf("received status %s - %s", resp, buf)
80
87
}
81
88
82
82
-
return fmt.Errorf("received status %s - %s", resp, buf)
89
89
+
return s.connOperation(op)
83
90
}
84
91
85
92
// UnsubscribeToTopics will unsubscribe to the provided topics
86
93
func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
87
87
-
err := binary.Write(s.conn, binary.BigEndian, server.Unsubscribe)
88
88
-
if err != nil {
89
89
-
return fmt.Errorf("failed to unsubscribe: %w", err)
90
90
-
}
94
94
+
op := func(conn net.Conn) error {
95
95
+
err := binary.Write(conn, binary.BigEndian, server.Unsubscribe)
96
96
+
if err != nil {
97
97
+
return fmt.Errorf("failed to unsubscribe: %w", err)
98
98
+
}
91
99
92
92
-
b, err := json.Marshal(topicNames)
93
93
-
if err != nil {
94
94
-
return fmt.Errorf("failed to marshal topic names: %w", err)
95
95
-
}
100
100
+
b, err := json.Marshal(topicNames)
101
101
+
if err != nil {
102
102
+
return fmt.Errorf("failed to marshal topic names: %w", err)
103
103
+
}
96
104
97
97
-
err = binary.Write(s.conn, binary.BigEndian, uint32(len(b)))
98
98
-
if err != nil {
99
99
-
return fmt.Errorf("failed to write topic data length: %w", err)
100
100
-
}
105
105
+
err = binary.Write(conn, binary.BigEndian, uint32(len(b)))
106
106
+
if err != nil {
107
107
+
return fmt.Errorf("failed to write topic data length: %w", err)
108
108
+
}
101
109
102
102
-
_, err = s.conn.Write(b)
103
103
-
if err != nil {
104
104
-
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
105
105
-
}
110
110
+
_, err = conn.Write(b)
111
111
+
if err != nil {
112
112
+
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
113
113
+
}
106
114
107
107
-
var resp server.Status
108
108
-
err = binary.Read(s.conn, binary.BigEndian, &resp)
109
109
-
if err != nil {
110
110
-
return fmt.Errorf("failed to read confirmation of unsubscription: %w", err)
111
111
-
}
115
115
+
var resp server.Status
116
116
+
err = binary.Read(conn, binary.BigEndian, &resp)
117
117
+
if err != nil {
118
118
+
return fmt.Errorf("failed to read confirmation of unsubscription: %w", err)
119
119
+
}
112
120
113
113
-
if resp == server.Unsubscribed {
114
114
-
return nil
115
115
-
}
121
121
+
if resp == server.Unsubscribed {
122
122
+
return nil
123
123
+
}
116
124
117
117
-
var dataLen uint32
118
118
-
err = binary.Read(s.conn, binary.BigEndian, &dataLen)
119
119
-
if err != nil {
120
120
-
return fmt.Errorf("received status %s:", resp)
121
121
-
}
125
125
+
var dataLen uint32
126
126
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
127
127
+
if err != nil {
128
128
+
return fmt.Errorf("received status %s:", resp)
129
129
+
}
122
130
123
123
-
buf := make([]byte, dataLen)
124
124
-
_, err = s.conn.Read(buf)
125
125
-
if err != nil {
126
126
-
return fmt.Errorf("received status %s:", resp)
131
131
+
buf := make([]byte, dataLen)
132
132
+
_, err = conn.Read(buf)
133
133
+
if err != nil {
134
134
+
return fmt.Errorf("received status %s:", resp)
135
135
+
}
136
136
+
137
137
+
return fmt.Errorf("received status %s - %s", resp, buf)
127
138
}
128
139
129
129
-
return fmt.Errorf("received status %s - %s", resp, buf)
140
140
+
return s.connOperation(op)
130
141
}
131
142
132
143
// Consumer allows the consumption of messages. If during the consumer receiving messages from the
133
144
// server an error occurs, it will be stored in Err
134
145
type Consumer struct {
135
135
-
msgs chan messagebroker.Message
146
146
+
msgs chan Message
136
147
// TODO: better error handling? Maybe a channel of errors?
137
148
Err error
138
149
}
139
150
140
151
// Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once
141
152
// the consumer has finished either due to an error or from being cancelled.
142
142
-
func (c *Consumer) Messages() <-chan messagebroker.Message {
153
153
+
func (c *Consumer) Messages() <-chan Message {
143
154
return c.msgs
144
155
}
145
156
···
147
158
// to read the messages
148
159
func (s *Subscriber) Consume(ctx context.Context) *Consumer {
149
160
consumer := &Consumer{
150
150
-
msgs: make(chan messagebroker.Message),
161
161
+
msgs: make(chan Message),
151
162
}
152
163
153
164
go s.consume(ctx, consumer)
···
174
185
}
175
186
}
176
187
177
177
-
func (s *Subscriber) readMessage() (*messagebroker.Message, error) {
178
178
-
err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
179
179
-
if err != nil {
180
180
-
return nil, err
188
188
+
func (s *Subscriber) readMessage() (*Message, error) {
189
189
+
var msg *Message
190
190
+
op := func(conn net.Conn) error {
191
191
+
err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
192
192
+
if err != nil {
193
193
+
return err
194
194
+
}
195
195
+
196
196
+
var topicLen uint64
197
197
+
err = binary.Read(s.conn, binary.BigEndian, &topicLen)
198
198
+
if err != nil {
199
199
+
// TODO: check if this is needed elsewhere. I'm not sure where the read deadline resets....
200
200
+
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
201
201
+
return nil
202
202
+
}
203
203
+
return err
204
204
+
}
205
205
+
206
206
+
topicBuf := make([]byte, topicLen)
207
207
+
_, err = s.conn.Read(topicBuf)
208
208
+
if err != nil {
209
209
+
return err
210
210
+
}
211
211
+
212
212
+
var dataLen uint64
213
213
+
err = binary.Read(s.conn, binary.BigEndian, &dataLen)
214
214
+
if err != nil {
215
215
+
return err
216
216
+
}
217
217
+
218
218
+
if dataLen <= 0 {
219
219
+
return nil
220
220
+
}
221
221
+
222
222
+
dataBuf := make([]byte, dataLen)
223
223
+
_, err = s.conn.Read(dataBuf)
224
224
+
if err != nil {
225
225
+
return err
226
226
+
}
227
227
+
228
228
+
msg = &Message{
229
229
+
Data: dataBuf,
230
230
+
Topic: string(topicBuf),
231
231
+
}
232
232
+
233
233
+
return nil
234
234
+
181
235
}
182
236
183
183
-
var dataLen uint64
184
184
-
err = binary.Read(s.conn, binary.BigEndian, &dataLen)
237
237
+
err := s.connOperation(op)
185
238
if err != nil {
186
186
-
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
239
239
+
var neterr net.Error
240
240
+
if errors.As(err, &neterr) && neterr.Timeout() {
187
241
return nil, nil
188
242
}
189
243
return nil, err
190
244
}
191
245
192
192
-
if dataLen <= 0 {
193
193
-
return nil, nil
194
194
-
}
246
246
+
return msg, err
247
247
+
}
195
248
196
196
-
buf := make([]byte, dataLen)
197
197
-
_, err = s.conn.Read(buf)
198
198
-
if err != nil {
199
199
-
return nil, err
200
200
-
}
249
249
+
func (s *Subscriber) connOperation(op connOpp) error {
250
250
+
s.connMu.Lock()
251
251
+
defer s.connMu.Unlock()
201
252
202
202
-
var msg messagebroker.Message
203
203
-
err = json.Unmarshal(buf, &msg)
204
204
-
if err != nil {
205
205
-
slog.Error("failed to unmarshal message", "error", err)
206
206
-
return nil, nil
207
207
-
}
208
208
-
209
209
-
return &msg, nil
253
253
+
return op(s.conn)
210
254
}
+19
-22
pubsub/subscriber_test.go
···
8
8
9
9
"github.com/stretchr/testify/assert"
10
10
"github.com/stretchr/testify/require"
11
11
-
"github.com/willdot/messagebroker"
12
11
13
12
"github.com/willdot/messagebroker/server"
14
13
)
15
14
16
15
const (
17
17
-
serverAddr = ":3000"
16
16
+
serverAddr = ":9999"
17
17
+
topicA = "topic a"
18
18
+
topicB = "topic b"
18
19
)
19
20
20
21
func createServer(t *testing.T) {
21
21
-
server, err := server.New(context.Background(), serverAddr)
22
22
+
server, err := server.New(serverAddr)
22
23
require.NoError(t, err)
23
24
24
25
t.Cleanup(func() {
···
72
73
sub.Close()
73
74
})
74
75
75
75
-
topics := []string{"topic a", "topic b"}
76
76
+
topics := []string{topicA, topicB}
76
77
77
78
err = sub.SubscribeToTopics(topics)
78
79
require.NoError(t, err)
···
88
89
sub.Close()
89
90
})
90
91
91
91
-
topics := []string{"topic a", "topic b"}
92
92
+
topics := []string{topicA, topicB}
92
93
93
94
err = sub.SubscribeToTopics(topics)
94
95
require.NoError(t, err)
95
96
96
96
-
err = sub.UnsubscribeToTopics([]string{"topic a"})
97
97
+
err = sub.UnsubscribeToTopics([]string{topicA})
97
98
require.NoError(t, err)
98
99
99
100
ctx, cancel := context.WithCancel(context.Background())
···
104
105
consumer := sub.Consume(ctx)
105
106
require.NoError(t, err)
106
107
107
107
-
var receivedMessages []messagebroker.Message
108
108
+
var receivedMessages []Message
108
109
consumerFinCh := make(chan struct{})
109
110
go func() {
110
111
for msg := range consumer.Messages() {
···
118
119
// publish a message to both topics and check the subscriber only gets the message from the 1 topic
119
120
// and not the unsubscribed topic
120
121
121
121
-
publisher, err := NewPublisher("localhost:3000")
122
122
+
publisher, err := NewPublisher("localhost:9999")
122
123
require.NoError(t, err)
123
124
t.Cleanup(func() {
124
125
publisher.Close()
125
126
})
126
127
127
127
-
msg := messagebroker.Message{
128
128
-
Topic: "topic a",
128
128
+
msg := Message{
129
129
+
Topic: topicA,
129
130
Data: []byte("hello world"),
130
131
}
131
132
132
133
err = publisher.PublishMessage(msg)
133
134
require.NoError(t, err)
134
135
135
135
-
msg.Topic = "topic b"
136
136
+
msg.Topic = topicB
136
137
err = publisher.PublishMessage(msg)
137
138
require.NoError(t, err)
138
139
139
140
cancel()
140
141
141
141
-
// give the consumer some time to read the messages -- TODO: make better!
142
142
-
time.Sleep(time.Millisecond * 500)
143
143
-
cancel()
144
144
-
145
142
select {
146
143
case <-consumerFinCh:
147
144
break
···
150
147
}
151
148
152
149
assert.Len(t, receivedMessages, 1)
153
153
-
assert.Equal(t, "topic b", receivedMessages[0].Topic)
150
150
+
assert.Equal(t, topicB, receivedMessages[0].Topic)
154
151
}
155
152
156
153
func TestPublishAndSubscribe(t *testing.T) {
···
163
160
sub.Close()
164
161
})
165
162
166
166
-
topics := []string{"topic a", "topic b"}
163
163
+
topics := []string{topicA, topicB}
167
164
168
165
err = sub.SubscribeToTopics(topics)
169
166
require.NoError(t, err)
···
176
173
consumer := sub.Consume(ctx)
177
174
require.NoError(t, err)
178
175
179
179
-
var receivedMessages []messagebroker.Message
176
176
+
var receivedMessages []Message
180
177
181
178
consumerFinCh := make(chan struct{})
182
179
go func() {
···
188
185
consumerFinCh <- struct{}{}
189
186
}()
190
187
191
191
-
publisher, err := NewPublisher("localhost:3000")
188
188
+
publisher, err := NewPublisher("localhost:9999")
192
189
require.NoError(t, err)
193
190
t.Cleanup(func() {
194
191
publisher.Close()
195
192
})
196
193
197
194
// send some messages
198
198
-
sentMessages := make([]messagebroker.Message, 0, 10)
195
195
+
sentMessages := make([]Message, 0, 10)
199
196
for i := 0; i < 10; i++ {
200
200
-
msg := messagebroker.Message{
201
201
-
Topic: "topic a",
197
197
+
msg := Message{
198
198
+
Topic: topicA,
202
199
Data: []byte(fmt.Sprintf("message %d", i)),
203
200
}
204
201
+28
-65
server/peer.go
···
1
1
package server
2
2
3
3
import (
4
4
-
"encoding/binary"
5
5
-
"fmt"
6
4
"log/slog"
7
5
"net"
8
8
-
)
9
9
-
10
10
-
type peer struct {
11
11
-
conn net.Conn
12
12
-
}
13
13
-
14
14
-
func newPeer(conn net.Conn) peer {
15
15
-
return peer{
16
16
-
conn: conn,
17
17
-
}
18
18
-
}
6
6
+
"sync"
19
7
20
20
-
// Read wraps the peers underlying connections Read function to satisfy io.Reader
21
21
-
func (p *peer) Read(b []byte) (n int, err error) {
22
22
-
return p.conn.Read(b)
23
23
-
}
24
24
-
25
25
-
// Write wraps the peers underlying connections Write function to satisfy io.Writer
26
26
-
func (p *peer) Write(b []byte) (n int, err error) {
27
27
-
return p.conn.Write(b)
28
28
-
}
29
29
-
30
30
-
func (p *peer) addr() net.Addr {
31
31
-
return p.conn.LocalAddr()
32
32
-
}
33
33
-
34
34
-
func (p *peer) readAction() (Action, error) {
35
35
-
var action Action
36
36
-
err := binary.Read(p.conn, binary.BigEndian, &action)
37
37
-
if err != nil {
38
38
-
return 0, fmt.Errorf("failed to read action from peer: %w", err)
39
39
-
}
40
40
-
41
41
-
return action, nil
42
42
-
}
43
43
-
44
44
-
func (p *peer) readDataLength() (uint32, error) {
45
45
-
var dataLen uint32
46
46
-
err := binary.Read(p.conn, binary.BigEndian, &dataLen)
47
47
-
if err != nil {
48
48
-
return 0, fmt.Errorf("failed to read data length from peer: %w", err)
49
49
-
}
50
50
-
51
51
-
return dataLen, nil
52
52
-
}
8
8
+
"github.com/google/uuid"
9
9
+
)
53
10
54
11
// Status represents the status of a request
55
12
type Status uint8
···
73
30
return ""
74
31
}
75
32
76
76
-
func (p *peer) writeStatus(status Status, message string) {
77
77
-
err := binary.Write(p.conn, binary.BigEndian, status)
78
78
-
if err != nil {
79
79
-
slog.Error("failed to write status to peers connection", "error", err, "peer", p.addr())
80
80
-
return
81
81
-
}
33
33
+
type peer struct {
34
34
+
conn net.Conn
35
35
+
connMu sync.Mutex
36
36
+
name string
37
37
+
}
82
38
83
83
-
if message == "" {
84
84
-
return
39
39
+
func newPeer(conn net.Conn) peer {
40
40
+
return peer{
41
41
+
conn: conn,
42
42
+
name: uuid.New().String(),
85
43
}
44
44
+
}
86
45
87
87
-
msgBytes := []byte(message)
88
88
-
err = binary.Write(p.conn, binary.BigEndian, uint32(len(msgBytes)))
89
89
-
if err != nil {
90
90
-
slog.Error("failed to write message length to peers connection", "error", err, "peer", p.addr())
91
91
-
return
92
92
-
}
46
46
+
func (p *peer) addr() net.Addr {
47
47
+
return p.conn.RemoteAddr()
48
48
+
}
49
49
+
50
50
+
type connOpp func(conn net.Conn) error
51
51
+
52
52
+
func (p *peer) connOperation(op connOpp, from string) error {
53
53
+
slog.Info("operation running", "from", from, "peer", p.conn.RemoteAddr(), "name", p.name, "mu addr", &p.connMu)
54
54
+
55
55
+
p.connMu.Lock()
56
56
+
err := op(p.conn)
57
57
+
p.connMu.Unlock()
93
58
94
94
-
_, err = p.conn.Write(msgBytes)
95
95
-
if err != nil {
96
96
-
slog.Error("failed to write message to peers connection", "error", err, "peer", p.addr())
97
97
-
return
98
98
-
}
59
59
+
slog.Info("operation finished", "from", from, "peer", p.conn.RemoteAddr(), "name", p.name, "mu addr", &p.connMu)
60
60
+
61
61
+
return err
99
62
}
+199
-88
server/server.go
···
1
1
package server
2
2
3
3
import (
4
4
-
"context"
4
4
+
"encoding/binary"
5
5
"encoding/json"
6
6
"errors"
7
7
"fmt"
8
8
"log/slog"
9
9
"net"
10
10
+
"strings"
10
11
"sync"
11
11
-
12
12
-
"github.com/willdot/messagebroker"
12
12
+
"time"
13
13
)
14
14
15
15
// Action represents the type of action that a peer requests to do
···
31
31
}
32
32
33
33
// New creates and starts a new server
34
34
-
func New(ctx context.Context, addr string) (*Server, error) {
34
34
+
func New(addr string) (*Server, error) {
35
35
lis, err := net.Listen("tcp", addr)
36
36
if err != nil {
37
37
return nil, fmt.Errorf("failed to listen: %w", err)
···
42
42
topics: map[string]topic{},
43
43
}
44
44
45
45
-
go srv.start(ctx)
45
45
+
go srv.start()
46
46
47
47
return srv, nil
48
48
}
···
52
52
return s.lis.Close()
53
53
}
54
54
55
55
-
func (s *Server) start(ctx context.Context) {
55
55
+
func (s *Server) start() {
56
56
for {
57
57
conn, err := s.lis.Accept()
58
58
if err != nil {
···
70
70
71
71
func (s *Server) handleConn(conn net.Conn) {
72
72
peer := newPeer(conn)
73
73
-
action, err := peer.readAction()
73
73
+
action, err := readAction(peer)
74
74
if err != nil {
75
75
slog.Error("failed to read action from peer", "error", err, "peer", peer.addr())
76
76
return
···
85
85
s.handlePublish(peer)
86
86
default:
87
87
slog.Error("unknown action", "action", action, "peer", peer.addr())
88
88
-
peer.writeStatus(Error, "unknown action")
88
88
+
writeStatus(Error, "unknown action", peer.conn)
89
89
}
90
90
}
91
91
92
92
func (s *Server) handleSubscribe(peer peer) {
93
93
// subscribe the peer to the topic
94
94
-
s.subscribePeerToTopic(peer)
94
94
+
s.subscribePeerToTopic(&peer)
95
95
96
96
// keep handling the peers connection, getting the action from the peer when it wishes to do something else.
97
97
// once the peers connection ends, it will be unsubscribed from all topics and returned
98
98
for {
99
99
-
action, err := peer.readAction()
99
99
+
action, err := readAction(peer)
100
100
if err != nil {
101
101
+
var neterr net.Error
102
102
+
if errors.As(err, &neterr) && neterr.Timeout() {
103
103
+
time.Sleep(time.Second)
104
104
+
continue
105
105
+
}
101
106
// TODO: see if there's a way to check if the peers connection has been ended etc
102
107
slog.Error("failed to read action from subscriber", "error", err, "peer", peer.addr())
103
108
···
108
113
109
114
switch action {
110
115
case Subscribe:
111
111
-
s.subscribePeerToTopic(peer)
116
116
+
s.subscribePeerToTopic(&peer)
112
117
case Unsubscribe:
113
118
s.handleUnsubscribe(peer)
114
119
default:
115
120
slog.Error("unknown action for subscriber", "action", action, "peer", peer.addr())
116
116
-
peer.writeStatus(Error, "unknown action")
121
121
+
writeStatus(Error, "unknown action", peer.conn)
117
122
continue
118
123
}
119
124
}
120
125
}
121
126
122
122
-
func (s *Server) subscribePeerToTopic(peer peer) {
123
123
-
// get the topics the peer wishes to subscribe to
124
124
-
dataLen, err := peer.readDataLength()
125
125
-
if err != nil {
126
126
-
slog.Error(err.Error(), "peer", peer.addr())
127
127
-
peer.writeStatus(Error, "invalid data length of topics provided")
128
128
-
return
129
129
-
}
130
130
-
if dataLen == 0 {
131
131
-
peer.writeStatus(Error, "data length of topics is 0")
132
132
-
return
133
133
-
}
134
134
-
135
135
-
buf := make([]byte, dataLen)
136
136
-
_, err = peer.Read(buf)
137
137
-
if err != nil {
138
138
-
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
139
139
-
peer.writeStatus(Error, "failed to read topic data")
140
140
-
return
141
141
-
}
142
142
-
143
143
-
var topics []string
144
144
-
err = json.Unmarshal(buf, &topics)
145
145
-
if err != nil {
146
146
-
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
147
147
-
peer.writeStatus(Error, "invalid topic data provided")
148
148
-
return
149
149
-
}
127
127
+
func (s *Server) subscribePeerToTopic(peer *peer) {
128
128
+
op := func(conn net.Conn) error {
129
129
+
// get the topics the peer wishes to subscribe to
130
130
+
dataLen, err := dataLength(conn)
131
131
+
if err != nil {
132
132
+
slog.Error(err.Error(), "peer", peer.addr())
133
133
+
writeStatus(Error, "invalid data length of topics provided", conn)
134
134
+
return nil
135
135
+
}
136
136
+
if dataLen == 0 {
137
137
+
writeStatus(Error, "data length of topics is 0", conn)
138
138
+
return nil
139
139
+
}
150
140
151
151
-
s.subscribeToTopics(peer, topics)
152
152
-
peer.writeStatus(Subscribed, "")
153
153
-
}
141
141
+
buf := make([]byte, dataLen)
142
142
+
_, err = conn.Read(buf)
143
143
+
if err != nil {
144
144
+
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
145
145
+
writeStatus(Error, "failed to read topic data", conn)
146
146
+
return nil
147
147
+
}
154
148
155
155
-
func (s *Server) handleUnsubscribe(peer peer) {
156
156
-
// get the topics the peer wishes to unsubscribe from
157
157
-
dataLen, err := peer.readDataLength()
158
158
-
if err != nil {
159
159
-
slog.Error(err.Error(), "peer", peer.addr())
160
160
-
peer.writeStatus(Error, "invalid data length of topics provided")
161
161
-
return
162
162
-
}
163
163
-
if dataLen == 0 {
164
164
-
peer.writeStatus(Error, "data length of topics is 0")
165
165
-
return
166
166
-
}
149
149
+
var topics []string
150
150
+
err = json.Unmarshal(buf, &topics)
151
151
+
if err != nil {
152
152
+
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
153
153
+
writeStatus(Error, "invalid topic data provided", conn)
154
154
+
return nil
155
155
+
}
167
156
168
168
-
buf := make([]byte, dataLen)
169
169
-
_, err = peer.Read(buf)
170
170
-
if err != nil {
171
171
-
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
172
172
-
peer.writeStatus(Error, "failed to read topic data")
173
173
-
return
174
174
-
}
157
157
+
s.subscribeToTopics(peer, topics)
158
158
+
writeStatus(Subscribed, "", conn)
175
159
176
176
-
var topics []string
177
177
-
err = json.Unmarshal(buf, &topics)
178
178
-
if err != nil {
179
179
-
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
180
180
-
peer.writeStatus(Error, "invalid topic data provided")
181
181
-
return
160
160
+
return nil
182
161
}
183
162
184
184
-
s.unsubscribeToTopics(peer, topics)
185
185
-
peer.writeStatus(Unsubscribed, "")
163
163
+
_ = peer.connOperation(op, "subscribe peer to topic")
186
164
}
187
165
188
188
-
func (s *Server) handlePublish(peer peer) {
189
189
-
for {
190
190
-
dataLen, err := peer.readDataLength()
166
166
+
func (s *Server) handleUnsubscribe(peer peer) {
167
167
+
op := func(conn net.Conn) error {
168
168
+
// get the topics the peer wishes to unsubscribe from
169
169
+
dataLen, err := dataLength(conn)
191
170
if err != nil {
192
171
slog.Error(err.Error(), "peer", peer.addr())
193
193
-
peer.writeStatus(Error, "invalid data length of data provided")
194
194
-
return
172
172
+
writeStatus(Error, "invalid data length of topics provided", conn)
173
173
+
return nil
195
174
}
196
175
if dataLen == 0 {
197
197
-
continue
176
176
+
writeStatus(Error, "data length of topics is 0", conn)
177
177
+
return nil
198
178
}
199
179
200
180
buf := make([]byte, dataLen)
201
201
-
_, err = peer.Read(buf)
181
181
+
_, err = conn.Read(buf)
202
182
if err != nil {
203
203
-
slog.Error("failed to read data from peer", "error", err, "peer", peer.addr())
204
204
-
peer.writeStatus(Error, "failed to read data")
205
205
-
return
183
183
+
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
184
184
+
writeStatus(Error, "failed to read topic data", conn)
185
185
+
return nil
206
186
}
207
187
208
208
-
var msg messagebroker.Message
209
209
-
err = json.Unmarshal(buf, &msg)
188
188
+
var topics []string
189
189
+
err = json.Unmarshal(buf, &topics)
210
190
if err != nil {
211
211
-
slog.Error("failed to unmarshal data to message", "error", err, "peer", peer.addr())
212
212
-
peer.writeStatus(Error, "invalid message")
191
191
+
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
192
192
+
writeStatus(Error, "invalid topic data provided", conn)
193
193
+
return nil
194
194
+
}
195
195
+
196
196
+
s.unsubscribeToTopics(peer, topics)
197
197
+
writeStatus(Unsubscribed, "", conn)
198
198
+
199
199
+
return nil
200
200
+
}
201
201
+
202
202
+
_ = peer.connOperation(op, "handle unsubscribe")
203
203
+
}
204
204
+
205
205
+
type messageToSend struct {
206
206
+
topic string
207
207
+
data []byte
208
208
+
}
209
209
+
210
210
+
func (s *Server) handlePublish(peer peer) {
211
211
+
for {
212
212
+
var message *messageToSend
213
213
+
214
214
+
op := func(conn net.Conn) error {
215
215
+
dataLen, err := dataLength(conn)
216
216
+
if err != nil {
217
217
+
slog.Error(err.Error(), "peer", peer.addr())
218
218
+
writeStatus(Error, "invalid data length of data provided", conn)
219
219
+
return nil
220
220
+
}
221
221
+
if dataLen == 0 {
222
222
+
return nil
223
223
+
}
224
224
+
topicBuf := make([]byte, dataLen)
225
225
+
_, err = conn.Read(topicBuf)
226
226
+
if err != nil {
227
227
+
slog.Error("failed to read topic from peer", "error", err, "peer", peer.addr())
228
228
+
writeStatus(Error, "failed to read topic", conn)
229
229
+
return nil
230
230
+
}
231
231
+
232
232
+
topicStr := string(topicBuf)
233
233
+
if !strings.HasPrefix(topicStr, "topic:") {
234
234
+
slog.Error("topic data does not contain topic prefix", "peer", peer.addr())
235
235
+
writeStatus(Error, "topic data does not contain 'topic:' prefix", conn)
236
236
+
return nil
237
237
+
}
238
238
+
topicStr = strings.TrimPrefix(topicStr, "topic:")
239
239
+
240
240
+
dataLen, err = dataLength(conn)
241
241
+
if err != nil {
242
242
+
slog.Error(err.Error(), "peer", peer.addr())
243
243
+
writeStatus(Error, "invalid data length of data provided", conn)
244
244
+
return nil
245
245
+
}
246
246
+
if dataLen == 0 {
247
247
+
return nil
248
248
+
}
249
249
+
250
250
+
dataBuf := make([]byte, dataLen)
251
251
+
_, err = conn.Read(dataBuf)
252
252
+
if err != nil {
253
253
+
slog.Error("failed to read data from peer", "error", err, "peer", peer.addr())
254
254
+
writeStatus(Error, "failed to read data", conn)
255
255
+
return nil
256
256
+
}
257
257
+
258
258
+
message = &messageToSend{
259
259
+
topic: topicStr,
260
260
+
data: dataBuf,
261
261
+
}
262
262
+
return nil
263
263
+
}
264
264
+
265
265
+
_ = peer.connOperation(op, "handle publish")
266
266
+
267
267
+
if message == nil {
213
268
continue
214
269
}
270
270
+
// TODO: this can be done in a go routine because once we've got the message from the publisher, the publisher
271
271
+
// doesn't need to wait for us to send the message to all peers
215
272
216
216
-
topic := s.getTopic(msg.Topic)
273
273
+
topic := s.getTopic(message.topic)
217
274
if topic != nil {
218
218
-
topic.sendMessageToSubscribers(msg)
275
275
+
topic.sendMessageToSubscribers(message.data)
219
276
}
220
277
}
221
278
}
222
279
223
223
-
func (s *Server) subscribeToTopics(peer peer, topics []string) {
280
280
+
func (s *Server) subscribeToTopics(peer *peer, topics []string) {
224
281
for _, topic := range topics {
225
282
s.addSubsciberToTopic(topic, peer)
226
283
}
227
284
}
228
285
229
229
-
func (s *Server) addSubsciberToTopic(topicName string, peer peer) {
286
286
+
func (s *Server) addSubsciberToTopic(topicName string, peer *peer) {
230
287
s.mu.Lock()
231
288
defer s.mu.Unlock()
232
289
···
280
337
281
338
return nil
282
339
}
340
340
+
341
341
+
func readAction(peer peer) (Action, error) {
342
342
+
var action Action
343
343
+
op := func(conn net.Conn) error {
344
344
+
conn.SetReadDeadline(time.Now().Add(time.Second))
345
345
+
346
346
+
err := binary.Read(conn, binary.BigEndian, &action)
347
347
+
if err != nil {
348
348
+
return err
349
349
+
}
350
350
+
return nil
351
351
+
}
352
352
+
353
353
+
err := peer.connOperation(op, "read action")
354
354
+
if err != nil {
355
355
+
return 0, fmt.Errorf("failed to read action from peer: %w", err)
356
356
+
}
357
357
+
358
358
+
return action, nil
359
359
+
}
360
360
+
361
361
+
func dataLength(conn net.Conn) (uint32, error) {
362
362
+
var dataLen uint32
363
363
+
err := binary.Read(conn, binary.BigEndian, &dataLen)
364
364
+
if err != nil {
365
365
+
return 0, err
366
366
+
}
367
367
+
return dataLen, nil
368
368
+
}
369
369
+
370
370
+
func writeStatus(status Status, message string, conn net.Conn) {
371
371
+
err := binary.Write(conn, binary.BigEndian, status)
372
372
+
if err != nil {
373
373
+
slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr())
374
374
+
return
375
375
+
}
376
376
+
377
377
+
if message == "" {
378
378
+
return
379
379
+
}
380
380
+
381
381
+
msgBytes := []byte(message)
382
382
+
err = binary.Write(conn, binary.BigEndian, uint32(len(msgBytes)))
383
383
+
if err != nil {
384
384
+
slog.Error("failed to write message length to peers connection", "error", err, "peer", conn.RemoteAddr())
385
385
+
return
386
386
+
}
387
387
+
388
388
+
_, err = conn.Write(msgBytes)
389
389
+
if err != nil {
390
390
+
slog.Error("failed to write message to peers connection", "error", err, "peer", conn.RemoteAddr())
391
391
+
return
392
392
+
}
393
393
+
}
+82
-62
server/server_test.go
···
1
1
package server
2
2
3
3
import (
4
4
-
"context"
5
4
"encoding/binary"
6
5
"encoding/json"
7
6
"fmt"
···
11
10
12
11
"github.com/stretchr/testify/assert"
13
12
"github.com/stretchr/testify/require"
14
14
-
"github.com/willdot/messagebroker"
13
13
+
)
14
14
+
15
15
+
const (
16
16
+
topicA = "topic a"
17
17
+
topicB = "topic b"
18
18
+
topicC = "topic c"
19
19
+
20
20
+
serverAddr = ":6666"
15
21
)
16
22
17
23
func createServer(t *testing.T) *Server {
18
18
-
srv, err := New(context.Background(), ":3000")
24
24
+
srv, err := New(serverAddr)
19
25
require.NoError(t, err)
20
26
21
27
t.Cleanup(func() {
···
36
42
}
37
43
38
44
func createConnectionAndSubscribe(t *testing.T, topics []string) net.Conn {
39
39
-
conn, err := net.Dial("tcp", "localhost:3000")
45
45
+
conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
40
46
require.NoError(t, err)
41
47
42
48
err = binary.Write(conn, binary.BigEndian, Subscribe)
···
64
70
func TestSubscribeToTopics(t *testing.T) {
65
71
// create a server with an existing topic so we can test subscribing to a new and
66
72
// existing topic
67
67
-
srv := createServerWithExistingTopic(t, "topic a")
73
73
+
srv := createServerWithExistingTopic(t, topicA)
68
74
69
69
-
_ = createConnectionAndSubscribe(t, []string{"topic a", "topic b"})
75
75
+
_ = createConnectionAndSubscribe(t, []string{topicA, topicB})
70
76
71
77
assert.Len(t, srv.topics, 2)
72
72
-
assert.Len(t, srv.topics["topic a"].subscriptions, 1)
73
73
-
assert.Len(t, srv.topics["topic b"].subscriptions, 1)
78
78
+
assert.Len(t, srv.topics[topicA].subscriptions, 1)
79
79
+
assert.Len(t, srv.topics[topicB].subscriptions, 1)
74
80
}
75
81
76
82
func TestUnsubscribesFromTopic(t *testing.T) {
77
77
-
srv := createServerWithExistingTopic(t, "topic a")
83
83
+
srv := createServerWithExistingTopic(t, topicA)
78
84
79
79
-
conn := createConnectionAndSubscribe(t, []string{"topic a", "topic b", "topic c"})
85
85
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC})
80
86
81
87
assert.Len(t, srv.topics, 3)
82
82
-
assert.Len(t, srv.topics["topic a"].subscriptions, 1)
83
83
-
assert.Len(t, srv.topics["topic b"].subscriptions, 1)
84
84
-
assert.Len(t, srv.topics["topic c"].subscriptions, 1)
88
88
+
assert.Len(t, srv.topics[topicA].subscriptions, 1)
89
89
+
assert.Len(t, srv.topics[topicB].subscriptions, 1)
90
90
+
assert.Len(t, srv.topics[topicC].subscriptions, 1)
85
91
86
92
err := binary.Write(conn, binary.BigEndian, Unsubscribe)
87
93
require.NoError(t, err)
88
94
89
89
-
topics := []string{"topic a", "topic b"}
95
95
+
topics := []string{topicA, topicB}
90
96
rawTopics, err := json.Marshal(topics)
91
97
require.NoError(t, err)
92
98
···
104
110
assert.Equal(t, expectedRes, int(resp))
105
111
106
112
assert.Len(t, srv.topics, 3)
107
107
-
assert.Len(t, srv.topics["topic a"].subscriptions, 0)
108
108
-
assert.Len(t, srv.topics["topic b"].subscriptions, 0)
109
109
-
assert.Len(t, srv.topics["topic c"].subscriptions, 1)
113
113
+
assert.Len(t, srv.topics[topicA].subscriptions, 0)
114
114
+
assert.Len(t, srv.topics[topicB].subscriptions, 0)
115
115
+
assert.Len(t, srv.topics[topicC].subscriptions, 1)
110
116
}
111
117
112
118
func TestSubscriberClosesWithoutUnsubscribing(t *testing.T) {
113
119
srv := createServer(t)
114
120
115
115
-
conn := createConnectionAndSubscribe(t, []string{"topic a", "topic b"})
121
121
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB})
116
122
117
123
assert.Len(t, srv.topics, 2)
118
118
-
assert.Len(t, srv.topics["topic a"].subscriptions, 1)
119
119
-
assert.Len(t, srv.topics["topic b"].subscriptions, 1)
124
124
+
assert.Len(t, srv.topics[topicA].subscriptions, 1)
125
125
+
assert.Len(t, srv.topics[topicB].subscriptions, 1)
120
126
121
127
// close the conn
122
128
err := conn.Close()
123
129
require.NoError(t, err)
124
130
125
125
-
publisherConn, err := net.Dial("tcp", "localhost:3000")
131
131
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
126
132
require.NoError(t, err)
127
133
128
134
err = binary.Write(publisherConn, binary.BigEndian, Publish)
···
137
143
require.Equal(t, len(data), n)
138
144
139
145
assert.Len(t, srv.topics, 2)
140
140
-
assert.Len(t, srv.topics["topic a"].subscriptions, 0)
141
141
-
assert.Len(t, srv.topics["topic b"].subscriptions, 0)
146
146
+
assert.Len(t, srv.topics[topicA].subscriptions, 0)
147
147
+
assert.Len(t, srv.topics[topicB].subscriptions, 0)
142
148
}
143
149
144
150
func TestInvalidAction(t *testing.T) {
145
151
_ = createServer(t)
146
152
147
147
-
conn, err := net.Dial("tcp", "localhost:3000")
153
153
+
conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
148
154
require.NoError(t, err)
149
155
150
156
err = binary.Write(conn, binary.BigEndian, uint8(99))
···
170
176
assert.Equal(t, expectedMessage, string(buf))
171
177
}
172
178
173
173
-
func TestInvalidMessagePublished(t *testing.T) {
179
179
+
func TestInvalidTopicDataPublished(t *testing.T) {
174
180
_ = createServer(t)
175
181
176
176
-
publisherConn, err := net.Dial("tcp", "localhost:3000")
182
182
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
177
183
require.NoError(t, err)
178
184
179
185
err = binary.Write(publisherConn, binary.BigEndian, Publish)
180
186
require.NoError(t, err)
181
187
182
182
-
// send some data
183
183
-
data := []byte("this isn't wrapped in a message type")
184
184
-
185
185
-
// send data length first
186
186
-
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(data)))
188
188
+
// send topic
189
189
+
topic := topicA
190
190
+
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic)))
187
191
require.NoError(t, err)
188
188
-
n, err := publisherConn.Write(data)
192
192
+
_, err = publisherConn.Write([]byte(topic))
189
193
require.NoError(t, err)
190
190
-
require.Equal(t, len(data), n)
191
194
192
195
expectedRes := Error
193
196
···
196
199
197
200
assert.Equal(t, expectedRes, int(resp))
198
201
199
199
-
expectedMessage := "invalid message"
202
202
+
expectedMessage := "topic data does not contain 'topic:' prefix"
200
203
201
204
var dataLen uint32
202
205
err = binary.Read(publisherConn, binary.BigEndian, &dataLen)
···
212
215
func TestSendsDataToTopicSubscribers(t *testing.T) {
213
216
_ = createServer(t)
214
217
215
215
-
subscribers := make([]net.Conn, 0, 5)
216
216
-
for i := 0; i < 5; i++ {
217
217
-
subscriberConn := createConnectionAndSubscribe(t, []string{"topic a", "topic b"})
218
218
+
subscribers := make([]net.Conn, 0, 1)
219
219
+
for i := 0; i < 1; i++ {
220
220
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
218
221
219
222
subscribers = append(subscribers, subscriberConn)
220
223
}
221
224
222
222
-
publisherConn, err := net.Dial("tcp", "localhost:3000")
225
225
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
223
226
require.NoError(t, err)
224
227
225
228
err = binary.Write(publisherConn, binary.BigEndian, Publish)
226
229
require.NoError(t, err)
227
230
228
228
-
// send a message
229
229
-
msg := messagebroker.Message{
230
230
-
Topic: "topic a",
231
231
-
Data: []byte("hello world"),
232
232
-
}
231
231
+
topic := fmt.Sprintf("topic:%s", topicA)
232
232
+
messageData := "hello world"
233
233
234
234
-
rawMsg, err := json.Marshal(msg)
234
234
+
// send topic first
235
235
+
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic)))
236
236
+
require.NoError(t, err)
237
237
+
_, err = publisherConn.Write([]byte(topic))
235
238
require.NoError(t, err)
236
239
237
237
-
// send data length first
238
238
-
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(rawMsg)))
240
240
+
// now send the data
241
241
+
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData)))
239
242
require.NoError(t, err)
240
240
-
n, err := publisherConn.Write(rawMsg)
243
243
+
n, err := publisherConn.Write([]byte(messageData))
241
244
require.NoError(t, err)
242
242
-
require.Equal(t, len(rawMsg), n)
245
245
+
require.Equal(t, len(messageData), n)
243
246
244
247
// check the subsribers got the data
245
248
for _, conn := range subscribers {
249
249
+
var topicLen uint64
250
250
+
err = binary.Read(conn, binary.BigEndian, &topicLen)
251
251
+
require.NoError(t, err)
252
252
+
253
253
+
topicBuf := make([]byte, topicLen)
254
254
+
_, err = conn.Read(topicBuf)
255
255
+
require.NoError(t, err)
256
256
+
assert.Equal(t, topicA, string(topicBuf))
246
257
247
258
var dataLen uint64
248
259
err = binary.Read(conn, binary.BigEndian, &dataLen)
···
253
264
require.NoError(t, err)
254
265
require.Equal(t, int(dataLen), n)
255
266
256
256
-
assert.Equal(t, rawMsg, buf)
267
267
+
assert.Equal(t, messageData, string(buf))
257
268
}
258
269
}
259
270
260
271
func TestPublishMultipleTimes(t *testing.T) {
261
272
_ = createServer(t)
262
273
263
263
-
publisherConn, err := net.Dial("tcp", "localhost:3000")
274
274
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
264
275
require.NoError(t, err)
265
276
266
277
err = binary.Write(publisherConn, binary.BigEndian, Publish)
···
268
279
269
280
messages := make([][]byte, 0, 10)
270
281
for i := 0; i < 10; i++ {
271
271
-
msg := messagebroker.Message{
272
272
-
Topic: "topic a",
273
273
-
Data: []byte(fmt.Sprintf("message %d", i)),
274
274
-
}
275
275
-
276
276
-
rawMsg, err := json.Marshal(msg)
277
277
-
require.NoError(t, err)
278
278
-
279
279
-
messages = append(messages, rawMsg)
282
282
+
messages = append(messages, []byte(fmt.Sprintf("message %d", i)))
280
283
}
281
284
282
285
subscribeFinCh := make(chan struct{})
283
286
// create a subscriber that will read messages
284
284
-
subscriberConn := createConnectionAndSubscribe(t, []string{"topic a", "topic b"})
287
287
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
285
288
go func() {
286
289
// check subscriber got all messages
287
290
for _, msg := range messages {
291
291
+
var topicLen uint64
292
292
+
err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
293
293
+
require.NoError(t, err)
294
294
+
295
295
+
topicBuf := make([]byte, topicLen)
296
296
+
_, err = subscriberConn.Read(topicBuf)
297
297
+
require.NoError(t, err)
298
298
+
assert.Equal(t, topicA, string(topicBuf))
299
299
+
288
300
var dataLen uint64
289
301
err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
290
302
require.NoError(t, err)
···
300
312
subscribeFinCh <- struct{}{}
301
313
}()
302
314
315
315
+
topic := fmt.Sprintf("topic:%s", topicA)
316
316
+
303
317
// send multiple messages
304
318
for _, msg := range messages {
305
305
-
// send data length first
319
319
+
// send topic first
320
320
+
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic)))
321
321
+
require.NoError(t, err)
322
322
+
_, err = publisherConn.Write([]byte(topic))
323
323
+
require.NoError(t, err)
324
324
+
325
325
+
// now send the data
306
326
err = binary.Write(publisherConn, binary.BigEndian, uint32(len(msg)))
307
327
require.NoError(t, err)
308
308
-
n, err := publisherConn.Write(msg)
328
328
+
n, err := publisherConn.Write([]byte(msg))
309
329
require.NoError(t, err)
310
330
require.Equal(t, len(msg), n)
311
331
}
-26
server/subscriber.go
···
1
1
-
package server
2
2
-
3
3
-
import (
4
4
-
"encoding/binary"
5
5
-
"fmt"
6
6
-
)
7
7
-
8
8
-
type subscriber struct {
9
9
-
peer peer
10
10
-
currentOffset int
11
11
-
}
12
12
-
13
13
-
func (s *subscriber) sendMessage(msg []byte) error {
14
14
-
dataLen := uint64(len(msg))
15
15
-
16
16
-
err := binary.Write(&s.peer, binary.BigEndian, dataLen)
17
17
-
if err != nil {
18
18
-
return fmt.Errorf("failed to send data length: %w", err)
19
19
-
}
20
20
-
21
21
-
_, err = s.peer.Write(msg)
22
22
-
if err != nil {
23
23
-
return fmt.Errorf("failed to write to peer: %w", err)
24
24
-
}
25
25
-
return nil
26
26
-
}
+39
-11
server/topic.go
···
1
1
package server
2
2
3
3
import (
4
4
-
"encoding/json"
4
4
+
"encoding/binary"
5
5
+
"fmt"
5
6
"log/slog"
6
7
"net"
7
8
"sync"
8
8
-
9
9
-
"github.com/willdot/messagebroker"
10
9
)
11
10
12
11
type topic struct {
···
15
14
mu sync.Mutex
16
15
}
17
16
17
17
+
type subscriber struct {
18
18
+
peer *peer
19
19
+
currentOffset int
20
20
+
}
21
21
+
18
22
func newTopic(name string) topic {
19
23
return topic{
20
24
name: name,
···
30
34
delete(t.subscriptions, addr)
31
35
}
32
36
33
33
-
func (t *topic) sendMessageToSubscribers(msg messagebroker.Message) {
37
37
+
func (t *topic) sendMessageToSubscribers(msgData []byte) {
34
38
t.mu.Lock()
35
39
subscribers := t.subscriptions
36
40
t.mu.Unlock()
37
41
38
38
-
msgData, err := json.Marshal(msg)
39
39
-
if err != nil {
40
40
-
slog.Error("failed to marshal message for subscribers", "error", err)
41
41
-
}
42
42
+
for addr, subscriber := range subscribers {
43
43
+
//sendMessageOpFunc := sendMessageOp(t.name, msgData)
42
44
43
43
-
for addr, subscriber := range subscribers {
44
44
-
err := subscriber.sendMessage(msgData)
45
45
+
err := subscriber.peer.connOperation(sendMessageOp(t.name, msgData), "send message to subscribers")
45
46
if err != nil {
46
47
slog.Error("failed to send to message", "error", err, "peer", addr)
47
47
-
continue
48
48
+
return
48
49
}
49
50
}
50
51
}
52
52
+
53
53
+
func sendMessageOp(topic string, data []byte) connOpp {
54
54
+
return func(conn net.Conn) error {
55
55
+
topicLen := uint64(len(topic))
56
56
+
err := binary.Write(conn, binary.BigEndian, topicLen)
57
57
+
if err != nil {
58
58
+
return fmt.Errorf("failed to send topic length: %w", err)
59
59
+
}
60
60
+
_, err = conn.Write([]byte(topic))
61
61
+
if err != nil {
62
62
+
return fmt.Errorf("failed to send topic: %w", err)
63
63
+
}
64
64
+
65
65
+
dataLen := uint64(len(data))
66
66
+
67
67
+
err = binary.Write(conn, binary.BigEndian, dataLen)
68
68
+
if err != nil {
69
69
+
return fmt.Errorf("failed to send data length: %w", err)
70
70
+
}
71
71
+
72
72
+
_, err = conn.Write(data)
73
73
+
if err != nil {
74
74
+
return fmt.Errorf("failed to write to peer: %w", err)
75
75
+
}
76
76
+
return nil
77
77
+
}
78
78
+
}