Skip to content

Commit

Permalink
tso: refactor the tso service (#2803)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Aug 21, 2020
1 parent b35148e commit 7a6a9da
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 149 deletions.
7 changes: 3 additions & 4 deletions server/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func (ls *Leadership) setLease(lease *lease) {
ls.lease.Store(lease)
}

// ResetLease sets the lease of leadership to nil.
func (ls *Leadership) resetLease() {
ls.setLease(nil)
// GetClient is used to get the etcd client.
func (ls *Leadership) GetClient() *clientv3.Client {
return ls.client
}

// Campaign is used to campaign the leader with given lease and returns a leadership
Expand Down Expand Up @@ -147,5 +147,4 @@ func (ls *Leadership) DeleteLeader() error {
// Reset does some defer job such as closing lease, resetting lease etc.
func (ls *Leadership) Reset() {
ls.getLease().Close()
ls.resetLease()
}
3 changes: 3 additions & 0 deletions server/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (l *lease) Close() error {
// IsExpired checks if the lease is expired. If it returns true,
// current leader should step down and try to re-elect again.
func (l *lease) IsExpired() bool {
if l.expireTime.Load() == nil {
return false
}
return time.Now().After(l.expireTime.Load().(time.Time))
}

Expand Down
6 changes: 4 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/versioninfo"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tso.GetRespTS(count)
ts, err := s.tsoAllocator.GenerateTSO(count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -775,10 +776,11 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

now, err := s.tso.Now()
nowTSO, err := s.tsoAllocator.GenerateTSO(1)
if err != nil {
return nil, err
}
now, _ := tsoutil.ParseTimestamp(nowTSO)
min, err := s.storage.LoadMinServiceGCSafePoint(now)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,11 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoServer := h.s.tso
if tsoServer == nil {
tsoAllocator := h.s.tsoAllocator
if tsoAllocator == nil {
return ErrServerNotStarted
}
return tsoServer.ResetUserTimestamp(ts)
return tsoAllocator.SetTSO(ts)
}

// SetStoreLimitScene sets the limit values for differents scenes
Expand Down
21 changes: 9 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

// Member roles need to be elected
// In a PD cluster, there will be two kinds of election.
// One is for PD leader, another is for TSO Allocator
// for PD leader election.
member *member.Member
// etcd client
client *clientv3.Client
Expand All @@ -121,7 +119,7 @@ type Server struct {
// for baiscCluster operation.
basicCluster *core.BasicCluster
// for tso.
tso *tso.TimestampOracle
tsoAllocator tso.Allocator
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
Expand Down Expand Up @@ -350,10 +348,9 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tso = tso.NewTimestampOracle(
s.client,
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.Leadership,
s.rootPath,
s.member.MemberValue(),
s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
Expand Down Expand Up @@ -1113,12 +1110,12 @@ func (s *Server) campaignLeader() {
go s.member.Leadership.Keep(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Debug("sync timestamp for tso")
if err := s.tso.SyncTimestamp(s.member.Leadership); err != nil {
log.Error("failed to sync timestamp", zap.Error(err))
log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
return
}
defer s.tso.ResetTimestamp()
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
Expand Down Expand Up @@ -1155,7 +1152,7 @@ func (s *Server) campaignLeader() {
return
}
case <-tsTicker.C:
if err := s.tso.UpdateTimestamp(); err != nil {
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
return
}
Expand Down
147 changes: 147 additions & 0 deletions server/tso/global_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"go.uber.org/zap"
)

// Allocator is a Timestamp Orcale allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (pdpb.Timestamp, error)
// Reset is uesed to reset the TSO allocator.
Reset()
}

// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
// leadership is used to check the current PD server's leadership
// to determine whether a tso request could be processed and
// it's stored as *election.Leadership
leadership atomic.Value
timestampOracle *timestampOracle
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(leadership *election.Leadership, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator {
gta := &GlobalTSOAllocator{
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
},
}
gta.setLeadership(leadership)
return gta
}

func (gta *GlobalTSOAllocator) getLeadership() *election.Leadership {
leadership := gta.leadership.Load()
if leadership == nil {
return nil
}
return leadership.(*election.Leadership)
}

func (gta *GlobalTSOAllocator) setLeadership(leadership *election.Leadership) {
gta.leadership.Store(leadership)
}

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.timestampOracle.SyncTimestamp(gta.getLeadership())
}

// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.timestampOracle.UpdateTimestamp(gta.getLeadership())
}

// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.timestampOracle.ResetUserTimestamp(gta.getLeadership(), tso)
}

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp

if count == 0 {
return resp, errors.New("tso count should be positive")
}

maxRetryCount := 10
failpoint.Inject("skipRetryGetTS", func() {
maxRetryCount = 1
})

for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.timestampOracle.TSO))
if current == nil || current.physical == typeutil.ZeroTime {
// If it's leader, maybe SyncTimestamp hasn't completed yet
if gta.getLeadership().Check() {
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
log.Error("invalid timestamp", zap.Any("timestamp", current), zap.Error(errs.ErrInvalidTimestamp.FastGenByArgs()))
return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader")
}

resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)
resp.Logical = atomic.AddInt64(&current.logical, int64(count))
if resp.Logical >= maxLogical {
log.Error("logical part outside of max logical interval, please check ntp time",
zap.Reflect("response", resp),
zap.Int("retry-count", i), zap.Error(errs.ErrLogicOverflow.FastGenByArgs()))
tsoCounter.WithLabelValues("logical_overflow").Inc()
time.Sleep(UpdateTimestampStep)
continue
}
// In case lease expired after the first check.
if !gta.getLeadership().Check() {
return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
}

// Reset is uesed to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.timestampOracle.ResetTimestamp()
}
Loading

0 comments on commit 7a6a9da

Please sign in to comment.