Skip to content

Commit

Permalink
enhance: Enable node assign policy on resource group
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Nov 6, 2024
1 parent 8714774 commit 7cf414c
Show file tree
Hide file tree
Showing 11 changed files with 743 additions and 139 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7 h1:HwAitQk+V59QdYUwwVVYHTujd4QZrebg2Cc2hmcjhAg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241025031121-4d5c88b00cf7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80 h1:Lt3W/8liX5YkzZFGkD0jhXPEIOgLj68n6eDQToxs43s=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241104030616-2810a94b3c80/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
145 changes: 113 additions & 32 deletions internal/querycoordv2/meta/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand All @@ -30,23 +31,25 @@ func newResourceGroupConfig(request int32, limit int32) *rgpb.ResourceGroupConfi
}

type ResourceGroup struct {
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
name string
nodes typeutil.UniqueSet
cfg *rgpb.ResourceGroupConfig
nodeMgr *session.NodeManager
}

// NewResourceGroup create resource group.
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup {
func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig, nodeMgr *session.NodeManager) *ResourceGroup {
rg := &ResourceGroup{
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
name: name,
nodes: typeutil.NewUniqueSet(),
cfg: cfg,
nodeMgr: nodeMgr,
}
return rg
}

// NewResourceGroupFromMeta create resource group from meta.
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
func NewResourceGroupFromMeta(meta *querypb.ResourceGroup, nodeMgr *session.NodeManager) *ResourceGroup {
// Backward compatibility, recover the config from capacity.
if meta.Config == nil {
// If meta.Config is nil, which means the meta is from old version.
Expand All @@ -57,7 +60,7 @@ func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup {
meta.Config = newResourceGroupConfig(meta.Capacity, meta.Capacity)
}
}
rg := NewResourceGroup(meta.Name, meta.Config)
rg := NewResourceGroup(meta.Name, meta.Config, nodeMgr)
for _, node := range meta.GetNodes() {
rg.nodes.Insert(node)
}
Expand Down Expand Up @@ -91,55 +94,132 @@ func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig {
return proto.Clone(rg.cfg).(*rgpb.ResourceGroupConfig)
}

// GetNodes return nodes of resource group.
// GetNodes return nodes of resource group which match required node labels
func (rg *ResourceGroup) GetNodes() []int64 {
return rg.nodes.Collect()
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return rg.nodes.Collect()
}

ret := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if rg.AcceptNode(nodeID) {
ret = append(ret, nodeID)
}
return true
})

return ret
}

// NodeNum return node count of resource group.
// NodeNum return node count of resource group which match required node labels
func (rg *ResourceGroup) NodeNum() int {
return rg.nodes.Len()
return len(rg.GetNodes())
}

// ContainNode return whether resource group contain node.
func (rg *ResourceGroup) ContainNode(id int64) bool {
return rg.nodes.Contain(id)
}

// OversizedNumOfNodes return oversized nodes count. `len(node) - requests`
// OversizedNumOfNodes return oversized nodes count. `NodeNum - requests`
func (rg *ResourceGroup) OversizedNumOfNodes() int {
oversized := rg.nodes.Len() - int(rg.cfg.Requests.NodeNum)
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized < 0 {
return 0
oversized = 0
}
return oversized
return oversized + len(rg.getDirtyNode())
}

// MissingNumOfNodes return lack nodes count. `requests - len(node)`
// MissingNumOfNodes return lack nodes count. `requests - NodeNum`
func (rg *ResourceGroup) MissingNumOfNodes() int {
missing := int(rg.cfg.Requests.NodeNum) - len(rg.nodes)
missing := int(rg.cfg.Requests.NodeNum) - rg.NodeNum()
if missing < 0 {
return 0
}
return missing
}

// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`
// ReachLimitNumOfNodes return reach limit nodes count. `limits - NodeNum`
func (rg *ResourceGroup) ReachLimitNumOfNodes() int {
reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes)
reachLimit := int(rg.cfg.Limits.NodeNum) - rg.NodeNum()
if reachLimit < 0 {
return 0
}
return reachLimit
}

// RedundantOfNodes return redundant nodes count. `len(node) - limits`
// RedundantOfNodes return redundant nodes count. `len(node) - limits` or len(dirty_nodes)
func (rg *ResourceGroup) RedundantNumOfNodes() int {
redundant := len(rg.nodes) - int(rg.cfg.Limits.NodeNum)
redundant := rg.NodeNum() - int(rg.cfg.Limits.NodeNum)
if redundant < 0 {
return 0
redundant = 0
}
return redundant + len(rg.getDirtyNode())
}

func (rg *ResourceGroup) getDirtyNode() []int64 {
dirtyNodes := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
if !rg.AcceptNode(nodeID) {
dirtyNodes = append(dirtyNodes, nodeID)
}
return true
})

return dirtyNodes
}

func (rg *ResourceGroup) SelectNodeForRG(targetRG *ResourceGroup) int64 {
// try to move out dirty node
for _, node := range rg.getDirtyNode() {
if targetRG.AcceptNode(node) {
return node
}
}

// try to move out oversized node
oversized := rg.NodeNum() - int(rg.cfg.Requests.NodeNum)
if oversized > 0 {
for _, node := range rg.GetNodes() {
if targetRG.AcceptNode(node) {
return node
}
}
}

return -1
}

// return node and priority.
func (rg *ResourceGroup) AcceptNode(nodeID int64) bool {
if rg.GetName() == DefaultResourceGroupName {
return true
}

nodeInfo := rg.nodeMgr.Get(nodeID)
if nodeInfo == nil {
return false
}
return redundant

requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return true
}

nodeLabels := nodeInfo.Labels()
if len(nodeLabels) == 0 {
return false
}

for _, labelPair := range requiredNodeLabels {
valueInNode, ok := nodeLabels[labelPair.Key]
if !ok || valueInNode != labelPair.Value {
return false
}
}

return true
}

// HasFrom return whether given resource group is in `from` of rg.
Expand Down Expand Up @@ -176,28 +256,29 @@ func (rg *ResourceGroup) GetMeta() *querypb.ResourceGroup {
// Snapshot return a snapshot of resource group.
func (rg *ResourceGroup) Snapshot() *ResourceGroup {
return &ResourceGroup{
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
name: rg.name,
nodes: rg.nodes.Clone(),
cfg: rg.GetConfigCloned(),
nodeMgr: rg.nodeMgr,
}
}

// MeetRequirement return whether resource group meet requirement.
// Return error with reason if not meet requirement.
func (rg *ResourceGroup) MeetRequirement() error {
// if len(node) is less than requests, new node need to be assigned.
if rg.nodes.Len() < int(rg.cfg.Requests.NodeNum) {
if rg.MissingNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, less than request %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}
// if len(node) is greater than limits, node need to be removed.
if rg.nodes.Len() > int(rg.cfg.Limits.NodeNum) {
if rg.RedundantNumOfNodes() > 0 {
return errors.Errorf(
"has %d nodes, greater than limit %d",
rg.nodes.Len(),
rg.NodeNum(),
rg.cfg.Requests.NodeNum,
)
}
Expand Down
Loading

0 comments on commit 7cf414c

Please sign in to comment.