An experimental pub/sub client and server project.

Merge pull request #7 from willdot/ack

Implement ack / nack of messages

authored by willdot.net and committed by

GitHub 9d1420b8 7d274c7d

+525 -129
+1 -4
example/main.go
··· 59 59 i := 0 60 60 for { 61 61 i++ 62 - msg := pubsub.Message{ 63 - Topic: "topic a", 64 - Data: []byte(fmt.Sprintf("message %d", i)), 65 - } 62 + msg := pubsub.NewMessage("topic a", []byte(fmt.Sprintf("message %d", i))) 66 63 67 64 err = publisher.PublishMessage(msg) 68 65 if err != nil {
+2 -1
example/server/main.go
··· 5 5 "os" 6 6 "os/signal" 7 7 "syscall" 8 + "time" 8 9 9 10 "github.com/willdot/messagebroker/server" 10 11 ) 11 12 12 13 func main() { 13 - srv, err := server.New(":3000") 14 + srv, err := server.New(":3000", time.Second, time.Second*2) 14 15 if err != nil { 15 16 log.Fatal(err) 16 17 }
+16
pubsub/message.go
··· 4 4 type Message struct { 5 5 Topic string `json:"topic"` 6 6 Data []byte `json:"data"` 7 + 8 + ack chan bool 9 + } 10 + 11 + // NewMessage creates a new message 12 + func NewMessage(topic string, data []byte) *Message { 13 + return &Message{ 14 + Topic: topic, 15 + Data: data, 16 + ack: make(chan bool), 17 + } 18 + } 19 + 20 + // Ack will send the provided value of the ack to the server 21 + func (m *Message) Ack(ack bool) { 22 + m.ack <- ack 7 23 }
+1 -1
pubsub/publisher.go
··· 39 39 } 40 40 41 41 // Publish will publish the given message to the server 42 - func (p *Publisher) PublishMessage(message Message) error { 42 + func (p *Publisher) PublishMessage(message *Message) error { 43 43 op := func(conn net.Conn) error { 44 44 // send topic first 45 45 topic := fmt.Sprintf("topic:%s", message.Topic)
+28 -17
pubsub/subscriber.go
··· 143 143 // Consumer allows the consumption of messages. If during the consumer receiving messages from the 144 144 // server an error occurs, it will be stored in Err 145 145 type Consumer struct { 146 - msgs chan Message 146 + msgs chan *Message 147 147 // TODO: better error handling? Maybe a channel of errors? 148 148 Err error 149 149 } 150 150 151 151 // Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once 152 152 // the consumer has finished either due to an error or from being cancelled. 153 - func (c *Consumer) Messages() <-chan Message { 153 + func (c *Consumer) Messages() <-chan *Message { 154 154 return c.msgs 155 155 } 156 156 ··· 158 158 // to read the messages 159 159 func (s *Subscriber) Consume(ctx context.Context) *Consumer { 160 160 consumer := &Consumer{ 161 - msgs: make(chan Message), 161 + msgs: make(chan *Message), 162 162 } 163 163 164 164 go s.consume(ctx, consumer) ··· 173 173 return 174 174 } 175 175 176 - msg, err := s.readMessage() 176 + err := s.readMessage(ctx, consumer.msgs) 177 177 if err != nil { 178 178 consumer.Err = err 179 179 return 180 - } 181 - 182 - if msg != nil { 183 - consumer.msgs <- *msg 184 180 } 185 181 } 186 182 } 187 183 188 - func (s *Subscriber) readMessage() (*Message, error) { 189 - var msg *Message 184 + func (s *Subscriber) readMessage(ctx context.Context, msgChan chan *Message) error { 190 185 op := func(conn net.Conn) error { 191 186 err := s.conn.SetReadDeadline(time.Now().Add(time.Second)) 192 187 if err != nil { ··· 225 220 return err 226 221 } 227 222 228 - msg = &Message{ 229 - Data: dataBuf, 230 - Topic: string(topicBuf), 223 + msg := NewMessage(string(topicBuf), dataBuf) 224 + 225 + msgChan <- msg 226 + 227 + var ack bool 228 + select { 229 + case <-ctx.Done(): 230 + return ctx.Err() 231 + case ack = <-msg.ack: 231 232 } 233 + //ack := <-msg.ack 232 234 233 - return nil 235 + ackMessage := server.Nack 236 + if ack { 237 + ackMessage = server.Ack 238 + } 234 239 240 + err = binary.Write(s.conn, binary.BigEndian, ackMessage) 241 + if err != nil { 242 + return fmt.Errorf("failed to ack/nack message: %w", err) 243 + } 244 + 245 + return nil 235 246 } 236 247 237 248 err := s.connOperation(op) 238 249 if err != nil { 239 250 var neterr net.Error 240 251 if errors.As(err, &neterr) && neterr.Timeout() { 241 - return nil, nil 252 + return nil 242 253 } 243 - return nil, err 254 + return err 244 255 } 245 256 246 - return msg, err 257 + return err 247 258 } 248 259 249 260 func (s *Subscriber) connOperation(op connOpp) error {
+90 -37
pubsub/subscriber_test.go
··· 19 19 ) 20 20 21 21 func createServer(t *testing.T) { 22 - server, err := server.New(serverAddr) 22 + server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100) 23 23 require.NoError(t, err) 24 24 25 25 t.Cleanup(func() { ··· 105 105 consumer := sub.Consume(ctx) 106 106 require.NoError(t, err) 107 107 108 - var receivedMessages []Message 108 + var receivedMessages []*Message 109 109 consumerFinCh := make(chan struct{}) 110 110 go func() { 111 111 for msg := range consumer.Messages() { 112 + msg.Ack(true) 112 113 receivedMessages = append(receivedMessages, msg) 113 114 } 114 115 115 - require.NoError(t, err) 116 116 consumerFinCh <- struct{}{} 117 117 }() 118 118 ··· 125 125 publisher.Close() 126 126 }) 127 127 128 - msg := Message{ 129 - Topic: topicA, 130 - Data: []byte("hello world"), 131 - } 128 + msg := NewMessage(topicA, []byte("hello world")) 132 129 133 130 err = publisher.PublishMessage(msg) 134 131 require.NoError(t, err) ··· 137 134 err = publisher.PublishMessage(msg) 138 135 require.NoError(t, err) 139 136 137 + time.Sleep(time.Second) 140 138 cancel() 141 139 142 140 select { ··· 151 149 } 152 150 153 151 func TestPublishAndSubscribe(t *testing.T) { 154 - createServer(t) 155 - 156 - sub, err := NewSubscriber(serverAddr) 157 - require.NoError(t, err) 158 - 159 - t.Cleanup(func() { 160 - sub.Close() 161 - }) 162 - 163 - topics := []string{topicA, topicB} 164 - 165 - err = sub.SubscribeToTopics(topics) 166 - require.NoError(t, err) 167 - 168 - ctx, cancel := context.WithCancel(context.Background()) 169 - t.Cleanup(func() { 170 - cancel() 171 - }) 172 - 173 - consumer := sub.Consume(ctx) 174 - require.NoError(t, err) 152 + consumer, cancel := setupConsumer(t) 175 153 176 - var receivedMessages []Message 154 + var receivedMessages []*Message 177 155 178 156 consumerFinCh := make(chan struct{}) 179 157 go func() { 180 158 for msg := range consumer.Messages() { 159 + msg.Ack(true) 181 160 receivedMessages = append(receivedMessages, msg) 182 161 } 183 162 184 - require.NoError(t, err) 185 163 consumerFinCh <- struct{}{} 186 164 }() 187 165 ··· 192 170 }) 193 171 194 172 // send some messages 195 - sentMessages := make([]Message, 0, 10) 173 + sentMessages := make([]*Message, 0, 10) 196 174 for i := 0; i < 10; i++ { 197 - msg := Message{ 198 - Topic: topicA, 199 - Data: []byte(fmt.Sprintf("message %d", i)), 200 - } 175 + msg := NewMessage(topicA, []byte(fmt.Sprintf("message %d", i))) 201 176 202 177 sentMessages = append(sentMessages, msg) 203 178 ··· 206 181 } 207 182 208 183 // give the consumer some time to read the messages -- TODO: make better! 209 - time.Sleep(time.Millisecond * 500) 184 + time.Sleep(time.Second) 210 185 cancel() 211 186 212 187 select { 213 188 case <-consumerFinCh: 214 189 break 215 - case <-time.After(time.Second): 190 + case <-time.After(time.Second * 5): 216 191 t.Fatal("timed out waiting for consumer to read messages") 217 192 } 218 193 194 + // THIS IS SO HACKY 195 + for _, msg := range receivedMessages { 196 + msg.ack = nil 197 + } 198 + 199 + for _, msg := range sentMessages { 200 + msg.ack = nil 201 + } 202 + 219 203 assert.ElementsMatch(t, receivedMessages, sentMessages) 220 204 } 205 + 206 + func TestPublishAndSubscribeNackMessage(t *testing.T) { 207 + consumer, cancel := setupConsumer(t) 208 + 209 + var receivedMessages []*Message 210 + 211 + consumerFinCh := make(chan struct{}) 212 + timesMsgWasReceived := 0 213 + go func() { 214 + for msg := range consumer.Messages() { 215 + msg.Ack(false) 216 + timesMsgWasReceived++ 217 + } 218 + 219 + consumerFinCh <- struct{}{} 220 + }() 221 + 222 + publisher, err := NewPublisher("localhost:9999") 223 + require.NoError(t, err) 224 + t.Cleanup(func() { 225 + publisher.Close() 226 + }) 227 + 228 + // send a message 229 + msg := NewMessage(topicA, []byte("hello world")) 230 + 231 + err = publisher.PublishMessage(msg) 232 + require.NoError(t, err) 233 + 234 + // give the consumer some time to read the messages -- TODO: make better! 235 + time.Sleep(time.Second) 236 + cancel() 237 + 238 + select { 239 + case <-consumerFinCh: 240 + break 241 + case <-time.After(time.Second * 5): 242 + t.Fatal("timed out waiting for consumer to read messages") 243 + } 244 + 245 + assert.Empty(t, receivedMessages) 246 + assert.Equal(t, 5, timesMsgWasReceived) 247 + } 248 + 249 + func setupConsumer(t *testing.T) (*Consumer, context.CancelFunc) { 250 + createServer(t) 251 + 252 + sub, err := NewSubscriber(serverAddr) 253 + require.NoError(t, err) 254 + 255 + t.Cleanup(func() { 256 + sub.Close() 257 + }) 258 + 259 + topics := []string{topicA, topicB} 260 + 261 + err = sub.SubscribeToTopics(topics) 262 + require.NoError(t, err) 263 + 264 + ctx, cancel := context.WithCancel(context.Background()) 265 + t.Cleanup(func() { 266 + cancel() 267 + }) 268 + 269 + consumer := sub.Consume(ctx) 270 + require.NoError(t, err) 271 + 272 + return consumer, cancel 273 + }
+27 -10
server/server.go
··· 23 23 Subscribe Action = 1 24 24 Unsubscribe Action = 2 25 25 Publish Action = 3 26 + Ack Action = 4 27 + Nack Action = 5 26 28 ) 27 29 28 30 // Status represents the status of a request ··· 54 56 55 57 mu sync.Mutex 56 58 topics map[string]*topic 59 + 60 + ackDelay time.Duration 61 + ackTimeout time.Duration 57 62 } 58 63 59 64 // New creates and starts a new server 60 - func New(Addr string) (*Server, error) { 65 + func New(Addr string, ackDelay, ackTimeout time.Duration) (*Server, error) { 61 66 lis, err := net.Listen("tcp", Addr) 62 67 if err != nil { 63 68 return nil, fmt.Errorf("failed to listen: %w", err) 64 69 } 65 70 66 71 srv := &Server{ 67 - lis: lis, 68 - topics: map[string]*topic{}, 72 + lis: lis, 73 + topics: map[string]*topic{}, 74 + ackDelay: ackDelay, 75 + ackTimeout: ackTimeout, 69 76 } 70 77 71 78 go srv.start() ··· 337 344 t = newTopic(topicName) 338 345 } 339 346 340 - t.subscriptions[peer.Addr()] = subscriber{ 341 - peer: peer, 342 - currentOffset: 0, 343 - } 347 + t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout) 344 348 345 349 s.topics[topicName] = t 346 350 } ··· 360 364 if !ok { 361 365 return 362 366 } 363 - 367 + sub, ok := t.subscriptions[peer.Addr()] 368 + if !ok { 369 + return 370 + } 371 + sub.unsubscribe() 364 372 delete(t.subscriptions, peer.Addr()) 365 373 } 366 374 ··· 369 377 defer s.mu.Unlock() 370 378 371 379 for _, topic := range s.topics { 380 + sub, ok := topic.subscriptions[peer.Addr()] 381 + if !ok { 382 + continue 383 + } 384 + sub.unsubscribe() 372 385 delete(topic.subscriptions, peer.Addr()) 373 386 } 374 387 } ··· 388 401 var action Action 389 402 op := func(conn net.Conn) error { 390 403 if timeout > 0 { 391 - err := conn.SetReadDeadline(time.Now().Add(timeout)) 392 - if err != nil { 404 + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { 393 405 slog.Error("failed to set connection read deadline", "error", err, "peer", peer.Addr()) 394 406 } 407 + defer func() { 408 + if err := conn.SetReadDeadline(time.Time{}); err != nil { 409 + slog.Error("failed to reset connection read deadline", "error", err, "peer", peer.Addr()) 410 + } 411 + }() 395 412 } 396 413 397 414 err := binary.Read(conn, binary.BigEndian, &action)
+217 -2
server/server_test.go
··· 18 18 topicC = "topic c" 19 19 20 20 serverAddr = ":6666" 21 + 22 + ackDelay = time.Millisecond * 100 23 + ackTimeout = time.Millisecond * 100 21 24 ) 22 25 23 26 func createServer(t *testing.T) *Server { 24 - srv, err := New(serverAddr) 27 + srv, err := New(serverAddr, ackDelay, ackTimeout) 25 28 require.NoError(t, err) 26 29 27 30 t.Cleanup(func() { ··· 35 38 srv := createServer(t) 36 39 srv.topics[topicName] = &topic{ 37 40 name: topicName, 38 - subscriptions: make(map[net.Addr]subscriber), 41 + subscriptions: make(map[net.Addr]*subscriber), 39 42 } 40 43 41 44 return srv ··· 268 271 buf := make([]byte, dataLen) 269 272 n, err := conn.Read(buf) 270 273 require.NoError(t, err) 274 + 271 275 require.Equal(t, int(dataLen), n) 272 276 273 277 assert.Equal(t, messageData, string(buf)) 278 + 279 + err = binary.Write(conn, binary.BigEndian, Ack) 280 + require.NoError(t, err) 274 281 } 275 282 } 276 283 ··· 314 321 require.Equal(t, int(dataLen), n) 315 322 316 323 results = append(results, string(buf)) 324 + 325 + err = binary.Write(subscriberConn, binary.BigEndian, Ack) 326 + require.NoError(t, err) 317 327 } 318 328 319 329 assert.ElementsMatch(t, results, messages) ··· 346 356 t.Fatal(fmt.Errorf("timed out waiting for subscriber to read messages")) 347 357 } 348 358 } 359 + 360 + func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) { 361 + _ = createServer(t) 362 + 363 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 364 + 365 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 366 + require.NoError(t, err) 367 + 368 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 369 + require.NoError(t, err) 370 + 371 + topic := fmt.Sprintf("topic:%s", topicA) 372 + messageData := "hello world" 373 + 374 + // send topic first 375 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 376 + require.NoError(t, err) 377 + _, err = publisherConn.Write([]byte(topic)) 378 + require.NoError(t, err) 379 + 380 + // now send the data 381 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 382 + require.NoError(t, err) 383 + n, err := publisherConn.Write([]byte(messageData)) 384 + require.NoError(t, err) 385 + require.Equal(t, len(messageData), n) 386 + 387 + // check the subsribers got the data 388 + readMessage := func(conn net.Conn, ack Action) { 389 + var topicLen uint64 390 + err = binary.Read(conn, binary.BigEndian, &topicLen) 391 + require.NoError(t, err) 392 + 393 + topicBuf := make([]byte, topicLen) 394 + _, err = conn.Read(topicBuf) 395 + require.NoError(t, err) 396 + assert.Equal(t, topicA, string(topicBuf)) 397 + 398 + var dataLen uint64 399 + err = binary.Read(conn, binary.BigEndian, &dataLen) 400 + require.NoError(t, err) 401 + 402 + buf := make([]byte, dataLen) 403 + n, err := conn.Read(buf) 404 + require.NoError(t, err) 405 + 406 + require.Equal(t, int(dataLen), n) 407 + 408 + assert.Equal(t, messageData, string(buf)) 409 + 410 + err = binary.Write(conn, binary.BigEndian, ack) 411 + require.NoError(t, err) 412 + } 413 + 414 + // NACK the message and then ack it 415 + readMessage(subscriberConn, Nack) 416 + readMessage(subscriberConn, Ack) 417 + // reading for another message should now timeout but give enough time for the ack delay to kick in 418 + // should the second read of the message not have been ack'd properly 419 + var topicLen uint64 420 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 421 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 422 + require.Error(t, err) 423 + } 424 + 425 + func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) { 426 + _ = createServer(t) 427 + 428 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 429 + 430 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 431 + require.NoError(t, err) 432 + 433 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 434 + require.NoError(t, err) 435 + 436 + topic := fmt.Sprintf("topic:%s", topicA) 437 + messageData := "hello world" 438 + 439 + // send topic first 440 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 441 + require.NoError(t, err) 442 + _, err = publisherConn.Write([]byte(topic)) 443 + require.NoError(t, err) 444 + 445 + // now send the data 446 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 447 + require.NoError(t, err) 448 + n, err := publisherConn.Write([]byte(messageData)) 449 + require.NoError(t, err) 450 + require.Equal(t, len(messageData), n) 451 + 452 + // check the subsribers got the data 453 + readMessage := func(conn net.Conn, ack bool) { 454 + var topicLen uint64 455 + err = binary.Read(conn, binary.BigEndian, &topicLen) 456 + require.NoError(t, err) 457 + 458 + topicBuf := make([]byte, topicLen) 459 + _, err = conn.Read(topicBuf) 460 + require.NoError(t, err) 461 + assert.Equal(t, topicA, string(topicBuf)) 462 + 463 + var dataLen uint64 464 + err = binary.Read(conn, binary.BigEndian, &dataLen) 465 + require.NoError(t, err) 466 + 467 + buf := make([]byte, dataLen) 468 + n, err := conn.Read(buf) 469 + require.NoError(t, err) 470 + 471 + require.Equal(t, int(dataLen), n) 472 + 473 + assert.Equal(t, messageData, string(buf)) 474 + 475 + if ack { 476 + err = binary.Write(conn, binary.BigEndian, Ack) 477 + require.NoError(t, err) 478 + return 479 + } 480 + } 481 + 482 + // don't send ack or nack and then ack on the second attempt 483 + readMessage(subscriberConn, false) 484 + readMessage(subscriberConn, true) 485 + 486 + // reading for another message should now timeout but give enough time for the ack delay to kick in 487 + // should the second read of the message not have been ack'd properly 488 + var topicLen uint64 489 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 490 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 491 + require.Error(t, err) 492 + } 493 + 494 + func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) { 495 + _ = createServer(t) 496 + 497 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 498 + 499 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 500 + require.NoError(t, err) 501 + 502 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 503 + require.NoError(t, err) 504 + 505 + topic := fmt.Sprintf("topic:%s", topicA) 506 + messageData := "hello world" 507 + 508 + // send topic first 509 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 510 + require.NoError(t, err) 511 + _, err = publisherConn.Write([]byte(topic)) 512 + require.NoError(t, err) 513 + 514 + // now send the data 515 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 516 + require.NoError(t, err) 517 + n, err := publisherConn.Write([]byte(messageData)) 518 + require.NoError(t, err) 519 + require.Equal(t, len(messageData), n) 520 + 521 + // check the subsribers got the data 522 + readMessage := func(conn net.Conn, ack bool) { 523 + var topicLen uint64 524 + err = binary.Read(conn, binary.BigEndian, &topicLen) 525 + require.NoError(t, err) 526 + 527 + topicBuf := make([]byte, topicLen) 528 + _, err = conn.Read(topicBuf) 529 + require.NoError(t, err) 530 + assert.Equal(t, topicA, string(topicBuf)) 531 + 532 + var dataLen uint64 533 + err = binary.Read(conn, binary.BigEndian, &dataLen) 534 + require.NoError(t, err) 535 + 536 + buf := make([]byte, dataLen) 537 + n, err := conn.Read(buf) 538 + require.NoError(t, err) 539 + 540 + require.Equal(t, int(dataLen), n) 541 + 542 + assert.Equal(t, messageData, string(buf)) 543 + 544 + if ack { 545 + err = binary.Write(conn, binary.BigEndian, Ack) 546 + require.NoError(t, err) 547 + return 548 + } 549 + } 550 + 551 + // nack the message 5 times 552 + readMessage(subscriberConn, false) 553 + readMessage(subscriberConn, false) 554 + readMessage(subscriberConn, false) 555 + readMessage(subscriberConn, false) 556 + readMessage(subscriberConn, false) 557 + 558 + // reading for the message should now timeout as we have nack'd the message too many times 559 + var topicLen uint64 560 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 561 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 562 + require.Error(t, err) 563 + }
+140
server/subscriber.go
··· 1 + package server 2 + 3 + import ( 4 + "encoding/binary" 5 + "fmt" 6 + "log/slog" 7 + "net" 8 + "time" 9 + 10 + "github.com/willdot/messagebroker/server/peer" 11 + ) 12 + 13 + type subscriber struct { 14 + peer *peer.Peer 15 + topic string 16 + messages chan message 17 + unsubscribeCh chan struct{} 18 + 19 + ackDelay time.Duration 20 + ackTimeout time.Duration 21 + } 22 + 23 + type message struct { 24 + data []byte 25 + deliveryCount int 26 + } 27 + 28 + func newMessage(data []byte) message { 29 + return message{data: data, deliveryCount: 1} 30 + } 31 + 32 + func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration) *subscriber { 33 + s := &subscriber{ 34 + peer: peer, 35 + topic: topic, 36 + messages: make(chan message), 37 + ackDelay: ackDelay, 38 + ackTimeout: ackTimeout, 39 + unsubscribeCh: make(chan struct{}), 40 + } 41 + 42 + go s.sendMessages() 43 + 44 + return s 45 + } 46 + 47 + func (s *subscriber) sendMessages() { 48 + for { 49 + select { 50 + case <-s.unsubscribeCh: 51 + return 52 + case msg := <-s.messages: 53 + ack, err := s.sendMessage(s.topic, msg) 54 + if err != nil { 55 + slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr()) 56 + } 57 + 58 + if ack { 59 + continue 60 + } 61 + 62 + if msg.deliveryCount >= 5 { 63 + slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr()) 64 + continue 65 + } 66 + 67 + msg.deliveryCount++ 68 + s.addMessage(msg, s.ackDelay) 69 + } 70 + } 71 + } 72 + 73 + func (s *subscriber) addMessage(msg message, delay time.Duration) { 74 + go func() { 75 + timer := time.NewTimer(delay) 76 + defer timer.Stop() 77 + 78 + select { 79 + case <-s.unsubscribeCh: 80 + return 81 + case <-timer.C: 82 + s.messages <- msg 83 + } 84 + }() 85 + } 86 + 87 + func (s *subscriber) sendMessage(topic string, msg message) (bool, error) { 88 + var ack bool 89 + op := func(conn net.Conn) error { 90 + topicLen := uint64(len(topic)) 91 + err := binary.Write(conn, binary.BigEndian, topicLen) 92 + if err != nil { 93 + return fmt.Errorf("failed to send topic length: %w", err) 94 + } 95 + _, err = conn.Write([]byte(topic)) 96 + if err != nil { 97 + return fmt.Errorf("failed to send topic: %w", err) 98 + } 99 + 100 + dataLen := uint64(len(msg.data)) 101 + 102 + err = binary.Write(conn, binary.BigEndian, dataLen) 103 + if err != nil { 104 + return fmt.Errorf("failed to send data length: %w", err) 105 + } 106 + 107 + _, err = conn.Write(msg.data) 108 + if err != nil { 109 + return fmt.Errorf("failed to write to peer: %w", err) 110 + } 111 + 112 + var ackRes Action 113 + if err := conn.SetReadDeadline(time.Now().Add(s.ackTimeout)); err != nil { 114 + slog.Error("failed to set connection read deadline", "error", err, "peer", s.peer.Addr()) 115 + } 116 + defer func() { 117 + if err := conn.SetReadDeadline(time.Time{}); err != nil { 118 + slog.Error("failed to reset connection read deadline", "error", err, "peer", s.peer.Addr()) 119 + } 120 + }() 121 + err = binary.Read(conn, binary.BigEndian, &ackRes) 122 + if err != nil { 123 + return fmt.Errorf("failed to read ack from peer: %w", err) 124 + } 125 + 126 + if ackRes == Ack { 127 + ack = true 128 + } 129 + 130 + return nil 131 + } 132 + 133 + err := s.peer.RunConnOperation(op) 134 + 135 + return ack, err 136 + } 137 + 138 + func (s *subscriber) unsubscribe() { 139 + close(s.unsubscribeCh) 140 + }
+3 -57
server/topic.go
··· 1 1 package server 2 2 3 3 import ( 4 - "encoding/binary" 5 - "fmt" 6 - "log/slog" 7 4 "net" 8 5 "sync" 9 - 10 - "github.com/willdot/messagebroker/server/peer" 11 6 ) 12 7 13 8 type topic struct { 14 9 name string 15 - subscriptions map[net.Addr]subscriber 10 + subscriptions map[net.Addr]*subscriber 16 11 mu sync.Mutex 17 12 } 18 13 19 - type subscriber struct { 20 - peer *peer.Peer 21 - currentOffset int 22 - } 23 - 24 14 func newTopic(name string) *topic { 25 15 return &topic{ 26 16 name: name, 27 - subscriptions: make(map[net.Addr]subscriber), 17 + subscriptions: make(map[net.Addr]*subscriber), 28 18 } 29 19 } 30 20 ··· 33 23 subscribers := t.subscriptions 34 24 t.mu.Unlock() 35 25 36 - var wg sync.WaitGroup 37 - 38 26 for _, subscriber := range subscribers { 39 - wg.Add(1) 40 - sub := subscriber 41 - go func() { 42 - defer wg.Done() 43 - sendMessage(sub, t.name, msgData) 44 - }() 45 - } 46 - 47 - wg.Wait() 48 - } 49 - 50 - func sendMessage(sub subscriber, topicName string, message []byte) { 51 - err := sub.peer.RunConnOperation(sendMessageOp(topicName, message)) 52 - if err != nil { 53 - slog.Error("failed to send to message", "error", err, "peer", sub.peer.Addr()) 54 - return 55 - } 56 - } 57 - 58 - func sendMessageOp(topic string, data []byte) peer.ConnOpp { 59 - return func(conn net.Conn) error { 60 - topicLen := uint64(len(topic)) 61 - err := binary.Write(conn, binary.BigEndian, topicLen) 62 - if err != nil { 63 - return fmt.Errorf("failed to send topic length: %w", err) 64 - } 65 - _, err = conn.Write([]byte(topic)) 66 - if err != nil { 67 - return fmt.Errorf("failed to send topic: %w", err) 68 - } 69 - 70 - dataLen := uint64(len(data)) 71 - 72 - err = binary.Write(conn, binary.BigEndian, dataLen) 73 - if err != nil { 74 - return fmt.Errorf("failed to send data length: %w", err) 75 - } 76 - 77 - _, err = conn.Write(data) 78 - if err != nil { 79 - return fmt.Errorf("failed to write to peer: %w", err) 80 - } 81 - return nil 27 + subscriber.addMessage(newMessage(msgData), 0) 82 28 } 83 29 }