Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: revert custom gRPC resolver #33115

Merged
merged 10 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/resolver"
split "github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -128,22 +127,23 @@ type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
resolveBuilder *resolver.Builder
tcpConcurrency int
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
resolveBuilder := resolver.NewBuilder(splitCli)
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
resolveBuilder: resolveBuilder,
tcpConcurrency: tcpConcurrency,
}
}

func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := f.splitCli.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
Expand All @@ -152,17 +152,21 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
// we should use peer address for tiflash. for tikv, peer address is empty
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
f.resolveBuilder.Target(storeID),
addr,
opt,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithResolvers(f.resolveBuilder),
)
cancel()
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *testClient) InvalidateStoreCache(storeID uint64) {}

func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
r := &metapb.Region{}
if region.Region != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser(

backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr)
if err != nil {
return nil, errors.Annotate(err, "build local backend failed")
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
}
err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
118 changes: 0 additions & 118 deletions br/pkg/resolver/resolver.go

This file was deleted.

81 changes: 0 additions & 81 deletions br/pkg/resolver/resolver_test.go

This file was deleted.

44 changes: 14 additions & 30 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

const (
splitRegionMaxRetryTime = 4
defaultDialTimeout = time.Minute
)

// SplitClient is an external client used by RegionSplitter.
Expand Down Expand Up @@ -76,8 +75,6 @@ type SplitClient interface {
// SetStoresLabel add or update specified label of stores. If labelValue
// is empty, it clears the label.
SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
// InvalidateStoreCache invalidate store cache for the given store id.
InvalidateStoreCache(storeID uint64)
}

// pdClient is a wrapper of pd client, can be used by RegionSplitter.
Expand Down Expand Up @@ -195,7 +192,11 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -311,7 +312,15 @@ func (c *pdClient) sendSplitRegionRequest(
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down Expand Up @@ -368,25 +377,6 @@ func (c *pdClient) sendSplitRegionRequest(
return nil, errors.Trace(splitErrors)
}

func (c *pdClient) dialStore(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
credsOpt := grpc.WithInsecure()
if c.tlsConf != nil {
credsOpt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
subCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
conn, err := grpc.DialContext(subCtx, store.GetAddress(), credsOpt, grpc.WithReturnConnectionError())
if err != nil {
c.InvalidateStoreCache(storeID)
return nil, errors.Trace(err)
}
return conn, nil
}

func (c *pdClient) BatchSplitRegionsWithOrigin(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
Expand Down Expand Up @@ -610,12 +600,6 @@ func (c *pdClient) getPDAPIAddr() string {
return strings.TrimRight(addr, "/")
}

func (c *pdClient) InvalidateStoreCache(storeID uint64) {
c.mu.Lock()
delete(c.storeCache, storeID)
c.mu.Unlock()
}

func checkRegionEpoch(_new, _old *RegionInfo) bool {
return _new.Region.GetId() == _old.Region.GetId() &&
_new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() &&
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) InvalidateStoreCache(storeID uint64) {}

type assertRetryLessThanBackoffer struct {
max int
already int
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_split_region_fail/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ BR_LOG_TO_TERM=1

grep "a error occurs on split region" $LOG && \
grep "split region meet not leader error" $LOG && \
grep "Full restore success" $LOG && \
grep "Full Restore success" $LOG && \
grep "find new leader" $LOG

if [ $? -ne 0 ]; then
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_fail_fast/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

set -eux

export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(50);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(100);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)'

for CFG in chunk engine; do
rm -f "$TEST_DIR/lightning-tidb.log"
Expand Down