An experimental pub/sub client and server project.
at main 632 lines 18 kB view raw
1package server 2 3import ( 4 "encoding/binary" 5 "encoding/json" 6 "fmt" 7 "net" 8 "testing" 9 "time" 10 11 "github.com/stretchr/testify/assert" 12 "github.com/stretchr/testify/require" 13 "github.com/willdot/messagebroker/internal/messagestore" 14) 15 16const ( 17 topicA = "topic a" 18 topicB = "topic b" 19 topicC = "topic c" 20 21 serverAddr = ":6666" 22 23 ackDelay = time.Millisecond * 100 24 ackTimeout = time.Millisecond * 100 25) 26 27func createServer(t *testing.T) *Server { 28 srv, err := New(serverAddr, ackDelay, ackTimeout) 29 require.NoError(t, err) 30 31 t.Cleanup(func() { 32 _ = srv.Shutdown() 33 }) 34 35 return srv 36} 37 38func createServerWithExistingTopic(t *testing.T, topicName string) *Server { 39 srv := createServer(t) 40 srv.topics[topicName] = &topic{ 41 name: topicName, 42 subscriptions: make(map[net.Addr]*subscriber), 43 messageStore: messagestore.NewMemoryStore(), 44 } 45 46 return srv 47} 48 49func createConnectionAndSubscribe(t *testing.T, topics []string, startAtType StartAtType, startAtIndex int) net.Conn { 50 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 51 require.NoError(t, err) 52 53 subscribeToTopics(t, conn, topics, startAtType, startAtIndex) 54 55 expectedRes := Subscribed 56 57 var resp Status 58 err = binary.Read(conn, binary.BigEndian, &resp) 59 require.NoError(t, err) 60 61 assert.Equal(t, expectedRes, resp) 62 63 return conn 64} 65 66func sendMessage(t *testing.T, conn net.Conn, topic string, message []byte) { 67 topicLenB := make([]byte, 4) 68 binary.BigEndian.PutUint32(topicLenB, uint32(len(topic))) 69 70 headers := topicLenB 71 headers = append(headers, []byte(topic)...) 72 73 messageLenB := make([]byte, 4) 74 binary.BigEndian.PutUint32(messageLenB, uint32(len(message))) 75 headers = append(headers, messageLenB...) 76 77 _, err := conn.Write(append(headers, message...)) 78 require.NoError(t, err) 79} 80 81func subscribeToTopics(t *testing.T, conn net.Conn, topics []string, startAtType StartAtType, startAtIndex int) { 82 actionB := make([]byte, 2) 83 binary.BigEndian.PutUint16(actionB, uint16(Subscribe)) 84 headers := actionB 85 86 b, err := json.Marshal(topics) 87 require.NoError(t, err) 88 89 topicNamesB := make([]byte, 4) 90 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 91 headers = append(headers, topicNamesB...) 92 headers = append(headers, b...) 93 94 startAtTypeB := make([]byte, 2) 95 binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType)) 96 headers = append(headers, startAtTypeB...) 97 98 if startAtType == From { 99 fromB := make([]byte, 2) 100 binary.BigEndian.PutUint16(fromB, uint16(startAtIndex)) 101 headers = append(headers, fromB...) 102 } 103 104 _, err = conn.Write(headers) 105 require.NoError(t, err) 106} 107 108func unsubscribetoTopics(t *testing.T, conn net.Conn, topics []string) { 109 actionB := make([]byte, 2) 110 binary.BigEndian.PutUint16(actionB, uint16(Unsubscribe)) 111 headers := actionB 112 113 b, err := json.Marshal(topics) 114 require.NoError(t, err) 115 116 topicNamesB := make([]byte, 4) 117 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 118 headers = append(headers, topicNamesB...) 119 120 _, err = conn.Write(append(headers, b...)) 121 require.NoError(t, err) 122} 123 124func TestSubscribeToTopics(t *testing.T) { 125 // create a server with an existing topic so we can test subscribing to a new and 126 // existing topic 127 srv := createServerWithExistingTopic(t, topicA) 128 129 _ = createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 130 131 srv.mu.Lock() 132 defer srv.mu.Unlock() 133 assert.Len(t, srv.topics, 2) 134 assert.Len(t, srv.topics[topicA].subscriptions, 1) 135 assert.Len(t, srv.topics[topicB].subscriptions, 1) 136} 137 138func TestUnsubscribesFromTopic(t *testing.T) { 139 srv := createServerWithExistingTopic(t, topicA) 140 141 conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC}, Current, 0) 142 143 assert.Len(t, srv.topics, 3) 144 145 srv.mu.Lock() 146 assert.Len(t, srv.topics[topicA].subscriptions, 1) 147 assert.Len(t, srv.topics[topicB].subscriptions, 1) 148 assert.Len(t, srv.topics[topicC].subscriptions, 1) 149 srv.mu.Unlock() 150 151 topics := []string{topicA, topicB} 152 153 unsubscribetoTopics(t, conn, topics) 154 155 expectedRes := Unsubscribed 156 157 var resp Status 158 err := binary.Read(conn, binary.BigEndian, &resp) 159 require.NoError(t, err) 160 161 assert.Equal(t, expectedRes, resp) 162 163 assert.Len(t, srv.topics, 3) 164 165 srv.mu.Lock() 166 assert.Len(t, srv.topics[topicA].subscriptions, 0) 167 assert.Len(t, srv.topics[topicB].subscriptions, 0) 168 assert.Len(t, srv.topics[topicC].subscriptions, 1) 169 srv.mu.Unlock() 170} 171 172func TestSubscriberClosesWithoutUnsubscribing(t *testing.T) { 173 srv := createServer(t) 174 175 conn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 176 177 assert.Len(t, srv.topics, 2) 178 179 srv.mu.Lock() 180 assert.Len(t, srv.topics[topicA].subscriptions, 1) 181 assert.Len(t, srv.topics[topicB].subscriptions, 1) 182 srv.mu.Unlock() 183 184 // close the conn 185 err := conn.Close() 186 require.NoError(t, err) 187 188 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 189 require.NoError(t, err) 190 191 err = binary.Write(publisherConn, binary.BigEndian, Publish) 192 require.NoError(t, err) 193 194 data := []byte("hello world") 195 196 sendMessage(t, publisherConn, topicA, data) 197 198 // the timeout for a connection is 100 milliseconds, so we should wait at least this long before checking the unsubscribe 199 // TODO: see if theres a better way, but without this, the test is flakey 200 time.Sleep(time.Millisecond * 100) 201 202 assert.Len(t, srv.topics, 2) 203 204 srv.mu.Lock() 205 assert.Len(t, srv.topics[topicA].subscriptions, 0) 206 assert.Len(t, srv.topics[topicB].subscriptions, 0) 207 srv.mu.Unlock() 208} 209 210func TestInvalidAction(t *testing.T) { 211 _ = createServer(t) 212 213 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 214 require.NoError(t, err) 215 216 err = binary.Write(conn, binary.BigEndian, uint16(99)) 217 require.NoError(t, err) 218 219 expectedRes := Error 220 221 var resp Status 222 err = binary.Read(conn, binary.BigEndian, &resp) 223 require.NoError(t, err) 224 225 assert.Equal(t, expectedRes, resp) 226 227 expectedMessage := "unknown action" 228 229 var dataLen uint16 230 err = binary.Read(conn, binary.BigEndian, &dataLen) 231 require.NoError(t, err) 232 assert.Equal(t, len(expectedMessage), int(dataLen)) 233 234 buf := make([]byte, dataLen) 235 _, err = conn.Read(buf) 236 require.NoError(t, err) 237 238 assert.Equal(t, expectedMessage, string(buf)) 239} 240 241func TestInvalidTopicDataPublished(t *testing.T) { 242 _ = createServer(t) 243 244 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 245 require.NoError(t, err) 246 247 err = binary.Write(publisherConn, binary.BigEndian, Publish) 248 require.NoError(t, err) 249 250 // send topic 251 topic := topicA 252 err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 253 require.NoError(t, err) 254 _, err = publisherConn.Write([]byte(topic)) 255 require.NoError(t, err) 256 257 expectedRes := Error 258 259 var resp Status 260 err = binary.Read(publisherConn, binary.BigEndian, &resp) 261 require.NoError(t, err) 262 263 assert.Equal(t, expectedRes, resp) 264 265 expectedMessage := "topic data does not contain 'topic:' prefix" 266 267 var dataLen uint16 268 err = binary.Read(publisherConn, binary.BigEndian, &dataLen) 269 require.NoError(t, err) 270 assert.Equal(t, len(expectedMessage), int(dataLen)) 271 272 buf := make([]byte, dataLen) 273 _, err = publisherConn.Read(buf) 274 require.NoError(t, err) 275 276 assert.Equal(t, expectedMessage, string(buf)) 277} 278 279func TestSendsDataToTopicSubscribers(t *testing.T) { 280 _ = createServer(t) 281 282 subscribers := make([]net.Conn, 0, 10) 283 for i := 0; i < 10; i++ { 284 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 285 286 subscribers = append(subscribers, subscriberConn) 287 } 288 289 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 290 require.NoError(t, err) 291 292 err = binary.Write(publisherConn, binary.BigEndian, Publish) 293 require.NoError(t, err) 294 295 topic := fmt.Sprintf("topic:%s", topicA) 296 messageData := "hello world" 297 298 sendMessage(t, publisherConn, topic, []byte(messageData)) 299 300 // check the subsribers got the data 301 for _, conn := range subscribers { 302 msg := readMessage(t, conn) 303 assert.Equal(t, messageData, string(msg)) 304 } 305} 306 307func TestPublishMultipleTimes(t *testing.T) { 308 _ = createServer(t) 309 310 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 311 require.NoError(t, err) 312 313 err = binary.Write(publisherConn, binary.BigEndian, Publish) 314 require.NoError(t, err) 315 316 messages := make([]string, 0, 10) 317 for i := 0; i < 10; i++ { 318 messages = append(messages, fmt.Sprintf("message %d", i)) 319 } 320 321 subscribeFinCh := make(chan struct{}) 322 // create a subscriber that will read messages 323 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 324 go func() { 325 // check subscriber got all messages 326 results := make([]string, 0, len(messages)) 327 for i := 0; i < len(messages); i++ { 328 msg := readMessage(t, subscriberConn) 329 results = append(results, string(msg)) 330 } 331 332 assert.ElementsMatch(t, results, messages) 333 334 subscribeFinCh <- struct{}{} 335 }() 336 337 topic := fmt.Sprintf("topic:%s", topicA) 338 339 // send multiple messages 340 for _, msg := range messages { 341 sendMessage(t, publisherConn, topic, []byte(msg)) 342 } 343 344 select { 345 case <-subscribeFinCh: 346 break 347 case <-time.After(time.Second): 348 t.Fatal(fmt.Errorf("timed out waiting for subscriber to read messages")) 349 } 350} 351 352func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) { 353 _ = createServer(t) 354 355 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 356 357 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 358 require.NoError(t, err) 359 360 err = binary.Write(publisherConn, binary.BigEndian, Publish) 361 require.NoError(t, err) 362 363 topic := fmt.Sprintf("topic:%s", topicA) 364 messageData := "hello world" 365 366 sendMessage(t, publisherConn, topic, []byte(messageData)) 367 368 // check the subsribers got the data 369 readMessage := func(conn net.Conn, ack Action) { 370 var topicLen uint16 371 err = binary.Read(conn, binary.BigEndian, &topicLen) 372 require.NoError(t, err) 373 374 topicBuf := make([]byte, topicLen) 375 _, err = conn.Read(topicBuf) 376 require.NoError(t, err) 377 assert.Equal(t, topicA, string(topicBuf)) 378 379 var dataLen uint64 380 err = binary.Read(conn, binary.BigEndian, &dataLen) 381 require.NoError(t, err) 382 383 buf := make([]byte, dataLen) 384 n, err := conn.Read(buf) 385 require.NoError(t, err) 386 387 require.Equal(t, int(dataLen), n) 388 389 assert.Equal(t, messageData, string(buf)) 390 391 err = binary.Write(conn, binary.BigEndian, ack) 392 require.NoError(t, err) 393 } 394 395 // NACK the message and then ack it 396 readMessage(subscriberConn, Nack) 397 readMessage(subscriberConn, Ack) 398 // reading for another message should now timeout but give enough time for the ack delay to kick in 399 // should the second read of the message not have been ack'd properly 400 var topicLen uint16 401 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 402 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 403 require.Error(t, err) 404} 405 406func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) { 407 _ = createServer(t) 408 409 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 410 411 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 412 require.NoError(t, err) 413 414 err = binary.Write(publisherConn, binary.BigEndian, Publish) 415 require.NoError(t, err) 416 417 topic := fmt.Sprintf("topic:%s", topicA) 418 messageData := "hello world" 419 420 sendMessage(t, publisherConn, topic, []byte(messageData)) 421 422 // check the subsribers got the data 423 readMessage := func(conn net.Conn, ack bool) { 424 var topicLen uint16 425 err = binary.Read(conn, binary.BigEndian, &topicLen) 426 require.NoError(t, err) 427 428 topicBuf := make([]byte, topicLen) 429 _, err = conn.Read(topicBuf) 430 require.NoError(t, err) 431 assert.Equal(t, topicA, string(topicBuf)) 432 433 var dataLen uint64 434 err = binary.Read(conn, binary.BigEndian, &dataLen) 435 require.NoError(t, err) 436 437 buf := make([]byte, dataLen) 438 n, err := conn.Read(buf) 439 require.NoError(t, err) 440 441 require.Equal(t, int(dataLen), n) 442 443 assert.Equal(t, messageData, string(buf)) 444 445 if ack { 446 err = binary.Write(conn, binary.BigEndian, Ack) 447 require.NoError(t, err) 448 return 449 } 450 } 451 452 // don't send ack or nack and then ack on the second attempt 453 readMessage(subscriberConn, false) 454 readMessage(subscriberConn, true) 455 456 // reading for another message should now timeout but give enough time for the ack delay to kick in 457 // should the second read of the message not have been ack'd properly 458 var topicLen uint16 459 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 460 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 461 require.Error(t, err) 462} 463 464func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) { 465 _ = createServer(t) 466 467 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0) 468 469 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 470 require.NoError(t, err) 471 472 err = binary.Write(publisherConn, binary.BigEndian, Publish) 473 require.NoError(t, err) 474 475 topic := fmt.Sprintf("topic:%s", topicA) 476 messageData := "hello world" 477 478 sendMessage(t, publisherConn, topic, []byte(messageData)) 479 480 // check the subsribers got the data 481 readMessage := func(conn net.Conn, ack bool) { 482 var topicLen uint16 483 err = binary.Read(conn, binary.BigEndian, &topicLen) 484 require.NoError(t, err) 485 486 topicBuf := make([]byte, topicLen) 487 _, err = conn.Read(topicBuf) 488 require.NoError(t, err) 489 assert.Equal(t, topicA, string(topicBuf)) 490 491 var dataLen uint64 492 err = binary.Read(conn, binary.BigEndian, &dataLen) 493 require.NoError(t, err) 494 495 buf := make([]byte, dataLen) 496 n, err := conn.Read(buf) 497 require.NoError(t, err) 498 499 require.Equal(t, int(dataLen), n) 500 501 assert.Equal(t, messageData, string(buf)) 502 503 if ack { 504 err = binary.Write(conn, binary.BigEndian, Ack) 505 require.NoError(t, err) 506 return 507 } 508 } 509 510 // nack the message 5 times 511 readMessage(subscriberConn, false) 512 readMessage(subscriberConn, false) 513 readMessage(subscriberConn, false) 514 readMessage(subscriberConn, false) 515 readMessage(subscriberConn, false) 516 517 // reading for the message should now timeout as we have nack'd the message too many times 518 var topicLen uint16 519 _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 520 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 521 require.Error(t, err) 522} 523 524func TestSubscribeAndReplaysFromStart(t *testing.T) { 525 _ = createServer(t) 526 527 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 528 require.NoError(t, err) 529 530 err = binary.Write(publisherConn, binary.BigEndian, Publish) 531 require.NoError(t, err) 532 533 messages := make([]string, 0, 10) 534 for i := 0; i < 10; i++ { 535 messages = append(messages, fmt.Sprintf("message %d", i)) 536 } 537 538 topic := fmt.Sprintf("topic:%s", topicA) 539 540 for _, msg := range messages { 541 sendMessage(t, publisherConn, topic, []byte(msg)) 542 } 543 544 // send some messages for topic B as well 545 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1")) 546 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2")) 547 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3")) 548 549 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 0) 550 results := make([]string, 0, len(messages)) 551 for i := 0; i < len(messages); i++ { 552 msg := readMessage(t, subscriberConn) 553 results = append(results, string(msg)) 554 } 555 assert.ElementsMatch(t, results, messages) 556} 557 558func TestSubscribeAndReplaysFromIndex(t *testing.T) { 559 _ = createServer(t) 560 561 publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 562 require.NoError(t, err) 563 564 err = binary.Write(publisherConn, binary.BigEndian, Publish) 565 require.NoError(t, err) 566 567 messages := make([]string, 0, 10) 568 for i := 0; i < 10; i++ { 569 messages = append(messages, fmt.Sprintf("message %d", i)) 570 } 571 572 topic := fmt.Sprintf("topic:%s", topicA) 573 574 // send multiple messages 575 for _, msg := range messages { 576 sendMessage(t, publisherConn, topic, []byte(msg)) 577 } 578 579 // send some messages for topic B as well 580 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1")) 581 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2")) 582 sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3")) 583 584 subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 3) 585 586 // now that the subscriber has subecribed send another message that should arrive after all the other messages were consumed 587 sendMessage(t, publisherConn, topic, []byte("hello there")) 588 589 results := make([]string, 0, len(messages)) 590 for i := 0; i < len(messages)-3; i++ { 591 msg := readMessage(t, subscriberConn) 592 results = append(results, string(msg)) 593 } 594 require.Len(t, results, 7) 595 expMessages := make([]string, 0, 7) 596 for i, msg := range messages { 597 if i < 3 { 598 continue 599 } 600 expMessages = append(expMessages, msg) 601 } 602 assert.Equal(t, expMessages, results) 603 604 // now check we can get the message that was sent after the subscription was created 605 msg := readMessage(t, subscriberConn) 606 assert.Equal(t, "hello there", string(msg)) 607} 608 609func readMessage(t *testing.T, subscriberConn net.Conn) []byte { 610 var topicLen uint16 611 err := binary.Read(subscriberConn, binary.BigEndian, &topicLen) 612 require.NoError(t, err) 613 614 topicBuf := make([]byte, topicLen) 615 _, err = subscriberConn.Read(topicBuf) 616 require.NoError(t, err) 617 assert.Equal(t, topicA, string(topicBuf)) 618 619 var dataLen uint64 620 err = binary.Read(subscriberConn, binary.BigEndian, &dataLen) 621 require.NoError(t, err) 622 623 buf := make([]byte, dataLen) 624 n, err := subscriberConn.Read(buf) 625 require.NoError(t, err) 626 require.Equal(t, int(dataLen), n) 627 628 err = binary.Write(subscriberConn, binary.BigEndian, Ack) 629 require.NoError(t, err) 630 631 return buf 632}