tangled
alpha
login
or
join now
willdot.net
/
message-broker
2
fork
atom
An experimental pub/sub client and server project.
2
fork
atom
overview
issues
pulls
pipelines
Refactor so that resp status are sent back to connections
willdot.net
2 years ago
731d8366
b8557aed
+133
-40
4 changed files
expand all
collapse all
unified
split
pubsub
subscriber.go
server
peer.go
server.go
server_test.go
+18
-6
pubsub/subscriber.go
···
56
56
if err != nil {
57
57
return fmt.Errorf("failed to subscribe to topics: %w", err)
58
58
}
59
59
-
buf := make([]byte, 512)
60
60
-
_, err = s.conn.Read(buf)
59
59
+
60
60
+
var resp server.Status
61
61
+
err = binary.Read(s.conn, binary.BigEndian, &resp)
61
62
if err != nil {
62
63
return fmt.Errorf("failed to read confirmation of subscription: %w", err)
63
64
}
64
65
65
65
-
// TODO: this is soooo hacky - need to have some sort of response code
66
66
-
if string(buf[:10]) != "subscribed" {
67
67
-
return fmt.Errorf("failed to subscribe: '%s'", string(buf))
66
66
+
if resp == server.Subscribed {
67
67
+
return nil
68
68
}
69
69
70
70
-
return nil
70
70
+
var dataLen uint32
71
71
+
err = binary.Read(s.conn, binary.BigEndian, &dataLen)
72
72
+
if err != nil {
73
73
+
return fmt.Errorf("received status %s:", resp)
74
74
+
}
75
75
+
76
76
+
buf := make([]byte, dataLen)
77
77
+
_, err = s.conn.Read(buf)
78
78
+
if err != nil {
79
79
+
return fmt.Errorf("received status %s:", resp)
80
80
+
}
81
81
+
82
82
+
return fmt.Errorf("received status %s - %s", resp, buf)
71
83
}
72
84
73
85
// Consumer allows the consumption of messages. It is thread safe to range over the Msgs channel to consume. If during the consumer
+48
server/peer.go
···
3
3
import (
4
4
"encoding/binary"
5
5
"fmt"
6
6
+
"log/slog"
6
7
"net"
7
8
)
8
9
···
49
50
50
51
return dataLen, nil
51
52
}
53
53
+
54
54
+
// Status represents the status of a request
55
55
+
type Status uint8
56
56
+
57
57
+
const (
58
58
+
Subscribed = 1
59
59
+
Unsubscribed = 2
60
60
+
Error = 3
61
61
+
)
62
62
+
63
63
+
func (s Status) String() string {
64
64
+
switch s {
65
65
+
case Subscribed:
66
66
+
return "subsribed"
67
67
+
case Unsubscribed:
68
68
+
return "unsubscribed"
69
69
+
case Error:
70
70
+
return "error"
71
71
+
}
72
72
+
73
73
+
return ""
74
74
+
}
75
75
+
76
76
+
func (p *peer) writeStatus(status Status, message string) {
77
77
+
err := binary.Write(p.conn, binary.BigEndian, status)
78
78
+
if err != nil {
79
79
+
slog.Error("failed to write status to peers connection", "error", err, "peer", p.addr())
80
80
+
return
81
81
+
}
82
82
+
83
83
+
if message == "" {
84
84
+
return
85
85
+
}
86
86
+
87
87
+
msgBytes := []byte(message)
88
88
+
err = binary.Write(p.conn, binary.BigEndian, uint32(len(msgBytes)))
89
89
+
if err != nil {
90
90
+
slog.Error("failed to write message length to peers connection", "error", err, "peer", p.addr())
91
91
+
return
92
92
+
}
93
93
+
94
94
+
_, err = p.conn.Write(msgBytes)
95
95
+
if err != nil {
96
96
+
slog.Error("failed to write message to peers connection", "error", err, "peer", p.addr())
97
97
+
return
98
98
+
}
99
99
+
}
+28
-15
server/server.go
···
84
84
s.handlePublish(peer)
85
85
default:
86
86
slog.Error("unknown action", "action", action, "peer", peer.addr())
87
87
-
_, _ = peer.Write([]byte("unknown action"))
87
87
+
peer.writeStatus(Error, "unknown action")
88
88
+
//_, _ = peer.Write([]byte("unknown action"))
88
89
}
89
90
}
90
91
···
122
123
dataLen, err := peer.readDataLength()
123
124
if err != nil {
124
125
slog.Error(err.Error(), "peer", peer.addr())
125
125
-
_, _ = peer.Write([]byte("invalid data length of topics provided"))
126
126
+
peer.writeStatus(Error, "invalid data length of topics provided")
127
127
+
// _, _ = peer.Write([]byte("invalid data length of topics provided"))
126
128
return
127
129
}
128
130
if dataLen == 0 {
129
129
-
_, _ = peer.Write([]byte("data length of topics is 0"))
131
131
+
peer.writeStatus(Error, "data length of topics is 0")
132
132
+
// _, _ = peer.Write([]byte("data length of topics is 0"))
130
133
return
131
134
}
132
135
···
134
137
_, err = peer.Read(buf)
135
138
if err != nil {
136
139
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
137
137
-
_, _ = peer.Write([]byte("failed to read topic data"))
140
140
+
peer.writeStatus(Error, "failed to read topic data")
141
141
+
//_, _ = peer.Write([]byte("failed to read topic data"))
138
142
return
139
143
}
140
144
···
142
146
err = json.Unmarshal(buf, &topics)
143
147
if err != nil {
144
148
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
145
145
-
_, _ = peer.Write([]byte("invalid topic data provided"))
149
149
+
peer.writeStatus(Error, "invalid topic data provided")
150
150
+
//_, _ = peer.Write([]byte("invalid topic data provided"))
146
151
return
147
152
}
148
153
149
154
s.subscribeToTopics(peer, topics)
150
150
-
_, _ = peer.Write([]byte("subscribed"))
155
155
+
//_, _ = peer.Write([]byte("subscribed"))
156
156
+
peer.writeStatus(Subscribed, "")
151
157
}
152
158
153
159
func (s *Server) handleUnsubscribe(peer peer) {
···
155
161
dataLen, err := peer.readDataLength()
156
162
if err != nil {
157
163
slog.Error(err.Error(), "peer", peer.addr())
158
158
-
_, _ = peer.Write([]byte("invalid data length of topics provided"))
164
164
+
peer.writeStatus(Error, "invalid data length of topics provided")
165
165
+
//_, _ = peer.Write([]byte("invalid data length of topics provided"))
159
166
return
160
167
}
161
168
if dataLen == 0 {
162
162
-
_, _ = peer.Write([]byte("data length of topics is 0"))
169
169
+
peer.writeStatus(Error, "data length of topics is 0")
170
170
+
//_, _ = peer.Write([]byte("data length of topics is 0"))
163
171
return
164
172
}
165
173
···
167
175
_, err = peer.Read(buf)
168
176
if err != nil {
169
177
slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr())
170
170
-
_, _ = peer.Write([]byte("failed to read topic data"))
178
178
+
peer.writeStatus(Error, "failed to read topic data")
179
179
+
//_, _ = peer.Write([]byte("failed to read topic data"))
171
180
return
172
181
}
173
182
···
175
184
err = json.Unmarshal(buf, &topics)
176
185
if err != nil {
177
186
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr())
178
178
-
_, _ = peer.Write([]byte("invalid topic data provided"))
187
187
+
peer.writeStatus(Error, "invalid topic data provided")
188
188
+
//_, _ = peer.Write([]byte("invalid topic data provided"))
179
189
return
180
190
}
181
191
182
192
s.unsubscribeToTopics(peer, topics)
183
183
-
184
184
-
_, _ = peer.Write([]byte("unsubscribed"))
193
193
+
peer.writeStatus(Unsubscribed, "")
194
194
+
//_, _ = peer.Write([]byte("unsubscribed"))
185
195
}
186
196
187
197
func (s *Server) handlePublish(peer peer) {
···
189
199
dataLen, err := peer.readDataLength()
190
200
if err != nil {
191
201
slog.Error(err.Error(), "peer", peer.addr())
192
192
-
_, _ = peer.Write([]byte("invalid data length of data provided"))
202
202
+
peer.writeStatus(Error, "invalid data length of data provided")
203
203
+
//_, _ = peer.Write([]byte("invalid data length of data provided"))
193
204
return
194
205
}
195
206
if dataLen == 0 {
···
199
210
buf := make([]byte, dataLen)
200
211
_, err = peer.Read(buf)
201
212
if err != nil {
202
202
-
_, _ = peer.Write([]byte("failed to read data"))
203
213
slog.Error("failed to read data from peer", "error", err, "peer", peer.addr())
214
214
+
peer.writeStatus(Error, "failed to read data")
215
215
+
//_, _ = peer.Write([]byte("failed to read data"))
204
216
return
205
217
}
206
218
207
219
var msg messagebroker.Message
208
220
err = json.Unmarshal(buf, &msg)
209
221
if err != nil {
210
210
-
_, _ = peer.Write([]byte("invalid message"))
222
222
+
peer.writeStatus(Error, "invalid message")
223
223
+
//_, _ = peer.Write([]byte("invalid message"))
211
224
slog.Error("failed to unmarshal data to message", "error", err, "peer", peer.addr())
212
225
continue
213
226
}
+39
-19
server/server_test.go
···
51
51
_, err = conn.Write(rawTopics)
52
52
require.NoError(t, err)
53
53
54
54
-
expectedRes := "subscribed"
54
54
+
expectedRes := Subscribed
55
55
56
56
-
buf := make([]byte, len(expectedRes))
57
57
-
n, err := conn.Read(buf)
58
58
-
require.NoError(t, err)
59
59
-
require.Equal(t, len(expectedRes), n)
56
56
+
var resp Status
57
57
+
err = binary.Read(conn, binary.BigEndian, &resp)
60
58
61
61
-
assert.Equal(t, expectedRes, string(buf))
59
59
+
assert.Equal(t, expectedRes, int(resp))
62
60
63
61
return conn
64
62
}
···
98
96
_, err = conn.Write(rawTopics)
99
97
require.NoError(t, err)
100
98
101
101
-
expectedRes := "unsubscribed"
99
99
+
expectedRes := Unsubscribed
102
100
103
103
-
buf := make([]byte, len(expectedRes))
104
104
-
n, err := conn.Read(buf)
105
105
-
require.NoError(t, err)
106
106
-
require.Equal(t, len(expectedRes), n)
101
101
+
var resp Status
102
102
+
err = binary.Read(conn, binary.BigEndian, &resp)
107
103
108
108
-
assert.Equal(t, expectedRes, string(buf))
104
104
+
assert.Equal(t, expectedRes, int(resp))
109
105
110
106
assert.Len(t, srv.topics, 3)
111
107
assert.Len(t, srv.topics["topic a"].subscriptions, 0)
···
154
150
err = binary.Write(conn, binary.BigEndian, uint8(99))
155
151
require.NoError(t, err)
156
152
157
157
-
expectedRes := "unknown action"
153
153
+
expectedRes := Error
154
154
+
155
155
+
var resp Status
156
156
+
err = binary.Read(conn, binary.BigEndian, &resp)
157
157
+
158
158
+
assert.Equal(t, expectedRes, int(resp))
158
159
159
159
-
buf := make([]byte, len(expectedRes))
160
160
-
n, err := conn.Read(buf)
160
160
+
expectedMessage := "unknown action"
161
161
+
162
162
+
var dataLen uint32
163
163
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
164
164
+
assert.Equal(t, len(expectedMessage), int(dataLen))
165
165
+
166
166
+
buf := make([]byte, dataLen)
167
167
+
_, err = conn.Read(buf)
161
168
require.NoError(t, err)
162
162
-
require.Equal(t, len(expectedRes), n)
163
169
164
164
-
assert.Equal(t, expectedRes, string(buf))
170
170
+
assert.Equal(t, expectedMessage, string(buf))
165
171
}
166
172
167
173
func TestInvalidMessagePublished(t *testing.T) {
···
183
189
require.NoError(t, err)
184
190
require.Equal(t, len(data), n)
185
191
186
186
-
buf := make([]byte, 15)
192
192
+
expectedRes := Error
193
193
+
194
194
+
var resp Status
195
195
+
err = binary.Read(publisherConn, binary.BigEndian, &resp)
196
196
+
197
197
+
assert.Equal(t, expectedRes, int(resp))
198
198
+
199
199
+
expectedMessage := "invalid message"
200
200
+
201
201
+
var dataLen uint32
202
202
+
err = binary.Read(publisherConn, binary.BigEndian, &dataLen)
203
203
+
assert.Equal(t, len(expectedMessage), int(dataLen))
204
204
+
205
205
+
buf := make([]byte, dataLen)
187
206
_, err = publisherConn.Read(buf)
188
207
require.NoError(t, err)
189
189
-
assert.Equal(t, "invalid message", string(buf))
208
208
+
209
209
+
assert.Equal(t, expectedMessage, string(buf))
190
210
}
191
211
192
212
func TestSendsDataToTopicSubscribers(t *testing.T) {