An experimental pub/sub client and server project.

comments

+13 -7
+7 -1
server/peer/peer.go
··· 5 5 "sync" 6 6 ) 7 7 8 + // Peer represents a remote connection to the server such as a publisher or subscriber 8 9 type Peer struct { 9 10 conn net.Conn 10 11 connMu sync.Mutex 11 12 } 12 13 14 + // New returns a new peer 13 15 func New(conn net.Conn) *Peer { 14 16 return &Peer{ 15 17 conn: conn, 16 18 } 17 19 } 18 20 21 + // Addr returns the peers connections address 19 22 func (p *Peer) Addr() net.Addr { 20 23 return p.conn.RemoteAddr() 21 24 } 22 25 26 + // ConnOpp represents a set of actions on a connection that can be used synchrnously 23 27 type ConnOpp func(conn net.Conn) error 24 28 25 - func (p *Peer) ConnOperation(op ConnOpp) error { 29 + // RunConnOperation will run the provided operation. It ensures that it is the only operation that is being 30 + // run on the connection to ensure any other operations don't get mixed up. 31 + func (p *Peer) RunConnOperation(op ConnOpp) error { 26 32 p.connMu.Lock() 27 33 defer p.connMu.Unlock() 28 34
+5 -5
server/server.go
··· 185 185 return nil 186 186 } 187 187 188 - _ = peer.ConnOperation(op) 188 + _ = peer.RunConnOperation(op) 189 189 } 190 190 191 191 func (s *Server) handleUnsubscribe(peer *peer.Peer) { ··· 224 224 return nil 225 225 } 226 226 227 - _ = peer.ConnOperation(op) 227 + _ = peer.RunConnOperation(op) 228 228 } 229 229 230 230 type messageToSend struct { ··· 287 287 return nil 288 288 } 289 289 290 - _ = peer.ConnOperation(op) 290 + _ = peer.RunConnOperation(op) 291 291 292 292 if message == nil { 293 293 continue ··· 377 377 return nil 378 378 } 379 379 380 - err := peer.ConnOperation(op) 380 + err := peer.RunConnOperation(op) 381 381 if err != nil { 382 382 return 0, fmt.Errorf("failed to read action from peer: %w", err) 383 383 } ··· 391 391 return nil 392 392 } 393 393 394 - _ = peer.ConnOperation(op) 394 + _ = peer.RunConnOperation(op) 395 395 } 396 396 397 397 func dataLength(conn net.Conn) (uint32, error) {
+1 -1
server/topic.go
··· 42 42 t.mu.Unlock() 43 43 44 44 for addr, subscriber := range subscribers { 45 - err := subscriber.peer.ConnOperation(sendMessageOp(t.name, msgData)) 45 + err := subscriber.peer.RunConnOperation(sendMessageOp(t.name, msgData)) 46 46 if err != nil { 47 47 slog.Error("failed to send to message", "error", err, "peer", addr) 48 48 return