diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index a44ba6b628595..7d32e821c501d 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -178,6 +179,8 @@ func (q *QuotaCenter) clearMetrics() { // syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes. func (q *QuotaCenter) syncMetrics() error { + oldDataNodes := typeutil.NewSet(lo.Keys(q.dataNodeMetrics)...) + oldQueryNodes := typeutil.NewSet(lo.Keys(q.queryNodeMetrics)...) q.clearMetrics() ctx, cancel := context.WithTimeout(context.Background(), GetMetricsTimeout) defer cancel() @@ -191,12 +194,9 @@ func (q *QuotaCenter) syncMetrics() error { // get Query cluster metrics group.Go(func() error { rsp, err := q.queryCoord.GetMetrics(ctx, req) - if err != nil { + if err := merr.CheckRPCCall(rsp, err); err != nil { return err } - if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return fmt.Errorf("quotaCenter get Query cluster failed, err = %s", rsp.GetStatus().GetReason()) - } queryCoordTopology := &metricsinfo.QueryCoordTopology{} err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), queryCoordTopology) if err != nil { @@ -206,6 +206,7 @@ func (q *QuotaCenter) syncMetrics() error { collections := typeutil.NewUniqueSet() for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { if queryNodeMetric.QuotaMetrics != nil { + oldQueryNodes.Remove(queryNodeMetric.ID) q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics collections.Insert(queryNodeMetric.QuotaMetrics.Effect.CollectionIDs...) } @@ -216,12 +217,9 @@ func (q *QuotaCenter) syncMetrics() error { // get Data cluster metrics group.Go(func() error { rsp, err := q.dataCoord.GetMetrics(ctx, req) - if err != nil { + if err := merr.CheckRPCCall(rsp, err); err != nil { return err } - if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return fmt.Errorf("quotaCenter get Data cluster failed, err = %s", rsp.GetStatus().GetReason()) - } dataCoordTopology := &metricsinfo.DataCoordTopology{} err = metricsinfo.UnmarshalTopology(rsp.GetResponse(), dataCoordTopology) if err != nil { @@ -231,6 +229,7 @@ func (q *QuotaCenter) syncMetrics() error { collections := typeutil.NewUniqueSet() for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes { if dataNodeMetric.QuotaMetrics != nil { + oldDataNodes.Remove(dataNodeMetric.ID) q.dataNodeMetrics[dataNodeMetric.ID] = dataNodeMetric.QuotaMetrics collections.Insert(dataNodeMetric.QuotaMetrics.Effect.CollectionIDs...) } @@ -266,11 +265,13 @@ func (q *QuotaCenter) syncMetrics() error { if err != nil { return err } - // log.Debug("QuotaCenter sync metrics done", - // zap.Any("dataNodeMetrics", q.dataNodeMetrics), - // zap.Any("queryNodeMetrics", q.queryNodeMetrics), - // zap.Any("proxyMetrics", q.proxyMetrics), - // zap.Any("dataCoordMetrics", q.dataCoordMetrics)) + + for oldDN := range oldDataNodes { + metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.DataNodeRole, strconv.FormatInt(oldDN, 10)) + } + for oldQN := range oldQueryNodes { + metrics.RootCoordTtDelay.DeleteLabelValues(typeutil.QueryNodeRole, strconv.FormatInt(oldQN, 10)) + } return nil } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index fb5d073465cf1..a8aa8c1f5f90e 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -24,8 +24,10 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -35,10 +37,12 @@ import ( mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -669,3 +673,375 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, float64(quotaCenter.currentRates[1][internalpb.RateType_DMLUpsert]), float64(6*1024*1024)) }) } + +type QuotaCenterSuite struct { + testutils.PromMetricsSuite + + core *Core + + pcm *proxyutil.MockProxyClientManager + dc *mocks.MockDataCoordClient + qc *mocks.MockQueryCoordClient + meta *mockrootcoord.IMetaTable +} + +func (s *QuotaCenterSuite) SetupSuite() { + paramtable.Init() + + var err error + s.core, err = NewCore(context.Background(), nil) + + s.Require().NoError(err) +} + +func (s *QuotaCenterSuite) SetupTest() { + s.pcm = proxyutil.NewMockProxyClientManager(s.T()) + s.dc = mocks.NewMockDataCoordClient(s.T()) + s.qc = mocks.NewMockQueryCoordClient(s.T()) + s.meta = mockrootcoord.NewIMetaTable(s.T()) +} + +func (s *QuotaCenterSuite) getEmptyQCMetricsRsp() string { + metrics := &metricsinfo.QueryCoordTopology{ + Cluster: metricsinfo.QueryClusterTopology{}, + } + + resp, err := metricsinfo.MarshalTopology(metrics) + s.Require().NoError(err) + return resp +} + +func (s *QuotaCenterSuite) getEmptyDCMetricsRsp() string { + metrics := &metricsinfo.DataCoordTopology{ + Cluster: metricsinfo.DataClusterTopology{}, + } + + resp, err := metricsinfo.MarshalTopology(metrics) + s.Require().NoError(err) + return resp +} + +func (s *QuotaCenterSuite) TestSyncMetricsSuccess() { + pcm := s.pcm + dc := s.dc + qc := s.qc + meta := s.meta + core := s.core + + s.Run("querycoord_cluster", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyDCMetricsRsp(), + }, nil).Once() + + metrics := &metricsinfo.QueryCoordTopology{ + Cluster: metricsinfo.QueryClusterTopology{ + ConnectedNodes: []metricsinfo.QueryNodeInfos{ + {BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1}, QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 1, CollectionIDs: []int64{100, 200}}}}, + {BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}}}, + }, + }, + } + + resp, err := metricsinfo.MarshalTopology(metrics) + s.Require().NoError(err) + + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + + err = quotaCenter.syncMetrics() + s.Require().NoError(err) + + s.ElementsMatch([]int64{100, 200, 300}, quotaCenter.readableCollections) + nodes := lo.Keys(quotaCenter.queryNodeMetrics) + s.ElementsMatch([]int64{1, 2}, nodes) + }) + + s.Run("datacoord_cluster", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyQCMetricsRsp(), + }, nil).Once() + + metrics := &metricsinfo.DataCoordTopology{ + Cluster: metricsinfo.DataClusterTopology{ + ConnectedDataNodes: []metricsinfo.DataNodeInfos{ + {BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1}, QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 1, CollectionIDs: []int64{100, 200}}}}, + {BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}}}, + }, + }, + } + + resp, err := metricsinfo.MarshalTopology(metrics) + s.Require().NoError(err) + + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + + err = quotaCenter.syncMetrics() + s.Require().NoError(err) + + s.ElementsMatch([]int64{100, 200, 300}, quotaCenter.writableCollections) + nodes := lo.Keys(quotaCenter.dataNodeMetrics) + s.ElementsMatch([]int64{1, 2}, nodes) + }) +} + +func (s *QuotaCenterSuite) TestSyncMetricsFailure() { + pcm := s.pcm + dc := s.dc + qc := s.qc + meta := s.meta + core := s.core + + s.Run("querycoord_failure", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyDCMetricsRsp(), + }, nil).Once() + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mock")).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + + err := quotaCenter.syncMetrics() + s.Error(err) + }) + + s.Run("querycoord_bad_response", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyDCMetricsRsp(), + }, nil).Once() + + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: "abc", + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + + err := quotaCenter.syncMetrics() + s.Error(err) + }) + + s.Run("datacoord_failure", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyQCMetricsRsp(), + }, nil).Once() + + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("mocked")).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + err := quotaCenter.syncMetrics() + s.Error(err) + }) + + s.Run("datacoord_bad_response", func() { + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil).Once() + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyQCMetricsRsp(), + }, nil).Once() + + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: "abc", + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + err := quotaCenter.syncMetrics() + s.Error(err) + }) + + s.Run("proxy_manager_return_failure", func() { + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyQCMetricsRsp(), + }, nil).Once() + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyDCMetricsRsp(), + }, nil).Once() + + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, errors.New("mocked")).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + err := quotaCenter.syncMetrics() + s.Error(err) + }) + + s.Run("proxy_manager_bad_response", func() { + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyQCMetricsRsp(), + }, nil).Once() + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: s.getEmptyDCMetricsRsp(), + }, nil).Once() + + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return([]*milvuspb.GetMetricsResponse{ + { + Status: merr.Status(nil), + Response: "abc", + }, + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + err := quotaCenter.syncMetrics() + s.Error(err) + }) +} + +func (s *QuotaCenterSuite) TestNodeOffline() { + pcm := s.pcm + dc := s.dc + qc := s.qc + meta := s.meta + core := s.core + + metrics.RootCoordTtDelay.Reset() + Params.Save(Params.QuotaConfig.TtProtectionEnabled.Key, "true") + defer Params.Reset(Params.QuotaConfig.TtProtectionEnabled.Key) + + // proxy + pcm.EXPECT().GetProxyMetrics(mock.Anything).Return(nil, nil) + + // qc first time + qcMetrics := &metricsinfo.QueryCoordTopology{ + Cluster: metricsinfo.QueryClusterTopology{ + ConnectedNodes: []metricsinfo.QueryNodeInfos{ + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 1}, + QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{ + NodeID: 1, CollectionIDs: []int64{100, 200}, + }, + }, + }, + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, + QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{ + NodeID: 2, CollectionIDs: []int64{100, 200}, + }, + }, + }, + }, + }, + } + resp, err := metricsinfo.MarshalTopology(qcMetrics) + s.Require().NoError(err) + + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + // dc first time + dcMetrics := &metricsinfo.DataCoordTopology{ + Cluster: metricsinfo.DataClusterTopology{ + ConnectedDataNodes: []metricsinfo.DataNodeInfos{ + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 3}, + QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{NodeID: 3, CollectionIDs: []int64{100, 200}}, + }, + }, + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 4}, + QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{NodeID: 4, CollectionIDs: []int64{200, 300}}, + }, + }, + }, + }, + } + + resp, err = metricsinfo.MarshalTopology(dcMetrics) + s.Require().NoError(err) + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta) + err = quotaCenter.syncMetrics() + s.Require().NoError(err) + + quotaCenter.getTimeTickDelayFactor(tsoutil.ComposeTSByTime(time.Now(), 0)) + + s.CollectCntEqual(metrics.RootCoordTtDelay, 4) + + // qc second time + qcMetrics = &metricsinfo.QueryCoordTopology{ + Cluster: metricsinfo.QueryClusterTopology{ + ConnectedNodes: []metricsinfo.QueryNodeInfos{ + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 2}, + QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}, + }, + }, + }, + }, + } + resp, err = metricsinfo.MarshalTopology(qcMetrics) + s.Require().NoError(err) + + qc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + // dc second time + dcMetrics = &metricsinfo.DataCoordTopology{ + Cluster: metricsinfo.DataClusterTopology{ + ConnectedDataNodes: []metricsinfo.DataNodeInfos{ + { + BaseComponentInfos: metricsinfo.BaseComponentInfos{ID: 4}, + QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 2, MinFlowGraphChannel: "dml_0"}, + Effect: metricsinfo.NodeEffect{NodeID: 2, CollectionIDs: []int64{200, 300}}, + }, + }, + }, + }, + } + + resp, err = metricsinfo.MarshalTopology(dcMetrics) + s.Require().NoError(err) + dc.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(&milvuspb.GetMetricsResponse{ + Status: merr.Status(nil), + Response: resp, + }, nil).Once() + + err = quotaCenter.syncMetrics() + s.Require().NoError(err) + + quotaCenter.getTimeTickDelayFactor(tsoutil.ComposeTSByTime(time.Now(), 0)) + s.CollectCntEqual(metrics.RootCoordTtDelay, 2) +} + +func TestQuotaCenterSuite(t *testing.T) { + suite.Run(t, new(QuotaCenterSuite)) +} diff --git a/pkg/util/testutils/prometheus_metric.go b/pkg/util/testutils/prometheus_metric.go index a30464175a00d..42da5836899b4 100644 --- a/pkg/util/testutils/prometheus_metric.go +++ b/pkg/util/testutils/prometheus_metric.go @@ -15,3 +15,8 @@ func (suite *PromMetricsSuite) MetricsEqual(c prometheus.Collector, expect float value := testutil.ToFloat64(c) return suite.Suite.Equal(expect, value, msgAndArgs...) } + +func (suite *PromMetricsSuite) CollectCntEqual(c prometheus.Collector, expect int, msgAndArgs ...any) bool { + cnt := testutil.CollectAndCount(c) + return suite.Suite.EqualValues(expect, cnt, msgAndArgs...) +}