Skip to content

Commit

Permalink
Add a test case
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 14, 2023
1 parent 2422bee commit d221414
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 209 deletions.
13 changes: 7 additions & 6 deletions pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,14 +30,16 @@ func newHandler(s *Server) *Handler {
return &Handler{s: s}
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
// ResetTS resets the TSO with the specified one.
func (h *Handler) ResetTS(
ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32,
) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID)
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck),
zap.Uint32("keyspace-group-id", keyspaceGroupID))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(keyspaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// Handler defines the common behaviors of a basic tso handler.
type Handler interface {
ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error
ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32) error
}

// AdminHandler wrap the basic tso handler to provide http service.
Expand Down Expand Up @@ -93,7 +93,7 @@ func (h *AdminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
ignoreSmaller, skipUpperBoundCheck = true, true
}

if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck); err != nil {
if err = handler.ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0); err != nil {
if err == errs.ErrServerNotStarted {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
} else {
Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,9 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
const keyspaceGroupsAPIPrefix = "/pd/api/v2/tso/keyspace-groups"

func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
if kgm.httpClient == nil {
return nil
}
statusCode, err := apiutil.DoDelete(kgm.httpClient, kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/split", id))
if err != nil {
return err
Expand Down
10 changes: 2 additions & 8 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"path"
"reflect"
"sort"
Expand Down Expand Up @@ -54,7 +53,6 @@ type keyspaceGroupManagerTestSuite struct {
cancel context.CancelFunc
backendEndpoints string
etcdClient *clientv3.Client
httpClient *http.Client
clean func()
cfg *TestServiceConfig
}
Expand All @@ -67,11 +65,7 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() {
t := suite.T()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t)
suite.httpClient = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
},
}

suite.cfg = &TestServiceConfig{
Name: "tso-test-name",
BackendEndpoints: suite.backendEndpoints,
Expand Down Expand Up @@ -544,7 +538,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string,
) *KeyspaceGroupManager {
return NewKeyspaceGroupManager(
suite.ctx, tsoServiceID, suite.etcdClient, suite.httpClient,
suite.ctx, tsoServiceID, suite.etcdClient, nil,
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath,
suite.cfg)
}
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.Re
}

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
Expand Down
153 changes: 153 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/testutil"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
"google.golang.org/grpc"
)

type tsoKeyspaceGroupManagerTestSuite struct {
suite.Suite

ctx context.Context
cancel context.CancelFunc

// The PD cluster.
cluster *tests.TestCluster
// pdLeaderServer is the leader server of the PD cluster.
pdLeaderServer *tests.TestServer
// tsoServer is the TSO service provider.
tsoServer *tso.Server
tsoServerCleanup func()
tsoClientConn *grpc.ClientConn

tsoClient tsopb.TSOClient
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
suite.Run(t, &tsoKeyspaceGroupManagerTestSuite{})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
re := suite.Require()

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1)
re.NoError(err)
err = suite.cluster.RunInitialServers()
re.NoError(err)
leaderName := suite.cluster.WaitLeader()
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
re.NoError(suite.pdLeaderServer.BootstrapCluster())
backendEndpoints := suite.pdLeaderServer.GetAddr()
suite.tsoServer, suite.tsoServerCleanup = mcs.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr())
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() {
suite.cancel()
suite.tsoClientConn.Close()
suite.tsoServerCleanup()
suite.cluster.Destroy()
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
UserKind: endpoint.Standard.String(),
Members: []endpoint.KeyspaceGroupMember{{Address: suite.tsoServer.GetAddr()}},
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.InSplit())
// Get a TSO from the keyspace group 1.
var ts *pdpb.Timestamp
testutil.Eventually(re, func() bool {
resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 1)
ts = resp.GetTimestamp()
return err == nil && tsoutil.CompareTimestamp(ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err := suite.tsoServer.GetHandler().ResetTS(tsoutil.GenerateTS(ts), false, true, 1)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
Keyspaces: []uint32{222, 333},
})
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTo())
// Check the split TSO from keyspace group 2.
var splitTS *pdpb.Timestamp
testutil.Eventually(re, func() bool {
resp, err := request(re, ctx, suite.tsoClient, 1, suite.pdLeaderServer.GetClusterID(), 222, 2)
splitTS = resp.GetTimestamp()
return err == nil && tsoutil.CompareTimestamp(splitTS, &pdpb.Timestamp{}) > 0
})
re.Greater(tsoutil.CompareTimestamp(splitTS, ts), 0)
}

func request(
re *require.Assertions,
ctx context.Context, client tsopb.TSOClient, count uint32,
clusterID uint64, keyspaceID, keyspaceGroupID uint32,
) (ts *tsopb.TsoResponse, err error) {
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
KeyspaceId: keyspaceID,
KeyspaceGroupId: keyspaceGroupID,
},
DcLocation: tsopkg.GlobalDCLocation,
Count: count,
}
tsoClient, err := client.Tso(ctx)
re.NoError(err)
defer tsoClient.CloseSend()
re.NoError(tsoClient.Send(req))
return tsoClient.Recv()
}
4 changes: 2 additions & 2 deletions tests/integrations/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ func (suite *tsoServerTestSuite) getClusterID() uint64 {
func (suite *tsoServerTestSuite) resetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) {
var err error
if suite.legacy {
err = suite.pdLeaderServer.GetServer().GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck)
err = suite.pdLeaderServer.GetServer().GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0)
} else {
err = suite.tsoServer.GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck)
err = suite.tsoServer.GetHandler().ResetTS(ts, ignoreSmaller, skipUpperBoundCheck, 0)
}
// Only this error is acceptable.
if err != nil {
Expand Down
Loading

0 comments on commit d221414

Please sign in to comment.