Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: introduce the AllocatorManager #2859

Merged
merged 13 commits into from
Sep 2, 2020
2 changes: 2 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import "github.com/pingcap/errors"

// tso errors
var (
ErrSetAllocator = errors.Normalize("set allocator failed, %s", errors.RFCCodeText("PD:tso:ErrSetAllocator"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:tso:ErrIncorrectSystemTime"))
Expand Down
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocator.GenerateTSO(count)
ts, err := s.tsoAllocatorManager.HandleTSORequest("global", count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -776,7 +776,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

nowTSO, err := s.tsoAllocator.GenerateTSO(1)
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest("global", 1)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoAllocator := h.s.tsoAllocator
tsoAllocator, _ := h.s.tsoAllocatorManager.GetAllocator("global")
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if tsoAllocator == nil {
return ErrServerNotStarted
}
Expand Down
22 changes: 6 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Server struct {
// for basicCluster operation.
basicCluster *core.BasicCluster
// for tso.
tsoAllocator tso.Allocator
tsoAllocatorManager *tso.AllocatorManager
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
Expand Down Expand Up @@ -348,10 +348,8 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.GetLeadership(),
s.rootPath,
s.cfg.TsoSaveInterval.Duration,
s.tsoAllocatorManager = tso.NewAllocatorManager(
ctx, s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand Down Expand Up @@ -1112,12 +1110,11 @@ func (s *Server) campaignLeader() {
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
log.Info("setting up the global TSO allocator")
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, "global", s.member.GetLeadership()); err != nil {
log.Error("failed to set up the global TSO allocator", zap.Error(err))
return
}
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
Expand All @@ -1135,8 +1132,6 @@ func (s *Server) campaignLeader() {
CheckPDVersion(s.persistOptions)
log.Info("PD cluster leader is ready to serve", zap.String("pd-leader-name", s.Name()))

tsTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsTicker.Stop()
leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -1152,11 +1147,6 @@ func (s *Server) campaignLeader() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-tsTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
Expand Down
168 changes: 168 additions & 0 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/election"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
)

type allocatorGroup struct {
// allocator's ctx and cancel function, which is to
// control the allocator's behaviour in AllocatorDaemon
ctx context.Context
cancel context.CancelFunc
// For the Global TSO Allocator, leadership is a PD leader's
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
// the flag indicates whether this allocator is initialized
isInitialized bool
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
sync.RWMutex
// There are two kinds of TSO Allocators:
// 1. Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
// etcd and its client
etcd *embed.Etcd
client *clientv3.Client
// tso config
rootPath string
saveInterval time.Duration
maxResetTSGap func() time.Duration
}

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(serverCtx context.Context, etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
etcd: etcd,
client: client,
rootPath: path.Join(rootPath, "allocator"),
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
}
// Start the deamon for allocators
go allocatorManager.AllocatorDaemon(serverCtx)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return allocatorManager
}

func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
return path.Join(am.rootPath, dcLocation)
}

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(ctx context.Context, cancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
am.Lock()
defer am.Unlock()
switch dcLocation {
case "global":
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
am.allocatorGroups[dcLocation] = &allocatorGroup{
ctx: ctx,
cancel: cancel,
leadership: leadership,
allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap),
}
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
return errs.ErrSetAllocator.FastGenByArgs(err)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
}
am.allocatorGroups[dcLocation].isInitialized = true
default:
// Todo: set up a Local TSO Allocator
}
return nil
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation))
}
return allocatorGroup.allocator, nil
}

// AllocatorDaemon is used to update every allocator's TSO.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(UpdateTimestampStep)
defer tsTicker.Stop()

for {
select {
case <-tsTicker.C:
am.RLock()
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
for dcLocation, allocatorGroup := range am.allocatorGroups {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-allocatorGroup.ctx.Done():
// Need to initialize first before next use
am.allocatorGroups[dcLocation].isInitialized = false
// Resetting the allocator will clear TSO in memory
allocatorGroup.allocator.Reset()
continue
default:
}
if !allocatorGroup.isInitialized {
log.Info("allocator has not been initialized yet", zap.String("dc-location", dcLocation))
continue
}
if !allocatorGroup.leadership.Check() {
continue
}
if err := allocatorGroup.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", zap.String("dc-location", dcLocation), zap.Error(err))
allocatorGroup.cancel()
continue
}
}
am.RUnlock()
case <-serverCtx.Done():
return
}
}
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, can not get timestamp", dcLocation))
return pdpb.Timestamp{}, err
}
return allocatorGroup.allocator.GenerateTSO(count)
}
2 changes: 1 addition & 1 deletion tools/pd-backup/pdbackup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func GetBackupInfo(client *clientv3.Client, pdAddr string) (*BackupInfo, error)

backInfo.AllocIDMax = allocIDMax

timestampPath := path.Join(rootPath, "timestamp")
timestampPath := path.Join(rootPath, "allocator", "global", "timestamp")
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
resp, err = etcdutil.EtcdKVGet(client, timestampPath)
if err != nil {
return nil, err
Expand Down