Skip to content

Commit

Permalink
mcs: add more tso tests (#6184)
Browse files Browse the repository at this point in the history
ref #5836

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
rleungx and ti-chi-bot authored Mar 29, 2023
1 parent c4a8b80 commit 6cf2bd3
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 9 deletions.
3 changes: 3 additions & 0 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
}

func (c *pdServiceDiscovery) updateServiceModeLoop() {
failpoint.Inject("skipUpdateServiceMode", func() {
failpoint.Return()
})
defer c.wg.Done()

ctx, cancel := context.WithCancel(c.ctx)
Expand Down
4 changes: 4 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
defer patrolTicker.Stop()
}
tsTicker := time.NewTicker(am.updatePhysicalInterval)
failpoint.Inject("fastUpdatePhysicalInterval", func() {
tsTicker.Stop()
tsTicker = time.NewTicker(time.Millisecond)
})
defer tsTicker.Stop()
checkerTicker := time.NewTicker(PriorityCheck)
defer checkerTicker.Stop()
Expand Down
2 changes: 0 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,13 @@ func (c *RaftCluster) Start(s Server) error {
if cluster == nil {
return nil
}

c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels())
if err != nil {
return err
}
}

c.regionLabeler, err = labeler.NewRegionLabeler(c.ctx, c.storage, regionLabelGCInterval)
if err != nil {
return err
Expand Down
14 changes: 14 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,20 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ
return s, nil
}

// JoinAPIServer is used to add a new TestAPIServer into the cluster.
func (c *TestCluster) JoinAPIServer(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
conf, err := c.config.Join().Generate(opts...)
if err != nil {
return nil, err
}
s, err := NewTestAPIServer(ctx, conf)
if err != nil {
return nil, err
}
c.servers[conf.Name] = s
return s, nil
}

// Destroy is used to destroy a TestCluster.
func (c *TestCluster) Destroy() {
for _, s := range c.servers {
Expand Down
11 changes: 9 additions & 2 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
)

// SetupTSOClient creates a TSO client for test.
func SetupTSOClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, utils.DefaultKeyspaceID, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// SetupClient creates a TSO client for test.
func SetupClient(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc())
defer cleanup()

cli := mcs.SetupTSOClient(ctx, re, []string{backendEndpoints})
cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints})
physical, logical, err := cli.GetTS(ctx)
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
Expand Down
148 changes: 144 additions & 4 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package tso
import (
"context"
"math"
"math/rand"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/testutil"
Expand All @@ -44,6 +47,8 @@ type tsoClientTestSuite struct {
tsoServer *tso.Server
tsoServerCleanup func()

backendEndpoints string

client pd.TSOClient
}

Expand Down Expand Up @@ -74,13 +79,14 @@ func (suite *tsoClientTestSuite) SetupSuite() {
re.NoError(err)
leaderName := suite.cluster.WaitLeader()
pdLeader := suite.cluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()
re.NoError(pdLeader.BootstrapCluster())
suite.backendEndpoints = pdLeader.GetAddr()
if suite.legacy {
suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(backendEndpoints, ","), pd.SecurityOption{})
suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{})
re.NoError(err)
} else {
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.client = mcs.SetupTSOClient(suite.ctx, re, strings.Split(backendEndpoints, ","))
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
}
}

Expand Down Expand Up @@ -168,3 +174,137 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp"))
}

func (suite *tsoClientTestSuite) TestRandomTransferLeader() {
re := suite.Require()

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
r := rand.New(rand.NewSource(time.Now().UnixNano()))

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
go func() {
defer wg.Done()
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
err := suite.cluster.ResignLeader()
re.NoError(err)
suite.cluster.WaitLeader()
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
}

func (suite *tsoClientTestSuite) TestRandomShutdown() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))

tsoSvr, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanup()

ctx, cancel := context.WithCancel(suite.ctx)
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
if !suite.legacy {
// random close one of the tso servers
if r.Intn(2) == 0 {
tsoSvr.Close()
} else {
suite.tsoServer.Close()
}
} else {
// close pd leader server
suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close()
}
cancel()
}()

checkTSO(ctx, re, &wg, suite.backendEndpoints)
wg.Wait()
suite.TearDownSuite()
suite.SetupSuite()
}

// When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time.
func TestMixedTSODeployment(t *testing.T) {
re := require.New(t)

re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))
defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)"))

ctx, cancel := context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cancel()
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)

leaderServer := cluster.GetServer(cluster.WaitLeader())
backendEndpoints := leaderServer.GetAddr()

apiSvr, err := cluster.JoinAPIServer(ctx)
re.NoError(err)
err = apiSvr.Run()
re.NoError(err)

_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc())
defer cleanup()

ctx1, cancel1 := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber + 1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 2; i++ {
n := r.Intn(2) + 1
time.Sleep(time.Duration(n) * time.Second)
leaderServer.ResignLeader()
leaderServer = cluster.GetServer(cluster.WaitLeader())
}
cancel1()
}()
checkTSO(ctx1, re, &wg, backendEndpoints)
wg.Wait()
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithKeyspace(context.Background(), re, strings.Split(backendEndpoints, ","))
var ts, lastTS uint64
for {
physical, logical, err := cli.GetTS(context.Background())
// omit the error check since there are many kinds of errors
if err == nil {
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
select {
case <-ctx.Done():
physical, logical, _ := cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
return
default:
}
}
}()
}
}

0 comments on commit 6cf2bd3

Please sign in to comment.