Skip to content

Commit

Permalink
lightning: revert custom gRPC resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole committed Mar 16, 2022
1 parent 2ae7132 commit 700931b
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 242 deletions.
16 changes: 10 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/resolver"
split "github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -128,22 +127,23 @@ type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
resolveBuilder *resolver.Builder
tcpConcurrency int
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
resolveBuilder := resolver.NewBuilder(splitCli)
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
resolveBuilder: resolveBuilder,
tcpConcurrency: tcpConcurrency,
}
}

func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := f.splitCli.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
Expand All @@ -152,17 +152,21 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
// we should use peer address for tiflash. for tikv, peer address is empty
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
f.resolveBuilder.Target(storeID),
addr,
opt,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithResolvers(f.resolveBuilder),
)
cancel()
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *testClient) InvalidateStoreCache(storeID uint64) {}

func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
r := &metapb.Region{}
if region.Region != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser(

backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr)
if err != nil {
return nil, errors.Annotate(err, "build local backend failed")
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
}
err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
118 changes: 0 additions & 118 deletions br/pkg/resolver/resolver.go

This file was deleted.

81 changes: 0 additions & 81 deletions br/pkg/resolver/resolver_test.go

This file was deleted.

48 changes: 16 additions & 32 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (

const (
splitRegionMaxRetryTime = 4
defaultDialTimeout = time.Minute
)

// SplitClient is an external client used by RegionSplitter.
Expand All @@ -64,7 +63,7 @@ type SplitClient interface {
ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegions gets a list of regions, starts from the region that contains key.
// ScanRegion gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error)
// GetPlacementRule loads a placement rule from PD.
Expand All @@ -73,11 +72,9 @@ type SplitClient interface {
SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error
// DeletePlacementRule removes a placement rule from PD.
DeletePlacementRule(ctx context.Context, groupID, ruleID string) error
// SetStoresLabel add or update specified label of stores. If labelValue
// SetStoreLabel add or update specified label of stores. If labelValue
// is empty, it clears the label.
SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
// InvalidateStoreCache invalidate store cache for the given store id.
InvalidateStoreCache(storeID uint64)
}

// pdClient is a wrapper of pd client, can be used by RegionSplitter.
Expand Down Expand Up @@ -195,7 +192,11 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -311,7 +312,15 @@ func (c *pdClient) sendSplitRegionRequest(
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
conn, err := c.dialStore(ctx, storeID)
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down Expand Up @@ -368,25 +377,6 @@ func (c *pdClient) sendSplitRegionRequest(
return nil, errors.Trace(splitErrors)
}

func (c *pdClient) dialStore(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
credsOpt := grpc.WithInsecure()
if c.tlsConf != nil {
credsOpt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
subCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
conn, err := grpc.DialContext(subCtx, store.GetAddress(), credsOpt, grpc.WithReturnConnectionError())
if err != nil {
c.InvalidateStoreCache(storeID)
return nil, errors.Trace(err)
}
return conn, nil
}

func (c *pdClient) BatchSplitRegionsWithOrigin(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
Expand Down Expand Up @@ -610,12 +600,6 @@ func (c *pdClient) getPDAPIAddr() string {
return strings.TrimRight(addr, "/")
}

func (c *pdClient) InvalidateStoreCache(storeID uint64) {
c.mu.Lock()
delete(c.storeCache, storeID)
c.mu.Unlock()
}

func checkRegionEpoch(_new, _old *RegionInfo) bool {
return _new.Region.GetId() == _old.Region.GetId() &&
_new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() &&
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *TestClient) InvalidateStoreCache(storeID uint64) {}

type assertRetryLessThanBackoffer struct {
max int
already int
Expand Down

0 comments on commit 700931b

Please sign in to comment.