diff --git a/server/server.go b/server/server.go index 164b2549fd8..38c8ceddadd 100644 --- a/server/server.go +++ b/server/server.go @@ -349,7 +349,7 @@ func (s *Server) startServer(ctx context.Context) error { s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue()) s.tsoAllocatorManager = tso.NewAllocatorManager( - s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration, + s.member, s.rootPath, s.cfg.TsoSaveInterval.Duration, func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }, ) kvBase := kv.NewEtcdKVBase(s.client, s.rootPath) @@ -688,6 +688,11 @@ func (s *Server) GetAllocator() *id.AllocatorImpl { return s.idAllocator } +// GetTSOAllocatorManager returns the manager of TSO Allocator. +func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { + return s.tsoAllocatorManager +} + // Name returns the unique etcd Name for this server in etcd cluster. func (s *Server) Name() string { return s.cfg.Name @@ -941,6 +946,11 @@ func (s *Server) GetSecurityConfig() *grpcutil.SecurityConfig { return &s.cfg.Security } +// GetServerRootPath returns the server root path. +func (s *Server) GetServerRootPath() string { + return s.rootPath +} + // GetClusterRootPath returns the cluster root path. func (s *Server) GetClusterRootPath() string { return path.Join(s.rootPath, "raft") diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 2249c503329..b658044349c 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -23,14 +23,21 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/election" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" + "github.com/tikv/pd/server/member" "go.uber.org/zap" ) -// GlobalDCLocation is the Global TSO Allocator's dc-location label. -const GlobalDCLocation = "global" +const ( + // GlobalDCLocation is the Global TSO Allocator's dc-location label. + GlobalDCLocation = "global" + leaderTickInterval = 50 * time.Millisecond + defaultAllocatorLeaderLease = 3 +) + +// AllocatorGroupFilter is used to select AllocatorGroup. +type AllocatorGroupFilter func(ag *allocatorGroup) bool type allocatorGroup struct { dcLocation string @@ -47,8 +54,6 @@ type allocatorGroup struct { // TSO for local transactions in its DC. leadership *election.Leadership allocator Allocator - // the flag indicates whether this allocator is initialized - isInitialized bool } // AllocatorManager is used to manage the TSO Allocators a PD server holds. @@ -63,9 +68,8 @@ type AllocatorManager struct { // 2. Local TSO Allocator, servers for DC-level transactions. // dc-location/global (string) -> TSO Allocator allocatorGroups map[string]*allocatorGroup - // etcd and its client - etcd *embed.Etcd - client *clientv3.Client + // for election use + member *member.Member // tso config rootPath string saveInterval time.Duration @@ -73,11 +77,10 @@ type AllocatorManager struct { } // NewAllocatorManager creates a new TSO Allocator Manager. -func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager { +func NewAllocatorManager(m *member.Member, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager { allocatorManager := &AllocatorManager{ allocatorGroups: make(map[string]*allocatorGroup), - etcd: etcd, - client: client, + member: m, rootPath: rootPath, saveInterval: saveInterval, maxResetTSGap: maxResetTSGap, @@ -85,56 +88,130 @@ func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath str return allocatorManager } -func (am *AllocatorManager) getAllocatorPath(dcLocation string) string { - // For backward compatibility, the global timestamp's store path will still use the old one - if dcLocation == GlobalDCLocation { - return am.rootPath - } - return path.Join(am.rootPath, dcLocation) -} - // SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon. func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error { + var allocator Allocator + if dcLocation == GlobalDCLocation { + allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap) + } else { + allocator = NewLocalTSOAllocator(am.member, leadership, am.getAllocatorPath(dcLocation), dcLocation, am.saveInterval, am.maxResetTSGap) + } am.Lock() defer am.Unlock() + // Update or create a new allocatorGroup + am.allocatorGroups[dcLocation] = &allocatorGroup{ + dcLocation: dcLocation, + parentCtx: parentCtx, + parentCancel: parentCancel, + leadership: leadership, + allocator: allocator, + } + // Different kinds of allocators have different setup works to do switch dcLocation { + // For Global TSO Allocator case GlobalDCLocation: - am.allocatorGroups[dcLocation] = &allocatorGroup{ - dcLocation: dcLocation, - parentCtx: parentCtx, - parentCancel: parentCancel, - leadership: leadership, - allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap), - } + // Because Global TSO Allocator only depends on PD leader's leadership, + // so we can directly initialize it here. if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil { return err } - am.allocatorGroups[dcLocation].isInitialized = true + // For Local TSO Allocator default: - // Todo: set up a Local TSO Allocator + // Join in a Local TSO Allocator election + localTSOAllocator, _ := allocator.(*LocalTSOAllocator) + go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } return nil } -// GetAllocator get the allocator by dc-location. -func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) { - am.RLock() - defer am.RUnlock() - allocatorGroup, exist := am.allocatorGroups[dcLocation] - if !exist { - return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation)) +func (am *AllocatorManager) getAllocatorPath(dcLocation string) string { + // For backward compatibility, the global timestamp's store path will still use the old one + if dcLocation == GlobalDCLocation { + return am.rootPath } - return allocatorGroup.allocator, nil + return path.Join(am.rootPath, dcLocation) } -func (am *AllocatorManager) getAllocatorGroups() []*allocatorGroup { - am.RLock() - defer am.RUnlock() - allocatorGroups := make([]*allocatorGroup, 0, len(am.allocatorGroups)) - for _, ag := range am.allocatorGroups { - allocatorGroups = append(allocatorGroups, ag) +// similar logic with leaderLoop in server/server.go +func (am *AllocatorManager) allocatorLeaderLoop(parentCtx context.Context, allocator *LocalTSOAllocator) { + for { + select { + case <-parentCtx.Done(): + log.Info("server is closed, return local tso allocator leader loop", + zap.String("dc-location", allocator.dcLocation), + zap.String("local-tso-allocator-name", am.member.Member().Name)) + return + default: + } + + allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader() + if checkAgain { + continue + } + if allocatorLeader != nil { + log.Info("start to watch allocator leader", + zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.dcLocation), allocatorLeader), + zap.String("local-tso-allocator-name", am.member.Member().Name)) + // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed. + allocator.WatchAllocatorLeader(parentCtx, allocatorLeader, rev) + log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader", + zap.String("dc-location", allocator.dcLocation)) + } + am.campaignAllocatorLeader(parentCtx, allocator) + } +} + +func (am *AllocatorManager) campaignAllocatorLeader(parentCtx context.Context, allocator *LocalTSOAllocator) { + log.Info("start to campaign local tso allocator leader", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + if err := allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease); err != nil { + log.Error("failed to campaign local tso allocator leader", errs.ZapError(err)) + return + } + + // Start keepalive the Local TSO Allocator leadership and enable Local TSO service. + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + defer am.resetAllocatorGroup(allocator.dcLocation) + // maintain the Local TSO Allocator leader + go allocator.KeepAllocatorLeader(ctx) + log.Info("campaign local tso allocator leader ok", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + + log.Info("initialize the local TSO allocator", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + if err := allocator.Initialize(); err != nil { + log.Error("failed to initialize the local TSO allocator", errs.ZapError(err)) + return + } + allocator.EnableAllocatorLeader() + log.Info("local tso allocator leader is ready to serve", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + + leaderTicker := time.NewTicker(leaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !allocator.IsStillAllocatorLeader() { + log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed, reset the local tso allocator", + zap.String("dc-location", allocator.dcLocation), + zap.String("name", am.member.Member().Name)) + return + } } - return allocatorGroups } // AllocatorDaemon is used to update every allocator's TSO. @@ -145,15 +222,12 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) { for { select { case <-tsTicker.C: - // Collect all dc-locations first - allocatorGroups := am.getAllocatorGroups() + // Filter out allocators without leadership and uninitialized + allocatorGroups := am.getAllocatorGroups(FilterUninitialized(), FilterUnavailableLeadership()) // Update each allocator concurrently for _, ag := range allocatorGroups { - // Filter allocators without leadership and uninitialized - if ag.isInitialized && ag.leadership.Check() { - am.wg.Add(1) - go am.updateAllocator(ag) - } + am.wg.Add(1) + go am.updateAllocator(ag) } am.wg.Wait() case <-serverCtx.Done(): @@ -167,8 +241,6 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { defer am.wg.Done() select { case <-ag.parentCtx.Done(): - // Need to initialize first before next use - ag.isInitialized = false // Resetting the allocator will clear TSO in memory ag.allocator.Reset() return @@ -197,3 +269,47 @@ func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (p } return allocatorGroup.allocator.GenerateTSO(count) } + +func (am *AllocatorManager) resetAllocatorGroup(dcLocation string) { + am.Lock() + defer am.Unlock() + if allocatorGroup, exist := am.allocatorGroups[dcLocation]; exist { + allocatorGroup.allocator.Reset() + allocatorGroup.leadership.Reset() + } +} + +func (am *AllocatorManager) getAllocatorGroups(filters ...AllocatorGroupFilter) []*allocatorGroup { + am.RLock() + defer am.RUnlock() + allocatorGroups := make([]*allocatorGroup, 0) + for _, ag := range am.allocatorGroups { + if ag == nil { + continue + } + if slice.NoneOf(filters, func(i int) bool { return filters[i](ag) }) { + allocatorGroups = append(allocatorGroups, ag) + } + } + return allocatorGroups +} + +// GetAllocator get the allocator by dc-location. +func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) { + am.RLock() + defer am.RUnlock() + allocatorGroup, exist := am.allocatorGroups[dcLocation] + if !exist { + return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation)) + } + return allocatorGroup.allocator, nil +} + +// GetAllocators get all allocators with some filters. +func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []Allocator { + var allocators []Allocator + for _, ag := range am.getAllocatorGroups(filters...) { + allocators = append(allocators, ag.allocator) + } + return allocators +} diff --git a/server/tso/filter.go b/server/tso/filter.go new file mode 100644 index 00000000000..14ec8907b64 --- /dev/null +++ b/server/tso/filter.go @@ -0,0 +1,29 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +// FilterDCLocation will filter out the allocatorGroup with a given dcLocation. +func FilterDCLocation(dcLocation string) func(ag *allocatorGroup) bool { + return func(ag *allocatorGroup) bool { return ag.dcLocation == dcLocation } +} + +// FilterUninitialized will filter out the allocatorGroup uninitialized. +func FilterUninitialized() func(ag *allocatorGroup) bool { + return func(ag *allocatorGroup) bool { return !ag.allocator.IsInitialize() } +} + +// FilterUnavailableLeadership will filter out the allocatorGroup whose leadership is unavailable. +func FilterUnavailableLeadership() func(ag *allocatorGroup) bool { + return func(ag *allocatorGroup) bool { return !ag.leadership.Check() } +} diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 8674d6cc735..fafcba242b1 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -32,9 +32,11 @@ type Allocator interface { // It will synchronize TSO with etcd and initialize the // memory for later allocation work. Initialize() error + // IsInitialize is used to indicates whether this allocator is initialized. + IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error - // SetTSO sets the physical part with given tso. It's mainly used for BR restore + // SetTSO sets the physical part with given TSO. It's mainly used for BR restore // and can not forcibly set the TSO smaller than now. SetTSO(tso uint64) error // GenerateTSO is used to generate a given number of TSOs. @@ -47,7 +49,7 @@ type Allocator interface { // GlobalTSOAllocator is the global single point TSO allocator. type GlobalTSOAllocator struct { // leadership is used to check the current PD server's leadership - // to determine whether a tso request could be processed. + // to determine whether a TSO request could be processed. leadership *election.Leadership timestampOracle *timestampOracle } @@ -70,12 +72,17 @@ func (gta *GlobalTSOAllocator) Initialize() error { return gta.timestampOracle.SyncTimestamp(gta.leadership) } +// IsInitialize is used to indicates whether this allocator is initialized. +func (gta *GlobalTSOAllocator) IsInitialize() bool { + return gta.timestampOracle.isInitialized() +} + // UpdateTSO is used to update the TSO in memory and the time window in etcd. func (gta *GlobalTSOAllocator) UpdateTSO() error { return gta.timestampOracle.UpdateTimestamp(gta.leadership) } -// SetTSO sets the physical part with given tso. +// SetTSO sets the physical part with given TSO. func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { return gta.timestampOracle.ResetUserTimestamp(gta.leadership, tso) } diff --git a/server/tso/local_allocator.go b/server/tso/local_allocator.go new file mode 100644 index 00000000000..af9fc80ef57 --- /dev/null +++ b/server/tso/local_allocator.go @@ -0,0 +1,188 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/server/election" + "github.com/tikv/pd/server/member" + "go.uber.org/zap" +) + +// LocalTSOAllocator is the DC-level local TSO allocator, +// which is only used to allocate TSO in one DC each. +// One PD server may hold multiple Local TSO Allocators. +type LocalTSOAllocator struct { + // leadership is used to campaign the corresponding DC's Local TSO Allocator. + leadership *election.Leadership + timestampOracle *timestampOracle + // for election use, notice that the leadership that member holds is + // the leadership for PD leader. Local TSO Allocator's leadership is for the + // election of Local TSO Allocator leader among several PD servers and + // Local TSO Allocator only use member's some etcd and pbpd.Member info. + // So it's not conflicted. + member *member.Member + rootPath string + dcLocation string + allocatorLeader atomic.Value // stored as *pdpb.Member +} + +// NewLocalTSOAllocator creates a new local TSO allocator. +func NewLocalTSOAllocator(member *member.Member, leadership *election.Leadership, rootPath, dcLocation string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator { + return &LocalTSOAllocator{ + leadership: leadership, + timestampOracle: ×tampOracle{ + client: leadership.GetClient(), + rootPath: rootPath, + saveInterval: saveInterval, + maxResetTSGap: maxResetTSGap, + }, + member: member, + rootPath: rootPath, + dcLocation: dcLocation, + } +} + +// Initialize will initialize the created local TSO allocator. +func (lta *LocalTSOAllocator) Initialize() error { + return lta.timestampOracle.SyncTimestamp(lta.leadership) +} + +// IsInitialize is used to indicates whether this allocator is initialized. +func (lta *LocalTSOAllocator) IsInitialize() bool { + return lta.timestampOracle.isInitialized() +} + +// UpdateTSO is used to update the TSO in memory and the time window in etcd +// for all local TSO allocators this PD server hold. +func (lta *LocalTSOAllocator) UpdateTSO() error { + return lta.timestampOracle.UpdateTimestamp(lta.leadership) +} + +// SetTSO sets the physical part with given TSO. +func (lta *LocalTSOAllocator) SetTSO(tso uint64) error { + return lta.timestampOracle.ResetUserTimestamp(lta.leadership, tso) +} + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) { + // Todo: update both Local and Global TSO generating logic + return pdpb.Timestamp{}, nil +} + +// Reset is used to reset the TSO allocator. +func (lta *LocalTSOAllocator) Reset() { + lta.timestampOracle.ResetTimestamp() +} + +// setAllocatorLeader sets the current Local TSO Allocator leader. +func (lta *LocalTSOAllocator) setAllocatorLeader(member *pdpb.Member) { + lta.allocatorLeader.Store(member) +} + +// unsetAllocatorLeader unsets the current Local TSO Allocator leader. +func (lta *LocalTSOAllocator) unsetAllocatorLeader() { + lta.allocatorLeader.Store(&pdpb.Member{}) +} + +// GetAllocatorLeader returns the Local TSO Allocator leader. +func (lta *LocalTSOAllocator) GetAllocatorLeader() *pdpb.Member { + allocatorLeader := lta.allocatorLeader.Load() + if allocatorLeader == nil { + return nil + } + return allocatorLeader.(*pdpb.Member) +} + +// GetMember returns the Local TSO Allocator's member value. +func (lta *LocalTSOAllocator) GetMember() *pdpb.Member { + return lta.member.Member() +} + +// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader. +func (lta *LocalTSOAllocator) EnableAllocatorLeader() { + lta.setAllocatorLeader(lta.member.Member()) +} + +// CampaignAllocatorLeader is used to campaign a Local TSO Allocator's leadership. +func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64) error { + return lta.leadership.Campaign(leaseTimeout, lta.member.MemberValue()) +} + +// KeepAllocatorLeader is used to keep the PD leader's leadership. +func (lta *LocalTSOAllocator) KeepAllocatorLeader(ctx context.Context) { + lta.leadership.Keep(ctx) +} + +// IsStillAllocatorLeader returns whether the allocator is still a +// Local TSO Allocator leader by checking its leadership's lease. +func (lta *LocalTSOAllocator) IsStillAllocatorLeader() bool { + return lta.leadership.Check() +} + +// isSameLeader checks whether a server is the leader itself. +func (lta *LocalTSOAllocator) isSameAllocatorLeader(leader *pdpb.Member) bool { + return leader.GetMemberId() == lta.member.Member().MemberId +} + +// CheckAllocatorLeader checks who is the current Local TSO Allocator leader, and returns true if it is needed to check later. +func (lta *LocalTSOAllocator) CheckAllocatorLeader() (*pdpb.Member, int64, bool) { + if lta.member.GetEtcdLeader() == 0 { + log.Error("no etcd leader, check local tso allocator leader later", + zap.String("dc-location", lta.dcLocation), errs.ZapError(errs.ErrEtcdLeaderNotFound)) + time.Sleep(200 * time.Millisecond) + return nil, 0, true + } + + allocatorLeader, rev, err := election.GetLeader(lta.leadership.GetClient(), lta.rootPath) + if err != nil { + log.Error("getting local tso allocator leader meets error", + zap.String("dc-location", lta.dcLocation), errs.ZapError(err)) + time.Sleep(200 * time.Millisecond) + return nil, 0, true + } + if allocatorLeader != nil { + if lta.isSameAllocatorLeader(allocatorLeader) { + // oh, we are already a Local TSO Allocator leader, which indicates we may meet something wrong + // in previous CampaignAllocatorLeader. We should delete the leadership and campaign again. + // In normal case, if a Local TSO Allocator become an allocator leader, it will keep looping + // in the campaignAllocatorLeader to maintain its leadership. However, the potential failure + // may occur after an allocator get the leadership and it will return from the campaignAllocatorLeader, + // which means the election and initialization are not completed fully. By this mean, we should + // re-campaign by deleting the current allocator leader. + log.Warn("the local tso allocator leader has not changed, delete and campaign again", + zap.String("dc-location", lta.dcLocation), zap.Stringer("old-pd-leader", allocatorLeader)) + if err = lta.leadership.DeleteLeader(); err != nil { + log.Error("deleting local tso allocator leader key meets error", errs.ZapError(err)) + time.Sleep(200 * time.Millisecond) + return nil, 0, true + } + } + } + return allocatorLeader, rev, false +} + +// WatchAllocatorLeader is used to watch the changes of the Local TSO Allocator leader. +func (lta *LocalTSOAllocator) WatchAllocatorLeader(serverCtx context.Context, allocatorLeader *pdpb.Member, revision int64) { + lta.setAllocatorLeader(allocatorLeader) + lta.leadership.Watch(serverCtx, revision) + lta.unsetAllocatorLeader() +} diff --git a/server/tso/tso.go b/server/tso/tso.go index e0129cf47ef..92494df79f8 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -47,14 +47,14 @@ type atomicObject struct { logical int64 } -// timestampOracle is used to maintain the logic of tso. +// timestampOracle is used to maintain the logic of TSO. type timestampOracle struct { client *clientv3.Client rootPath string // TODO: remove saveInterval saveInterval time.Duration maxResetTSGap func() time.Duration - // For tso, set after the PD becomes a leader. + // For TSO, set after the PD becomes a leader. TSO unsafe.Pointer lastSavedTime atomic.Value } @@ -136,7 +136,19 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { return nil } -// ResetUserTimestamp update the physical part with specified tso. +// isInitialized is used to check whether the timestampOracle is initialized. +// There are two situations we have an uninitialized timestampOracle: +// 1. When the SyncTimestamp has not been called yet. +// 2. When the ResetUserTimestamp has been called already. +func (t *timestampOracle) isInitialized() bool { + tsoNow := (*atomicObject)(atomic.LoadPointer(&t.TSO)) + if tsoNow == nil || tsoNow.physical == typeutil.ZeroTime { + return false + } + return true +} + +// ResetUserTimestamp update the physical part with specified TSO. func (t *timestampOracle) ResetUserTimestamp(leadership *election.Leadership, tso uint64) error { if !leadership.Check() { tsoCounter.WithLabelValues("err_lease_reset_ts").Inc() diff --git a/tests/cluster.go b/tests/cluster.go index 3a7a35a4bc4..1b314d6d3a9 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/id" "github.com/tikv/pd/server/join" + "github.com/tikv/pd/server/tso" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -342,6 +343,11 @@ func (s *TestServer) WaitLeader() bool { return false } +// GetTSOAllocatorManager returns the server's TSO Allocator Manager. +func (s *TestServer) GetTSOAllocatorManager() *tso.AllocatorManager { + return s.server.GetTSOAllocatorManager() +} + // TestCluster is only for test. type TestCluster struct { config *clusterConfig diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go new file mode 100644 index 00000000000..681a595ba06 --- /dev/null +++ b/tests/server/tso/allocator_test.go @@ -0,0 +1,118 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso_test + +import ( + "context" + "path" + "time" + + . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/election" + "github.com/tikv/pd/server/tso" + "github.com/tikv/pd/tests" +) + +const waitAllocatorCheckInterval = 1 * time.Second + +var _ = Suite(&testAllocatorSuite{}) + +type testAllocatorSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testAllocatorSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + server.EnableZap = true +} + +func (s *testAllocatorSuite) TearDownSuite(c *C) { + s.cancel() +} + +// Make sure we have the correct number of allocator leaders. +func (s *testAllocatorSuite) TestAllocatorLeader(c *C) { + var err error + cluster, err := tests.NewTestCluster(s.ctx, 6) + defer cluster.Destroy() + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + // There will be three Local TSO Allocator leaders elected + testDCLocations := []string{"dc-1", "dc-2", "dc-3"} + dcLocationNum := len(testDCLocations) + for _, dcLocation := range testDCLocations { + for _, server := range cluster.GetServers() { + tsoAllocatorManager := server.GetTSOAllocatorManager() + leadership := election.NewLeadership( + server.GetEtcdClient(), + path.Join(server.GetServer().GetServerRootPath(), dcLocation), + "campaign-local-allocator-test") + tsoAllocatorManager.SetUpAllocator(ctx, cancel, dcLocation, leadership) + + } + } + // Wait for a while to check + time.Sleep(waitAllocatorCheckInterval) + // To check whether we have enough Local TSO Allocator leaders + allAllocatorLeaders := make([]tso.Allocator, 0, dcLocationNum) + for _, server := range cluster.GetServers() { + // Filter out Global TSO Allocator and uninitialized Local TSO Allocator + allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation), tso.FilterUninitialized()) + // One PD server will have at most three initialized Local TSO Allocators, + // which also means three allocator leaders + c.Assert(len(allocators), LessEqual, dcLocationNum) + if len(allocators) == 0 { + continue + } + if len(allAllocatorLeaders) == 0 { + allAllocatorLeaders = append(allAllocatorLeaders, allocators...) + continue + } + for _, allocator := range allocators { + if slice.NoneOf(allAllocatorLeaders, func(i int) bool { return allAllocatorLeaders[i] == allocator }) { + allAllocatorLeaders = append(allAllocatorLeaders, allocator) + } + } + } + // At the end, we should have three initialized Local TSO Allocator, + // i.e., the Local TSO Allocator leaders for all dc-locations in testDCLocations + c.Assert(len(allAllocatorLeaders), Equals, dcLocationNum) + allocatorLeaderMemberIDs := make([]uint64, 0, dcLocationNum) + for _, allocator := range allAllocatorLeaders { + allocatorLeader, _ := allocator.(*tso.LocalTSOAllocator) + allocatorLeaderMemberIDs = append(allocatorLeaderMemberIDs, allocatorLeader.GetMember().GetMemberId()) + } + for _, server := range cluster.GetServers() { + // Filter out Global TSO Allocator + allocators := server.GetTSOAllocatorManager().GetAllocators(tso.FilterDCLocation(tso.GlobalDCLocation)) + c.Assert(len(allocators), Equals, dcLocationNum) + for _, allocator := range allocators { + allocatorFollower, _ := allocator.(*tso.LocalTSOAllocator) + allocatorFollowerMemberID := allocatorFollower.GetAllocatorLeader().GetMemberId() + c.Assert( + slice.AnyOf( + allocatorLeaderMemberIDs, + func(i int) bool { return allocatorLeaderMemberIDs[i] == allocatorFollowerMemberID }), + IsTrue) + } + } +}