diff --git a/server/config/config.go b/server/config/config.go index 850b1722669..5dfe3c5f903 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -82,8 +82,11 @@ type Config struct { LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"` LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"` - // TsoSaveInterval is the interval to save timestamp. - TsoSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"` + // TSOSaveInterval is the interval to save timestamp. + TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"` + + // Local TSO service related configuration. + LocalTSO LocalTSOConfig `toml:"local-tso" json:"local-tso"` Metric metricutil.MetricConfig `toml:"metric" json:"metric"` @@ -492,7 +495,11 @@ func (c *Config) Adjust(meta *toml.MetaData) error { adjustInt64(&c.LeaderLease, defaultLeaderLease) - adjustDuration(&c.TsoSaveInterval, time.Duration(defaultLeaderLease)*time.Second) + adjustDuration(&c.TSOSaveInterval, time.Duration(defaultLeaderLease)*time.Second) + + if err := c.LocalTSO.Validate(); err != nil { + return err + } if c.nextRetryDelay == 0 { c.nextRetryDelay = defaultNextRetryDelay @@ -1019,7 +1026,7 @@ func (c *ReplicationConfig) adjust(meta *configMetaData) error { type PDServerConfig struct { // UseRegionStorage enables the independent region storage. UseRegionStorage bool `toml:"use-region-storage" json:"use-region-storage,string"` - // MaxResetTSGap is the max gap to reset the tso. + // MaxResetTSGap is the max gap to reset the TSO. MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"` // KeyType is option to specify the type of keys. // There are some types supported: ["table", "raw", "txn"], default: "table" @@ -1320,3 +1327,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configMetaData) { c.WaitSyncTimeout = typeutil.Duration{Duration: defaultDRWaitSyncTimeout} } } + +// GlobalDCLocation is the Global TSO Allocator's dc-location label. +const GlobalDCLocation = "global" + +// LocalTSOConfig is the configuration for Local TSO service. +type LocalTSOConfig struct { + // EnableLocalTSO is used to enable the Local TSO Allocator feature, + // which allows the PD server to generate local TSO for certain DC-level transactions. + // To make this feature meaningful, user has to set the dc-location configuration for + // each PD server. + EnableLocalTSO bool `toml:"enable-local-tso" json:"enable-local-tso"` + // DCLocation indicates that which data center a PD server is in. According to it, + // the PD cluster can elect a TSO allocator to generate local TSO for + // DC-level transactions. It shouldn't be the same with GlobalDCLocation. + DCLocation string `toml:"dc-location" json:"dc-location"` +} + +// Validate is used to validate if some TSO configurations are right. +func (c *LocalTSOConfig) Validate() error { + if c.DCLocation == GlobalDCLocation { + errMsg := fmt.Sprintf("dc-location %s is the PD reserved label to represent the PD leader, please try another one.", GlobalDCLocation) + return errors.New(errMsg) + } + return nil +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 919199f5685..3e1eb1dbc11 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -29,8 +29,8 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/server/cluster" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/tso" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -93,7 +93,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) } count := request.GetCount() - ts, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, count) + ts, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, count) if err != nil { return status.Errorf(codes.Unknown, err.Error()) } @@ -794,7 +794,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd } } - nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1) + nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(config.GlobalDCLocation, 1) if err != nil { return nil, err } diff --git a/server/handler.go b/server/handler.go index ed0b4a50564..cf727e57686 100644 --- a/server/handler.go +++ b/server/handler.go @@ -37,7 +37,6 @@ import ( "github.com/tikv/pd/server/schedule/storelimit" "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/statistics" - "github.com/tikv/pd/server/tso" "go.uber.org/zap" ) @@ -826,7 +825,7 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) { // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64) error { - tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) + tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(config.GlobalDCLocation) if err != nil { return err } diff --git a/server/server.go b/server/server.go index 38c8ceddadd..593d2225311 100644 --- a/server/server.go +++ b/server/server.go @@ -349,7 +349,7 @@ func (s *Server) startServer(ctx context.Context) error { s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue()) s.tsoAllocatorManager = tso.NewAllocatorManager( - s.member, s.rootPath, s.cfg.TsoSaveInterval.Duration, + s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }, ) kvBase := kv.NewEtcdKVBase(s.client, s.rootPath) @@ -1133,7 +1133,7 @@ func (s *Server) campaignLeader() { log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name())) log.Info("setting up the global TSO allocator") - if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, tso.GlobalDCLocation, s.member.GetLeadership()); err != nil { + if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, config.GlobalDCLocation, s.member.GetLeadership()); err != nil { log.Error("failed to set up the global TSO allocator", errs.ZapError(err)) return } diff --git a/server/testutil.go b/server/testutil.go index eab0690488a..545f409e86d 100644 --- a/server/testutil.go +++ b/server/testutil.go @@ -71,7 +71,7 @@ func NewTestSingleConfig(c *check.C) *config.Config { InitialClusterState: embed.ClusterStateFlagNew, LeaderLease: 1, - TsoSaveInterval: typeutil.NewDuration(200 * time.Millisecond), + TSOSaveInterval: typeutil.NewDuration(200 * time.Millisecond), } cfg.AdvertiseClientUrls = cfg.ClientUrls diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index b658044349c..854acf90427 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -24,14 +24,13 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/election" "github.com/tikv/pd/server/member" "go.uber.org/zap" ) const ( - // GlobalDCLocation is the Global TSO Allocator's dc-location label. - GlobalDCLocation = "global" leaderTickInterval = 50 * time.Millisecond defaultAllocatorLeaderLease = 3 ) @@ -91,7 +90,7 @@ func NewAllocatorManager(m *member.Member, rootPath string, saveInterval time.Du // SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon. func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error { var allocator Allocator - if dcLocation == GlobalDCLocation { + if dcLocation == config.GlobalDCLocation { allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap) } else { allocator = NewLocalTSOAllocator(am.member, leadership, am.getAllocatorPath(dcLocation), dcLocation, am.saveInterval, am.maxResetTSGap) @@ -109,7 +108,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCanc // Different kinds of allocators have different setup works to do switch dcLocation { // For Global TSO Allocator - case GlobalDCLocation: + case config.GlobalDCLocation: // Because Global TSO Allocator only depends on PD leader's leadership, // so we can directly initialize it here. if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil { @@ -126,7 +125,7 @@ func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCanc func (am *AllocatorManager) getAllocatorPath(dcLocation string) string { // For backward compatibility, the global timestamp's store path will still use the old one - if dcLocation == GlobalDCLocation { + if dcLocation == config.GlobalDCLocation { return am.rootPath } return path.Join(am.rootPath, dcLocation) diff --git a/server/tso/tso.go b/server/tso/tso.go index 92494df79f8..37b1558ec0e 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -187,7 +187,7 @@ func (t *timestampOracle) ResetUserTimestamp(leadership *election.Leadership, ts // 1. When the logical time is going to be used up, increase the current physical time. // 2. When the time window is not big enough, which means the saved etcd time minus the next physical time // will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and -// we also need to save the next physical time plus `TsoSaveInterval` into etcd. +// we also need to save the next physical time plus `TSOSaveInterval` into etcd. // // Here is some constraints that this function must satisfy: // 1. The saved time is monotonically increasing. diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 681a595ba06..254d5d64937 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -21,6 +21,7 @@ import ( . "github.com/pingcap/check" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/election" "github.com/tikv/pd/server/tso" "github.com/tikv/pd/tests" @@ -76,7 +77,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) { allAllocatorLeaders := make([]tso.Allocator, 0, dcLocationNum) for _, server := range cluster.GetServers() { // Filter out Global TSO Allocator and uninitialized Local TSO Allocator - allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation), tso.FilterUninitialized()) + allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(config.GlobalDCLocation), tso.FilterUninitialized()) // One PD server will have at most three initialized Local TSO Allocators, // which also means three allocator leaders c.Assert(len(allocators), LessEqual, dcLocationNum) @@ -103,7 +104,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) { } for _, server := range cluster.GetServers() { // Filter out Global TSO Allocator - allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation)) + allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(config.GlobalDCLocation)) c.Assert(len(allocators), Equals, dcLocationNum) for _, allocator := range allocators { allocatorFollower, _ := allocator.(*tso.LocalTSOAllocator) diff --git a/tools/pd-simulator/simulator/config.go b/tools/pd-simulator/simulator/config.go index 84ef4391393..c49727ad4ff 100644 --- a/tools/pd-simulator/simulator/config.go +++ b/tools/pd-simulator/simulator/config.go @@ -20,7 +20,7 @@ const ( defaultStoreVersion = "2.1.0" // server defaultLeaderLease = 1 - defaultTsoSaveInterval = 200 * time.Millisecond + defaultTSOSaveInterval = 200 * time.Millisecond defaultTickInterval = 100 * time.Millisecond defaultElectionInterval = 3 * time.Second defaultLeaderPriorityCheckInterval = 100 * time.Millisecond @@ -88,7 +88,7 @@ func (sc *SimConfig) Adjust() error { adjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond) adjustString(&sc.StoreVersion, defaultStoreVersion) adjustInt64(&sc.ServerConfig.LeaderLease, defaultLeaderLease) - adjustDuration(&sc.ServerConfig.TsoSaveInterval, defaultTsoSaveInterval) + adjustDuration(&sc.ServerConfig.TSOSaveInterval, defaultTSOSaveInterval) adjustDuration(&sc.ServerConfig.TickInterval, defaultTickInterval) adjustDuration(&sc.ServerConfig.ElectionInterval, defaultElectionInterval) adjustDuration(&sc.ServerConfig.LeaderPriorityCheckInterval, defaultLeaderPriorityCheckInterval)