Skip to content

Commit

Permalink
Merge branch 'master' into pd-ctl-keyspace
Browse files Browse the repository at this point in the history
  • Loading branch information
AmoebaProtozoa committed Oct 8, 2023
2 parents c42c80a + f3ed1a0 commit 869d08b
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 18 deletions.
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand All @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.String("key", string(kv.Key)))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strings"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// ruleStorage is an in-memory storage for Placement Rules,
Expand Down Expand Up @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand Down
35 changes: 20 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ func (c *RaftCluster) runStoreConfigSync() {
defer c.wg.Done()

var (
synced, switchRaftV2Config bool
stores = c.GetStores()
synced, switchRaftV2Config, needPersist bool
stores = c.GetStores()
)
// Start the ticker with a second-level timer to accelerate
// the bootstrap stage.
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
synced, switchRaftV2Config = c.syncStoreConfig(stores)
synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores)
if switchRaftV2Config {
if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
Expand All @@ -444,8 +444,11 @@ func (c *RaftCluster) runStoreConfigSync() {
// Update the stores if the synchronization is not completed.
if !synced {
stores = c.GetStores()
} else if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
if needPersist {
if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
}
select {
case <-c.ctx.Done():
Expand All @@ -459,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() {
// syncStoreConfig syncs the store config from TiKV.
// - `synced` is true if sync config from one tikv.
// - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2.
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) {
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) {
var err error
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
Expand All @@ -479,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
Expand All @@ -492,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
storeSyncConfigEvent.WithLabelValues(address, "succ").Inc()

return true, switchRaftV2
return true, switchRaftV2, needPersist
}
return false, false
return false, false, needPersist
}

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
return false, false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
return false, false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
return c.updateStoreConfig(oldCfg, cfg), true, nil
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) {
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil
switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2
return
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
Expand Down
10 changes: 7 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,10 @@ func TestSyncConfigContext(t *testing.T) {
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.False(switchRaftV2)
re.False(needPersist)
re.Less(time.Since(now), clientTimeout*2)
}

Expand All @@ -1450,15 +1452,17 @@ func TestStoreConfigSync(t *testing.T) {
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`))
// switchRaftV2 will be true.
synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.True(switchRaftV2)
re.True(needPersist)
re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize())
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
// switchRaftV2 will be false this time.
synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.False(switchRaftV2)
re.False(needPersist)
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(opt.Persist(tc.GetStorage()))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))
Expand Down

0 comments on commit 869d08b

Please sign in to comment.