Skip to content

Commit

Permalink
tso: introduce the AllocatorManager (#2859)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Sep 2, 2020
1 parent ba4499d commit 7e7ea1a
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 22 deletions.
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "github.com/pingcap/errors"
// The internal error which is generated in PD project.
// tso errors
var (
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrInvalidTimestamp = errors.Normalize("invalid timestamp", errors.RFCCodeText("PD:tso:ErrInvalidTimestamp"))
Expand Down
5 changes: 3 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tsoAllocator.GenerateTSO(count)
ts, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

nowTSO, err := s.tsoAllocator.GenerateTSO(1)
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tikv/pd/server/schedule/storelimit"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/tso"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -826,7 +827,10 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoAllocator := h.s.tsoAllocator
tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
if tsoAllocator == nil {
return ErrServerNotStarted
}
Expand Down
36 changes: 19 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Server struct {
// for basicCluster operation.
basicCluster *core.BasicCluster
// for tso.
tsoAllocator tso.Allocator
tsoAllocatorManager *tso.AllocatorManager
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
Expand Down Expand Up @@ -348,10 +348,8 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.GetLeadership(),
s.rootPath,
s.cfg.TsoSaveInterval.Duration,
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member.Etcd(), s.client, s.rootPath, s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
kvBase := kv.NewEtcdKVBase(s.client, s.rootPath)
Expand Down Expand Up @@ -472,10 +470,11 @@ func (s *Server) LoopContext() context.Context {

func (s *Server) startServerLoop(ctx context.Context) {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(4)
go s.leaderLoop()
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.tsoAllocatorLoop()
}

func (s *Server) stopServerLoop() {
Expand All @@ -500,6 +499,17 @@ func (s *Server) serverMetricsLoop() {
}
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info("server is closed, exit allocator loop")
}

func (s *Server) collectEtcdStateMetrics() {
etcdStateGauge.WithLabelValues("term").Set(float64(s.member.Etcd().Server.Term()))
etcdStateGauge.WithLabelValues("appliedIndex").Set(float64(s.member.Etcd().Server.AppliedIndex()))
Expand Down Expand Up @@ -1112,12 +1122,11 @@ func (s *Server) campaignLeader() {
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
log.Info("setting up the global TSO allocator")
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, cancel, tso.GlobalDCLocation, s.member.GetLeadership()); err != nil {
log.Error("failed to set up the global TSO allocator", zap.Error(err))
return
}
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
Expand All @@ -1135,8 +1144,6 @@ func (s *Server) campaignLeader() {
CheckPDVersion(s.persistOptions)
log.Info("PD cluster leader is ready to serve", zap.String("pd-leader-name", s.Name()))

tsTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsTicker.Stop()
leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -1152,11 +1159,6 @@ func (s *Server) campaignLeader() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-tsTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
Expand Down
199 changes: 199 additions & 0 deletions server/tso/allocator_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/election"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
)

// GlobalDCLocation is the Global TSO Allocator's dc-location label.
const GlobalDCLocation = "global"

type allocatorGroup struct {
dcLocation string
// allocator's parent ctx and cancel function, which is to
// control the allocator's behavior in AllocatorDaemon and
// pass the cancel signal to its parent as soon as possible
// since it is critical to let parent goroutine know whether
// the allocator is still able to work well.
parentCtx context.Context
parentCancel context.CancelFunc
// For the Global TSO Allocator, leadership is a PD leader's
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
// the flag indicates whether this allocator is initialized
isInitialized bool
}

// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators' leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
sync.RWMutex
wg sync.WaitGroup
// There are two kinds of TSO Allocators:
// 1. Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
// etcd and its client
etcd *embed.Etcd
client *clientv3.Client
// tso config
rootPath string
saveInterval time.Duration
maxResetTSGap func() time.Duration
}

// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(etcd *embed.Etcd, client *clientv3.Client, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *AllocatorManager {
allocatorManager := &AllocatorManager{
allocatorGroups: make(map[string]*allocatorGroup),
etcd: etcd,
client: client,
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
}
return allocatorManager
}

func (am *AllocatorManager) getAllocatorPath(dcLocation string) string {
// For backward compatibility, the global timestamp's store path will still use the old one
if dcLocation == GlobalDCLocation {
return am.rootPath
}
return path.Join(am.rootPath, dcLocation)
}

// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, parentCancel context.CancelFunc, dcLocation string, leadership *election.Leadership) error {
am.Lock()
defer am.Unlock()
switch dcLocation {
case GlobalDCLocation:
am.allocatorGroups[dcLocation] = &allocatorGroup{
dcLocation: dcLocation,
parentCtx: parentCtx,
parentCancel: parentCancel,
leadership: leadership,
allocator: NewGlobalTSOAllocator(leadership, am.getAllocatorPath(dcLocation), am.saveInterval, am.maxResetTSGap),
}
if err := am.allocatorGroups[dcLocation].allocator.Initialize(); err != nil {
return err
}
am.allocatorGroups[dcLocation].isInitialized = true
default:
// Todo: set up a Local TSO Allocator
}
return nil
}

// GetAllocator get the allocator by dc-location.
func (am *AllocatorManager) GetAllocator(dcLocation string) (Allocator, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
return nil, errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found", dcLocation))
}
return allocatorGroup.allocator, nil
}

func (am *AllocatorManager) getAllocatorGroups() []*allocatorGroup {
am.RLock()
defer am.RUnlock()
allocatorGroups := make([]*allocatorGroup, 0, len(am.allocatorGroups))
for _, ag := range am.allocatorGroups {
allocatorGroups = append(allocatorGroups, ag)
}
return allocatorGroups
}

// AllocatorDaemon is used to update every allocator's TSO.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(UpdateTimestampStep)
defer tsTicker.Stop()

for {
select {
case <-tsTicker.C:
// Collect all dc-locations first
allocatorGroups := am.getAllocatorGroups()
// Update each allocator concurrently
for _, ag := range allocatorGroups {
// Filter allocators without leadership and uninitialized
if ag.isInitialized && ag.leadership.Check() {
am.wg.Add(1)
go am.updateAllocator(ag)
}
}
am.wg.Wait()
case <-serverCtx.Done():
return
}
}
}

// updateAllocator is used to update the allocator in the group.
func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
defer am.wg.Done()
select {
case <-ag.parentCtx.Done():
// Need to initialize first before next use
ag.isInitialized = false
// Resetting the allocator will clear TSO in memory
ag.allocator.Reset()
return
default:
}
if !ag.leadership.Check() {
log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation))
time.Sleep(200 * time.Millisecond)
return
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp", zap.String("dc-location", ag.dcLocation), zap.Error(err))
ag.parentCancel()
return
}
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators.
func (am *AllocatorManager) HandleTSORequest(dcLocation string, count uint32) (pdpb.Timestamp, error) {
am.RLock()
defer am.RUnlock()
allocatorGroup, exist := am.allocatorGroups[dcLocation]
if !exist {
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
}
return allocatorGroup.allocator.GenerateTSO(count)
}
3 changes: 2 additions & 1 deletion server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
return nil
}

// ResetTimestamp is used to reset the timestamp.
// ResetTimestamp is used to reset the timestamp in memory.
func (t *timestampOracle) ResetTimestamp() {
log.Info("reset the timestamp in memory")
zero := &atomicObject{
physical: typeutil.ZeroTime,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *testTsoSuite) TestRequestFollower(c *C) {

// In some cases, when a TSO request arrives, the SyncTimestamp may not finish yet.
// This test is used to simulate this situation and verify that the retry mechanism.
func (s *testTsoSuite) TestDeplaySyncTimestamp(c *C) {
func (s *testTsoSuite) TestDelaySyncTimestamp(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()
Expand Down

0 comments on commit 7e7ea1a

Please sign in to comment.