Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: fix the corner case that may cause TSO fallback (#4885) #4892

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,14 @@ func (s *Server) campaignLeader() {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
failpoint.Inject("updateAfterResetTSO", func() {
if err = alllocator.UpdateTSO(); err != nil {
panic(err)
}
})
}()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
Expand Down
13 changes: 10 additions & 3 deletions server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ type timestampOracle struct {
dcLocation string
}

func (t *timestampOracle) setTSOPhysical(next time.Time) {
func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
// Do not update the zero physical time if the `force` flag is false.
if t.tsoMux.physical == typeutil.ZeroTime && !force {
return
}
// make sure the ts won't fall back
if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 {
t.tsoMux.physical = next
Expand Down Expand Up @@ -217,7 +221,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
tsoCounter.WithLabelValues("sync_ok", t.dcLocation).Inc()
log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next)
t.setTSOPhysical(next, true)
return nil
}

Expand Down Expand Up @@ -294,6 +298,9 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts
// 1. The saved time is monotonically increasing.
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
//
// NOTICE: this function should be called after the TSO in memory has been initialized
// and should not be called when the TSO in memory has been reset anymore.
func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error {
prevPhysical, prevLogical := t.getTSO()
tsoGauge.WithLabelValues("tso", t.dcLocation).Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond)))
Expand Down Expand Up @@ -344,7 +351,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
}
}
// save into memory
t.setTSOPhysical(next)
t.setTSOPhysical(next, false)

return nil
}
Expand Down
38 changes: 38 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,44 @@ func (s *clientTestSuite) TestLeaderTransfer(c *C) {
wg.Wait()
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
func (s *clientTestSuite) TestUpdateAfterResetTSO(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()

endpoints := s.runServer(c, cluster)
cli := setupCli(c, s.ctx, endpoints)

testutil.WaitUntil(c, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader to trigger the TSO resetting.
c.Assert(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"), IsNil)
oldLeaderName := cluster.WaitLeader()
err = cluster.GetServer(oldLeaderName).ResignLeader()
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"), IsNil)
newLeaderName := cluster.WaitLeader()
c.Assert(newLeaderName, Not(Equals), oldLeaderName)
// Request a new TSO.
testutil.WaitUntil(c, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader back.
c.Assert(failpoint.Enable("github.com/tikv/pd/server/tso/delaySyncTimestamp", `return(true)`), IsNil)
err = cluster.GetServer(newLeaderName).ResignLeader()
c.Assert(err, IsNil)
// Should NOT panic here.
testutil.WaitUntil(c, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
c.Assert(failpoint.Disable("github.com/tikv/pd/server/tso/delaySyncTimestamp"), IsNil)
}

func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) {
dcLocationConfig := map[string]string{
"pd1": "dc-1",
Expand Down