From 363d524a01d681308c0f848ab50fb234c367949e Mon Sep 17 00:00:00 2001 From: myzhan Date: Tue, 29 Aug 2023 15:47:38 +0800 Subject: [PATCH] fix: remove flaky tests that need to listen on addr --- .github/workflows/unittest.yml | 2 +- boomer.go | 2 + boomer_test.go | 49 ++++--- client_gomq.go | 85 ++++++++++++- client_gomq_test.go | 225 ++++++--------------------------- go.mod | 10 +- go.sum | 58 +++------ runner_test.go | 32 ++--- 8 files changed, 182 insertions(+), 281 deletions(-) diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index def3915..378ae3a 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - os: [macos-latest, windows-latest] + os: [macos-latest, windows-latest, ubuntu-latest] runs-on: ${{ matrix.os }} steps: - name: Install Go diff --git a/boomer.go b/boomer.go index 61e7f88..98741a8 100644 --- a/boomer.go +++ b/boomer.go @@ -115,12 +115,14 @@ func (b *Boomer) Run(tasks ...*Task) { switch b.mode { case DistributedMode: b.slaveRunner = newSlaveRunner(b.masterHost, b.masterPort, tasks, b.rateLimiter) + println("new slave runner") for _, o := range b.outputs { b.slaveRunner.addOutput(o) } b.slaveRunner.run() case StandaloneMode: b.localRunner = newLocalRunner(tasks, b.rateLimiter, b.spawnCount, b.spawnRate) + println("new local runner") for _, o := range b.outputs { b.localRunner.addOutput(o) } diff --git a/boomer_test.go b/boomer_test.go index a9afb26..ef0454f 100644 --- a/boomer_test.go +++ b/boomer_test.go @@ -2,14 +2,13 @@ package boomer import ( "flag" - "log" "math" - "math/rand" "os" "runtime" "sync/atomic" "time" + "github.com/myzhan/gomq/zmtp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -95,13 +94,8 @@ var _ = Describe("Test Boomer", func() { }) It("test distributed run", func() { - masterHost := "0.0.0.0" - rand.Seed(Now()) - masterPort := rand.Intn(1000) + 10240 - - server := newTestServer(masterHost, masterPort) - server.start() - defer server.close() + masterHost := "mock:0.0.0.0" + masterPort := 10240 b := NewBoomer(masterHost, masterPort) @@ -116,15 +110,21 @@ var _ = Describe("Test Boomer", func() { b.Run(taskA) defer b.Quit() - server.toClient <- newGenericMessage("spawn", map[string]interface{}{ + serverMessage := newGenericMessage("spawn", map[string]interface{}{ "user_classes_count": map[interface{}]interface{}{ "Dummy": int64(5), "Dummy2": int64(5), }, }, b.slaveRunner.nodeID) + serverMessageInBytes, _ := serverMessage.serialize() + serverZmtpMessage := &zmtp.Message{ + MessageType: zmtp.UserMessage, + Body: [][]byte{serverMessageInBytes}, + } + MockGomqDealerInstance.RecvChannel() <- serverZmtpMessage time.Sleep(4 * time.Second) - Expect(count).To(BeEquivalentTo(10)) + Expect(count).Should(BeEquivalentTo(10)) }) It("test run tasks for test", func() { @@ -181,17 +181,8 @@ var _ = Describe("Test Boomer", func() { It("test run", func() { flag.Parse() - - masterHost = "0.0.0.0" - rand.Seed(Now()) - masterPort = rand.Intn(1000) + 10240 - - server := newTestServer(masterHost, masterPort) - - log.Printf("Starting to serve on %s:%d\n", masterHost, masterPort) - server.start() - defer server.close() - + masterHost = "mock:0.0.0.0" + masterPort = 1234 count := int64(0) taskA := &Task{ Name: "increaseCount", @@ -202,19 +193,23 @@ var _ = Describe("Test Boomer", func() { } go Run(taskA) - time.Sleep(20 * time.Millisecond) + time.Sleep(50 * time.Millisecond) + defer defaultBoomer.Quit() - server.toClient <- newGenericMessage("spawn", map[string]interface{}{ + serverMessage := newGenericMessage("spawn", map[string]interface{}{ "user_classes_count": map[interface{}]interface{}{ "Dummy": int64(5), "Dummy2": int64(5), }, }, defaultBoomer.slaveRunner.nodeID) + serverMessageInBytes, _ := serverMessage.serialize() + serverZmtpMessage := &zmtp.Message{ + MessageType: zmtp.UserMessage, + Body: [][]byte{serverMessageInBytes}, + } + MockGomqDealerInstance.RecvChannel() <- serverZmtpMessage time.Sleep(4 * time.Second) - - defaultBoomer.Quit() - Expect(count).To(BeEquivalentTo(10)) }) diff --git a/client_gomq.go b/client_gomq.go index ed8616b..d63c0d9 100644 --- a/client_gomq.go +++ b/client_gomq.go @@ -1,3 +1,4 @@ +//go:build !goczmq // +build !goczmq package boomer @@ -5,11 +6,87 @@ package boomer import ( "fmt" "log" + "strings" + "time" "github.com/myzhan/gomq" "github.com/myzhan/gomq/zmtp" ) +type MockGomqDealer struct { + connectErr error + sendChannel chan []byte + receiveChannel chan *zmtp.Message +} + +var MockGomqDealerInstance *MockGomqDealer = &MockGomqDealer{ + sendChannel: make(chan []byte, 10), + receiveChannel: make(chan *zmtp.Message, 10), +} + +func (m *MockGomqDealer) SetConnectError(err error) { + m.connectErr = err +} + +func (m *MockGomqDealer) Connect(add string) (err error) { + if m.connectErr != nil { + return m.connectErr + } + return nil +} + +func (m *MockGomqDealer) AddConnection(*gomq.Connection) { + +} + +func (m *MockGomqDealer) RemoveConnection(string) { +} + +func (m *MockGomqDealer) SendChannel() chan []byte { + return m.sendChannel +} + +func (m *MockGomqDealer) Send(payload []byte) (err error) { + m.sendChannel <- payload + return nil +} + +func (m *MockGomqDealer) SendMultipart(payload [][]byte) (err error) { + return nil +} + +func (m *MockGomqDealer) RecvChannel() chan *zmtp.Message { + return m.receiveChannel +} + +func (m *MockGomqDealer) Recv() ([]byte, error) { + return nil, nil +} + +func (m *MockGomqDealer) RecvMultipart() ([][]byte, error) { + return nil, nil +} + +func (m *MockGomqDealer) Close() { + +} + +func (m *MockGomqDealer) RetryInterval() time.Duration { + return time.Second +} + +func (m *MockGomqDealer) SocketType() zmtp.SocketType { + return zmtp.DealerSocketType +} + +func (m *MockGomqDealer) SocketIdentity() zmtp.SocketIdentity { + return nil +} + +func (m *MockGomqDealer) SecurityMechanism() zmtp.SecurityMechanism { + return nil +} + type gomqSocketClient struct { masterHost string masterPort int @@ -39,7 +116,13 @@ func newClient(masterHost string, masterPort int, identity string) (client *gomq func (c *gomqSocketClient) connect() (err error) { addr := fmt.Sprintf("tcp://%s:%d", c.masterHost, c.masterPort) - c.dealerSocket = gomq.NewDealer(zmtp.NewSecurityNull(), c.identity) + + if strings.HasPrefix(c.masterHost, "mock:") { + // for unittest + c.dealerSocket = MockGomqDealerInstance + } else { + c.dealerSocket = gomq.NewDealer(zmtp.NewSecurityNull(), c.identity) + } if err = c.dealerSocket.Connect(addr); err != nil { return err diff --git a/client_gomq_test.go b/client_gomq_test.go index 7aea602..f5fa65a 100644 --- a/client_gomq_test.go +++ b/client_gomq_test.go @@ -4,207 +4,62 @@ package boomer import ( - "fmt" - "log" - "math/rand" - "net" - "runtime/debug" - "strings" + "errors" - "github.com/myzhan/gomq" "github.com/myzhan/gomq/zmtp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) -// Router is a gomq interface used for router sockets. -// It implements the Socket interface along with a -// Bind method for binding to endpoints. -type Router interface { - gomq.ZeroMQSocket - Bind(endpoint string) (net.Addr, error) -} - -// BindRouter accepts a Router interface and an endpoint -// in the format ://
:. It then attempts -// to bind to the endpoint. -func BindRouter(r Router, endpoint string) (net.Addr, error) { - var addr net.Addr - parts := strings.Split(endpoint, "://") - - ln, err := net.Listen(parts[0], parts[1]) - if err != nil { - return addr, err - } - - netConn, err := ln.Accept() - if err != nil { - return addr, err - } - - zmtpConn := zmtp.NewConnection(netConn) - _, err = zmtpConn.Prepare(r.SecurityMechanism(), r.SocketType(), r.SocketIdentity(), true, nil) - if err != nil { - return netConn.LocalAddr(), err - } - - conn := gomq.NewConnection(netConn, zmtpConn) - - r.AddConnection(conn) - zmtpConn.Recv(r.RecvChannel()) - return netConn.LocalAddr(), nil -} - -// RouteSocket is a ZMQ_ROUTER socket type. -// See: https://rfc.zeromq.org/spec:28/REQREP/ -type RouterSocket struct { - *gomq.Socket -} - -// NewRouter accepts a zmtp.SecurityMechanism and an ID. -// It returns a RouterSocket as a gomq.Router interface. -func NewRouter(mechanism zmtp.SecurityMechanism, id string) Router { - return &RouterSocket{ - Socket: gomq.NewSocket(false, zmtp.RouterSocketType, zmtp.SocketIdentity(id), mechanism), - } -} - -// Bind accepts a zeromq endpoint and binds the -// server socket to it. Currently the only transport -// supported is TCP. The endpoint string should be -// in the format "tcp://
:". -func (r *RouterSocket) Bind(endpoint string) (net.Addr, error) { - return BindRouter(r, endpoint) -} - -type testServer struct { - bindHost string - bindPort int - nodeID string - fromClient chan message - toClient chan message - routerSocket Router - shutdownSignal chan bool -} +var _ = Describe("Test gomq client", func() { -func newTestServer(bindHost string, bindPort int) (server *testServer) { - return &testServer{ - bindHost: bindHost, - bindPort: bindPort, - nodeID: getNodeID(), - fromClient: make(chan message, 100), - toClient: make(chan message, 100), - shutdownSignal: make(chan bool, 1), - } -} + It("test connect with error", func() { + client := newClient("mock:0.0.0.0", 1234, "testing") + MockGomqDealerInstance.SetConnectError(errors.New("connect error")) + defer MockGomqDealerInstance.SetConnectError(nil) + err := client.connect() + Expect(err).To(HaveOccurred()) + }) -func (s *testServer) bind() (net.Addr, error) { - s.routerSocket = NewRouter(zmtp.NewSecurityNull(), s.nodeID) - go s.recv() - go s.send() - return BindRouter(s.routerSocket, fmt.Sprintf("tcp://%s:%d", s.bindHost, s.bindPort)) -} + It("test server send generic message", func() { + client := newClient("mock:0.0.0.0", 1234, "testing") + client.connect() + defer client.close() -func (s *testServer) send() { - for { - select { - case <-s.shutdownSignal: - s.routerSocket.Close() - return - case msg := <-s.toClient: - s.sendMessage(msg) - } - } -} + pingMessage := newGenericMessage("ping", nil, "testing") + pingMessageInBytes, _ := pingMessage.serialize() + pongMessage := newGenericMessage("pong", nil, "testing") + pongMessageInBytes, _ := pongMessage.serialize() -func (s *testServer) sendMessage(msg message) { - defer func() { - // don't panic - err := recover() - if err != nil { - log.Printf("%v\n", err) - debug.PrintStack() - } - }() - serializedMessage, err := msg.serialize() - if err != nil { - log.Println("Msgpack encode fail:", err) - return - } - err = s.routerSocket.Send(serializedMessage) - if err != nil { - log.Printf("Error sending to client: %v\n", err) - } -} + client.sendChannel() <- pingMessage + Eventually(MockGomqDealerInstance.SendChannel()).Should(Receive(Equal(pingMessageInBytes))) -func (s *testServer) recv() { - for { - select { - case <-s.shutdownSignal: - s.routerSocket.Close() - return - default: - msg, err := s.routerSocket.RecvMultipart() - if err != nil { - log.Printf("Error reading: %v\n", err) - } else { - msgFromClient, err := newGenericMessageFromBytes(msg[0]) - if err != nil { - clientReadyMessage, err2 := newClientReadyMessageFromBytes(msg[0]) - if err2 != nil { - log.Println("Msgpack decode fail:", err) - } else { - // Send ack message - data := map[string]interface{}{ - "index": 1, - } - clientId := clientReadyMessage.NodeID - s.toClient <- newGenericMessage("ack", data, clientId) - s.fromClient <- clientReadyMessage - } - } else { - s.fromClient <- msgFromClient - } - } + serverMessage := &zmtp.Message{ + MessageType: zmtp.UserMessage, + Body: [][]byte{pongMessageInBytes}, } - } -} - -func (s *testServer) close() { - close(s.shutdownSignal) -} - -func (s *testServer) start() { - go s.bind() -} - -var _ = Describe("Test gomq client", func() { - - It("test ping pong", func() { - masterHost := "0.0.0.0" - rand.Seed(Now()) - masterPort := rand.Intn(1000) + 10240 - - server := newTestServer(masterHost, masterPort) - defer server.close() - - log.Printf("Starting to serve on %s:%d\n", masterHost, masterPort) - server.start() + MockGomqDealerInstance.RecvChannel() <- serverMessage + Eventually(client.recvChannel()).Should(Receive(Equal(pongMessage))) + }) - // start client - client := newClient(masterHost, masterPort, "testing ping pong") + It("test server send custom message", func() { + client := newClient("mock:0.0.0.0", 1234, "testing") client.connect() defer client.close() - client.sendChannel() <- newGenericMessage("ping", nil, "testing ping pong") - var msg *genericMessage - Eventually(server.fromClient).Should(Receive(&msg)) - Expect(msg.Type).To(Equal("ping")) - Expect(msg.NodeID).To(Equal("testing ping pong")) + pingMessage := newGenericMessage("ping", nil, "testing") + pingMessageInBytes, _ := pingMessage.serialize() + pongMessage := newCustomMessage("pong", int64(123), "testing") + pongMessageInBytes, _ := pongMessage.serialize() + + client.sendChannel() <- pingMessage + Eventually(MockGomqDealerInstance.SendChannel()).Should(Receive(Equal(pingMessageInBytes))) - server.toClient <- newGenericMessage("pong", nil, "testing ping pong") - Eventually(client.recvChannel()).Should(Receive(&msg)) - Expect(msg.Type).To(Equal("pong")) - Expect(msg.NodeID).To(Equal("testing ping pong")) + serverZmtpMessage := &zmtp.Message{ + MessageType: zmtp.UserMessage, + Body: [][]byte{pongMessageInBytes}, + } + MockGomqDealerInstance.RecvChannel() <- serverZmtpMessage + Eventually(client.recvChannel()).Should(Receive(Equal(pongMessage))) }) }) diff --git a/go.mod b/go.mod index 96ac5a4..014324c 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,12 @@ require ( github.com/myzhan/gomq v0.0.0-20220926014711-4eea0d4a1e75 github.com/myzhan/gomq/zmtp v0.0.0-20220926014711-4eea0d4a1e75 github.com/olekukonko/tablewriter v0.0.5 - github.com/onsi/ginkgo/v2 v2.11.0 - github.com/onsi/gomega v1.27.10 + github.com/onsi/ginkgo/v2 v2.9.1 + github.com/onsi/gomega v1.27.4 github.com/prometheus/client_golang v1.14.0 github.com/shirou/gopsutil v3.21.11+incompatible - github.com/stretchr/testify v1.6.1 - github.com/tklauser/go-sysconf v0.3.11 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/ugorji/go/codec v1.2.8 - github.com/yusufpapurcu/wmi v1.2.2 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zeromq/goczmq v0.0.0-20190906225145-a7546843a315 - golang.org/x/sys v0.11.0 // indirect ) diff --git a/go.sum b/go.sum index b592926..f337eb8 100644 --- a/go.sum +++ b/go.sum @@ -73,15 +73,13 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -198,12 +196,8 @@ github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJo github.com/onsi/ginkgo/v2 v2.7.0/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= github.com/onsi/ginkgo/v2 v2.8.1/go.mod h1:N1/NbDngAFcSLdyZ+/aYTYGSlq9qMCS/cNKGJjy+csc= github.com/onsi/ginkgo/v2 v2.9.0/go.mod h1:4xkjoL/tZv4SMWeww56BU5kAt19mVB47gTWxmrTcxyk= +github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk= github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo= -github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts= -github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= -github.com/onsi/ginkgo/v2 v2.9.7/go.mod h1:cxrmXWykAwTwhQsJOPfdIDiJ+l2RYq7U8hFU+M/1uw0= -github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= -github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= @@ -216,12 +210,8 @@ github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmv github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfadcIAw= github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw= +github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= -github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= -github.com/onsi/gomega v1.27.7/go.mod h1:1p8OOlwo2iUUDsHnOrjE5UKYJ+e3W8eQ3qSlRahPmr4= -github.com/onsi/gomega v1.27.8/go.mod h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ= -github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= -github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -264,13 +254,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= -github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= -github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= -github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/ugorji/go/codec v1.2.8 h1:sgBJS6COt0b/P40VouWKdseidkDgHxYGm0SAglUHfP0= github.com/ugorji/go/codec v1.2.8/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -279,8 +268,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= -github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeromq/goczmq v0.0.0-20190906225145-a7546843a315 h1:Mnki1bwiVDLVh9/gMqjI+3MdbVmAbswzayK/bzRmNaE= github.com/zeromq/goczmq v0.0.0-20190906225145-a7546843a315/go.mod h1:jBJgSEDlcqrdShbpgYc2S+mTo1Rs6pac+8zpUQFgsvg= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -296,7 +285,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= -golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -332,9 +320,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= -golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -380,10 +367,8 @@ golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -404,9 +389,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= -golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -464,8 +448,6 @@ golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -476,8 +458,6 @@ golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -489,10 +469,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -542,10 +520,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= -golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= -golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= -golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -641,8 +617,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/runner_test.go b/runner_test.go index 82c6a39..2beb727 100644 --- a/runner_test.go +++ b/runner_test.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/myzhan/gomq/zmtp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -440,13 +441,9 @@ var _ = Describe("Test runner", func() { }) It("test get ready", func() { - masterHost := "127.0.0.1" + masterHost := "mock:127.0.0.1" masterPort := 6557 - server := newTestServer(masterHost, masterPort) - server.start() - defer server.close() - rateLimiter := NewStableRateLimiter(100, time.Second) r := newSlaveRunner(masterHost, masterPort, nil, rateLimiter) defer r.shutdown() @@ -454,21 +451,16 @@ var _ = Describe("Test runner", func() { r.run() - clientReady := <-server.fromClient - crm := clientReady.(*clientReadyMessage) - Expect(crm.Type).To(Equal("client_ready")) - - r.numClients = 10 - // it's not really running - r.state = stateRunning - data := make(map[string]interface{}) - r.stats.messageToRunnerChan <- data + clientReadyMessage := newClientReadyMessage("client_ready", -1, r.nodeID) + clientReadyMessageInBytes, _ := clientReadyMessage.serialize() + Eventually(MockGomqDealerInstance.SendChannel()).Should(Receive(Equal(clientReadyMessageInBytes))) - msg := <-server.fromClient - m := msg.(*genericMessage) - Expect(m.Type).To(Equal("stats")) - - userCount := m.Data["user_count"].(int64) - Expect(userCount).To(BeEquivalentTo(10)) + ackMessage := newGenericMessage("ack", nil, r.nodeID) + ackMessageInBytes, _ := ackMessage.serialize() + ackZmtpMessage := &zmtp.Message{ + MessageType: zmtp.UserMessage, + Body: [][]byte{ackMessageInBytes}, + } + MockGomqDealerInstance.RecvChannel() <- ackZmtpMessage }) })