Skip to content

Commit

Permalink
fixes for bugs and flaky tests (tikv#71)
Browse files Browse the repository at this point in the history
* *: fix `TestConcurrentlyReset` (tikv#6318)

close tikv#6275

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>

* *: fix unexpected log (tikv#6286)

close tikv#6285

Signed-off-by: Ryan Leung <[email protected]>

* *: make TestGlobalAndLocalTSO stable (tikv#6292)

close tikv#6250

Signed-off-by: Ryan Leung <[email protected]>

* *: fix `TestLogicalOverflow` (tikv#6320)

close tikv#6277

Signed-off-by: Ryan Leung <[email protected]>

---------

Signed-off-by: Ryan Leung <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2023
1 parent 913e24f commit df2e3bf
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 19 deletions.
4 changes: 2 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed to update member", errs.ZapError(err))
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetURLs()), errs.ZapError(err))
}
}
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func (c *pdServiceDiscovery) updateMember() error {
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
}
return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs())
return errs.ErrClientGetMember.FastGenByArgs()
}

func (c *pdServiceDiscovery) getClusterInfo(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetClusterInfoResponse, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func (s *Server) campaignLeader() {
}

s.participant.EnableLeader()
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
})

// TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected
// go s.tsoAllocatorManager.ClusterDCLocationChecker()
log.Info("tso primary is ready to serve", zap.String("tso-primary-name", s.participant.Name()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type syncResp struct {

// SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap.
// If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into
// each allocator and remines the same after the synchronization.
// each allocator and remains the same after the synchronization.
// If not, it will be replaced with the new max Local TSO and return.
func (gta *GlobalTSOAllocator) SyncMaxTS(
ctx context.Context,
Expand Down
24 changes: 16 additions & 8 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
Expand Down Expand Up @@ -346,7 +347,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
leader.Stop()
cluster.WaitLeader()
re.NotEmpty(cluster.WaitLeader())
leaderReadyTime = time.Now()
cluster.RunServers([]*tests.TestServer{leader})
}()
Expand All @@ -362,7 +363,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
leader := cluster.GetServer(cluster.GetLeader())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
leader.Stop()
cluster.WaitLeader()
re.NotEmpty(cluster.WaitLeader())
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
leaderReadyTime = time.Now()
}()
Expand Down Expand Up @@ -420,7 +421,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateMember", `return(true)`))
err = cluster.ResignLeader()
re.NoError(err)
cluster.WaitLeader()
re.NotEmpty(cluster.WaitLeader())
_, _, err = cli.GetTS(ctx)
re.Error(err)
re.True(pd.IsLeaderChange(err))
Expand Down Expand Up @@ -452,13 +453,20 @@ func requestGlobalAndLocalTSO(
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
globalPhysical1, globalLogical1, err := cli.GetTS(context.TODO())
re.NoError(err)
// The allocator leader may be changed due to the environment issue.
if err != nil {
re.ErrorContains(err, errs.NotLeaderErr)
}
globalTS1 := tsoutil.ComposeTS(globalPhysical1, globalLogical1)
localPhysical, localLogical, err := cli.GetLocalTS(context.TODO(), dc)
re.NoError(err)
if err != nil {
re.ErrorContains(err, errs.NotLeaderErr)
}
localTS := tsoutil.ComposeTS(localPhysical, localLogical)
globalPhysical2, globalLogical2, err := cli.GetTS(context.TODO())
re.NoError(err)
if err != nil {
re.ErrorContains(err, errs.NotLeaderErr)
}
globalTS2 := tsoutil.ComposeTS(globalPhysical2, globalLogical2)
re.Less(lastTS, globalTS1)
re.Less(globalTS1, localTS)
Expand Down Expand Up @@ -587,7 +595,7 @@ func TestGetTsoFromFollowerClient2(t *testing.T) {

lastTS = checkTS(re, cli, lastTS)
re.NoError(cluster.GetServer(cluster.GetLeader()).ResignLeader())
cluster.WaitLeader()
re.NotEmpty(cluster.WaitLeader())
lastTS = checkTS(re, cli, lastTS)

re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"))
Expand All @@ -611,7 +619,7 @@ func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 {
func runServer(re *require.Assertions, cluster *tests.TestCluster) []string {
err := cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
re.NotEmpty(cluster.WaitLeader())
leaderServer := cluster.GetServer(cluster.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

Expand Down
13 changes: 7 additions & 6 deletions tests/server/tso/global_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestLogicalOverflow(t *testing.T) {
defer tsoClient.CloseSend()

begin := time.Now()
for i := 0; i < 2; i += 1 { // the 2nd request may (but not must) overflow, as max logical interval is 262144
for i := 0; i < 3; i++ {
req := &pdpb.TsoRequest{
Header: testutil.NewRequestHeader(clusterID),
Count: 150000,
Expand All @@ -163,12 +163,13 @@ func TestLogicalOverflow(t *testing.T) {
re.NoError(tsoClient.Send(req))
_, err = tsoClient.Recv()
re.NoError(err)
if i == 1 {
// the 2nd request may (but not must) overflow, as max logical interval is 262144
re.Less(time.Since(begin), updateInterval+20*time.Millisecond) // additional 20ms for gRPC latency
}
}
elapse := time.Since(begin)
if updateInterval >= 20*time.Millisecond { // on small interval, the physical may update before overflow
re.GreaterOrEqual(elapse, updateInterval)
}
re.Less(elapse, updateInterval+20*time.Millisecond) // additional 20ms for gRPC latency
// the 3rd request must overflow
re.GreaterOrEqual(time.Since(begin), updateInterval)
}

for _, updateInterval := range []int{1, 5, 30, 50} {
Expand Down
4 changes: 2 additions & 2 deletions tests/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func (suite *tsoServerTestSuite) TestConcurrentlyReset() {
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
for i := 0; i <= 100; i++ {
physical := now.Add(time.Duration(2*i)*time.Minute).UnixNano() / int64(time.Millisecond)
for j := 0; j <= 100; j++ {
physical := now.Add(time.Duration(2*j)*time.Minute).UnixNano() / int64(time.Millisecond)
ts := uint64(physical << 18)
suite.resetTS(ts, false, false)
}
Expand Down

0 comments on commit df2e3bf

Please sign in to comment.