From c8ce2658392df6e5bd4760ad5f3065e5cf27d783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justas=20=C4=8Cerniauskas?= Date: Mon, 1 Feb 2021 16:59:56 +0200 Subject: [PATCH] [aggregator] Prevent tcp client panic on nil placement (#3139) --- src/aggregator/client/tcp_client.go | 17 +++++++++++++++- src/aggregator/client/tcp_client_test.go | 25 ++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go index 38209e20f7..b2484e055e 100644 --- a/src/aggregator/client/tcp_client.go +++ b/src/aggregator/client/tcp_client.go @@ -21,6 +21,7 @@ package client import ( + "errors" "fmt" "math" "time" @@ -39,7 +40,11 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" ) -var _ AdminClient = (*TCPClient)(nil) +var ( + _ AdminClient = (*TCPClient)(nil) + + errNilPlacement = errors.New("placement is nil") +) // TCPClient sends metrics to M3 Aggregator via over custom TCP protocol. type TCPClient struct { @@ -229,6 +234,9 @@ func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) { return nil, 0, err } defer onStagedPlacementDoneFn() + if stagedPlacement == nil { + return nil, 0, errNilPlacement + } placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() if err != nil { @@ -247,6 +255,9 @@ func (c *TCPClient) ActivePlacementVersion() (int, error) { return 0, err } defer onStagedPlacementDoneFn() + if stagedPlacement == nil { + return 0, errNilPlacement + } return stagedPlacement.Version(), nil } @@ -274,6 +285,10 @@ func (c *TCPClient) write( if err != nil { return err } + if stagedPlacement == nil { + onStagedPlacementDoneFn() + return errNilPlacement + } placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() if err != nil { onStagedPlacementDoneFn() diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index a6f4a4af36..3990e7ada3 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -241,6 +241,31 @@ func TestTCPClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) { } } +func TestTCPClientWriteUntimedMetricActiveStagedPlacementNil(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + watcher := placement.NewMockStagedPlacementWatcher(ctrl) + watcher.EXPECT().ActiveStagedPlacement(). + Return(nil, func() {}, nil). + MinTimes(1) + c := mustNewTestTCPClient(t, testOptions()) + c.placementWatcher = watcher + + for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} { + var err error + switch input.Type { + case metric.CounterType: + err = c.WriteUntimedCounter(input.Counter(), testStagedMetadatas) + case metric.TimerType: + err = c.WriteUntimedBatchTimer(input.BatchTimer(), testStagedMetadatas) + case metric.GaugeType: + err = c.WriteUntimedGauge(input.Gauge(), testStagedMetadatas) + } + require.Equal(t, errNilPlacement, err) + } +} + func TestTCPClientWriteUntimedMetricActivePlacementError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish()