Skip to content

Commit

Permalink
simplify the auto recover
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Oct 21, 2024
1 parent 258c763 commit cc5ad70
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 181 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ queryNode:
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
workerPooling:
size: 10 # the size for worker querynode client pool
labelAwareQueryNodeBalance: QUERYNODE_LOCATION # the labels of querynode which can be used for node assignment in resource group
ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address
port: 21123 # TCP port of queryNode
grpc:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,4 @@ replace (

exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b

replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19
replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,8 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19 h1:Rsk8HBYftRrmeG+55tdfRipjc9V6UtyN2kcX4mBzcbc=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021030130-6ef46f943d19/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b h1:WPI6209CYRkuQNDlF8K/MC5lfKq741WlPFgCUn+rGHs=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20241021072851-f06e7148817b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
57 changes: 23 additions & 34 deletions internal/querycoordv2/meta/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,6 @@ func (rg *ResourceGroup) MissingNumOfNodes() int {
return missing
}

func (rg *ResourceGroup) MissingNumOfPreferNodes(nodeMgr *session.NodeManager) int {
if len(rg.GetConfig().GetNodeFilter().GetPreferNodeLabels()) == 0 {
return rg.MissingNumOfNodes()
}

preferNodesNum := rg.GetNodesByFilter(func(nodeID int64) bool {
if nodeMgr.Get(nodeID) == nil {
return false
}
return rg.PreferAcceptNode(nodeMgr.Get(nodeID))
})

missing := int(rg.cfg.Requests.NodeNum) - len(preferNodesNum)
if missing < 0 {
return 0
}
return missing
}

// ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`
func (rg *ResourceGroup) ReachLimitNumOfNodes() int {
reachLimit := int(rg.cfg.Limits.NodeNum) - len(rg.nodes)
Expand All @@ -163,6 +144,20 @@ func (rg *ResourceGroup) RedundantNumOfNodes() int {
return redundant
}

func (rg *ResourceGroup) DirtyNodes(nodeMgr *session.NodeManager) []int64 {
// filter out nodes that do not meet the node filter
dirtyNodes := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
nodeInfo := nodeMgr.Get(nodeID)
if nodeInfo == nil || !rg.AcceptNode(nodeInfo) {
dirtyNodes = append(dirtyNodes, nodeID)
}
return true
})

return dirtyNodes
}

func (rg *ResourceGroup) GetNodesByFilter(f func(nodeID int64) bool) []int64 {
ret := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
Expand All @@ -175,19 +170,22 @@ func (rg *ResourceGroup) GetNodesByFilter(f func(nodeID int64) bool) []int64 {
return ret
}

func (rg *ResourceGroup) PreferAcceptNode(nodeInfo *session.NodeInfo) bool {
func (rg *ResourceGroup) AcceptNode(nodeInfo *session.NodeInfo) bool {
if nodeInfo == nil {
return false
}

rgPreferLabels := rg.GetConfig().GetNodeFilter().GetPreferNodeLabels()
nodeLabels := nodeInfo.Labels()
requiredNodeLabels := rg.GetConfig().GetNodeFilter().GetNodeLabels()
if len(requiredNodeLabels) == 0 {
return true
}

if len(nodeLabels) == 0 || len(rgPreferLabels) == 0 {
nodeLabels := nodeInfo.Labels()
if len(nodeLabels) == 0 {
return false
}

for _, labelPair := range rgPreferLabels {
for _, labelPair := range requiredNodeLabels {
valueInNode, ok := nodeLabels[labelPair.Key]
if !ok || valueInNode != labelPair.Value {
return false
Expand Down Expand Up @@ -259,16 +257,7 @@ func (rg *ResourceGroup) MeetRequirement(nodeMgr *session.NodeManager) error {

// check rg node filter
if rg.cfg.NodeFilter != nil {
// filter out nodes that do not meet the node filter
dirtyNodes := make([]int64, 0)
rg.nodes.Range(func(nodeID int64) bool {
nodeInfo := nodeMgr.Get(nodeID)
if nodeInfo == nil || !rg.PreferAcceptNode(nodeInfo) {
dirtyNodes = append(dirtyNodes, nodeID)
}
return true
})

dirtyNodes := rg.DirtyNodes(nodeMgr)
if len(dirtyNodes) > 0 {
return errors.Errorf(
"has dirty nodes[%v], not meet node filter",
Expand Down
10 changes: 5 additions & 5 deletions internal/querycoordv2/meta/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func TestRGNodeFilter(t *testing.T) {
NodeNum: 3,
},
NodeFilter: &rgpb.ResourceGroupNodeFilter{
PreferNodeLabels: []*commonpb.KeyValuePair{
NodeLabels: []*commonpb.KeyValuePair{
{
Key: "dc_name",
Value: "dc1",
Expand Down Expand Up @@ -383,17 +383,17 @@ func TestRGNodeFilter(t *testing.T) {
nodeMgr.Add(nodeInfo2)
nodeMgr.Add(nodeInfo3)

assert.True(t, rg.PreferAcceptNode(nodeInfo1))
assert.True(t, rg.PreferAcceptNode(nodeInfo2))
assert.False(t, rg.PreferAcceptNode(nodeInfo3))
assert.True(t, rg.AcceptNode(nodeInfo1))
assert.True(t, rg.AcceptNode(nodeInfo2))
assert.False(t, rg.AcceptNode(nodeInfo3))
assert.Error(t, rg.MeetRequirement(nodeMgr))

nodeFilter := func(nodeID int64) bool {
nodeInfo := nodeMgr.Get(nodeID)
if nodeInfo == nil {
return false
}
return rg.PreferAcceptNode(nodeInfo)
return rg.AcceptNode(nodeInfo)
}
assert.Len(t, rg.GetNodesByFilter(nodeFilter), 2)
}
Loading

0 comments on commit cc5ad70

Please sign in to comment.