diff --git a/server/election/leadership.go b/server/election/leadership.go index f3f6d5d7e743..ae03c29e55d1 100644 --- a/server/election/leadership.go +++ b/server/election/leadership.go @@ -75,9 +75,9 @@ func (ls *Leadership) setLease(lease *lease) { ls.lease.Store(lease) } -// ResetLease sets the lease of leadership to nil. -func (ls *Leadership) resetLease() { - ls.setLease(nil) +// GetClient is used to get the etcd client. +func (ls *Leadership) GetClient() *clientv3.Client { + return ls.client } // Campaign is used to campaign the leader with given lease and returns a leadership @@ -147,5 +147,4 @@ func (ls *Leadership) DeleteLeader() error { // Reset does some defer job such as closing lease, resetting lease etc. func (ls *Leadership) Reset() { ls.getLease().Close() - ls.resetLease() } diff --git a/server/election/lease.go b/server/election/lease.go index a8dc7fe098df..a90992006c83 100644 --- a/server/election/lease.go +++ b/server/election/lease.go @@ -78,6 +78,9 @@ func (l *lease) Close() error { // IsExpired checks if the lease is expired. If it returns true, // current leader should step down and try to re-elect again. func (l *lease) IsExpired() bool { + if l.expireTime.Load() == nil { + return false + } return time.Now().After(l.expireTime.Load().(time.Time)) } diff --git a/server/grpc_service.go b/server/grpc_service.go index 75b2fcfd778e..bc5494c09baf 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/versioninfo" @@ -90,7 +91,7 @@ func (s *Server) Tso(stream pdpb.PD_TsoServer) error { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) } count := request.GetCount() - ts, err := s.tso.GetRespTS(count) + ts, err := s.tsoAllocator.GenerateTSO(count) if err != nil { return status.Errorf(codes.Unknown, err.Error()) } @@ -775,10 +776,11 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd } } - now, err := s.tso.Now() + nowTSO, err := s.tsoAllocator.GenerateTSO(1) if err != nil { return nil, err } + now, _ := tsoutil.ParseTimestamp(nowTSO) min, err := s.storage.LoadMinServiceGCSafePoint(now) if err != nil { return nil, err diff --git a/server/handler.go b/server/handler.go index 8abf22cf2aed..b4efec205f51 100644 --- a/server/handler.go +++ b/server/handler.go @@ -825,11 +825,11 @@ func (h *Handler) GetEmptyRegion() ([]*core.RegionInfo, error) { // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64) error { - tsoServer := h.s.tso - if tsoServer == nil { + tsoAllocator := h.s.tsoAllocator + if tsoAllocator == nil { return ErrServerNotStarted } - return tsoServer.ResetUserTimestamp(ts) + return tsoAllocator.SetTSO(ts) } // SetStoreLimitScene sets the limit values for differents scenes diff --git a/server/server.go b/server/server.go index b3d3cb0d505c..b2a5e3d959d6 100644 --- a/server/server.go +++ b/server/server.go @@ -100,9 +100,7 @@ type Server struct { serverLoopCancel func() serverLoopWg sync.WaitGroup - // Member roles need to be elected - // In a PD cluster, there will be two kinds of election. - // One is for PD leader, another is for TSO Allocator + // for PD leader election. member *member.Member // etcd client client *clientv3.Client @@ -121,7 +119,7 @@ type Server struct { // for baiscCluster operation. basicCluster *core.BasicCluster // for tso. - tso *tso.TimestampOracle + tsoAllocator tso.Allocator // for raft cluster cluster *cluster.RaftCluster // For async region heartbeat. @@ -350,10 +348,9 @@ func (s *Server) startServer(ctx context.Context) error { s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash) s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue()) - s.tso = tso.NewTimestampOracle( - s.client, + s.tsoAllocator = tso.NewGlobalTSOAllocator( + s.member.Leadership, s.rootPath, - s.member.MemberValue(), s.cfg.TsoSaveInterval.Duration, func() time.Duration { return s.persistOptions.GetMaxResetTSGap() }, ) @@ -1113,12 +1110,12 @@ func (s *Server) campaignLeader() { go s.member.Leadership.Keep(ctx) log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name())) - log.Debug("sync timestamp for tso") - if err := s.tso.SyncTimestamp(s.member.Leadership); err != nil { - log.Error("failed to sync timestamp", zap.Error(err)) + log.Info("initialize the global TSO allocator") + if err := s.tsoAllocator.Initialize(); err != nil { + log.Error("failed to initialize the global TSO allocator", zap.Error(err)) return } - defer s.tso.ResetTimestamp() + defer s.tsoAllocator.Reset() if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", zap.Error(err)) @@ -1155,7 +1152,7 @@ func (s *Server) campaignLeader() { return } case <-tsTicker.C: - if err := s.tso.UpdateTimestamp(); err != nil { + if err := s.tsoAllocator.UpdateTSO(); err != nil { log.Error("failed to update timestamp", zap.Error(err)) return } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go new file mode 100755 index 000000000000..e03b5c40483f --- /dev/null +++ b/server/tso/global_allocator.go @@ -0,0 +1,147 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/typeutil" + "github.com/tikv/pd/server/election" + "go.uber.org/zap" +) + +// Allocator is a Timestamp Orcale allocator. +type Allocator interface { + // Initialize is used to initialize a TSO allocator. + // It will synchronize TSO with etcd and initialize the + // memory for later allocation work. + Initialize() error + // UpdateTSO is used to update the TSO in memory and the time window in etcd. + UpdateTSO() error + // SetTSO sets the physical part with given tso. It's mainly used for BR restore + // and can not forcibly set the TSO smaller than now. + SetTSO(tso uint64) error + // GenerateTSO is used to generate a given number of TSOs. + // Make sure you have initialized the TSO allocator before calling. + GenerateTSO(count uint32) (pdpb.Timestamp, error) + // Reset is uesed to reset the TSO allocator. + Reset() +} + +// GlobalTSOAllocator is the global single point TSO allocator. +type GlobalTSOAllocator struct { + // leadership is used to check the current PD server's leadership + // to determine whether a tso request could be processed and + // it's stored as *election.Leadership + leadership atomic.Value + timestampOracle *timestampOracle +} + +// NewGlobalTSOAllocator creates a new global TSO allocator. +func NewGlobalTSOAllocator(leadership *election.Leadership, rootPath string, saveInterval time.Duration, maxResetTSGap func() time.Duration) Allocator { + gta := &GlobalTSOAllocator{ + timestampOracle: ×tampOracle{ + client: leadership.GetClient(), + rootPath: rootPath, + saveInterval: saveInterval, + maxResetTSGap: maxResetTSGap, + }, + } + gta.setLeadership(leadership) + return gta +} + +func (gta *GlobalTSOAllocator) getLeadership() *election.Leadership { + leadership := gta.leadership.Load() + if leadership == nil { + return nil + } + return leadership.(*election.Leadership) +} + +func (gta *GlobalTSOAllocator) setLeadership(leadership *election.Leadership) { + gta.leadership.Store(leadership) +} + +// Initialize will initialize the created global TSO allocator. +func (gta *GlobalTSOAllocator) Initialize() error { + return gta.timestampOracle.SyncTimestamp(gta.getLeadership()) +} + +// UpdateTSO is used to update the TSO in memory and the time window in etcd. +func (gta *GlobalTSOAllocator) UpdateTSO() error { + return gta.timestampOracle.UpdateTimestamp(gta.getLeadership()) +} + +// SetTSO sets the physical part with given tso. +func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error { + return gta.timestampOracle.ResetUserTimestamp(gta.getLeadership(), tso) +} + +// GenerateTSO is used to generate a given number of TSOs. +// Make sure you have initialized the TSO allocator before calling. +func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) { + var resp pdpb.Timestamp + + if count == 0 { + return resp, errors.New("tso count should be positive") + } + + maxRetryCount := 10 + failpoint.Inject("skipRetryGetTS", func() { + maxRetryCount = 1 + }) + + for i := 0; i < maxRetryCount; i++ { + current := (*atomicObject)(atomic.LoadPointer(>a.timestampOracle.TSO)) + if current == nil || current.physical == typeutil.ZeroTime { + // If it's leader, maybe SyncTimestamp hasn't completed yet + if gta.getLeadership().Check() { + log.Info("sync hasn't completed yet, wait for a while") + time.Sleep(200 * time.Millisecond) + continue + } + log.Error("invalid timestamp", zap.Any("timestamp", current), zap.Error(errs.ErrInvalidTimestamp.FastGenByArgs())) + return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader") + } + + resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) + resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) + if resp.Logical >= maxLogical { + log.Error("logical part outside of max logical interval, please check ntp time", + zap.Reflect("response", resp), + zap.Int("retry-count", i), zap.Error(errs.ErrLogicOverflow.FastGenByArgs())) + tsoCounter.WithLabelValues("logical_overflow").Inc() + time.Sleep(UpdateTimestampStep) + continue + } + // In case lease expired after the first check. + if !gta.getLeadership().Check() { + return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") + } + return resp, nil + } + return resp, errors.New("can not get timestamp") +} + +// Reset is uesed to reset the TSO allocator. +func (gta *GlobalTSOAllocator) Reset() { + gta.timestampOracle.ResetTimestamp() +} diff --git a/server/tso/tso.go b/server/tso/tso.go old mode 100644 new mode 100755 index a9cbf7df38e1..c1c4d996e930 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" @@ -34,61 +33,38 @@ import ( const ( // UpdateTimestampStep is used to update timestamp. - UpdateTimestampStep = 50 * time.Millisecond + UpdateTimestampStep = 50 * time.Millisecond + // updateTimestampGuard is the min timestamp interval. updateTimestampGuard = time.Millisecond - maxLogical = int64(1 << 18) + // maxLogical is the max upper limit for logical time. + // When a TSO's logical time reachs this limit, + // the physical time will be forced to increase. + maxLogical = int64(1 << 18) ) -// TimestampOracle is used to maintain the logic of tso. -type TimestampOracle struct { - // leadership is used to check the current PD server's leadership - // to determine whether a tso request could be processed and - // it's stored as *election.Leadership - leadership atomic.Value - // For tso, set after pd becomes leader. - ts unsafe.Pointer - lastSavedTime atomic.Value - rootPath string - member string - client *clientv3.Client - saveInterval time.Duration - maxResetTSGap func() time.Duration -} - -// NewTimestampOracle creates a new TimestampOracle. -// TODO: remove saveInterval -func NewTimestampOracle(client *clientv3.Client, rootPath string, member string, saveInterval time.Duration, maxResetTSGap func() time.Duration) *TimestampOracle { - return &TimestampOracle{ - rootPath: rootPath, - client: client, - saveInterval: saveInterval, - maxResetTSGap: maxResetTSGap, - member: member, - } -} - -func (t *TimestampOracle) getLeadership() *election.Leadership { - leadership := t.leadership.Load() - if leadership == nil { - return nil - } - return leadership.(*election.Leadership) -} - -func (t *TimestampOracle) setLeadership(leadership *election.Leadership) { - t.leadership.Store(leadership) -} - +// atomicObject is used to store the current TSO in memory. type atomicObject struct { physical time.Time logical int64 } -func (t *TimestampOracle) getTimestampPath() string { +// timestampOracle is used to maintain the logic of tso. +type timestampOracle struct { + client *clientv3.Client + rootPath string + // TODO: remove saveInterval + saveInterval time.Duration + maxResetTSGap func() time.Duration + // For tso, set after the PD becomes a leader. + TSO unsafe.Pointer + lastSavedTime atomic.Value +} + +func (t *timestampOracle) getTimestampPath() string { return path.Join(t.rootPath, "timestamp") } -func (t *TimestampOracle) loadTimestamp() (time.Time, error) { +func (t *timestampOracle) loadTimestamp() (time.Time, error) { data, err := etcdutil.GetValue(t.client, t.getTimestampPath()) if err != nil { return typeutil.ZeroTime, err @@ -101,12 +77,10 @@ func (t *TimestampOracle) loadTimestamp() (time.Time, error) { // save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it, // otherwise, update it. -func (t *TimestampOracle) saveTimestamp(ts time.Time) error { +func (t *timestampOracle) saveTimestamp(leadership *election.Leadership, ts time.Time) error { key := t.getTimestampPath() data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - leaderPath := path.Join(t.rootPath, "leader") - resp, err := t.getLeadership(). - LeaderTxn(clientv3.Compare(clientv3.Value(leaderPath), "=", t.member)). + resp, err := leadership.LeaderTxn(). Then(clientv3.OpPut(key, string(data))). Commit() if err != nil { @@ -122,11 +96,9 @@ func (t *TimestampOracle) saveTimestamp(ts time.Time) error { } // SyncTimestamp is used to synchronize the timestamp. -func (t *TimestampOracle) SyncTimestamp(leadership *election.Leadership) error { +func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { tsoCounter.WithLabelValues("sync").Inc() - t.setLeadership(leadership) - failpoint.Inject("delaySyncTimestamp", func() { time.Sleep(time.Second) }) @@ -149,7 +121,7 @@ func (t *TimestampOracle) SyncTimestamp(leadership *election.Leadership) error { } save := next.Add(t.saveInterval) - if err = t.saveTimestamp(save); err != nil { + if err = t.saveTimestamp(leadership, save); err != nil { tsoCounter.WithLabelValues("err_save_sync_ts").Inc() return err } @@ -160,20 +132,20 @@ func (t *TimestampOracle) SyncTimestamp(leadership *election.Leadership) error { current := &atomicObject{ physical: next, } - atomic.StorePointer(&t.ts, unsafe.Pointer(current)) + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) return nil } // ResetUserTimestamp update the physical part with specified tso. -func (t *TimestampOracle) ResetUserTimestamp(tso uint64) error { - if !t.getLeadership().Check() { +func (t *timestampOracle) ResetUserTimestamp(leadership *election.Leadership, tso uint64) error { + if !leadership.Check() { tsoCounter.WithLabelValues("err_lease_reset_ts").Inc() return errors.New("Setup timestamp failed, lease expired") } physical, _ := tsoutil.ParseTS(tso) next := physical.Add(time.Millisecond) - prev := (*atomicObject)(atomic.LoadPointer(&t.ts)) + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) // do not update if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard { @@ -187,31 +159,31 @@ func (t *TimestampOracle) ResetUserTimestamp(tso uint64) error { } save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { + if err := t.saveTimestamp(leadership, save); err != nil { tsoCounter.WithLabelValues("err_save_reset_ts").Inc() return err } update := &atomicObject{ physical: next, } - atomic.CompareAndSwapPointer(&t.ts, unsafe.Pointer(prev), unsafe.Pointer(update)) + atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update)) tsoCounter.WithLabelValues("reset_tso_ok").Inc() return nil } // UpdateTimestamp is used to update the timestamp. // This function will do two things: -// 1. When the logical time is going to be used up, the current physical time needs to increase. -// 2. If the time window is not enough, which means the saved etcd time minus the next physical time -// is less than or equal to `updateTimestampGuard`, it will need to be updated and save the -// next physical time plus `TsoSaveInterval` into etcd. +// 1. When the logical time is going to be used up, increase the current physical time. +// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time +// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and +// we also need to save the next physical time plus `TsoSaveInterval` into etcd. // // Here is some constraints that this function must satisfy: -// 1. The physical time is monotonically increasing. -// 2. The saved time is monotonically increasing. +// 1. The saved time is monotonically increasing. +// 2. The physical time is monotonically increasing. // 3. The physical time is always less than the saved timestamp. -func (t *TimestampOracle) UpdateTimestamp() error { - prev := (*atomicObject)(atomic.LoadPointer(&t.ts)) +func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error { + prev := (*atomicObject)(atomic.LoadPointer(&t.TSO)) now := time.Now() failpoint.Inject("fallBackUpdate", func() { @@ -250,7 +222,7 @@ func (t *TimestampOracle) UpdateTimestamp() error { // The time window needs to be updated and saved to etcd. if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { save := next.Add(t.saveInterval) - if err := t.saveTimestamp(save); err != nil { + if err := t.saveTimestamp(leadership, save); err != nil { tsoCounter.WithLabelValues("err_save_update_ts").Inc() return err } @@ -261,73 +233,16 @@ func (t *TimestampOracle) UpdateTimestamp() error { logical: 0, } - atomic.StorePointer(&t.ts, unsafe.Pointer(current)) + atomic.StorePointer(&t.TSO, unsafe.Pointer(current)) tsoGauge.WithLabelValues("tso").Set(float64(next.Unix())) return nil } // ResetTimestamp is used to reset the timestamp. -func (t *TimestampOracle) ResetTimestamp() { +func (t *timestampOracle) ResetTimestamp() { zero := &atomicObject{ physical: typeutil.ZeroTime, } - atomic.StorePointer(&t.ts, unsafe.Pointer(zero)) - t.setLeadership(nil) -} - -var maxRetryCount = 10 - -// GetRespTS is used to get a timestamp. -func (t *TimestampOracle) GetRespTS(count uint32) (pdpb.Timestamp, error) { - var resp pdpb.Timestamp - - if count == 0 { - return resp, errors.New("tso count should be positive") - } - - failpoint.Inject("skipRetryGetTS", func() { - maxRetryCount = 1 - }) - - for i := 0; i < maxRetryCount; i++ { - current := (*atomicObject)(atomic.LoadPointer(&t.ts)) - if current == nil || current.physical == typeutil.ZeroTime { - // If it's leader, maybe SyncTimestamp hasn't completed yet - if t.getLeadership().Check() { - log.Info("sync hasn't completed yet, wait for a while") - time.Sleep(200 * time.Millisecond) - continue - } - log.Error("invalid timestamp", zap.Any("timestamp", current), zap.Error(errs.ErrInvalidTimestamp.FastGenByArgs())) - return pdpb.Timestamp{}, errors.New("can not get timestamp, may be not leader") - } - - resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) - resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) - if resp.Logical >= maxLogical { - log.Error("logical part outside of max logical interval, please check ntp time", - zap.Reflect("response", resp), - zap.Int("retry-count", i), zap.Error(errs.ErrLogicOverflow.FastGenByArgs())) - tsoCounter.WithLabelValues("logical_overflow").Inc() - time.Sleep(UpdateTimestampStep) - continue - } - // In case lease expired after the first check. - if !t.getLeadership().Check() { - return pdpb.Timestamp{}, errors.New("alloc timestamp failed, lease expired") - } - return resp, nil - } - return resp, errors.New("can not get timestamp") -} - -// Now returns the current tso time. -func (t *TimestampOracle) Now() (time.Time, error) { - resp, err := t.GetRespTS(1) - if err != nil { - return time.Time{}, err - } - tm, _ := tsoutil.ParseTimestamp(resp) - return tm, nil + atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) }