From cc5ad709318491cae0b359fdba88f0dd9786eab4 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 21 Oct 2024 17:10:27 +0800 Subject: [PATCH] simplify the auto recover Signed-off-by: Wei Liu --- configs/milvus.yaml | 1 + go.mod | 2 +- go.sum | 4 +- internal/querycoordv2/meta/resource_group.go | 57 +++--- .../querycoordv2/meta/resource_group_test.go | 10 +- .../querycoordv2/meta/resource_manager.go | 172 ++++++++---------- .../meta/resource_manager_test.go | 63 ++----- 7 files changed, 128 insertions(+), 181 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c24c0201871a5..6cfde5a29dce0 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -477,6 +477,7 @@ queryNode: bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM workerPooling: size: 10 # the size for worker querynode client pool + labelAwareQueryNodeBalance: QUERYNODE_LOCATION # the labels of querynode which can be used for node assignment in resource group ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address port: 21123 # TCP port of queryNode grpc: diff --git a/go.mod b/go.mod index 669eea3d3db23..b0281af878d22 100644 --- a/go.mod +++ b/go.mod @@ -275,4 +275,4 @@ replace ( exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b -replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19 +replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b diff --git a/go.sum b/go.sum index 391d353e3d59b..2fdbec402a106 100644 --- a/go.sum +++ b/go.sum @@ -907,8 +907,8 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19 h1:Rsk8HBYftRrmeG+55tdfRipjc9V6UtyN2kcX4mBzcbc= -github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b h1:WPI6209CYRkuQNDlF8K/MC5lfKq741WlPFgCUn+rGHs= +github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/internal/querycoordv2/meta/resource_group.go b/internal/querycoordv2/meta/resource_group.go index aa5df52fec053..cea4fc4525a99 100644 --- a/internal/querycoordv2/meta/resource_group.go +++ b/internal/querycoordv2/meta/resource_group.go @@ -126,25 +126,6 @@ func (rg *ResourceGroup) MissingNumOfNodes() int { return missing } -func (rg *ResourceGroup) MissingNumOfPreferNodes(nodeMgr *session.NodeManager) int { - if len(rg.GetConfig().GetNodeFilter().GetPreferNodeLabels()) == 0 { - return rg.MissingNumOfNodes() - } - - preferNodesNum := rg.GetNodesByFilter(func(nodeID int64) bool { - if nodeMgr.Get(nodeID) == nil { - return false - } - return rg.PreferAcceptNode(nodeMgr.Get(nodeID)) - }) - - missing := int(rg.cfg.Requests.NodeNum) - len(preferNodesNum) - if missing < 0 { - return 0 - } - return missing -} - // ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)` func (rg *ResourceGroup) ReachLimitNumOfNodes() int { reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes) @@ -163,6 +144,20 @@ func (rg *ResourceGroup) RedundantNumOfNodes() int { return redundant } +func (rg *ResourceGroup) DirtyNodes(nodeMgr *session.NodeManager) []int64 { + // filter out nodes that do not meet the node filter + dirtyNodes := make([]int64, 0) + rg.nodes.Range(func(nodeID int64) bool { + nodeInfo := nodeMgr.Get(nodeID) + if nodeInfo == nil || !rg.AcceptNode(nodeInfo) { + dirtyNodes = append(dirtyNodes, nodeID) + } + return true + }) + + return dirtyNodes +} + func (rg *ResourceGroup) GetNodesByFilter(f func(nodeID int64) bool) []int64 { ret := make([]int64, 0) rg.nodes.Range(func(nodeID int64) bool { @@ -175,19 +170,22 @@ func (rg *ResourceGroup) GetNodesByFilter(f func(nodeID int64) bool) []int64 { return ret } -func (rg *ResourceGroup) PreferAcceptNode(nodeInfo *session.NodeInfo) bool { +func (rg *ResourceGroup) AcceptNode(nodeInfo *session.NodeInfo) bool { if nodeInfo == nil { return false } - rgPreferLabels := rg.GetConfig().GetNodeFilter().GetPreferNodeLabels() - nodeLabels := nodeInfo.Labels() + requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels() + if len(requiredNodeLabels) == 0 { + return true + } - if len(nodeLabels) == 0 || len(rgPreferLabels) == 0 { + nodeLabels := nodeInfo.Labels() + if len(nodeLabels) == 0 { return false } - for _, labelPair := range rgPreferLabels { + for _, labelPair := range requiredNodeLabels { valueInNode, ok := nodeLabels[labelPair.Key] if !ok || valueInNode != labelPair.Value { return false @@ -259,16 +257,7 @@ func (rg *ResourceGroup) MeetRequirement(nodeMgr *session.NodeManager) error { // check rg node filter if rg.cfg.NodeFilter != nil { - // filter out nodes that do not meet the node filter - dirtyNodes := make([]int64, 0) - rg.nodes.Range(func(nodeID int64) bool { - nodeInfo := nodeMgr.Get(nodeID) - if nodeInfo == nil || !rg.PreferAcceptNode(nodeInfo) { - dirtyNodes = append(dirtyNodes, nodeID) - } - return true - }) - + dirtyNodes := rg.DirtyNodes(nodeMgr) if len(dirtyNodes) > 0 { return errors.Errorf( "has dirty nodes[%v], not meet node filter", diff --git a/internal/querycoordv2/meta/resource_group_test.go b/internal/querycoordv2/meta/resource_group_test.go index a23beecbdb727..6f823fc33e94a 100644 --- a/internal/querycoordv2/meta/resource_group_test.go +++ b/internal/querycoordv2/meta/resource_group_test.go @@ -348,7 +348,7 @@ func TestRGNodeFilter(t *testing.T) { NodeNum: 3, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "dc1", @@ -383,9 +383,9 @@ func TestRGNodeFilter(t *testing.T) { nodeMgr.Add(nodeInfo2) nodeMgr.Add(nodeInfo3) - assert.True(t, rg.PreferAcceptNode(nodeInfo1)) - assert.True(t, rg.PreferAcceptNode(nodeInfo2)) - assert.False(t, rg.PreferAcceptNode(nodeInfo3)) + assert.True(t, rg.AcceptNode(nodeInfo1)) + assert.True(t, rg.AcceptNode(nodeInfo2)) + assert.False(t, rg.AcceptNode(nodeInfo3)) assert.Error(t, rg.MeetRequirement(nodeMgr)) nodeFilter := func(nodeID int64) bool { @@ -393,7 +393,7 @@ func TestRGNodeFilter(t *testing.T) { if nodeInfo == nil { return false } - return rg.PreferAcceptNode(nodeInfo) + return rg.AcceptNode(nodeInfo) } assert.Len(t, rg.GetNodesByFilter(nodeFilter), 2) } diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 629f9e802e623..6bd06689d5d05 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -533,44 +533,49 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error { return nil } - err := rm.recoverMissingNodeRG(rgName) - if err != nil { - return err + // try to move out all dirty nodes before recover rg + if len(rg.DirtyNodes(rm.nodeMgr)) > 0 { + err := rm.recoverDirtyNodeRG(rgName) + if err != nil { + return err + } + } + + if rg.MissingNumOfNodes() > 0 { + return rm.recoverMissingNodeRG(rgName) } // DefaultResourceGroup is the backup resource group of redundant recovery, // So after all other resource group is reach the `limits`, rest redundant node will be transfer to DefaultResourceGroup. - err = rm.recoverRedundantNodeRG(rgName) - if err != nil { - return err + if rg.RedundantNumOfNodes() > 0 { + return rm.recoverRedundantNodeRG(rgName) } - return nil } // recoverMissingNodeRG recover resource group by transfer node from other resource group. func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error { - preferNodeFilter := func(nodeID int64) bool { - nodeInfo := rm.nodeMgr.Get(nodeID) - if nodeInfo == nil { - return false + rgFilter := func(sourceRG *ResourceGroup) bool { + nodeFilter := func(nodeID int64) bool { + nodeInfo := rm.nodeMgr.Get(nodeID) + if nodeInfo == nil { + return false + } + return rm.groups[rgName].AcceptNode(nodeInfo) } - return rm.groups[rgName].PreferAcceptNode(nodeInfo) - } - preferRGFilter := func(sourceRG *ResourceGroup) bool { - return len(sourceRG.GetNodesByFilter(preferNodeFilter)) > 0 + return len(sourceRG.GetNodesByFilter(nodeFilter)) > 0 } for rm.groups[rgName].MissingNumOfNodes() > 0 { targetRG := rm.groups[rgName] - sourceRG := rm.selectMissingRecoverSourceRG(targetRG, preferRGFilter) + sourceRG := rm.selectMissingRecoverSourceRG(targetRG, rgFilter) if sourceRG == nil { log.Warn("fail to select source resource group", zap.String("rgName", targetRG.GetName())) return ErrNodeNotEnough } - nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG, preferNodeFilter) + nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG) if err != nil { log.Warn("failed to recover missing node by transfer node from other resource group", zap.String("sourceRG", sourceRG.GetName()), @@ -584,47 +589,17 @@ func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error { zap.Int64("nodeID", nodeID), ) } - - for rm.groups[rgName].MissingNumOfPreferNodes(rm.nodeMgr) > 0 { - targetRG := rm.groups[rgName] - sourceRG := rm.selectMissingRecoverSourceRG(targetRG, preferRGFilter) - if sourceRG == nil { - log.Warn("fail to select source resource group", zap.String("rgName", targetRG.GetName())) - return ErrNodeNotEnough - } - - if !preferRGFilter(sourceRG) { - break - } - - nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG, preferNodeFilter) - if err != nil { - log.Warn("failed to recover missing node by transfer node from other resource group", - zap.String("sourceRG", sourceRG.GetName()), - zap.String("targetRG", targetRG.GetName()), - zap.Error(err)) - return err - } - log.Info("recover missing prefer node by transfer node from other resource group", - zap.String("sourceRG", sourceRG.GetName()), - zap.String("targetRG", targetRG.GetName()), - zap.Int64("nodeID", nodeID), - ) - } return nil } // selectMissingRecoverSourceRG select source resource group for recover missing resource group. -func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup, preferRGFilter func(rg *ResourceGroup) bool) *ResourceGroup { +func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup, rgFilter func(rg *ResourceGroup) bool) *ResourceGroup { // First, Transfer node from most redundant resource group first. `len(nodes) > limits` if redundantRG := rm.findMaxRGWithGivenFilter( func(sourceRG *ResourceGroup) bool { - return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0 + return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0 && rgFilter(sourceRG) }, func(sourceRG *ResourceGroup) int { - if preferRGFilter(sourceRG) { - return sourceRG.RedundantNumOfNodes() * preferNodeTransferBoost - } return sourceRG.RedundantNumOfNodes() }, ); redundantRG != nil { @@ -635,13 +610,9 @@ func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup, prefe // `TransferFrom` configured resource group at high priority. return rm.findMaxRGWithGivenFilter( func(sourceRG *ResourceGroup) bool { - return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0 + return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0 && rgFilter(sourceRG) }, func(sourceRG *ResourceGroup) int { - if preferRGFilter(sourceRG) { - return sourceRG.OversizedNumOfNodes() * preferNodeTransferBoost - } - if rg.HasFrom(sourceRG.GetName()) { // give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select. return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost @@ -652,11 +623,11 @@ func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup, prefe // recoverRedundantNodeRG recover resource group by transfer node to other resource group. func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error { - preferRGFilter := func(targetRG *ResourceGroup) bool { + rgFilter := func(targetRG *ResourceGroup) bool { sourceRG := rm.groups[rgName] for _, node := range sourceRG.GetNodes() { nodeInfo := rm.nodeMgr.Get(node) - if nodeInfo != nil && !sourceRG.PreferAcceptNode(nodeInfo) && targetRG.PreferAcceptNode(nodeInfo) { + if nodeInfo != nil && targetRG.AcceptNode(nodeInfo) { return true } } @@ -665,22 +636,53 @@ func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error { for rm.groups[rgName].RedundantNumOfNodes() > 0 { sourceRG := rm.groups[rgName] - targetRG := rm.selectRedundantRecoverTargetRG(sourceRG, preferRGFilter) + targetRG := rm.selectRedundantRecoverTargetRG(sourceRG, rgFilter) if targetRG == nil { log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.", zap.String("rgName", sourceRG.GetName())) return errors.New("all resource group reach limits") } - preferNodeFilter := func(nodeID int64) bool { - nodeInfo := rm.nodeMgr.Get(nodeID) - if nodeInfo == nil { - return false + nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG) + if err != nil { + log.Warn("failed to recover redundant node by transfer node to other resource group", + zap.String("sourceRG", sourceRG.GetName()), + zap.String("targetRG", targetRG.GetName()), + zap.Error(err)) + return err + } + log.Info("recover redundant node by transfer node to other resource group", + zap.String("sourceRG", sourceRG.GetName()), + zap.String("targetRG", targetRG.GetName()), + zap.Int64("nodeID", nodeID), + ) + } + return nil +} + +// recoverDirtyNodeRG recover resource group by move out all node which doesn't meet node label requirement. +func (rm *ResourceManager) recoverDirtyNodeRG(rgName string) error { + rgFilter := func(targetRG *ResourceGroup) bool { + sourceRG := rm.groups[rgName] + for _, node := range sourceRG.GetNodes() { + nodeInfo := rm.nodeMgr.Get(node) + if nodeInfo != nil && targetRG.AcceptNode(nodeInfo) { + return true } - return !sourceRG.PreferAcceptNode(nodeInfo) } + return false + } - nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG, preferNodeFilter) + for len(rm.groups[rgName].DirtyNodes(rm.nodeMgr)) > 0 { + sourceRG := rm.groups[rgName] + targetRG := rm.selectRedundantRecoverTargetRG(sourceRG, rgFilter) + if targetRG == nil { + log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.", + zap.String("rgName", sourceRG.GetName())) + return errors.New("all resource group reach limits") + } + + nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, targetRG) if err != nil { log.Warn("failed to recover redundant node by transfer node to other resource group", zap.String("sourceRG", sourceRG.GetName()), @@ -688,7 +690,7 @@ func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error { zap.Error(err)) return err } - log.Info("recover redundant node by transfer node to other resource group", + log.Info("recover dirty node by transfer node to other resource group", zap.String("sourceRG", sourceRG.GetName()), zap.String("targetRG", targetRG.GetName()), zap.Int64("nodeID", nodeID), @@ -698,16 +700,13 @@ func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error { } // selectRedundantRecoverTargetRG select target resource group for recover redundant resource group. -func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup, preferRGFilter func(rg *ResourceGroup) bool) *ResourceGroup { +func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup, rgFilter func(rg *ResourceGroup) bool) *ResourceGroup { // First, Transfer node to most missing resource group first. if missingRG := rm.findMaxRGWithGivenFilter( func(targetRG *ResourceGroup) bool { - return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0 + return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0 && rgFilter(targetRG) }, func(targetRG *ResourceGroup) int { - if preferRGFilter(targetRG) { - return targetRG.MissingNumOfNodes() * preferNodeTransferBoost - } return targetRG.MissingNumOfNodes() }, ); missingRG != nil { @@ -718,12 +717,9 @@ func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup, pre // `TransferTo` configured resource group at high priority. if selectRG := rm.findMaxRGWithGivenFilter( func(targetRG *ResourceGroup) bool { - return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0 + return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0 && rgFilter(targetRG) }, func(targetRG *ResourceGroup) int { - if preferRGFilter(targetRG) { - return targetRG.ReachLimitNumOfNodes() * preferNodeTransferBoost - } if rg.HasTo(targetRG.GetName()) { // give a boost if targetRG is configured as `TransferTo` to set as high priority to select. return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost @@ -742,19 +738,13 @@ func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup, pre } // transferOneNodeFromRGToRG transfer one node from source resource group to target resource group. -func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup, preferNodeFilter func(nodeID int64) bool) (int64, error) { +func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) { if sourceRG.NodeNum() == 0 { return -1, ErrNodeNotEnough } - // prefer to select node which match the preferNodeFilter - nodes := sourceRG.GetNodesByFilter(preferNodeFilter) - if len(nodes) == 0 { - nodes = sourceRG.GetNodes() - } - // TODO: select node by some load strategy, such as segment loaded. - node := nodes[0] + node := sourceRG.GetNodes()[0] if err := rm.transferNode(targetRG.GetName(), node); err != nil { return -1, err } @@ -798,12 +788,12 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) { if nodeInfo == nil { return "", merr.WrapErrNodeNotAvailable(node) } - preferRGFilter := func(rg *ResourceGroup) bool { - return rg.PreferAcceptNode(nodeInfo) + rgFilter := func(rg *ResourceGroup) bool { + return rg.AcceptNode(nodeInfo) } // select a resource group to assign incoming node. - rg = rm.mustSelectAssignIncomingNodeTargetRG(preferRGFilter) + rg = rm.mustSelectAssignIncomingNodeTargetRG(rgFilter) if err := rm.transferNode(rg.GetName(), node); err != nil { return "", errors.Wrap(err, "at finally assign to default resource group") } @@ -811,17 +801,13 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) { } // mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node. -func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(preferRGFilter func(rg *ResourceGroup) bool) *ResourceGroup { +func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(rgFilter func(rg *ResourceGroup) bool) *ResourceGroup { // First, Assign it to rg with the most missing nodes at high priority. if rg := rm.findMaxRGWithGivenFilter( func(rg *ResourceGroup) bool { - return rg.MissingNumOfNodes() > 0 + return rg.MissingNumOfNodes() > 0 && rgFilter(rg) }, func(rg *ResourceGroup) int { - // give the rg which match preferRGFilter a boost to select. - if preferRGFilter(rg) { - return rg.MissingNumOfNodes() * preferNodeTransferBoost - } return rg.MissingNumOfNodes() }, ); rg != nil { @@ -831,13 +817,9 @@ func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(preferRGFilter f // Second, assign it to rg do not reach limit. if rg := rm.findMaxRGWithGivenFilter( func(rg *ResourceGroup) bool { - return rg.ReachLimitNumOfNodes() > 0 + return rg.ReachLimitNumOfNodes() > 0 && rgFilter(rg) }, func(rg *ResourceGroup) int { - // give the rg which match preferRGFilter a boost to select. - if preferRGFilter(rg) { - return rg.ReachLimitNumOfNodes() * preferNodeTransferBoost - } return rg.ReachLimitNumOfNodes() }, ); rg != nil { diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 122b077096ac6..8d168c657d62f 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -622,7 +622,7 @@ func (suite *ResourceManagerSuite) TestUnassignFail() { }) } -func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { +func (suite *ResourceManagerSuite) TestNodeLabels_NodeAssign() { suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{ Requests: &rgpb.ResourceGroupLimit{ NodeNum: 10, @@ -631,7 +631,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label1", @@ -648,7 +648,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label2", @@ -665,7 +665,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label3", @@ -687,9 +687,9 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { suite.manager.HandleNodeUp(int64(i)) } suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) - suite.Equal(0, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) // test new querynode with label2 for i := 31; i <= 40; i++ { @@ -705,22 +705,9 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { } suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) nodesInRG, _ := suite.manager.GetNodes("rg2") - for _, node := range nodesInRG { - suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) - } - - suite.manager.AutoRecoverResourceGroup("rg1") - suite.manager.AutoRecoverResourceGroup("rg2") - suite.manager.AutoRecoverResourceGroup("rg3") - suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) - suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) - nodesInRG, _ = suite.manager.GetNodes("rg2") for _, node := range nodesInRG { suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) } @@ -742,19 +729,6 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) nodesInRG, _ = suite.manager.GetNodes("rg3") - for _, node := range nodesInRG { - suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) - } - - suite.manager.AutoRecoverResourceGroup("rg1") - suite.manager.AutoRecoverResourceGroup("rg2") - suite.manager.AutoRecoverResourceGroup("rg3") - suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) - suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) - suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) - suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) - nodesInRG, _ = suite.manager.GetNodes("rg3") for _, node := range nodesInRG { suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) } @@ -769,7 +743,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label2", @@ -785,7 +759,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label3", @@ -801,7 +775,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label1", @@ -838,7 +812,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeAssign() { } } -func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { +func (suite *ResourceManagerSuite) TestNodeLabels_NodeDown() { suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{ Requests: &rgpb.ResourceGroupLimit{ NodeNum: 10, @@ -847,7 +821,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label1", @@ -864,7 +838,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label2", @@ -881,7 +855,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { NodeNum: 10, }, NodeFilter: &rgpb.ResourceGroupNodeFilter{ - PreferNodeLabels: []*commonpb.KeyValuePair{ + NodeLabels: []*commonpb.KeyValuePair{ { Key: "dc_name", Value: "label3", @@ -948,9 +922,10 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { }, })) suite.manager.HandleNodeUp(int64(101)) - suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum()) suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) // test node up with label1 suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ @@ -966,7 +941,7 @@ func (suite *ResourceManagerSuite) TestPreferNodeLabels_NodeDown() { suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) - nodesInRG, _ := suite.manager.GetNodes(DefaultResourceGroupName) + nodesInRG, _ := suite.manager.GetNodes("rg1") for _, node := range nodesInRG { suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) }