Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: introduce the AllocatorManager #2859

Merged
merged 13 commits into from
Sep 2, 2020
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "github.com/pingcap/errors"
// The internal error which is generated in PD project.
// tso errors
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
5 changes: 3 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocator.GenerateTSO(count)
ts, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

nowTSO, err := s.tsoAllocator.GenerateTSO(1)
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tikv/pd/server/schedule/storelimit"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/tso"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -826,7 +827,10 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoAllocator := h.s.tsoAllocator
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
if tsoAllocator == nil {
return ErrServerNotStarted
}
Expand Down
36 changes: 19 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Server struct {
// for basicCluster operation.
basicCluster *core.BasicCluster
// for tso.
tsoAllocator tso.Allocator
tsoAllocatorManager *tso.AllocatorManager
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
Expand Down Expand Up @@ -348,10 +348,8 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.GetLeadership(),
s.rootPath,
s.cfg.TsoSaveInterval.Duration,
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand Down Expand Up @@ -472,10 +470,11 @@ func (s *Server) LoopContext() context.Context {

func (s *Server) startServerLoop(ctx context.Context) {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(4)
go s.leaderLoop()
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.tsoAllocatorLoop()
}

func (s *Server) stopServerLoop() {
Expand All @@ -500,6 +499,17 @@ func (s *Server) serverMetricsLoop() {
}
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("server is closed, exit allocator loop")
}

func (s *Server) collectEtcdStateMetrics() {
etcdStateGauge.WithLabelValues("term").Set(float64(s.member.Etcd().Server.Term()))
etcdStateGauge.WithLabelValues("appliedIndex").Set(float64(s.member.Etcd().Server.AppliedIndex()))
Expand Down Expand Up @@ -1112,12 +1122,11 @@ func (s *Server) campaignLeader() {
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
log.Info("setting up the global TSO allocator")
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, tso.GlobalDCLocation, s.member.GetLeadership()); err != nil {
log.Error("failed to set up the global TSO allocator", zap.Error(err))
return
}
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
Expand All @@ -1135,8 +1144,6 @@ func (s *Server) campaignLeader() {
CheckPDVersion(s.persistOptions)
log.Info("PD cluster leader is ready to serve", zap.String("pd-leader-name", s.Name()))

tsTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsTicker.Stop()
leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -1152,11 +1159,6 @@ func (s *Server) campaignLeader() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-tsTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
Expand Down
199 changes: 199 additions & 0 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/election"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
)

// GlobalDCLocation is the Global TSO Allocator's dc-location label.
const GlobalDCLocation = "global"

type allocatorGroup struct {
dcLocation string
// allocator's parent ctx and cancel function, which is to
// control the allocator's behavior in AllocatorDaemon and
// pass the cancel signal to its parent as soon as possible
// since it is critical to let parent goroutine know whether
// the allocator is still able to work well.
parentCtx context.Context
parentCancel context.CancelFunc
// For the Global TSO Allocator, leadership is a PD leader's
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
// the flag indicates whether this allocator is initialized
isInitialized bool
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
sync.RWMutex
wg sync.WaitGroup
// There are two kinds of TSO Allocators:
// 1. Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
// etcd and its client
etcd *embed.Etcd
client *clientv3.Client
// tso config
rootPath string
saveInterval time.Duration
maxResetTSGap func() time.Duration
}

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
etcd: etcd,
client: client,
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
}
return allocatorManager
}

func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
// For backward compatibility, the global timestamp's store path will still use the old one
if dcLocation == GlobalDCLocation {
return am.rootPath
}
return path.Join(am.rootPath, dcLocation)
}

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
am.Lock()
defer am.Unlock()
switch dcLocation {
case GlobalDCLocation:
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap),
}
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
return err
}
am.allocatorGroups[dcLocation].isInitialized = true
default:
// Todo: set up a Local TSO Allocator
}
return nil
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation))
}
return allocatorGroup.allocator, nil
}

func (am *AllocatorManager) getAllocatorGroups() []*allocatorGroup {
am.RLock()
defer am.RUnlock()
allocatorGroups := make([]*allocatorGroup, 0, len(am.allocatorGroups))
for _, ag := range am.allocatorGroups {
allocatorGroups = append(allocatorGroups, ag)
}
return allocatorGroups
}

// AllocatorDaemon is used to update every allocator's TSO.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(UpdateTimestampStep)
defer tsTicker.Stop()

for {
select {
case <-tsTicker.C:
// Collect all dc-locations first
allocatorGroups := am.getAllocatorGroups()
// Update each allocator concurrently
for _, ag := range allocatorGroups {
// Filter allocators without leadership and uninitialized
if ag.isInitialized && ag.leadership.Check() {
am.wg.Add(1)
go am.updateAllocator(ag)
}
}
am.wg.Wait()
case <-serverCtx.Done():
return
}
}
}

// updateAllocator is used to update the allocator in the group.
func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
defer am.wg.Done()
select {
case <-ag.parentCtx.Done():
// Need to initialize first before next use
ag.isInitialized = false
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
return
default:
}
if !ag.leadership.Check() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", zap.String("dc-location", ag.dcLocation), zap.Error(err))
ag.parentCancel()
return
}
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
}
return allocatorGroup.allocator.GenerateTSO(count)
}
3 changes: 2 additions & 1 deletion server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
return nil
}

// ResetTimestamp is used to reset the timestamp.
// ResetTimestamp is used to reset the timestamp in memory.
func (t *timestampOracle) ResetTimestamp() {
log.Info("reset the timestamp in memory")
zero := &atomicObject{
physical: typeutil.ZeroTime,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *testTsoSuite) TestRequestFollower(c *C) {

// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet.
// This test is used to simulate this situation and verify that the retry mechanism.
func (s *testTsoSuite) TestDeplaySyncTimestamp(c *C) {
func (s *testTsoSuite) TestDelaySyncTimestamp(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()
Expand Down