Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix forward test with pd mode client #6290

Merged
merged 8 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,14 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
}

func (c *pdServiceDiscovery) updateServiceModeLoop() {
defer c.wg.Done()
failpoint.Inject("skipUpdateServiceMode", func() {
failpoint.Return()
})
defer c.wg.Done()
failpoint.Inject("usePDServiceMode", func() {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
failpoint.Return()
})

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
Expand Down
16 changes: 6 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ const (
AllocStep = uint64(100)
// AllocLabel is used to label keyspace idAllocator's metrics.
AllocLabel = "keyspace-idAlloc"
// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"
// DefaultKeyspaceID is the id of default keyspace.
DefaultKeyspaceID = uint32(0)
// regionLabelIDPrefix is used to prefix the keyspace region label.
regionLabelIDPrefix = "keyspaces/"
// regionLabelKey is the key for keyspace id in keyspace region label.
Expand Down Expand Up @@ -111,13 +107,13 @@ func NewKeyspaceManager(
// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(DefaultKeyspaceID); err != nil {
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil {
return err
}
now := time.Now().Unix()
defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{
Id: DefaultKeyspaceID,
Name: DefaultKeyspaceName,
Id: utils.DefaultKeyspaceID,
Name: utils.DefaultKeyspaceName,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
Expand Down Expand Up @@ -430,7 +426,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down Expand Up @@ -482,7 +478,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
)
Expand Down Expand Up @@ -586,7 +582,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0)
keyspaces, err := manager.store.LoadRangeKeyspace(txn, utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
re.Error(err)
}
// Changing config of DEFAULT keyspace is allowed.
updated, err := manager.UpdateKeyspaceConfig(DefaultKeyspaceName, mutations)
updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations)
re.NoError(err)
// remove auto filled fields
delete(updated.Config, TSOKeyspaceGroupIDKey)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
_, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime)
re.Error(err)
// Changing state of DEFAULT keyspace is not allowed.
_, err = manager.UpdateKeyspaceState(DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
_, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
re.Error(err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func validateID(id uint32) error {
if id > spaceIDMax {
return errors.Errorf("illegal keyspace id %d, larger than spaceID Max %d", id, spaceIDMax)
}
if id == DefaultKeyspaceID {
if id == utils.DefaultKeyspaceID {
return errors.Errorf("illegal keyspace id %d, collides with default keyspace id", id)
}
return nil
Expand All @@ -94,7 +95,7 @@ func validateName(name string) error {
if !isValid {
return errors.Errorf("illegal keyspace name %s, should contain only alphanumerical and underline", name)
}
if name == DefaultKeyspaceName {
if name == utils.DefaultKeyspaceName {
return errors.Errorf("illegal keyspace name %s, collides with default keyspace name", name)
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/labeler"
)

Expand All @@ -30,7 +31,7 @@ func TestValidateID(t *testing.T) {
id uint32
hasErr bool
}{
{DefaultKeyspaceID, true}, // Reserved id should result in error.
{utils.DefaultKeyspaceID, true}, // Reserved id should result in error.
{100, false},
{spaceIDMax - 1, false},
{spaceIDMax, false},
Expand All @@ -48,7 +49,7 @@ func TestValidateName(t *testing.T) {
name string
hasErr bool
}{
{DefaultKeyspaceName, true}, // Reserved name should result in error.
{utils.DefaultKeyspaceName, true}, // Reserved name should result in error.
{"keyspaceName1", false},
{"keyspace_name_1", false},
{"10", false},
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// LeaderTickInterval is the interval to check leader
LeaderTickInterval = 50 * time.Millisecond

// DefaultKeyspaceName is the name reserved for default keyspace.
DefaultKeyspaceName = "DEFAULT"

// DefaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
// 5. Check leadership again before we returning the response.
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd leader anymore")
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
// 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster
globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s
}
// In case lease expired after the first check.
if !leadership.Check() {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd or local tso allocator leader anymore")
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
resp.SuffixBits = uint32(suffixBits)
return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *TSODispatcher) dispatch(
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if needUpdateServicePrimaryAddr {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
select {
case updateServicePrimaryAddrChs[0] <- struct{}{}:
default:
Expand Down
25 changes: 18 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ import (
)

const (
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
heartbeatSendTimeout = 5 * time.Second
maxRetryTimesRequestTSOServer = 3
retryIntervalRequestTSOServer = 500 * time.Millisecond
)

// gRPC errors
Expand Down Expand Up @@ -1774,10 +1775,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
}

func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
Expand All @@ -1787,18 +1784,32 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
Count: 1,
}
var (
forwardedHost string
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
for i := 0; i < maxRetryTimesRequestTSOServer; i++ {
forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName)
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
select {
case s.updateServicePrimaryAddrCh <- struct{}{}:
log.Info("update service primary address when meet not leader error")
default:
}
time.Sleep(retryIntervalRequestTSOServer)
continue
}
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,12 @@ func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int6
return revision, nil
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}
Expand Down
9 changes: 5 additions & 4 deletions tests/integrations/client/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server"
)
Expand Down Expand Up @@ -61,10 +62,10 @@ func (suite *clientTestSuite) TestLoadKeyspace() {
_, err := suite.client.LoadKeyspace(suite.ctx, "non-existing keyspace")
re.Error(err)
// Loading default keyspace should be successful.
keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, keyspace.DefaultKeyspaceName)
keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, utils.DefaultKeyspaceName)
re.NoError(err)
re.Equal(keyspace.DefaultKeyspaceID, keyspaceDefault.GetId())
re.Equal(keyspace.DefaultKeyspaceName, keyspaceDefault.GetName())
re.Equal(utils.DefaultKeyspaceID, keyspaceDefault.GetId())
re.Equal(utils.DefaultKeyspaceName, keyspaceDefault.GetName())
}

func (suite *clientTestSuite) TestWatchKeyspaces() {
Expand Down Expand Up @@ -105,7 +106,7 @@ func (suite *clientTestSuite) TestWatchKeyspaces() {
loaded = <-watchChan
re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded)
// Updates to default keyspace's config should also be captured.
expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(keyspace.DefaultKeyspaceName, []*keyspace.Mutation{
expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(utils.DefaultKeyspaceName, []*keyspace.Mutation{
{
Op: keyspace.OpPut,
Key: "config",
Expand Down
Loading