Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: introduce the dc-location config #2925

Merged
merged 7 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ type Config struct {
LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"`
LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"`

// TsoSaveInterval is the interval to save timestamp.
TsoSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`
// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

// Local TSO service related configuration.
LocalTSO LocalTSOConfig `toml:"local-tso" json:"local-tso"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

Expand Down Expand Up @@ -492,7 +495,11 @@ func (c *Config) Adjust(meta *toml.MetaData) error {

adjustInt64(&c.LeaderLease, defaultLeaderLease)

adjustDuration(&c.TsoSaveInterval, time.Duration(defaultLeaderLease)*time.Second)
adjustDuration(&c.TSOSaveInterval, time.Duration(defaultLeaderLease)*time.Second)

if err := c.LocalTSO.Validate(); err != nil {
return err
}

if c.nextRetryDelay == 0 {
c.nextRetryDelay = defaultNextRetryDelay
Expand Down Expand Up @@ -1019,7 +1026,7 @@ func (c *ReplicationConfig) adjust(meta *configMetaData) error {
type PDServerConfig struct {
// UseRegionStorage enables the independent region storage.
UseRegionStorage bool `toml:"use-region-storage" json:"use-region-storage,string"`
// MaxResetTSGap is the max gap to reset the tso.
// MaxResetTSGap is the max gap to reset the TSO.
MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"`
// KeyType is option to specify the type of keys.
// There are some types supported: ["table", "raw", "txn"], default: "table"
Expand Down Expand Up @@ -1320,3 +1327,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configMetaData) {
c.WaitSyncTimeout = typeutil.Duration{Duration: defaultDRWaitSyncTimeout}
}
}

// GlobalDCLocation is the Global TSO Allocator's dc-location label.
const GlobalDCLocation = "global"

// LocalTSOConfig is the configuration for Local TSO service.
type LocalTSOConfig struct {
// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the dc-location configuration for
// each PD server.
EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"`
// DCLocation indicates that which data center a PD server is in. According to it,
// the PD cluster can elect a TSO allocator to generate local TSO for
// DC-level transactions. It shouldn't be the same with GlobalDCLocation.
DCLocation string `toml:"dc-location" json:"dc-location"`
}

// Validate is used to validate if some TSO configurations are right.
func (c *LocalTSOConfig) Validate() error {
if c.DCLocation == GlobalDCLocation {
errMsg := fmt.Sprintf("dc-location %s is the PD reserved label to represent the PD leader, please try another one.", GlobalDCLocation)
return errors.New(errMsg)
}
return nil
}
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, count)
ts, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1)
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, 1)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/tikv/pd/server/schedule/storelimit"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/tso"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -826,7 +825,7 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(config.GlobalDCLocation)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TsoSaveInterval.Duration,
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (s *Server) campaignLeader() {
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Info("setting up the global TSO allocator")
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, tso.GlobalDCLocation, s.member.GetLeadership()); err != nil {
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, config.GlobalDCLocation, s.member.GetLeadership()); err != nil {
log.Error("failed to set up the global TSO allocator", errs.ZapError(err))
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewTestSingleConfig(c *check.C) *config.Config {
InitialClusterState: embed.ClusterStateFlagNew,

LeaderLease: 1,
TsoSaveInterval: typeutil.NewDuration(200 * time.Millisecond),
TSOSaveInterval: typeutil.NewDuration(200 * time.Millisecond),
}

cfg.AdvertiseClientUrls = cfg.ClientUrls
Expand Down
9 changes: 4 additions & 5 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/election"
"github.com/tikv/pd/server/member"
"go.uber.org/zap"
)

const (
// GlobalDCLocation is the Global TSO Allocator's dc-location label.
GlobalDCLocation = "global"
leaderTickInterval = 50 * time.Millisecond
defaultAllocatorLeaderLease = 3
)
Expand Down Expand Up @@ -91,7 +90,7 @@ func NewAllocatorManager(m *member.Member, rootPath string, saveInterval time.Du
// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
var allocator Allocator
if dcLocation == GlobalDCLocation {
if dcLocation == config.GlobalDCLocation {
allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap)
} else {
allocator = NewLocalTSOAllocator(am.member, leadership, am.getAllocatorPath(dcLocation), dcLocation, am.saveInterval, am.maxResetTSGap)
Expand All @@ -109,7 +108,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCanc
// Different kinds of allocators have different setup works to do
switch dcLocation {
// For Global TSO Allocator
case GlobalDCLocation:
case config.GlobalDCLocation:
// Because Global TSO Allocator only depends on PD leader's leadership,
// so we can directly initialize it here.
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
Expand All @@ -126,7 +125,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCanc

func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
// For backward compatibility, the global timestamp's store path will still use the old one
if dcLocation == GlobalDCLocation {
if dcLocation == config.GlobalDCLocation {
return am.rootPath
}
return path.Join(am.rootPath, dcLocation)
Expand Down
2 changes: 1 addition & 1 deletion server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (t *timestampOracle) ResetUserTimestamp(leadership *election.Leadership, ts
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and
// we also need to save the next physical time plus `TsoSaveInterval` into etcd.
// we also need to save the next physical time plus `TSOSaveInterval` into etcd.
//
// Here is some constraints that this function must satisfy:
// 1. The saved time is monotonically increasing.
Expand Down
5 changes: 3 additions & 2 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
. "github.com/pingcap/check"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/election"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
allAllocatorLeaders := make([]tso.Allocator, 0, dcLocationNum)
for _, server := range cluster.GetServers() {
// Filter out Global TSO Allocator and uninitialized Local TSO Allocator
allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation), tso.FilterUninitialized())
allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(config.GlobalDCLocation), tso.FilterUninitialized())
// One PD server will have at most three initialized Local TSO Allocators,
// which also means three allocator leaders
c.Assert(len(allocators), LessEqual, dcLocationNum)
Expand All @@ -103,7 +104,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) {
}
for _, server := range cluster.GetServers() {
// Filter out Global TSO Allocator
allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation))
allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(config.GlobalDCLocation))
c.Assert(len(allocators), Equals, dcLocationNum)
for _, allocator := range allocators {
allocatorFollower, _ := allocator.(*tso.LocalTSOAllocator)
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-simulator/simulator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
defaultStoreVersion = "2.1.0"
// server
defaultLeaderLease = 1
defaultTsoSaveInterval = 200 * time.Millisecond
defaultTSOSaveInterval = 200 * time.Millisecond
defaultTickInterval = 100 * time.Millisecond
defaultElectionInterval = 3 * time.Second
defaultLeaderPriorityCheckInterval = 100 * time.Millisecond
Expand Down Expand Up @@ -88,7 +88,7 @@ func (sc *SimConfig) Adjust() error {
adjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond)
adjustString(&sc.StoreVersion, defaultStoreVersion)
adjustInt64(&sc.ServerConfig.LeaderLease, defaultLeaderLease)
adjustDuration(&sc.ServerConfig.TsoSaveInterval, defaultTsoSaveInterval)
adjustDuration(&sc.ServerConfig.TSOSaveInterval, defaultTSOSaveInterval)
adjustDuration(&sc.ServerConfig.TickInterval, defaultTickInterval)
adjustDuration(&sc.ServerConfig.ElectionInterval, defaultElectionInterval)
adjustDuration(&sc.ServerConfig.LeaderPriorityCheckInterval, defaultLeaderPriorityCheckInterval)
Expand Down