···18181919### Sending data via a connection
20202121-When sending a message representing an action (subscribe, publish etc) then a uint8 binary message is sent.
2121+When sending a message representing an action (subscribe, publish etc) then a uint16 binary message is sent.
22222323When sending any other data, the length of the data is to be sent first using a binary uint32 and then the actual data sent afterwards.
+1
example/main.go
···41414242 for msg := range consumer.Messages() {
4343 slog.Info("received message", "message", string(msg.Data))
4444+ msg.Ack(true)
4445 }
45464647}
+8-13
pubsub/publisher.go
···4343 op := func(conn net.Conn) error {
4444 // send topic first
4545 topic := fmt.Sprintf("topic:%s", message.Topic)
4646- err := binary.Write(p.conn, binary.BigEndian, uint32(len(topic)))
4747- if err != nil {
4848- return fmt.Errorf("failed to write topic size to server")
4949- }
4646+4747+ topicLenB := make([]byte, 4)
4848+ binary.BigEndian.PutUint32(topicLenB, uint32(len(topic)))
50495151- _, err = p.conn.Write([]byte(topic))
5252- if err != nil {
5353- return fmt.Errorf("failed to write topic to server")
5454- }
5050+ headers := append(topicLenB, []byte(topic)...)
55515656- err = binary.Write(p.conn, binary.BigEndian, uint32(len(message.Data)))
5757- if err != nil {
5858- return fmt.Errorf("failed to write message size to server")
5959- }
5252+ messageLenB := make([]byte, 4)
5353+ binary.BigEndian.PutUint32(messageLenB, uint32(len(message.Data)))
5454+ headers = append(headers, messageLenB...)
60556161- _, err = p.conn.Write(message.Data)
5656+ _, err := conn.Write(append(headers, message.Data...))
6257 if err != nil {
6358 return fmt.Errorf("failed to publish data to server")
6459 }
+14-20
pubsub/subscriber.go
···4141// SubscribeToTopics will subscribe to the provided topics
4242func (s *Subscriber) SubscribeToTopics(topicNames []string) error {
4343 op := func(conn net.Conn) error {
4444- err := binary.Write(conn, binary.BigEndian, server.Subscribe)
4545- if err != nil {
4646- return fmt.Errorf("failed to subscribe: %w", err)
4747- }
4444+ actionB := make([]byte, 2)
4545+ binary.BigEndian.PutUint16(actionB, server.Subscribed)
4646+ headers := actionB
48474948 b, err := json.Marshal(topicNames)
5049 if err != nil {
5150 return fmt.Errorf("failed to marshal topic names: %w", err)
5251 }
53525454- err = binary.Write(conn, binary.BigEndian, uint32(len(b)))
5555- if err != nil {
5656- return fmt.Errorf("failed to write topic data length: %w", err)
5757- }
5353+ topicNamesB := make([]byte, 4)
5454+ binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
5555+ headers = append(headers, topicNamesB...)
58565959- _, err = conn.Write(b)
5757+ _, err = conn.Write(append(headers, b...))
6058 if err != nil {
6159 return fmt.Errorf("failed to subscribe to topics: %w", err)
6260 }
···9290// UnsubscribeToTopics will unsubscribe to the provided topics
9391func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
9492 op := func(conn net.Conn) error {
9595- err := binary.Write(conn, binary.BigEndian, server.Unsubscribe)
9696- if err != nil {
9797- return fmt.Errorf("failed to unsubscribe: %w", err)
9898- }
9393+ actionB := make([]byte, 2)
9494+ binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
9595+ headers := actionB
999610097 b, err := json.Marshal(topicNames)
10198 if err != nil {
10299 return fmt.Errorf("failed to marshal topic names: %w", err)
103100 }
104101105105- err = binary.Write(conn, binary.BigEndian, uint32(len(b)))
106106- if err != nil {
107107- return fmt.Errorf("failed to write topic data length: %w", err)
108108- }
102102+ topicNamesB := make([]byte, 4)
103103+ binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
104104+ headers = append(headers, topicNamesB...)
109105110110- _, err = conn.Write(b)
106106+ _, err = conn.Write(append(headers, b...))
111107 if err != nil {
112108 return fmt.Errorf("failed to unsubscribe to topics: %w", err)
113109 }
···230226 return ctx.Err()
231227 case ack = <-msg.ack:
232228 }
233233- //ack := <-msg.ack
234234-235229 ackMessage := server.Nack
236230 if ack {
237231 ackMessage = server.Ack
+14-19
server/server.go
···1717)
18181919// Action represents the type of action that a peer requests to do
2020-type Action uint8
2020+type Action uint16
21212222const (
2323 Subscribe Action = 1
···2828)
29293030// Status represents the status of a request
3131-type Status uint8
3131+type Status uint16
32323333const (
3434 Subscribed = 1
···445445}
446446447447func writeStatus(status Status, message string, conn net.Conn) {
448448- err := binary.Write(conn, binary.BigEndian, status)
449449- if err != nil {
450450- if !errors.Is(err, syscall.EPIPE) {
451451- slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr())
452452- }
453453- return
454454- }
448448+ statusB := make([]byte, 2)
449449+ binary.BigEndian.PutUint16(statusB, uint16(status))
450450+451451+ headers := statusB
455452456456- if message == "" {
457457- return
453453+ if len(message) > 0 {
454454+ sizeB := make([]byte, 4)
455455+ binary.BigEndian.PutUint32(sizeB, uint32(len(message)))
456456+ headers = append(headers, sizeB...)
458457 }
459458460459 msgBytes := []byte(message)
461461- err = binary.Write(conn, binary.BigEndian, uint32(len(msgBytes)))
462462- if err != nil {
463463- slog.Error("failed to write message length to peers connection", "error", err, "peer", conn.RemoteAddr())
464464- return
465465- }
466466-467467- _, err = conn.Write(msgBytes)
460460+ _, err := conn.Write(append(headers, msgBytes...))
468461 if err != nil {
469469- slog.Error("failed to write message to peers connection", "error", err, "peer", conn.RemoteAddr())
462462+ if !errors.Is(err, syscall.EPIPE) {
463463+ slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr())
464464+ }
470465 return
471466 }
472467}