Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: add basic election logic for Local TSO Allocator #2894

Merged
merged 21 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration,
s.member, s.rootPath, s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand All @@ -360,7 +360,7 @@ func (s *Server) startServer(ctx context.Context) error {
}
s.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.GetClusterRootPath(), s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, s.GetClusterRaftPath(), s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster)

// Run callbacks
Expand Down Expand Up @@ -537,7 +537,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe
if err != nil {
return nil, errors.WithStack(err)
}
clusterRootPath := s.GetClusterRootPath()
clusterRootPath := s.GetClusterRaftPath()

var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(clusterRootPath, string(clusterValue)))
Expand Down Expand Up @@ -688,6 +688,11 @@ func (s *Server) GetAllocator() *id.AllocatorImpl {
return s.idAllocator
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return s.cfg.Name
Expand Down Expand Up @@ -943,6 +948,11 @@ func (s *Server) GetSecurityConfig() *grpcutil.SecurityConfig {

// GetClusterRootPath returns the cluster root path.
func (s *Server) GetClusterRootPath() string {
return s.rootPath
}

// GetClusterRaftPath returns the cluster raft path.
func (s *Server) GetClusterRaftPath() string {
return path.Join(s.rootPath, "raft")
}

Expand Down
162 changes: 142 additions & 20 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/election"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"github.com/tikv/pd/server/member"
"go.uber.org/zap"
)

// GlobalDCLocation is the Global TSO Allocator's dc-location label.
const GlobalDCLocation = "global"
const (
// GlobalDCLocation is the Global TSO Allocator's dc-location label.
GlobalDCLocation = "global"
leaderTickInterval = 50 * time.Millisecond
defaultAllocatorLeaderLease = 3
)

type allocatorGroup struct {
dcLocation string
Expand Down Expand Up @@ -63,21 +66,19 @@ type AllocatorManager struct {
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
// etcd and its client
etcd *embed.Etcd
client *clientv3.Client
// for election use
member *member.Member
// tso config
rootPath string
saveInterval time.Duration
maxResetTSGap func() time.Duration
}

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
func NewAllocatorManager(m *member.Member, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
etcd: etcd,
client: client,
member: m,
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
Expand All @@ -95,27 +96,125 @@ func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
var allocator Allocator
if dcLocation == GlobalDCLocation {
allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
} else {
allocator = NewLocalTSOAllocator(am.member, leadership, am.getAllocatorPath(dcLocation), dcLocation, am.saveInterval, am.maxResetTSGap)
}
am.Lock()
defer am.Unlock()
// Update or create a new allocatorGroup
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: allocator,
}
// Different kinds of allocators have different setup works to do
switch dcLocation {
// For Global TSO Allocator
case GlobalDCLocation:
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap),
}
// Because Global TSO Allocator only depends on PD leader's leadership,
// so we can directly initialize it here.
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
return err
}
am.allocatorGroups[dcLocation].isInitialized = true
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
// For Local TSO Allocator
default:
// Todo: set up a Local TSO Allocator
// Join in a Local TSO Allocator election
localTSOAllocator, _ := allocator.(*LocalTSOAllocator)
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// similar logic with leaderLoop in server/server.go
func (am *AllocatorManager) allocatorLeaderLoop(parentCtx context.Context, allocator *LocalTSOAllocator) {
for {
select {
case <-parentCtx.Done():
log.Info("server is closed, return local tso allocator leader loop",
zap.String("dc-location", allocator.dcLocation),
zap.String("local-tso-allocator-name", am.member.Member().Name))
return
default:
}

allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.dcLocation), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(parentCtx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.dcLocation))
}
am.campaignAllocatorLeader(parentCtx, allocator)
}
}
JmPotato marked this conversation as resolved.
Show resolved Hide resolved

func (am *AllocatorManager) campaignAllocatorLeader(parentCtx context.Context, allocator *LocalTSOAllocator) {
log.Info("start to campaign local tso allocator leader",
zap.String("dc-location", allocator.dcLocation),
zap.String("campaign-local-tso-allocator-leader-name", am.member.Member().Name))
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if err := allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease); err != nil {
log.Error("failed to campaign local tso allocator leader", errs.ZapError(err))
return
}

// Start keepalive the Local TSO Allocator leadership and enable Local TSO service.
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
// maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
zap.String("dc-location", allocator.dcLocation),
zap.String("campaign-local-tso-allocator-leader-name", am.member.Member().Name))

log.Info("initialize the local TSO allocator", zap.String("dc-location", allocator.dcLocation))
if err := allocator.Initialize(); err != nil {
log.Error("failed to initialize the local TSO allocator", errs.ZapError(err))
return
}
am.Lock()
am.allocatorGroups[allocator.dcLocation].isInitialized = true
am.Unlock()
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
allocator.EnableAllocatorLeader()
log.Info("local tso allocator leader is ready to serve",
zap.String("dc-location", allocator.dcLocation),
zap.String("campaign-local-tso-allocator-leader-name", am.member.Member().Name))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !allocator.IsStillAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
zap.String("dc-location", allocator.dcLocation))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator", zap.String("dc-location", allocator.dcLocation))
am.Lock()
allocator.Reset()
am.allocatorGroups[allocator.dcLocation].leadership.Reset()
am.allocatorGroups[allocator.dcLocation].isInitialized = false
am.Unlock()
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
Expand All @@ -127,6 +226,26 @@ func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
return allocatorGroup.allocator, nil
}

// GetAllocators get all allocators with some filters.
func (am *AllocatorManager) GetAllocators(withGlobal, withLocal, withInitialized bool) []Allocator {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
am.RLock()
defer am.RUnlock()
var allocators []Allocator
for dcLocation, allocatorGroup := range am.allocatorGroups {
if !withGlobal && dcLocation == GlobalDCLocation {
continue
}
if !withLocal && dcLocation != GlobalDCLocation {
continue
}
if withInitialized && !allocatorGroup.isInitialized {
continue
}
allocators = append(allocators, allocatorGroup.allocator)
}
return allocators
}

func (am *AllocatorManager) getAllocatorGroups() []*allocatorGroup {
am.RLock()
defer am.RUnlock()
Expand All @@ -149,8 +268,11 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
allocatorGroups := am.getAllocatorGroups()
// Update each allocator concurrently
for _, ag := range allocatorGroups {
am.RLock()
// Filter allocators without leadership and uninitialized
if ag.isInitialized && ag.leadership.Check() {
notFilterd := ag.isInitialized && ag.leadership.Check()
am.RUnlock()
if notFilterd {
am.wg.Add(1)
go am.updateAllocator(ag)
}
Expand Down Expand Up @@ -180,7 +302,7 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", zap.String("dc-location", ag.dcLocation), zap.Error(err))
log.Warn("failed to update allocator's timestamp", zap.String("dc-location", ag.dcLocation), errs.ZapError(err))
ag.parentCancel()
return
}
Expand Down
Loading