Skip to content

Commit

Permalink
mcs: fix forward test with pd mode client (tikv#6290)
Browse files Browse the repository at this point in the history
ref tikv#5895, ref tikv#6279, close tikv#6289

Signed-off-by: lhy1024 <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
  • Loading branch information
lhy1024 authored and zeminzhou committed May 10, 2023
1 parent 7ea3ffc commit 95d1ea2
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 41 deletions.
6 changes: 5 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,14 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
}

func (c *pdServiceDiscovery) updateServiceModeLoop() {
defer c.wg.Done()
failpoint.Inject("skipUpdateServiceMode", func() {
failpoint.Return()
})
defer c.wg.Done()
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
failpoint.Return()
})

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
Expand Down
16 changes: 6 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ const (
AllocStep = uint64(100)
// AllocLabel is used to label keyspace idAllocator's metrics.
AllocLabel = "keyspace-idAlloc"
// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"
// DefaultKeyspaceID is the id of default keyspace.
DefaultKeyspaceID = uint32(0)
// regionLabelIDPrefix is used to prefix the keyspace region label.
regionLabelIDPrefix = "keyspaces/"
// regionLabelKey is the key for keyspace id in keyspace region label.
Expand Down Expand Up @@ -111,13 +107,13 @@ func NewKeyspaceManager(
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
return err
}
now := time.Now().Unix()
defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{
Id: DefaultKeyspaceID,
Name: DefaultKeyspaceName,
Id: utils.DefaultKeyspaceID,
Name: utils.DefaultKeyspaceName,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
Expand Down Expand Up @@ -430,7 +426,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down Expand Up @@ -482,7 +478,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down Expand Up @@ -586,7 +582,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0)
keyspaces, err := manager.store.LoadRangeKeyspace(txn, utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
re.Error(err)
}
// Changing config of DEFAULT keyspace is allowed.
updated, err := manager.UpdateKeyspaceConfig(DefaultKeyspaceName, mutations)
updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations)
re.NoError(err)
// remove auto filled fields
delete(updated.Config, TSOKeyspaceGroupIDKey)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
_, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime)
re.Error(err)
// Changing state of DEFAULT keyspace is not allowed.
_, err = manager.UpdateKeyspaceState(DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
_, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
re.Error(err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func validateID(id uint32) error {
if id > spaceIDMax {
return errors.Errorf("illegal keyspace id %d, larger than spaceID Max %d", id, spaceIDMax)
}
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
return errors.Errorf("illegal keyspace id %d, collides with default keyspace id", id)
}
return nil
Expand All @@ -94,7 +95,7 @@ func validateName(name string) error {
if !isValid {
return errors.Errorf("illegal keyspace name %s, should contain only alphanumerical and underline", name)
}
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
return errors.Errorf("illegal keyspace name %s, collides with default keyspace name", name)
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
)

Expand All @@ -30,7 +31,7 @@ func TestValidateID(t *testing.T) {
id uint32
hasErr bool
}{
{DefaultKeyspaceID, true}, // Reserved id should result in error.
{utils.DefaultKeyspaceID, true}, // Reserved id should result in error.
{100, false},
{spaceIDMax - 1, false},
{spaceIDMax, false},
Expand All @@ -48,7 +49,7 @@ func TestValidateName(t *testing.T) {
name string
hasErr bool
}{
{DefaultKeyspaceName, true}, // Reserved name should result in error.
{utils.DefaultKeyspaceName, true}, // Reserved name should result in error.
{"keyspaceName1", false},
{"keyspace_name_1", false},
{"10", false},
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// LeaderTickInterval is the interval to check leader
LeaderTickInterval = 50 * time.Millisecond

// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
// 5. Check leadership again before we returning the response.
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd leader anymore")
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
// 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster
globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s
}
// In case lease expired after the first check.
if !leadership.Check() {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd or local tso allocator leader anymore")
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
resp.SuffixBits = uint32(suffixBits)
return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *TSODispatcher) dispatch(
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if needUpdateServicePrimaryAddr {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
select {
case updateServicePrimaryAddrChs[0] <- struct{}{}:
default:
Expand Down
25 changes: 18 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ import (
)

const (
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesRequestTSOServer = 3
retryIntervalRequestTSOServer = 500 * time.Millisecond
)

// gRPC errors
Expand Down Expand Up @@ -1774,10 +1775,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}

func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
Expand All @@ -1787,18 +1784,32 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
Count: 1,
}
var (
forwardedHost string
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
for i := 0; i < maxRetryTimesRequestTSOServer; i++ {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
select {
case s.updateServicePrimaryAddrCh <- struct{}{}:
log.Info("update service primary address when meet not leader error")
default:
}
time.Sleep(retryIntervalRequestTSOServer)
continue
}
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,12 @@ func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int6
return revision, nil
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}
Expand Down
9 changes: 5 additions & 4 deletions tests/integrations/client/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server"
)
Expand Down Expand Up @@ -61,10 +62,10 @@ func (suite *clientTestSuite) TestLoadKeyspace() {
_, err := suite.client.LoadKeyspace(suite.ctx, "non-existing keyspace")
re.Error(err)
// Loading default keyspace should be successful.
keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, keyspace.DefaultKeyspaceName)
keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, utils.DefaultKeyspaceName)
re.NoError(err)
re.Equal(keyspace.DefaultKeyspaceID, keyspaceDefault.GetId())
re.Equal(keyspace.DefaultKeyspaceName, keyspaceDefault.GetName())
re.Equal(utils.DefaultKeyspaceID, keyspaceDefault.GetId())
re.Equal(utils.DefaultKeyspaceName, keyspaceDefault.GetName())
}

func (suite *clientTestSuite) TestWatchKeyspaces() {
Expand Down Expand Up @@ -105,7 +106,7 @@ func (suite *clientTestSuite) TestWatchKeyspaces() {
loaded = <-watchChan
re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded)
// Updates to default keyspace's config should also be captured.
expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(keyspace.DefaultKeyspaceName, []*keyspace.Mutation{
expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(utils.DefaultKeyspaceName, []*keyspace.Mutation{
{
Op: keyspace.OpPut,
Key: "config",
Expand Down
Loading

0 comments on commit 95d1ea2

Please sign in to comment.