From 7cf414ce84c6f4711d50a97d997a4a59e625dcfe Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 7 Nov 2024 00:09:26 +0800 Subject: [PATCH] enhance: Enable node assign policy on resource group Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- internal/querycoordv2/meta/resource_group.go | 145 ++++++-- .../querycoordv2/meta/resource_group_test.go | 129 ++++++- .../querycoordv2/meta/resource_manager.go | 203 ++++++----- .../meta/resource_manager_test.go | 337 ++++++++++++++++++ .../observers/resource_observer_test.go | 6 +- internal/querycoordv2/server.go | 2 + internal/querycoordv2/session/node_manager.go | 5 + internal/util/sessionutil/session_util.go | 34 +- .../util/sessionutil/session_util_test.go | 15 + 11 files changed, 743 insertions(+), 139 deletions(-) diff --git a/go.mod b/go.mod index 766a359265af5..7d5fb276b6d82 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 98ffb403f2456..ac476e551b0e1 100644 --- a/go.sum +++ b/go.sum @@ -627,8 +627,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80 h1:Lt3W/8liX5YkzZFGkD0jhXPEIOgLj68n6eDQToxs43s= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/querycoordv2/meta/resource_group.go b/internal/querycoordv2/meta/resource_group.go index e9bbd87551652..4d7a5a43702c8 100644 --- a/internal/querycoordv2/meta/resource_group.go +++ b/internal/querycoordv2/meta/resource_group.go @@ -6,6 +6,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -30,23 +31,25 @@ func newResourceGroupConfig(request int32, limit int32) *rgpb.ResourceGroupConfi } type ResourceGroup struct { - name string - nodes typeutil.UniqueSet - cfg *rgpb.ResourceGroupConfig + name string + nodes typeutil.UniqueSet + cfg *rgpb.ResourceGroupConfig + nodeMgr *session.NodeManager } // NewResourceGroup create resource group. -func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup { +func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig, nodeMgr *session.NodeManager) *ResourceGroup { rg := &ResourceGroup{ - name: name, - nodes: typeutil.NewUniqueSet(), - cfg: cfg, + name: name, + nodes: typeutil.NewUniqueSet(), + cfg: cfg, + nodeMgr: nodeMgr, } return rg } // NewResourceGroupFromMeta create resource group from meta. -func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup { +func NewResourceGroupFromMeta(meta *querypb.ResourceGroup, nodeMgr *session.NodeManager) *ResourceGroup { // Backward compatibility, recover the config from capacity. if meta.Config == nil { // If meta.Config is nil, which means the meta is from old version. @@ -57,7 +60,7 @@ func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup { meta.Config = newResourceGroupConfig(meta.Capacity, meta.Capacity) } } - rg := NewResourceGroup(meta.Name, meta.Config) + rg := NewResourceGroup(meta.Name, meta.Config, nodeMgr) for _, node := range meta.GetNodes() { rg.nodes.Insert(node) } @@ -91,14 +94,27 @@ func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig { return proto.Clone(rg.cfg).(*rgpb.ResourceGroupConfig) } -// GetNodes return nodes of resource group. +// GetNodes return nodes of resource group which match required node labels func (rg *ResourceGroup) GetNodes() []int64 { - return rg.nodes.Collect() + requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels() + if len(requiredNodeLabels) == 0 { + return rg.nodes.Collect() + } + + ret := make([]int64, 0) + rg.nodes.Range(func(nodeID int64) bool { + if rg.AcceptNode(nodeID) { + ret = append(ret, nodeID) + } + return true + }) + + return ret } -// NodeNum return node count of resource group. +// NodeNum return node count of resource group which match required node labels func (rg *ResourceGroup) NodeNum() int { - return rg.nodes.Len() + return len(rg.GetNodes()) } // ContainNode return whether resource group contain node. @@ -106,40 +122,104 @@ func (rg *ResourceGroup) ContainNode(id int64) bool { return rg.nodes.Contain(id) } -// OversizedNumOfNodes return oversized nodes count. `len(node) - requests` +// OversizedNumOfNodes return oversized nodes count. `NodeNum - requests` func (rg *ResourceGroup) OversizedNumOfNodes() int { - oversized := rg.nodes.Len() - int(rg.cfg.Requests.NodeNum) + oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum) if oversized < 0 { - return 0 + oversized = 0 } - return oversized + return oversized + len(rg.getDirtyNode()) } -// MissingNumOfNodes return lack nodes count. `requests - len(node)` +// MissingNumOfNodes return lack nodes count. `requests - NodeNum` func (rg *ResourceGroup) MissingNumOfNodes() int { - missing := int(rg.cfg.Requests.NodeNum) - len(rg.nodes) + missing := int(rg.cfg.Requests.NodeNum) - rg.NodeNum() if missing < 0 { return 0 } return missing } -// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)` +// ReachLimitNumOfNodes return reach limit nodes count. `limits - NodeNum` func (rg *ResourceGroup) ReachLimitNumOfNodes() int { - reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes) + reachLimit := int(rg.cfg.Limits.NodeNum) - rg.NodeNum() if reachLimit < 0 { return 0 } return reachLimit } -// RedundantOfNodes return redundant nodes count. `len(node) - limits` +// RedundantOfNodes return redundant nodes count. `len(node) - limits` or len(dirty_nodes) func (rg *ResourceGroup) RedundantNumOfNodes() int { - redundant := len(rg.nodes) - int(rg.cfg.Limits.NodeNum) + redundant := rg.NodeNum() - int(rg.cfg.Limits.NodeNum) if redundant < 0 { - return 0 + redundant = 0 + } + return redundant + len(rg.getDirtyNode()) +} + +func (rg *ResourceGroup) getDirtyNode() []int64 { + dirtyNodes := make([]int64, 0) + rg.nodes.Range(func(nodeID int64) bool { + if !rg.AcceptNode(nodeID) { + dirtyNodes = append(dirtyNodes, nodeID) + } + return true + }) + + return dirtyNodes +} + +func (rg *ResourceGroup) SelectNodeForRG(targetRG *ResourceGroup) int64 { + // try to move out dirty node + for _, node := range rg.getDirtyNode() { + if targetRG.AcceptNode(node) { + return node + } + } + + // try to move out oversized node + oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum) + if oversized > 0 { + for _, node := range rg.GetNodes() { + if targetRG.AcceptNode(node) { + return node + } + } + } + + return -1 +} + +// return node and priority. +func (rg *ResourceGroup) AcceptNode(nodeID int64) bool { + if rg.GetName() == DefaultResourceGroupName { + return true + } + + nodeInfo := rg.nodeMgr.Get(nodeID) + if nodeInfo == nil { + return false } - return redundant + + requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels() + if len(requiredNodeLabels) == 0 { + return true + } + + nodeLabels := nodeInfo.Labels() + if len(nodeLabels) == 0 { + return false + } + + for _, labelPair := range requiredNodeLabels { + valueInNode, ok := nodeLabels[labelPair.Key] + if !ok || valueInNode != labelPair.Value { + return false + } + } + + return true } // HasFrom return whether given resource group is in `from` of rg. @@ -176,9 +256,10 @@ func (rg *ResourceGroup) GetMeta() *querypb.ResourceGroup { // Snapshot return a snapshot of resource group. func (rg *ResourceGroup) Snapshot() *ResourceGroup { return &ResourceGroup{ - name: rg.name, - nodes: rg.nodes.Clone(), - cfg: rg.GetConfigCloned(), + name: rg.name, + nodes: rg.nodes.Clone(), + cfg: rg.GetConfigCloned(), + nodeMgr: rg.nodeMgr, } } @@ -186,18 +267,18 @@ func (rg *ResourceGroup) Snapshot() *ResourceGroup { // Return error with reason if not meet requirement. func (rg *ResourceGroup) MeetRequirement() error { // if len(node) is less than requests, new node need to be assigned. - if rg.nodes.Len() < int(rg.cfg.Requests.NodeNum) { + if rg.MissingNumOfNodes() > 0 { return errors.Errorf( "has %d nodes, less than request %d", - rg.nodes.Len(), + rg.NodeNum(), rg.cfg.Requests.NodeNum, ) } // if len(node) is greater than limits, node need to be removed. - if rg.nodes.Len() > int(rg.cfg.Limits.NodeNum) { + if rg.RedundantNumOfNodes() > 0 { return errors.Errorf( "has %d nodes, greater than limit %d", - rg.nodes.Len(), + rg.NodeNum(), rg.cfg.Requests.NodeNum, ) } diff --git a/internal/querycoordv2/meta/resource_group_test.go b/internal/querycoordv2/meta/resource_group_test.go index 2e34ab16b5691..ea92e41c1a11c 100644 --- a/internal/querycoordv2/meta/resource_group_test.go +++ b/internal/querycoordv2/meta/resource_group_test.go @@ -5,8 +5,11 @@ import ( "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestResourceGroup(t *testing.T) { @@ -24,7 +27,10 @@ func TestResourceGroup(t *testing.T) { ResourceGroup: "rg3", }}, } - rg := NewResourceGroup("rg1", cfg) + + nodeMgr := session.NewNodeManager() + + rg := NewResourceGroup("rg1", cfg, nodeMgr) cfg2 := rg.GetConfig() assert.Equal(t, cfg.Requests.NodeNum, cfg2.Requests.NodeNum) @@ -84,6 +90,9 @@ func TestResourceGroup(t *testing.T) { } assertion() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + })) // Test AddNode mrg = rg.CopyForWrite() mrg.AssignNode(1) @@ -108,6 +117,9 @@ func TestResourceGroup(t *testing.T) { } assertion() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + })) // Test AddNode until meet requirement. mrg = rg.CopyForWrite() mrg.AssignNode(2) @@ -132,6 +144,12 @@ func TestResourceGroup(t *testing.T) { } assertion() + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + })) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 4, + })) // Test AddNode until exceed requirement. mrg = rg.CopyForWrite() mrg.AssignNode(3) @@ -202,12 +220,21 @@ func TestResourceGroup(t *testing.T) { } func TestResourceGroupMeta(t *testing.T) { + nodeMgr := session.NewNodeManager() + + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + })) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + })) + rgMeta := &querypb.ResourceGroup{ Name: "rg1", Capacity: 1, Nodes: []int64{1, 2}, } - rg := NewResourceGroupFromMeta(rgMeta) + rg := NewResourceGroupFromMeta(rgMeta, nodeMgr) assert.Equal(t, "rg1", rg.GetName()) assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes()) assert.Equal(t, 2, rg.NodeNum()) @@ -225,6 +252,9 @@ func TestResourceGroupMeta(t *testing.T) { assert.False(t, rg.ContainNode(4)) assert.Error(t, rg.MeetRequirement()) + nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 4, + })) rgMeta = &querypb.ResourceGroup{ Name: "rg1", Capacity: 1, @@ -244,7 +274,7 @@ func TestResourceGroupMeta(t *testing.T) { }}, }, } - rg = NewResourceGroupFromMeta(rgMeta) + rg = NewResourceGroupFromMeta(rgMeta, nodeMgr) assert.Equal(t, "rg1", rg.GetName()) assert.ElementsMatch(t, []int64{1, 2, 4}, rg.GetNodes()) assert.Equal(t, 3, rg.NodeNum()) @@ -271,7 +301,7 @@ func TestResourceGroupMeta(t *testing.T) { Capacity: defaultResourceGroupCapacity, Nodes: []int64{1, 2}, } - rg = NewResourceGroupFromMeta(rgMeta) + rg = NewResourceGroupFromMeta(rgMeta, nodeMgr) assert.Equal(t, DefaultResourceGroupName, rg.GetName()) assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes()) assert.Equal(t, 2, rg.NodeNum()) @@ -311,7 +341,7 @@ func TestResourceGroupMeta(t *testing.T) { }}, }, } - rg = NewResourceGroupFromMeta(rgMeta) + rg = NewResourceGroupFromMeta(rgMeta, nodeMgr) assert.Equal(t, DefaultResourceGroupName, rg.GetName()) assert.ElementsMatch(t, []int64{1, 2}, rg.GetNodes()) assert.Equal(t, 2, rg.NodeNum()) @@ -332,3 +362,92 @@ func TestResourceGroupMeta(t *testing.T) { newMeta = rg.GetMeta() assert.Equal(t, int32(1000000), newMeta.Capacity) } + +func TestRGNodeFilter(t *testing.T) { + nodeMgr := session.NewNodeManager() + + rg := NewResourceGroup("rg1", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 3, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 3, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "dc1", + }, + }, + }, + }, nodeMgr) + + rg.nodes = typeutil.NewSet[int64](1, 2, 3) + + nodeInfo1 := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Labels: map[string]string{ + "dc_name": "dc1", + }, + }) + nodeInfo2 := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Labels: map[string]string{ + "dc_name": "dc1", + }, + }) + nodeInfo3 := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 3, + Labels: map[string]string{ + "dc_name": "dc2", + }, + }) + + nodeMgr.Add(nodeInfo1) + nodeMgr.Add(nodeInfo2) + nodeMgr.Add(nodeInfo3) + + assert.True(t, rg.AcceptNode(1)) + assert.True(t, rg.AcceptNode(2)) + assert.False(t, rg.AcceptNode(3)) + assert.Error(t, rg.MeetRequirement()) + assert.Equal(t, rg.NodeNum(), 2) + assert.Len(t, rg.GetNodes(), 2) + + rg2 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "dc2", + }, + }, + }, + }, nodeMgr) + assert.Equal(t, rg.SelectNodeForRG(rg2), int64(3)) + + rg3 := NewResourceGroup("rg2", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "dc1", + }, + }, + }, + }, nodeMgr) + assert.Equal(t, rg.SelectNodeForRG(rg3), int64(-1)) +} diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index 060f287bc1689..d9f72c074d627 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -60,7 +60,7 @@ type ResourceManager struct { func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager { groups := make(map[string]*ResourceGroup) // Always create a default resource group to keep compatibility. - groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity)) + groups[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupName, newResourceGroupConfig(0, defaultResourceGroupCapacity), nodeMgr) return &ResourceManager{ incomingNode: typeutil.NewUniqueSet(), groups: groups, @@ -89,7 +89,7 @@ func (rm *ResourceManager) Recover() error { for _, meta := range rgs { needUpgrade := meta.Config == nil - rg := NewResourceGroupFromMeta(meta) + rg := NewResourceGroupFromMeta(meta, rm.nodeMgr) rm.groups[rg.GetName()] = rg for _, node := range rg.GetNodes() { if _, ok := rm.nodeIDMap[node]; ok { @@ -145,7 +145,7 @@ func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGro return err } - rg := NewResourceGroup(rgName, cfg) + rg := NewResourceGroup(rgName, cfg, rm.nodeMgr) if err := rm.catalog.SaveResourceGroup(rg.GetMeta()); err != nil { log.Warn("failed to add resource group", zap.String("rgName", rgName), @@ -548,135 +548,158 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error { // recoverMissingNodeRG recover resource group by transfer node from other resource group. func (rm *ResourceManager) recoverMissingNodeRG(rgName string) error { for rm.groups[rgName].MissingNumOfNodes() > 0 { - rg := rm.groups[rgName] - sourceRG := rm.selectMissingRecoverSourceRG(rg) + targetRG := rm.groups[rgName] + node, sourceRG := rm.selectNodeForMissingRecover(targetRG) if sourceRG == nil { - log.Warn("fail to select source resource group", zap.String("rgName", rg.GetName())) + log.Warn("fail to select source resource group", zap.String("rgName", targetRG.GetName())) return ErrNodeNotEnough } - nodeID, err := rm.transferOneNodeFromRGToRG(sourceRG, rg) + + err := rm.transferNode(targetRG.GetName(), node) if err != nil { log.Warn("failed to recover missing node by transfer node from other resource group", zap.String("sourceRG", sourceRG.GetName()), - zap.String("targetRG", rg.GetName()), + zap.String("targetRG", targetRG.GetName()), + zap.Int64("nodeID", node), zap.Error(err)) return err } log.Info("recover missing node by transfer node from other resource group", zap.String("sourceRG", sourceRG.GetName()), - zap.String("targetRG", rg.GetName()), - zap.Int64("nodeID", nodeID), + zap.String("targetRG", targetRG.GetName()), + zap.Int64("nodeID", node), ) } return nil } -// selectMissingRecoverSourceRG select source resource group for recover missing resource group. -func (rm *ResourceManager) selectMissingRecoverSourceRG(rg *ResourceGroup) *ResourceGroup { - // First, Transfer node from most redundant resource group first. `len(nodes) > limits` - if redundantRG := rm.findMaxRGWithGivenFilter( - func(sourceRG *ResourceGroup) bool { - return rg.GetName() != sourceRG.GetName() && sourceRG.RedundantNumOfNodes() > 0 - }, - func(sourceRG *ResourceGroup) int { - return sourceRG.RedundantNumOfNodes() - }, - ); redundantRG != nil { - return redundantRG +// selectNodeForMissingRecover selects a node for missing recovery. +// It takes a target ResourceGroup and returns the selected node's ID and the source ResourceGroup with highest priority. +func (rm *ResourceManager) selectNodeForMissingRecover(targetRG *ResourceGroup) (int64, *ResourceGroup) { + computeRGPriority := func(rg *ResourceGroup) int { + // If the ResourceGroup has redundant nodes, boost it's priority its priority 1000,000. + if rg.RedundantNumOfNodes() > 0 { + return rg.RedundantNumOfNodes() * 1000000 + } + // If the target ResourceGroup has a 'from' relationship with the current ResourceGroup, + // boost it's priority its priority 100,000. + if targetRG.HasFrom(rg.GetName()) { + return rg.OversizedNumOfNodes() * 100000 + } + return rg.OversizedNumOfNodes() } - // Second, Transfer node from most oversized resource group. `len(nodes) > requests` - // `TransferFrom` configured resource group at high priority. - return rm.findMaxRGWithGivenFilter( - func(sourceRG *ResourceGroup) bool { - return rg.GetName() != sourceRG.GetName() && sourceRG.OversizedNumOfNodes() > 0 - }, - func(sourceRG *ResourceGroup) int { - if rg.HasFrom(sourceRG.GetName()) { - // give a boost if sourceRG is configured as `TransferFrom` to set as high priority to select. - return sourceRG.OversizedNumOfNodes() * resourceGroupTransferBoost + maxPriority := 0 + var sourceRG *ResourceGroup + candidateNode := int64(-1) + + for _, rg := range rm.groups { + if rg.GetName() == targetRG.GetName() { + continue + } + if rg.OversizedNumOfNodes() <= 0 { + continue + } + + priority := computeRGPriority(rg) + if priority > maxPriority { + // Select a node from the current resource group that is preferred to be removed and assigned to the target resource group. + node := rg.SelectNodeForRG(targetRG) + // If no such node is found, skip the current resource group. + if node == -1 { + continue } - return sourceRG.OversizedNumOfNodes() - }) + + sourceRG = rg + candidateNode = node + maxPriority = priority + } + } + + return candidateNode, sourceRG } // recoverRedundantNodeRG recover resource group by transfer node to other resource group. func (rm *ResourceManager) recoverRedundantNodeRG(rgName string) error { for rm.groups[rgName].RedundantNumOfNodes() > 0 { - rg := rm.groups[rgName] - targetRG := rm.selectRedundantRecoverTargetRG(rg) - if targetRG == nil { + sourceRG := rm.groups[rgName] + node, targetRG := rm.selectNodeForRedundantRecover(sourceRG) + if node == -1 { log.Info("failed to select redundant recover target resource group, please check resource group configuration if as expected.", - zap.String("rgName", rg.GetName())) + zap.String("rgName", sourceRG.GetName())) return errors.New("all resource group reach limits") } - nodeID, err := rm.transferOneNodeFromRGToRG(rg, targetRG) - if err != nil { + if err := rm.transferNode(targetRG.GetName(), node); err != nil { log.Warn("failed to recover redundant node by transfer node to other resource group", - zap.String("sourceRG", rg.GetName()), + zap.String("sourceRG", sourceRG.GetName()), zap.String("targetRG", targetRG.GetName()), + zap.Int64("nodeID", node), zap.Error(err)) return err } log.Info("recover redundant node by transfer node to other resource group", - zap.String("sourceRG", rg.GetName()), + zap.String("sourceRG", sourceRG.GetName()), zap.String("targetRG", targetRG.GetName()), - zap.Int64("nodeID", nodeID), + zap.Int64("nodeID", node), ) } return nil } -// selectRedundantRecoverTargetRG select target resource group for recover redundant resource group. -func (rm *ResourceManager) selectRedundantRecoverTargetRG(rg *ResourceGroup) *ResourceGroup { - // First, Transfer node to most missing resource group first. - if missingRG := rm.findMaxRGWithGivenFilter( - func(targetRG *ResourceGroup) bool { - return rg.GetName() != targetRG.GetName() && targetRG.MissingNumOfNodes() > 0 - }, - func(targetRG *ResourceGroup) int { - return targetRG.MissingNumOfNodes() - }, - ); missingRG != nil { - return missingRG +// selectNodeForRedundantRecover selects a node for redundant recovery. +// It takes a source ResourceGroup and returns the selected node's ID and the target ResourceGroup with highest priority. +func (rm *ResourceManager) selectNodeForRedundantRecover(sourceRG *ResourceGroup) (int64, *ResourceGroup) { + // computeRGPriority calculates the priority of a ResourceGroup based on certain conditions. + computeRGPriority := func(rg *ResourceGroup) int { + // If the ResourceGroup is missing nodes, boost it's priority by 1,000,000. + if rg.MissingNumOfNodes() > 0 { + return rg.MissingNumOfNodes() * 1000000 + } + // If the source ResourceGroup has a 'to' relationship with the current ResourceGroup, + // boost it's priority by 1,000,00. + if sourceRG.HasTo(rg.GetName()) { + return rg.ReachLimitNumOfNodes() * 100000 + } else { + return rg.ReachLimitNumOfNodes() + } } - // Second, Transfer node to max reachLimit resource group. - // `TransferTo` configured resource group at high priority. - if selectRG := rm.findMaxRGWithGivenFilter( - func(targetRG *ResourceGroup) bool { - return rg.GetName() != targetRG.GetName() && targetRG.ReachLimitNumOfNodes() > 0 - }, - func(targetRG *ResourceGroup) int { - if rg.HasTo(targetRG.GetName()) { - // give a boost if targetRG is configured as `TransferTo` to set as high priority to select. - return targetRG.ReachLimitNumOfNodes() * resourceGroupTransferBoost - } - return targetRG.ReachLimitNumOfNodes() - }, - ); selectRG != nil { - return selectRG - } + maxPriority := 0 + var targetRG *ResourceGroup + candidateNode := int64(-1) + for _, rg := range rm.groups { + if rg.GetName() == sourceRG.GetName() { + continue + } - // Finally, Always transfer node to default resource group. - if rg.GetName() != DefaultResourceGroupName { - return rm.groups[DefaultResourceGroupName] - } - return nil -} + if rg.ReachLimitNumOfNodes() <= 0 { + continue + } -// transferOneNodeFromRGToRG transfer one node from source resource group to target resource group. -func (rm *ResourceManager) transferOneNodeFromRGToRG(sourceRG *ResourceGroup, targetRG *ResourceGroup) (int64, error) { - if sourceRG.NodeNum() == 0 { - return -1, ErrNodeNotEnough + // Calculate the priority of the current resource group. + priority := computeRGPriority(rg) + if priority > maxPriority { + // select a node from it that is preferred to be removed and assigned to the target resource group. + node := sourceRG.SelectNodeForRG(rg) + // If no such node is found, skip the current resource group. + if node == -1 { + continue + } + candidateNode = node + targetRG = rg + maxPriority = priority + } } - // TODO: select node by some load strategy, such as segment loaded. - node := sourceRG.GetNodes()[0] - if err := rm.transferNode(targetRG.GetName(), node); err != nil { - return -1, err + + // Finally, always transfer the node to the default resource group if no other target resource group is found. + if targetRG == nil && sourceRG.GetName() != DefaultResourceGroupName { + targetRG = rm.groups[DefaultResourceGroupName] + if sourceRG != nil { + candidateNode = sourceRG.SelectNodeForRG(targetRG) + } } - return node, nil + return candidateNode, targetRG } // assignIncomingNodeWithNodeCheck assign node to resource group with node status check. @@ -713,7 +736,7 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) { } // select a resource group to assign incoming node. - rg = rm.mustSelectAssignIncomingNodeTargetRG() + rg = rm.mustSelectAssignIncomingNodeTargetRG(node) if err := rm.transferNode(rg.GetName(), node); err != nil { return "", errors.Wrap(err, "at finally assign to default resource group") } @@ -721,11 +744,11 @@ func (rm *ResourceManager) assignIncomingNode(node int64) (string, error) { } // mustSelectAssignIncomingNodeTargetRG select resource group for assign incoming node. -func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup { +func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG(nodeID int64) *ResourceGroup { // First, Assign it to rg with the most missing nodes at high priority. if rg := rm.findMaxRGWithGivenFilter( func(rg *ResourceGroup) bool { - return rg.MissingNumOfNodes() > 0 + return rg.MissingNumOfNodes() > 0 && rg.AcceptNode(nodeID) }, func(rg *ResourceGroup) int { return rg.MissingNumOfNodes() @@ -737,7 +760,7 @@ func (rm *ResourceManager) mustSelectAssignIncomingNodeTargetRG() *ResourceGroup // Second, assign it to rg do not reach limit. if rg := rm.findMaxRGWithGivenFilter( func(rg *ResourceGroup) bool { - return rg.ReachLimitNumOfNodes() > 0 + return rg.ReachLimitNumOfNodes() > 0 && rg.AcceptNode(nodeID) }, func(rg *ResourceGroup) int { return rg.ReachLimitNumOfNodes() diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index ca58a8e899e23..1f6086c6552e4 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/kv/mocks" @@ -28,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -205,6 +207,7 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() { suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ "rg2": newResourceGroupConfig(0, 0), }) + log.Info("xxxxx") // RemoveResourceGroup will remove all nodes from the resource group. err = suite.manager.RemoveResourceGroup("rg2") suite.NoError(err) @@ -619,3 +622,337 @@ func (suite *ResourceManagerSuite) TestUnassignFail() { suite.manager.HandleNodeDown(1) }) } + +func (suite *ResourceManagerSuite) TestNodeLabels_NodeAssign() { + suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label1", + }, + }, + }, + }) + + suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label2", + }, + }, + }, + }) + + suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label3", + }, + }, + }, + }) + + // test that all query nodes has been marked label1 + for i := 1; i <= 30; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label1", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + + // test new querynode with label2 + for i := 31; i <= 40; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label2", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(0, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + nodesInRG, _ := suite.manager.GetNodes("rg2") + for _, node := range nodesInRG { + suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } + + // test new querynode with label3 + for i := 41; i <= 50; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label3", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + nodesInRG, _ = suite.manager.GetNodes("rg3") + for _, node := range nodesInRG { + suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } + + // test swap rg's label + suite.manager.UpdateResourceGroups(map[string]*rgpb.ResourceGroupConfig{ + "rg1": { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label2", + }, + }, + }, + }, + "rg2": { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label3", + }, + }, + }, + }, + "rg3": { + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label1", + }, + }, + }, + }, + }) + + log.Info("test swap rg's label") + for i := 0; i < 4; i++ { + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup("rg3") + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + } + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(20, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + nodesInRG, _ = suite.manager.GetNodes("rg1") + for _, node := range nodesInRG { + suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } + + nodesInRG, _ = suite.manager.GetNodes("rg2") + for _, node := range nodesInRG { + suite.Equal("label3", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } + + nodesInRG, _ = suite.manager.GetNodes("rg3") + for _, node := range nodesInRG { + suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } +} + +func (suite *ResourceManagerSuite) TestNodeLabels_NodeDown() { + suite.manager.AddResourceGroup("rg1", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label1", + }, + }, + }, + }) + + suite.manager.AddResourceGroup("rg2", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label2", + }, + }, + }, + }) + + suite.manager.AddResourceGroup("rg3", &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 10, + }, + NodeFilter: &rgpb.ResourceGroupNodeFilter{ + NodeLabels: []*commonpb.KeyValuePair{ + { + Key: "dc_name", + Value: "label3", + }, + }, + }, + }) + + // test that all query nodes has been marked label1 + for i := 1; i <= 10; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label1", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + + // test new querynode with label2 + for i := 31; i <= 40; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label2", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + // test new querynode with label3 + for i := 41; i <= 50; i++ { + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(i), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label3", + }, + })) + suite.manager.HandleNodeUp(int64(i)) + } + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + + // test node down with label1 + suite.manager.HandleNodeDown(int64(1)) + suite.manager.nodeMgr.Remove(int64(1)) + suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + + // test node up with label2 + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(101), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label2", + }, + })) + suite.manager.HandleNodeUp(int64(101)) + suite.Equal(9, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + + // test node up with label1 + suite.manager.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: int64(102), + Address: "localhost", + Hostname: "localhost", + Labels: map[string]string{ + "dc_name": "label1", + }, + })) + suite.manager.HandleNodeUp(int64(102)) + suite.Equal(10, suite.manager.GetResourceGroup("rg1").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg2").NodeNum()) + suite.Equal(10, suite.manager.GetResourceGroup("rg3").NodeNum()) + suite.Equal(1, suite.manager.GetResourceGroup(DefaultResourceGroupName).NodeNum()) + nodesInRG, _ := suite.manager.GetNodes("rg1") + for _, node := range nodesInRG { + suite.Equal("label1", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } + + suite.manager.AutoRecoverResourceGroup("rg1") + suite.manager.AutoRecoverResourceGroup("rg2") + suite.manager.AutoRecoverResourceGroup("rg3") + suite.manager.AutoRecoverResourceGroup(DefaultResourceGroupName) + nodesInRG, _ = suite.manager.GetNodes(DefaultResourceGroupName) + for _, node := range nodesInRG { + suite.Equal("label2", suite.manager.nodeMgr.Get(node).Labels()["dc_name"]) + } +} diff --git a/internal/querycoordv2/observers/resource_observer_test.go b/internal/querycoordv2/observers/resource_observer_test.go index 07a5c41511247..9079d89e846ce 100644 --- a/internal/querycoordv2/observers/resource_observer_test.go +++ b/internal/querycoordv2/observers/resource_observer_test.go @@ -130,12 +130,8 @@ func (suite *ResourceObserverSuite) TestObserverRecoverOperation() { suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1")) suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2")) suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3")) - // but new node with id 10 is not in - suite.nodeMgr.Remove(10) - suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1")) - suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg2")) - suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg3")) // new node is down, rg3 cannot use that node anymore. + suite.nodeMgr.Remove(10) suite.meta.ResourceManager.HandleNodeDown(10) suite.observer.checkAndRecoverResourceGroup() suite.NoError(suite.meta.ResourceManager.MeetRequirement("rg1")) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 960e8697db648..23ee460ff8ec2 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -474,6 +474,7 @@ func (s *Server) startQueryCoord() error { Address: node.Address, Hostname: node.HostName, Version: node.Version, + Labels: node.GetServerLabel(), })) s.taskScheduler.AddExecutor(node.ServerID) @@ -712,6 +713,7 @@ func (s *Server) watchNodes(revision int64) { Address: addr, Hostname: event.Session.HostName, Version: event.Session.Version, + Labels: event.Session.GetServerLabel(), })) s.nodeUpEventChan <- nodeID select { diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index 0341544d02b89..fafd16d6b3b43 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -111,6 +111,7 @@ type ImmutableNodeInfo struct { Address string Hostname string Version semver.Version + Labels map[string]string } const ( @@ -147,6 +148,10 @@ func (n *NodeInfo) Hostname() string { return n.immutableInfo.Hostname } +func (n *NodeInfo) Labels() map[string]string { + return n.immutableInfo.Labels +} + func (n *NodeInfo) SegmentCnt() int { n.mu.RLock() defer n.mu.RUnlock() diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 828ea2d2cb425..f7326174fa92e 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -48,7 +48,8 @@ const ( // DefaultServiceRoot default root path used in kv by Session DefaultServiceRoot = "session/" // DefaultIDKey default id key for Session - DefaultIDKey = "id" + DefaultIDKey = "id" + SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" ) // SessionEventType session event type @@ -100,8 +101,9 @@ type SessionRaw struct { IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"` LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"` - HostName string `json:"HostName,omitempty"` - EnableDisk bool `json:"EnableDisk,omitempty"` + HostName string `json:"HostName,omitempty"` + EnableDisk bool `json:"EnableDisk,omitempty"` + ServerLabels map[string]string `json:"ServerLabels,omitempty"` } func (s *SessionRaw) GetAddress() string { @@ -112,6 +114,10 @@ func (s *SessionRaw) GetServerID() int64 { return s.ServerID } +func (s *SessionRaw) GetServerLabel() map[string]string { + return s.ServerLabels +} + func (s *SessionRaw) IsTriggerKill() bool { return s.TriggerKill } @@ -286,7 +292,8 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b panic(err) } s.ServerID = serverID - log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID)) + s.ServerLabels = GetServerLabelsFromEnv(serverName) + log.Info("start server", zap.String("name", serverName), zap.String("address", address), zap.Int64("id", s.ServerID), zap.Any("server_labels", s.ServerLabels)) } // String makes Session struct able to be logged by zap @@ -329,6 +336,25 @@ func (s *Session) getServerID() (int64, error) { return nodeID, nil } +func GetServerLabelsFromEnv(role string) map[string]string { + ret := make(map[string]string) + switch role { + case "querynode": + for _, value := range os.Environ() { + rs := []rune(value) + in := strings.Index(value, "=") + key := string(rs[0:in]) + value := string(rs[in+1:]) + + if strings.HasPrefix(key, SupportedLabelPrefix) { + label := strings.TrimPrefix(key, SupportedLabelPrefix) + ret[label] = value + } + } + } + return ret +} + func (s *Session) checkIDExist() { s.etcdCli.Txn(s.ctx).If( clientv3.Compare( diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index d58ae5552a072..e379c959fcf68 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -1064,6 +1064,21 @@ func (s *SessionSuite) TestSafeCloseLiveCh() { }) } +func (s *SessionSuite) TestGetSessions() { + os.Setenv("MILVUS_SERVER_LABEL_key1", "value1") + os.Setenv("MILVUS_SERVER_LABEL_key2", "value2") + os.Setenv("key3", "value3") + + defer os.Unsetenv("MILVUS_SERVER_LABEL_key1") + defer os.Unsetenv("MILVUS_SERVER_LABEL_key2") + defer os.Unsetenv("key3") + + ret := GetServerLabelsFromEnv("querynode") + assert.Equal(s.T(), 2, len(ret)) + assert.Equal(s.T(), "value1", ret["key1"]) + assert.Equal(s.T(), "value2", ret["key2"]) +} + func TestSessionSuite(t *testing.T) { suite.Run(t, new(SessionSuite)) }