Skip to content

Commit

Permalink
mcs: pick some updates and bug fixes (tikv#256)
Browse files Browse the repository at this point in the history
* etcdutil, mcs: fix the issue loading label rules is too slow (tikv#7718)

Signed-off-by: lhy1024 <[email protected]>

* ci: run `make check` with longer timeout (tikv#7271)

ref tikv#4399

Signed-off-by: lhy1024 <[email protected]>

* mcs: add a switch to dynamically enable scheduling service (tikv#7595)

ref tikv#5839

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* schedule: prevent suddenly scheduling (tikv#7714)

ref tikv#7671

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* makefile: update golangci (tikv#7556)

close tikv#7551

Signed-off-by: husharp <[email protected]>

* fix conflict

Signed-off-by: lhy1024 <[email protected]>

---------

Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: husharp <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: Hu# <[email protected]>
  • Loading branch information
4 people authored Jan 18, 2024
1 parent 19a7726 commit 5bf0c14
Show file tree
Hide file tree
Showing 23 changed files with 443 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ concurrency:
jobs:
statics:
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 20
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

install-tools:
@mkdir -p $(GO_TOOLS_BIN_PATH)
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.51.2
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.55.2
@grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install

.PHONY: install-tools
Expand Down
6 changes: 3 additions & 3 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func MakeRegionBound(id uint32) *RegionBound {
}
}

// makeKeyRanges encodes keyspace ID to correct LabelRule data.
func makeKeyRanges(id uint32, skipRaw bool) []interface{} {
// MakeKeyRanges encodes keyspace ID to correct LabelRule data.
func MakeKeyRanges(id uint32, skipRaw bool) []interface{} {
regionBound := MakeRegionBound(id)
if skipRaw {
return []interface{}{
Expand Down Expand Up @@ -215,7 +215,7 @@ func makeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule {
},
},
RuleType: labeler.KeyRange,
Data: makeKeyRanges(id, skipRaw),
Data: MakeKeyRanges(id, skipRaw),
}
}

Expand Down
25 changes: 18 additions & 7 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
// It will be locked until the postEventsFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
if err := rw.ruleManager.TryCommitPatchLocked(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
Expand All @@ -212,26 +212,37 @@ func (rw *Watcher) initializeRuleWatcher() error {

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
// TODO: use txn in region labeler.
preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postEventsFn is finished.
rw.regionLabeler.Lock()
return nil
}
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
log.Debug("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
return rw.regionLabeler.SetLabelRuleLocked(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
return rw.regionLabeler.DeleteLabelRuleLocked(strings.TrimPrefix(key, prefixToTrim))
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.regionLabeler.Unlock()
rw.regionLabeler.BuildRangeListLocked()
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
func([]*clientv3.Event) error { return nil },
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
postEventsFn,
true, /* withPrefix */
)
rw.labelWatcher.StartWatchLoop()
Expand Down
113 changes: 113 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rule

import (
"context"
"encoding/json"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

const (
clusterID = uint64(20240117)
rulesNum = 16384
)

func TestLoadLargeRules(t *testing.T) {
re := require.New(t)
ctx, client, clean := prepare(t)
defer clean()
runWatcherLoadLabelRule(ctx, re, client)
}

func BenchmarkLoadLargeRules(b *testing.B) {
re := require.New(b)
ctx, client, clean := prepare(b)
defer clean()

b.ResetTimer() // Resets the timer to ignore initialization time in the benchmark

for n := 0; n < b.N; n++ {
runWatcherLoadLabelRule(ctx, re, client)
}
}

func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client *clientv3.Client) {
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, time.Hour)
re.NoError(err)
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: client,
ruleStorage: storage,
regionLabeler: labelerManager,
}
err = rw.initializeRegionLabelWatcher()
re.NoError(err)
re.Len(labelerManager.GetAllLabelRules(), rulesNum)
cancel()
}

func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
cfg := etcdutil.NewTestSingleConfig()
cfg.Dir = os.TempDir() + "/test_etcd"
os.RemoveAll(cfg.Dir)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
client, err := etcdutil.CreateEtcdClient(nil, cfg.LCUrls)
re.NoError(err)
<-etcd.Server.ReadyNotify()

for i := 1; i < rulesNum+1; i++ {
rule := &labeler.LabelRule{
ID: "test_" + strconv.Itoa(i),
Labels: []labeler.RegionLabel{{Key: "test", Value: "test"}},
RuleType: labeler.KeyRange,
Data: keyspace.MakeKeyRanges(uint32(i), true),
}
value, err := json.Marshal(rule)
re.NoError(err)
key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
_, err = clientv3.NewKV(client).Put(ctx, key, string(value))
re.NoError(err)
}

return ctx, client, func() {
cancel()
client.Close()
etcd.Close()
os.RemoveAll(cfg.Dir)
}
}
33 changes: 25 additions & 8 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
}
}
if deleted {
l.buildRangeList()
l.BuildRangeListLocked()
}
}

Expand Down Expand Up @@ -128,11 +128,12 @@ func (l *RegionLabeler) loadRules() error {
return err
}
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

func (l *RegionLabeler) buildRangeList() {
// BuildRangeListLocked builds the range list.
func (l *RegionLabeler) BuildRangeListLocked() {
builder := rangelist.NewBuilder()
l.minExpire = nil
for _, rule := range l.labelRules {
Expand Down Expand Up @@ -206,31 +207,47 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule {

// SetLabelRule inserts or updates a LabelRule.
func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error {
l.Lock()
defer l.Unlock()
if err := l.SetLabelRuleLocked(rule); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// SetLabelRuleLocked inserts or updates a LabelRule but not buildRangeList.
func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error {
if err := rule.checkAndAdjust(); err != nil {
return err
}
l.Lock()
defer l.Unlock()
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
l.buildRangeList()
return nil
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if err := l.DeleteLabelRuleLocked(id); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList.
func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.storage.DeleteRegionRule(id); err != nil {
return err
}
delete(l.labelRules, id)
l.buildRangeList()
return nil
}

Expand Down Expand Up @@ -264,7 +281,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
for _, rule := range patch.SetRules {
l.labelRules[rule.ID] = rule
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (m *RuleManager) SetRule(rule *Rule) error {
defer m.Unlock()
p := m.BeginPatch()
p.SetRule(rule)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("placement rule updated", zap.String("rule", fmt.Sprint(rule)))
Expand All @@ -322,7 +322,7 @@ func (m *RuleManager) DeleteRule(group, id string) error {
defer m.Unlock()
p := m.BeginPatch()
p.DeleteRule(group, id)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("placement rule is removed", zap.String("group", group), zap.String("id", id))
Expand Down Expand Up @@ -467,8 +467,8 @@ func (m *RuleManager) BeginPatch() *RuleConfigPatch {
return m.ruleConfig.beginPatch()
}

// TryCommitPatch tries to commit a patch.
func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error {
// TryCommitPatchLocked tries to commit a patch.
func (m *RuleManager) TryCommitPatchLocked(patch *RuleConfigPatch) error {
patch.adjust()

ruleList, err := buildRuleList(patch)
Expand Down Expand Up @@ -533,7 +533,7 @@ func (m *RuleManager) SetRules(rules []*Rule) error {
}
p.SetRule(r)
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}

Expand Down Expand Up @@ -596,7 +596,7 @@ func (m *RuleManager) Batch(todo []RuleOp) error {
}
}

if err := m.TryCommitPatch(patch); err != nil {
if err := m.TryCommitPatchLocked(patch); err != nil {
return err
}

Expand Down Expand Up @@ -632,7 +632,7 @@ func (m *RuleManager) SetRuleGroup(group *RuleGroup) error {
defer m.Unlock()
p := m.BeginPatch()
p.SetGroup(group)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group config updated", zap.String("group", fmt.Sprint(group)))
Expand All @@ -645,7 +645,7 @@ func (m *RuleManager) DeleteRuleGroup(id string) error {
defer m.Unlock()
p := m.BeginPatch()
p.DeleteGroup(id)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group config reset", zap.String("group", id))
Expand Down Expand Up @@ -735,7 +735,7 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er
p.SetRule(r)
}
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("full config reset", zap.String("config", fmt.Sprint(groups)))
Expand Down Expand Up @@ -766,7 +766,7 @@ func (m *RuleManager) SetGroupBundle(group GroupBundle) error {
}
p.SetRule(r)
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group is reset", zap.String("group", fmt.Sprint(group)))
Expand Down Expand Up @@ -798,7 +798,7 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error {
p.DeleteGroup(g.ID)
}
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("groups are removed", zap.String("id", id), zap.Bool("regexp", regex))
Expand Down
Loading

0 comments on commit 5bf0c14

Please sign in to comment.