Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: refactor the tso service #2803

Merged
merged 7 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions server/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func (ls *Leadership) setLease(lease *lease) {
ls.lease.Store(lease)
}

// ResetLease sets the lease of leadership to nil.
func (ls *Leadership) resetLease() {
ls.setLease(nil)
// GetClient is used to get the etcd client.
func (ls *Leadership) GetClient() *clientv3.Client {
return ls.client
}

// Campaign is used to campaign the leader with given lease and returns a leadership
Expand Down Expand Up @@ -147,5 +147,4 @@ func (ls *Leadership) DeleteLeader() error {
// Reset does some defer job such as closing lease, resetting lease etc.
func (ls *Leadership) Reset() {
ls.getLease().Close()
ls.resetLease()
}
3 changes: 3 additions & 0 deletions server/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (l *lease) Close() error {
// IsExpired checks if the lease is expired. If it returns true,
// current leader should step down and try to re-elect again.
func (l *lease) IsExpired() bool {
if l.expireTime.Load() == nil {
return false
}
return time.Now().After(l.expireTime.Load().(time.Time))
}

Expand Down
6 changes: 4 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/versioninfo"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.tso.GetRespTS(count)
ts, err := s.tsoAllocator.GenerateTSO(count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -775,10 +776,11 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
}
}

now, err := s.tso.Now()
nowTSO, err := s.tsoAllocator.GenerateTSO(1)
if err != nil {
return nil, err
}
now, _ := tsoutil.ParseTimestamp(nowTSO)
min, err := s.storage.LoadMinServiceGCSafePoint(now)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,11 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64) error {
tsoServer := h.s.tso
if tsoServer == nil {
tsoAllocator := h.s.tsoAllocator
if tsoAllocator == nil {
return ErrServerNotStarted
}
return tsoServer.ResetUserTimestamp(ts)
return tsoAllocator.SetTSO(ts)
}

// SetStoreLimitScene sets the limit values for differents scenes
Expand Down
21 changes: 9 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ type Server struct {
serverLoopCancel func()
serverLoopWg sync.WaitGroup

// Member roles need to be elected
// In a PD cluster, there will be two kinds of election.
// One is for PD leader, another is for TSO Allocator
// for PD leader election.
member *member.Member
// etcd client
client *clientv3.Client
Expand All @@ -121,7 +119,7 @@ type Server struct {
// for baiscCluster operation.
basicCluster *core.BasicCluster
// for tso.
tso *tso.TimestampOracle
tsoAllocator tso.Allocator
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
Expand Down Expand Up @@ -350,10 +348,9 @@ func (s *Server) startServer(ctx context.Context) error {
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())
s.tso = tso.NewTimestampOracle(
s.client,
s.tsoAllocator = tso.NewGlobalTSOAllocator(
s.member.Leadership,
s.rootPath,
s.member.MemberValue(),
s.cfg.TsoSaveInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
)
Expand Down Expand Up @@ -1113,12 +1110,12 @@ func (s *Server) campaignLeader() {
go s.member.Leadership.Keep(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Debug("sync timestamp for tso")
if err := s.tso.SyncTimestamp(s.member.Leadership); err != nil {
log.Error("failed to sync timestamp", zap.Error(err))
log.Info("initialize the global TSO allocator")
if err := s.tsoAllocator.Initialize(); err != nil {
log.Error("failed to initialize the global TSO allocator", zap.Error(err))
return
}
defer s.tso.ResetTimestamp()
defer s.tsoAllocator.Reset()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", zap.Error(err))
Expand Down Expand Up @@ -1155,7 +1152,7 @@ func (s *Server) campaignLeader() {
return
}
case <-tsTicker.C:
if err := s.tso.UpdateTimestamp(); err != nil {
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Error("failed to update timestamp", zap.Error(err))
return
}
Expand Down
147 changes: 147 additions & 0 deletions server/tso/global_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/election"
"go.uber.org/zap"
)

// Allocator is a Timestamp Orcale allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
// It will synchronize TSO with etcd and initialize the
// memory for later allocation work.
Initialize() error
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
// and can not forcibly set the TSO smaller than now.
SetTSO(tso uint64) error
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
GenerateTSO(count uint32) (pdpb.Timestamp, error)
// Reset is uesed to reset the TSO allocator.
Reset()
}

// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
// leadership is used to check the current PD server's leadership
// to determine whether a tso request could be processed and
// it's stored as *election.Leadership
leadership atomic.Value
timestampOracle *timestampOracle
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(leadership *election.Leadership, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator {
gta := &GlobalTSOAllocator{
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: rootPath,
saveInterval: saveInterval,
maxResetTSGap: maxResetTSGap,
},
}
gta.setLeadership(leadership)
return gta
}

func (gta *GlobalTSOAllocator) getLeadership() *election.Leadership {
leadership := gta.leadership.Load()
if leadership == nil {
return nil
}
return leadership.(*election.Leadership)
}

func (gta *GlobalTSOAllocator) setLeadership(leadership *election.Leadership) {
gta.leadership.Store(leadership)
}

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error {
return gta.timestampOracle.SyncTimestamp(gta.getLeadership())
}

// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.timestampOracle.UpdateTimestamp(gta.getLeadership())
}

// SetTSO sets the physical part with given tso.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
return gta.timestampOracle.ResetUserTimestamp(gta.getLeadership(), tso)
}

// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp

if count == 0 {
return resp, errors.New("tso count should be positive")
}

maxRetryCount := 10
failpoint.Inject("skipRetryGetTS", func() {
maxRetryCount = 1
})

for i := 0; i < maxRetryCount; i++ {
current := (*atomicObject)(atomic.LoadPointer(&gta.timestampOracle.TSO))
if current == nil || current.physical == typeutil.ZeroTime {
// If it's leader, maybe SyncTimestamp hasn't completed yet
if gta.getLeadership().Check() {
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
log.Error("invalid timestamp", zap.Any("timestamp", current), zap.Error(errs.ErrInvalidTimestamp.FastGenByArgs()))
return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader")
}

resp.Physical = current.physical.UnixNano() / int64(time.Millisecond)
resp.Logical = atomic.AddInt64(&current.logical, int64(count))
if resp.Logical >= maxLogical {
log.Error("logical part outside of max logical interval, please check ntp time",
zap.Reflect("response", resp),
zap.Int("retry-count", i), zap.Error(errs.ErrLogicOverflow.FastGenByArgs()))
tsoCounter.WithLabelValues("logical_overflow").Inc()
time.Sleep(UpdateTimestampStep)
continue
}
// In case lease expired after the first check.
if !gta.getLeadership().Check() {
return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired")
}
return resp, nil
}
return resp, errors.New("can not get timestamp")
}

// Reset is uesed to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
gta.timestampOracle.ResetTimestamp()
}
Loading