Skip to content

Commit

Permalink
lightning: revert custom gRPC resolver (#33115)
Browse files Browse the repository at this point in the history
close #33114
  • Loading branch information
sleepymole authored Mar 16, 2022
1 parent cce729e commit 78523d2
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 243 deletions.
16 changes: 10 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/resolver"
split "github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -128,22 +127,23 @@ type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
resolveBuilder *resolver.Builder
tcpConcurrency int
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
resolveBuilder := resolver.NewBuilder(splitCli)
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
resolveBuilder: resolveBuilder,
tcpConcurrency: tcpConcurrency,
}
}

func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := f.splitCli.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
Expand All @@ -152,17 +152,21 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
// we should use peer address for tiflash. for tikv, peer address is empty
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
f.resolveBuilder.Target(storeID),
addr,
opt,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithResolvers(f.resolveBuilder),
)
cancel()
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *testClient) InvalidateStoreCache(storeID uint64) {}

func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
r := &metapb.Region{}
if region.Region != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
if walkErr == nil {
return common.ErrEmptySourceDir.GenWithStackByArgs(taskCfg.Mydumper.SourceDir)
}
return common.NormalizeError(err)
return common.NormalizeOrWrapErr(common.ErrStorageUnknown, walkErr)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser(

backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr)
if err != nil {
return nil, errors.Annotate(err, "build local backend failed")
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
}
err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
118 changes: 0 additions & 118 deletions br/pkg/resolver/resolver.go

This file was deleted.

81 changes: 0 additions & 81 deletions br/pkg/resolver/resolver_test.go

This file was deleted.

44 changes: 14 additions & 30 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

const (
splitRegionMaxRetryTime = 4
defaultDialTimeout = time.Minute
)

// SplitClient is an external client used by RegionSplitter.
Expand Down Expand Up @@ -76,8 +75,6 @@ type SplitClient interface {
// SetStoresLabel add or update specified label of stores. If labelValue
// is empty, it clears the label.
SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
// InvalidateStoreCache invalidate store cache for the given store id.
InvalidateStoreCache(storeID uint64)
}

// pdClient is a wrapper of pd client, can be used by RegionSplitter.
Expand Down Expand Up @@ -195,7 +192,11 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -311,7 +312,15 @@ func (c *pdClient) sendSplitRegionRequest(
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down Expand Up @@ -368,25 +377,6 @@ func (c *pdClient) sendSplitRegionRequest(
return nil, errors.Trace(splitErrors)
}

func (c *pdClient) dialStore(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
credsOpt := grpc.WithInsecure()
if c.tlsConf != nil {
credsOpt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
subCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
conn, err := grpc.DialContext(subCtx, store.GetAddress(), credsOpt, grpc.WithReturnConnectionError())
if err != nil {
c.InvalidateStoreCache(storeID)
return nil, errors.Trace(err)
}
return conn, nil
}

func (c *pdClient) BatchSplitRegionsWithOrigin(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
Expand Down Expand Up @@ -610,12 +600,6 @@ func (c *pdClient) getPDAPIAddr() string {
return strings.TrimRight(addr, "/")
}

func (c *pdClient) InvalidateStoreCache(storeID uint64) {
c.mu.Lock()
delete(c.storeCache, storeID)
c.mu.Unlock()
}

func checkRegionEpoch(_new, _old *RegionInfo) bool {
return _new.Region.GetId() == _old.Region.GetId() &&
_new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() &&
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) InvalidateStoreCache(storeID uint64) {}

type assertRetryLessThanBackoffer struct {
max int
already int
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_split_region_fail/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ BR_LOG_TO_TERM=1

grep "a error occurs on split region" $LOG && \
grep "split region meet not leader error" $LOG && \
grep "Full restore success" $LOG && \
grep "Full Restore success" $LOG && \
grep "find new leader" $LOG

if [ $? -ne 0 ]; then
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_fail_fast/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

set -eux

export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(50);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(100);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)'

for CFG in chunk engine; do
rm -f "$TEST_DIR/lightning-tidb.log"
Expand Down

0 comments on commit 78523d2

Please sign in to comment.