Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: add basic election logic for Local TSO Allocator #2894

Merged
merged 21 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration,
s.member, s.rootPath, s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand Down Expand Up @@ -688,6 +688,11 @@ func (s *Server) GetAllocator() *id.AllocatorImpl {
return s.idAllocator
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return s.cfg.Name
Expand Down Expand Up @@ -941,6 +946,11 @@ func (s *Server) GetSecurityConfig() *grpcutil.SecurityConfig {
return &s.cfg.Security
}

// GetServerRootPath returns the server root path.
func (s *Server) GetServerRootPath() string {
return s.rootPath
}

// GetClusterRootPath returns the cluster root path.
func (s *Server) GetClusterRootPath() string {
return path.Join(s.rootPath, "raft")
Expand Down
227 changes: 179 additions & 48 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/election"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"github.com/tikv/pd/server/member"
"go.uber.org/zap"
)

// GlobalDCLocation is the Global TSO Allocator's dc-location label.
const GlobalDCLocation = "global"
const (
// GlobalDCLocation is the Global TSO Allocator's dc-location label.
GlobalDCLocation = "global"
leaderTickInterval = 50 * time.Millisecond
defaultAllocatorLeaderLease = 3
)

// AllocatorGroupFilter is used to select AllocatorGroup.
type AllocatorGroupFilter func(ag *allocatorGroup) bool

type allocatorGroup struct {
dcLocation string
Expand Down Expand Up @@ -63,78 +70,152 @@ type AllocatorManager struct {
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
// etcd and its client
etcd *embed.Etcd
client *clientv3.Client
// for election use
member *member.Member
// tso config
rootPath string
saveInterval time.Duration
maxResetTSGap func() time.Duration
}

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
func NewAllocatorManager(m *member.Member, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
etcd: etcd,
client: client,
member: m,
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
}
return allocatorManager
}

func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
// For backward compatibility, the global timestamp's store path will still use the old one
if dcLocation == GlobalDCLocation {
return am.rootPath
}
return path.Join(am.rootPath, dcLocation)
}

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
var allocator Allocator
if dcLocation == GlobalDCLocation {
allocator = NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap)
nolouch marked this conversation as resolved.
Show resolved Hide resolved
} else {
allocator = NewLocalTSOAllocator(am.member, leadership, am.getAllocatorPath(dcLocation), dcLocation, am.saveInterval, am.maxResetTSGap)
}
am.Lock()
defer am.Unlock()
// Update or create a new allocatorGroup
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: allocator,
}
// Different kinds of allocators have different setup works to do
switch dcLocation {
// For Global TSO Allocator
case GlobalDCLocation:
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap),
}
// Because Global TSO Allocator only depends on PD leader's leadership,
// so we can directly initialize it here.
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
return err
}
am.allocatorGroups[dcLocation].isInitialized = true
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
// For Local TSO Allocator
default:
// Todo: set up a Local TSO Allocator
// Join in a Local TSO Allocator election
localTSOAllocator, _ := allocator.(*LocalTSOAllocator)
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation))
func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
// For backward compatibility, the global timestamp's store path will still use the old one
if dcLocation == GlobalDCLocation {
return am.rootPath
}
return allocatorGroup.allocator, nil
return path.Join(am.rootPath, dcLocation)
}

func (am *AllocatorManager) getAllocatorGroups() []*allocatorGroup {
am.RLock()
defer am.RUnlock()
allocatorGroups := make([]*allocatorGroup, 0, len(am.allocatorGroups))
for _, ag := range am.allocatorGroups {
allocatorGroups = append(allocatorGroups, ag)
// similar logic with leaderLoop in server/server.go
func (am *AllocatorManager) allocatorLeaderLoop(parentCtx context.Context, allocator *LocalTSOAllocator) {
for {
select {
case <-parentCtx.Done():
log.Info("server is closed, return local tso allocator leader loop",
zap.String("dc-location", allocator.dcLocation),
zap.String("local-tso-allocator-name", am.member.Member().Name))
return
default:
}

allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.dcLocation), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(parentCtx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.dcLocation))
}
am.campaignAllocatorLeader(parentCtx, allocator)
}
}
JmPotato marked this conversation as resolved.
Show resolved Hide resolved

func (am *AllocatorManager) campaignAllocatorLeader(parentCtx context.Context, allocator *LocalTSOAllocator) {
log.Info("start to campaign local tso allocator leader",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))
if err := allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease); err != nil {
log.Error("failed to campaign local tso allocator leader", errs.ZapError(err))
return
}

// Start keepalive the Local TSO Allocator leadership and enable Local TSO service.
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
// maintain the Local TSO Allocator leader
go allocator.KeepAllocatorLeader(ctx)
log.Info("campaign local tso allocator leader ok",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))

log.Info("initialize the local TSO allocator",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))
if err := allocator.Initialize(); err != nil {
log.Error("failed to initialize the local TSO allocator", errs.ZapError(err))
return
}
am.setIsInitialized(allocator.dcLocation, true)
allocator.EnableAllocatorLeader()
log.Info("local tso allocator leader is ready to serve",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !allocator.IsStillAllocatorLeader() {
log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed, reset the local tso allocator",
zap.String("dc-location", allocator.dcLocation),
zap.String("name", am.member.Member().Name))
am.resetAllocatorGroup(allocator.dcLocation)
return
}
}
return allocatorGroups
}

// AllocatorDaemon is used to update every allocator's TSO.
Expand All @@ -145,15 +226,12 @@ func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
for {
select {
case <-tsTicker.C:
// Collect all dc-locations first
allocatorGroups := am.getAllocatorGroups()
// Filter out allocators without leadership and uninitialized
allocatorGroups := am.getAllocatorGroups(FilterUninitialized(), FilterUnavailableLeadership())
// Update each allocator concurrently
for _, ag := range allocatorGroups {
// Filter allocators without leadership and uninitialized
if ag.isInitialized && ag.leadership.Check() {
am.wg.Add(1)
go am.updateAllocator(ag)
}
am.wg.Add(1)
go am.updateAllocator(ag)
}
am.wg.Wait()
case <-serverCtx.Done():
Expand Down Expand Up @@ -197,3 +275,56 @@ func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (p
}
return allocatorGroup.allocator.GenerateTSO(count)
}

func (am *AllocatorManager) setIsInitialized(dcLocation string, isInitialized bool) {
am.Lock()
defer am.Unlock()
if allocatorGroup, exist := am.allocatorGroups[dcLocation]; exist {
allocatorGroup.isInitialized = isInitialized
}
}

func (am *AllocatorManager) resetAllocatorGroup(dcLocation string) {
am.Lock()
defer am.Unlock()
if allocatorGroup, exist := am.allocatorGroups[dcLocation]; exist {
allocatorGroup.allocator.Reset()
allocatorGroup.leadership.Reset()
allocatorGroup.isInitialized = false
}
}

func (am *AllocatorManager) getAllocatorGroups(filters ...AllocatorGroupFilter) []*allocatorGroup {
am.RLock()
defer am.RUnlock()
allocatorGroups := make([]*allocatorGroup, 0)
for _, ag := range am.allocatorGroups {
if ag == nil {
continue
}
if slice.NoneOf(filters, func(i int) bool { return filters[i](ag) }) {
allocatorGroups = append(allocatorGroups, ag)
}
}
return allocatorGroups
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation))
}
return allocatorGroup.allocator, nil
}

// GetAllocators get all allocators with some filters.
func (am *AllocatorManager) GetAllocators(filters ...AllocatorGroupFilter) []Allocator {
var allocators []Allocator
for _, ag := range am.getAllocatorGroups(filters...) {
allocators = append(allocators, ag.allocator)
}
return allocators
}
29 changes: 29 additions & 0 deletions server/tso/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

// FilterDCLocation will filter out the allocatorGroup with a given dcLocation.
func FilterDCLocation(dcLocation string) func(ag *allocatorGroup) bool {
return func(ag *allocatorGroup) bool { return ag.dcLocation == dcLocation }
}

// FilterUninitialized will filter out the allocatorGroup uninitialized.
func FilterUninitialized() func(ag *allocatorGroup) bool {
return func(ag *allocatorGroup) bool { return !ag.isInitialized }
}

// FilterUnavailableLeadership will filter out the allocatorGroup whose leadership is unavailable.
func FilterUnavailableLeadership() func(ag *allocatorGroup) bool {
return func(ag *allocatorGroup) bool { return !ag.leadership.Check() }
}
Loading