An experimental pub/sub client and server project.
1package server
2
3import (
4 "encoding/binary"
5 "encoding/json"
6 "fmt"
7 "net"
8 "testing"
9 "time"
10
11 "github.com/stretchr/testify/assert"
12 "github.com/stretchr/testify/require"
13 "github.com/willdot/messagebroker/internal/messagestore"
14)
15
16const (
17 topicA = "topic a"
18 topicB = "topic b"
19 topicC = "topic c"
20
21 serverAddr = ":6666"
22
23 ackDelay = time.Millisecond * 100
24 ackTimeout = time.Millisecond * 100
25)
26
27func createServer(t *testing.T) *Server {
28 srv, err := New(serverAddr, ackDelay, ackTimeout)
29 require.NoError(t, err)
30
31 t.Cleanup(func() {
32 _ = srv.Shutdown()
33 })
34
35 return srv
36}
37
38func createServerWithExistingTopic(t *testing.T, topicName string) *Server {
39 srv := createServer(t)
40 srv.topics[topicName] = &topic{
41 name: topicName,
42 subscriptions: make(map[net.Addr]*subscriber),
43 messageStore: messagestore.NewMemoryStore(),
44 }
45
46 return srv
47}
48
49func createConnectionAndSubscribe(t *testing.T, topics []string, startAtType StartAtType, startAtIndex int) net.Conn {
50 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
51 require.NoError(t, err)
52
53 subscribeToTopics(t, conn, topics, startAtType, startAtIndex)
54
55 expectedRes := Subscribed
56
57 var resp Status
58 err = binary.Read(conn, binary.BigEndian, &resp)
59 require.NoError(t, err)
60
61 assert.Equal(t, expectedRes, resp)
62
63 return conn
64}
65
66func sendMessage(t *testing.T, conn net.Conn, topic string, message []byte) {
67 topicLenB := make([]byte, 4)
68 binary.BigEndian.PutUint32(topicLenB, uint32(len(topic)))
69
70 headers := topicLenB
71 headers = append(headers, []byte(topic)...)
72
73 messageLenB := make([]byte, 4)
74 binary.BigEndian.PutUint32(messageLenB, uint32(len(message)))
75 headers = append(headers, messageLenB...)
76
77 _, err := conn.Write(append(headers, message...))
78 require.NoError(t, err)
79}
80
81func subscribeToTopics(t *testing.T, conn net.Conn, topics []string, startAtType StartAtType, startAtIndex int) {
82 actionB := make([]byte, 2)
83 binary.BigEndian.PutUint16(actionB, uint16(Subscribe))
84 headers := actionB
85
86 b, err := json.Marshal(topics)
87 require.NoError(t, err)
88
89 topicNamesB := make([]byte, 4)
90 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
91 headers = append(headers, topicNamesB...)
92 headers = append(headers, b...)
93
94 startAtTypeB := make([]byte, 2)
95 binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
96 headers = append(headers, startAtTypeB...)
97
98 if startAtType == From {
99 fromB := make([]byte, 2)
100 binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
101 headers = append(headers, fromB...)
102 }
103
104 _, err = conn.Write(headers)
105 require.NoError(t, err)
106}
107
108func unsubscribetoTopics(t *testing.T, conn net.Conn, topics []string) {
109 actionB := make([]byte, 2)
110 binary.BigEndian.PutUint16(actionB, uint16(Unsubscribe))
111 headers := actionB
112
113 b, err := json.Marshal(topics)
114 require.NoError(t, err)
115
116 topicNamesB := make([]byte, 4)
117 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
118 headers = append(headers, topicNamesB...)
119
120 _, err = conn.Write(append(headers, b...))
121 require.NoError(t, err)
122}
123
124func TestSubscribeToTopics(t *testing.T) {
125 // create a server with an existing topic so we can test subscribing to a new and
126 // existing topic
127 srv := createServerWithExistingTopic(t, topicA)
128
129 _ = createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
130
131 srv.mu.Lock()
132 defer srv.mu.Unlock()
133 assert.Len(t, srv.topics, 2)
134 assert.Len(t, srv.topics[topicA].subscriptions, 1)
135 assert.Len(t, srv.topics[topicB].subscriptions, 1)
136}
137
138func TestUnsubscribesFromTopic(t *testing.T) {
139 srv := createServerWithExistingTopic(t, topicA)
140
141 conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC}, Current, 0)
142
143 assert.Len(t, srv.topics, 3)
144
145 srv.mu.Lock()
146 assert.Len(t, srv.topics[topicA].subscriptions, 1)
147 assert.Len(t, srv.topics[topicB].subscriptions, 1)
148 assert.Len(t, srv.topics[topicC].subscriptions, 1)
149 srv.mu.Unlock()
150
151 topics := []string{topicA, topicB}
152
153 unsubscribetoTopics(t, conn, topics)
154
155 expectedRes := Unsubscribed
156
157 var resp Status
158 err := binary.Read(conn, binary.BigEndian, &resp)
159 require.NoError(t, err)
160
161 assert.Equal(t, expectedRes, resp)
162
163 assert.Len(t, srv.topics, 3)
164
165 srv.mu.Lock()
166 assert.Len(t, srv.topics[topicA].subscriptions, 0)
167 assert.Len(t, srv.topics[topicB].subscriptions, 0)
168 assert.Len(t, srv.topics[topicC].subscriptions, 1)
169 srv.mu.Unlock()
170}
171
172func TestSubscriberClosesWithoutUnsubscribing(t *testing.T) {
173 srv := createServer(t)
174
175 conn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
176
177 assert.Len(t, srv.topics, 2)
178
179 srv.mu.Lock()
180 assert.Len(t, srv.topics[topicA].subscriptions, 1)
181 assert.Len(t, srv.topics[topicB].subscriptions, 1)
182 srv.mu.Unlock()
183
184 // close the conn
185 err := conn.Close()
186 require.NoError(t, err)
187
188 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
189 require.NoError(t, err)
190
191 err = binary.Write(publisherConn, binary.BigEndian, Publish)
192 require.NoError(t, err)
193
194 data := []byte("hello world")
195
196 sendMessage(t, publisherConn, topicA, data)
197
198 // the timeout for a connection is 100 milliseconds, so we should wait at least this long before checking the unsubscribe
199 // TODO: see if theres a better way, but without this, the test is flakey
200 time.Sleep(time.Millisecond * 100)
201
202 assert.Len(t, srv.topics, 2)
203
204 srv.mu.Lock()
205 assert.Len(t, srv.topics[topicA].subscriptions, 0)
206 assert.Len(t, srv.topics[topicB].subscriptions, 0)
207 srv.mu.Unlock()
208}
209
210func TestInvalidAction(t *testing.T) {
211 _ = createServer(t)
212
213 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
214 require.NoError(t, err)
215
216 err = binary.Write(conn, binary.BigEndian, uint16(99))
217 require.NoError(t, err)
218
219 expectedRes := Error
220
221 var resp Status
222 err = binary.Read(conn, binary.BigEndian, &resp)
223 require.NoError(t, err)
224
225 assert.Equal(t, expectedRes, resp)
226
227 expectedMessage := "unknown action"
228
229 var dataLen uint16
230 err = binary.Read(conn, binary.BigEndian, &dataLen)
231 require.NoError(t, err)
232 assert.Equal(t, len(expectedMessage), int(dataLen))
233
234 buf := make([]byte, dataLen)
235 _, err = conn.Read(buf)
236 require.NoError(t, err)
237
238 assert.Equal(t, expectedMessage, string(buf))
239}
240
241func TestInvalidTopicDataPublished(t *testing.T) {
242 _ = createServer(t)
243
244 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
245 require.NoError(t, err)
246
247 err = binary.Write(publisherConn, binary.BigEndian, Publish)
248 require.NoError(t, err)
249
250 // send topic
251 topic := topicA
252 err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic)))
253 require.NoError(t, err)
254 _, err = publisherConn.Write([]byte(topic))
255 require.NoError(t, err)
256
257 expectedRes := Error
258
259 var resp Status
260 err = binary.Read(publisherConn, binary.BigEndian, &resp)
261 require.NoError(t, err)
262
263 assert.Equal(t, expectedRes, resp)
264
265 expectedMessage := "topic data does not contain 'topic:' prefix"
266
267 var dataLen uint16
268 err = binary.Read(publisherConn, binary.BigEndian, &dataLen)
269 require.NoError(t, err)
270 assert.Equal(t, len(expectedMessage), int(dataLen))
271
272 buf := make([]byte, dataLen)
273 _, err = publisherConn.Read(buf)
274 require.NoError(t, err)
275
276 assert.Equal(t, expectedMessage, string(buf))
277}
278
279func TestSendsDataToTopicSubscribers(t *testing.T) {
280 _ = createServer(t)
281
282 subscribers := make([]net.Conn, 0, 10)
283 for i := 0; i < 10; i++ {
284 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
285
286 subscribers = append(subscribers, subscriberConn)
287 }
288
289 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
290 require.NoError(t, err)
291
292 err = binary.Write(publisherConn, binary.BigEndian, Publish)
293 require.NoError(t, err)
294
295 topic := fmt.Sprintf("topic:%s", topicA)
296 messageData := "hello world"
297
298 sendMessage(t, publisherConn, topic, []byte(messageData))
299
300 // check the subsribers got the data
301 for _, conn := range subscribers {
302 msg := readMessage(t, conn)
303 assert.Equal(t, messageData, string(msg))
304 }
305}
306
307func TestPublishMultipleTimes(t *testing.T) {
308 _ = createServer(t)
309
310 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
311 require.NoError(t, err)
312
313 err = binary.Write(publisherConn, binary.BigEndian, Publish)
314 require.NoError(t, err)
315
316 messages := make([]string, 0, 10)
317 for i := 0; i < 10; i++ {
318 messages = append(messages, fmt.Sprintf("message %d", i))
319 }
320
321 subscribeFinCh := make(chan struct{})
322 // create a subscriber that will read messages
323 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
324 go func() {
325 // check subscriber got all messages
326 results := make([]string, 0, len(messages))
327 for i := 0; i < len(messages); i++ {
328 msg := readMessage(t, subscriberConn)
329 results = append(results, string(msg))
330 }
331
332 assert.ElementsMatch(t, results, messages)
333
334 subscribeFinCh <- struct{}{}
335 }()
336
337 topic := fmt.Sprintf("topic:%s", topicA)
338
339 // send multiple messages
340 for _, msg := range messages {
341 sendMessage(t, publisherConn, topic, []byte(msg))
342 }
343
344 select {
345 case <-subscribeFinCh:
346 break
347 case <-time.After(time.Second):
348 t.Fatal(fmt.Errorf("timed out waiting for subscriber to read messages"))
349 }
350}
351
352func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) {
353 _ = createServer(t)
354
355 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
356
357 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
358 require.NoError(t, err)
359
360 err = binary.Write(publisherConn, binary.BigEndian, Publish)
361 require.NoError(t, err)
362
363 topic := fmt.Sprintf("topic:%s", topicA)
364 messageData := "hello world"
365
366 sendMessage(t, publisherConn, topic, []byte(messageData))
367
368 // check the subsribers got the data
369 readMessage := func(conn net.Conn, ack Action) {
370 var topicLen uint16
371 err = binary.Read(conn, binary.BigEndian, &topicLen)
372 require.NoError(t, err)
373
374 topicBuf := make([]byte, topicLen)
375 _, err = conn.Read(topicBuf)
376 require.NoError(t, err)
377 assert.Equal(t, topicA, string(topicBuf))
378
379 var dataLen uint64
380 err = binary.Read(conn, binary.BigEndian, &dataLen)
381 require.NoError(t, err)
382
383 buf := make([]byte, dataLen)
384 n, err := conn.Read(buf)
385 require.NoError(t, err)
386
387 require.Equal(t, int(dataLen), n)
388
389 assert.Equal(t, messageData, string(buf))
390
391 err = binary.Write(conn, binary.BigEndian, ack)
392 require.NoError(t, err)
393 }
394
395 // NACK the message and then ack it
396 readMessage(subscriberConn, Nack)
397 readMessage(subscriberConn, Ack)
398 // reading for another message should now timeout but give enough time for the ack delay to kick in
399 // should the second read of the message not have been ack'd properly
400 var topicLen uint16
401 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
402 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
403 require.Error(t, err)
404}
405
406func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) {
407 _ = createServer(t)
408
409 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
410
411 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
412 require.NoError(t, err)
413
414 err = binary.Write(publisherConn, binary.BigEndian, Publish)
415 require.NoError(t, err)
416
417 topic := fmt.Sprintf("topic:%s", topicA)
418 messageData := "hello world"
419
420 sendMessage(t, publisherConn, topic, []byte(messageData))
421
422 // check the subsribers got the data
423 readMessage := func(conn net.Conn, ack bool) {
424 var topicLen uint16
425 err = binary.Read(conn, binary.BigEndian, &topicLen)
426 require.NoError(t, err)
427
428 topicBuf := make([]byte, topicLen)
429 _, err = conn.Read(topicBuf)
430 require.NoError(t, err)
431 assert.Equal(t, topicA, string(topicBuf))
432
433 var dataLen uint64
434 err = binary.Read(conn, binary.BigEndian, &dataLen)
435 require.NoError(t, err)
436
437 buf := make([]byte, dataLen)
438 n, err := conn.Read(buf)
439 require.NoError(t, err)
440
441 require.Equal(t, int(dataLen), n)
442
443 assert.Equal(t, messageData, string(buf))
444
445 if ack {
446 err = binary.Write(conn, binary.BigEndian, Ack)
447 require.NoError(t, err)
448 return
449 }
450 }
451
452 // don't send ack or nack and then ack on the second attempt
453 readMessage(subscriberConn, false)
454 readMessage(subscriberConn, true)
455
456 // reading for another message should now timeout but give enough time for the ack delay to kick in
457 // should the second read of the message not have been ack'd properly
458 var topicLen uint16
459 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
460 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
461 require.Error(t, err)
462}
463
464func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) {
465 _ = createServer(t)
466
467 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
468
469 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
470 require.NoError(t, err)
471
472 err = binary.Write(publisherConn, binary.BigEndian, Publish)
473 require.NoError(t, err)
474
475 topic := fmt.Sprintf("topic:%s", topicA)
476 messageData := "hello world"
477
478 sendMessage(t, publisherConn, topic, []byte(messageData))
479
480 // check the subsribers got the data
481 readMessage := func(conn net.Conn, ack bool) {
482 var topicLen uint16
483 err = binary.Read(conn, binary.BigEndian, &topicLen)
484 require.NoError(t, err)
485
486 topicBuf := make([]byte, topicLen)
487 _, err = conn.Read(topicBuf)
488 require.NoError(t, err)
489 assert.Equal(t, topicA, string(topicBuf))
490
491 var dataLen uint64
492 err = binary.Read(conn, binary.BigEndian, &dataLen)
493 require.NoError(t, err)
494
495 buf := make([]byte, dataLen)
496 n, err := conn.Read(buf)
497 require.NoError(t, err)
498
499 require.Equal(t, int(dataLen), n)
500
501 assert.Equal(t, messageData, string(buf))
502
503 if ack {
504 err = binary.Write(conn, binary.BigEndian, Ack)
505 require.NoError(t, err)
506 return
507 }
508 }
509
510 // nack the message 5 times
511 readMessage(subscriberConn, false)
512 readMessage(subscriberConn, false)
513 readMessage(subscriberConn, false)
514 readMessage(subscriberConn, false)
515 readMessage(subscriberConn, false)
516
517 // reading for the message should now timeout as we have nack'd the message too many times
518 var topicLen uint16
519 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100))
520 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
521 require.Error(t, err)
522}
523
524func TestSubscribeAndReplaysFromStart(t *testing.T) {
525 _ = createServer(t)
526
527 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
528 require.NoError(t, err)
529
530 err = binary.Write(publisherConn, binary.BigEndian, Publish)
531 require.NoError(t, err)
532
533 messages := make([]string, 0, 10)
534 for i := 0; i < 10; i++ {
535 messages = append(messages, fmt.Sprintf("message %d", i))
536 }
537
538 topic := fmt.Sprintf("topic:%s", topicA)
539
540 for _, msg := range messages {
541 sendMessage(t, publisherConn, topic, []byte(msg))
542 }
543
544 // send some messages for topic B as well
545 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
546 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
547 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
548
549 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 0)
550 results := make([]string, 0, len(messages))
551 for i := 0; i < len(messages); i++ {
552 msg := readMessage(t, subscriberConn)
553 results = append(results, string(msg))
554 }
555 assert.ElementsMatch(t, results, messages)
556}
557
558func TestSubscribeAndReplaysFromIndex(t *testing.T) {
559 _ = createServer(t)
560
561 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
562 require.NoError(t, err)
563
564 err = binary.Write(publisherConn, binary.BigEndian, Publish)
565 require.NoError(t, err)
566
567 messages := make([]string, 0, 10)
568 for i := 0; i < 10; i++ {
569 messages = append(messages, fmt.Sprintf("message %d", i))
570 }
571
572 topic := fmt.Sprintf("topic:%s", topicA)
573
574 // send multiple messages
575 for _, msg := range messages {
576 sendMessage(t, publisherConn, topic, []byte(msg))
577 }
578
579 // send some messages for topic B as well
580 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
581 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
582 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
583
584 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 3)
585
586 // now that the subscriber has subecribed send another message that should arrive after all the other messages were consumed
587 sendMessage(t, publisherConn, topic, []byte("hello there"))
588
589 results := make([]string, 0, len(messages))
590 for i := 0; i < len(messages)-3; i++ {
591 msg := readMessage(t, subscriberConn)
592 results = append(results, string(msg))
593 }
594 require.Len(t, results, 7)
595 expMessages := make([]string, 0, 7)
596 for i, msg := range messages {
597 if i < 3 {
598 continue
599 }
600 expMessages = append(expMessages, msg)
601 }
602 assert.Equal(t, expMessages, results)
603
604 // now check we can get the message that was sent after the subscription was created
605 msg := readMessage(t, subscriberConn)
606 assert.Equal(t, "hello there", string(msg))
607}
608
609func readMessage(t *testing.T, subscriberConn net.Conn) []byte {
610 var topicLen uint16
611 err := binary.Read(subscriberConn, binary.BigEndian, &topicLen)
612 require.NoError(t, err)
613
614 topicBuf := make([]byte, topicLen)
615 _, err = subscriberConn.Read(topicBuf)
616 require.NoError(t, err)
617 assert.Equal(t, topicA, string(topicBuf))
618
619 var dataLen uint64
620 err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
621 require.NoError(t, err)
622
623 buf := make([]byte, dataLen)
624 n, err := subscriberConn.Read(buf)
625 require.NoError(t, err)
626 require.Equal(t, int(dataLen), n)
627
628 err = binary.Write(subscriberConn, binary.BigEndian, Ack)
629 require.NoError(t, err)
630
631 return buf
632}