tangled
alpha
login
or
join now
willdot.net
/
message-broker
2
fork
atom
An experimental pub/sub client and server project.
2
fork
atom
overview
issues
pulls
pipelines
some tweaks and typos
willdot.net
2 years ago
2d9f92db
215cfc23
+20
-47
4 changed files
expand all
collapse all
unified
split
server
message_store.go
server.go
subscriber.go
topic.go
+8
-11
server/message_store.go
···
1
1
package server
2
2
3
3
import (
4
4
-
"fmt"
5
4
"sync"
6
5
)
7
6
8
7
// Memory store allows messages to be stored in memory
9
8
type MemoryStore struct {
10
10
-
mu sync.Mutex
11
11
-
msgs map[int]message
12
12
-
offset int
9
9
+
mu sync.Mutex
10
10
+
msgs map[int]message
11
11
+
nextOffset int
13
12
}
14
13
15
14
// New memory store initializes a new in memory store
···
24
23
m.mu.Lock()
25
24
defer m.mu.Unlock()
26
25
27
27
-
m.msgs[m.offset] = msg
26
26
+
m.msgs[m.nextOffset] = msg
28
27
29
29
-
m.offset++
28
28
+
m.nextOffset++
30
29
31
30
return nil
32
31
}
33
32
34
33
// ReadFrom will read messages from (and including) the provided offset and pass them to the provided handler
35
35
-
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) error {
36
36
-
if offset < 0 || offset > m.offset {
37
37
-
return fmt.Errorf("invalid offset provided")
34
34
+
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) {
35
35
+
if offset < 0 || offset >= m.nextOffset {
36
36
+
return
38
37
}
39
38
40
39
m.mu.Lock()
···
43
42
for i := offset; i < len(m.msgs); i++ {
44
43
handleFunc(m.msgs[i])
45
44
}
46
46
-
47
47
-
return nil
48
45
}
+4
-22
server/server.go
···
27
27
Nack Action = 5
28
28
)
29
29
30
30
-
func (a Action) String() string {
31
31
-
switch a {
32
32
-
case Subscribe:
33
33
-
return "subscribe"
34
34
-
case Unsubscribe:
35
35
-
return "unsubscribe"
36
36
-
case Publish:
37
37
-
return "publish"
38
38
-
case Ack:
39
39
-
return "ack"
40
40
-
case Nack:
41
41
-
return "nack"
42
42
-
}
43
43
-
44
44
-
return ""
45
45
-
}
46
46
-
47
30
// Status represents the status of a request
48
31
type Status uint16
49
32
···
70
53
type StartAtType uint16
71
54
72
55
const (
73
73
-
Begining StartAtType = 0
74
74
-
Current StartAtType = 1
75
75
-
From StartAtType = 2
56
56
+
Beginning StartAtType = 0
57
57
+
Current StartAtType = 1
58
58
+
From StartAtType = 2
76
59
)
77
60
78
61
// Server accepts subscribe and publish connections and passes messages around
···
234
217
var startAt int
235
218
switch startAtType {
236
219
case From:
237
237
-
// read the from
238
220
var s uint16
239
221
err = binary.Read(conn, binary.BigEndian, &s)
240
222
if err != nil {
···
243
225
return nil
244
226
}
245
227
startAt = int(s)
246
246
-
case Begining:
228
228
+
case Beginning:
247
229
startAt = 0
248
230
case Current:
249
231
startAt = -1
+7
-13
server/subscriber.go
···
41
41
42
42
go s.sendMessages()
43
43
44
44
-
offset := startAt
45
45
-
46
44
go func() {
47
47
-
if startAt < 0 {
48
48
-
return
49
49
-
}
50
50
-
51
51
-
err := topic.messageStore.ReadFrom(offset, func(msg message) {
52
52
-
s.messages <- msg
45
45
+
topic.messageStore.ReadFrom(startAt, func(msg message) {
46
46
+
select {
47
47
+
case s.messages <- msg:
48
48
+
return
49
49
+
case <-s.unsubscribeCh:
50
50
+
return
51
51
+
}
53
52
})
54
54
-
if err != nil {
55
55
-
slog.Error("failed to replay messages from offset", "error", err, "offset", offset)
56
56
-
}
57
53
}()
58
54
59
55
return s
···
94
90
case <-s.unsubscribeCh:
95
91
return
96
92
case <-timer.C:
97
97
-
fmt.Printf("waiting to put message on queue: %s\n", msg.data)
98
93
s.messages <- msg
99
99
-
fmt.Printf("put message on queue: %s\n", msg.data)
100
94
}
101
95
}()
102
96
}
+1
-1
server/topic.go
···
8
8
9
9
type Store interface {
10
10
Write(msg message) error
11
11
-
ReadFrom(offset int, handleFunc func(msg message)) error
11
11
+
ReadFrom(offset int, handleFunc func(msg message))
12
12
}
13
13
14
14
type topic struct {