From 700931be2efebed8b2b682876f589f8cfad4a173 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 11:17:14 +0800 Subject: [PATCH] lightning: revert custom gRPC resolver --- br/pkg/lightning/backend/local/local.go | 16 ++- .../backend/local/localhelper_test.go | 2 - br/pkg/lightning/restore/restore.go | 2 +- br/pkg/resolver/resolver.go | 118 ------------------ br/pkg/resolver/resolver_test.go | 81 ------------ br/pkg/restore/split_client.go | 48 +++---- br/pkg/restore/split_test.go | 2 - 7 files changed, 27 insertions(+), 242 deletions(-) delete mode 100644 br/pkg/resolver/resolver.go delete mode 100644 br/pkg/resolver/resolver_test.go diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 480d7eaf6f93e..ae5d292d04dc7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pingcap/tidb/br/pkg/resolver" split "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" @@ -128,22 +127,23 @@ type importClientFactoryImpl struct { conns *common.GRPCConns splitCli split.SplitClient tls *common.TLS - resolveBuilder *resolver.Builder tcpConcurrency int } func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl { - resolveBuilder := resolver.NewBuilder(splitCli) return &importClientFactoryImpl{ conns: common.NewGRPCConns(), splitCli: splitCli, tls: tls, - resolveBuilder: resolveBuilder, tcpConcurrency: tcpConcurrency, } } func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { + store, err := f.splitCli.GetStore(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) + } opt := grpc.WithInsecure() if f.tls.TLSConfig() != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())) @@ -152,9 +152,14 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) bfConf := backoff.DefaultConfig bfConf.MaxDelay = gRPCBackOffMaxDelay + // we should use peer address for tiflash. for tikv, peer address is empty + addr := store.GetPeerAddress() + if addr == "" { + addr = store.GetAddress() + } conn, err := grpc.DialContext( ctx, - f.resolveBuilder.Target(storeID), + addr, opt, grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), grpc.WithKeepaliveParams(keepalive.ClientParameters{ @@ -162,7 +167,6 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) Timeout: gRPCKeepAliveTimeout, PermitWithoutStream: true, }), - grpc.WithResolvers(f.resolveBuilder), ) cancel() if err != nil { diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index cdaf939b81fbf..dd52b71493507 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } -func (c *testClient) InvalidateStoreCache(storeID uint64) {} - func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo { r := &metapb.Region{} if region.Region != nil { diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 4804b90e09cf4..2ac8e0581fd4f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser( backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr) if err != nil { - return nil, errors.Annotate(err, "build local backend failed") + return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir) if err != nil { diff --git a/br/pkg/resolver/resolver.go b/br/pkg/resolver/resolver.go deleted file mode 100644 index 1656c1a9ae290..0000000000000 --- a/br/pkg/resolver/resolver.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resolver - -import ( - "context" - "crypto/rand" - "encoding/hex" - "fmt" - "io" - "strconv" - "time" - - "github.com/pingcap/tidb/br/pkg/restore" - grpcresolver "google.golang.org/grpc/resolver" -) - -const ( - schemePrefix = "tikvstore-" - defaultGetStoreTimeout = time.Second * 30 -) - -type Builder struct { - scheme string - splitClient restore.SplitClient -} - -func NewBuilder(splitClient restore.SplitClient) *Builder { - return &Builder{ - scheme: generateScheme(), - splitClient: splitClient, - } -} - -func generateScheme() string { - var buf [8]byte - _, err := io.ReadFull(rand.Reader, buf[:]) - if err != nil { - panic(err) - } - return schemePrefix + hex.EncodeToString(buf[:]) -} - -func (b *Builder) Target(storeID uint64) string { - return fmt.Sprintf("%s://%d", b.scheme, storeID) -} - -func (b *Builder) Build(target grpcresolver.Target, conn grpcresolver.ClientConn, opts grpcresolver.BuildOptions) (grpcresolver.Resolver, error) { - storeID, err := strconv.ParseUint(target.URL.Host, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid tikv store target %s: %w", target.URL.String(), err) - } - ctx, cancel := context.WithCancel(context.Background()) - r := &storeResolver{ - ctx: ctx, - cancel: cancel, - storeID: storeID, - splitClient: b.splitClient, - conn: conn, - } - r.resolveNow() - return r, nil -} - -func (b *Builder) Scheme() string { - return b.scheme -} - -var _ grpcresolver.Builder = &Builder{} - -type storeResolver struct { - ctx context.Context - cancel context.CancelFunc - storeID uint64 - splitClient restore.SplitClient - conn grpcresolver.ClientConn -} - -func (s *storeResolver) resolveNow() { - ctx, cancel := context.WithTimeout(s.ctx, defaultGetStoreTimeout) - defer cancel() - store, err := s.splitClient.GetStore(ctx, s.storeID) - if err == nil { - // We should use peer address for tiflash. For tikv, peer address is empty. - addr := store.GetPeerAddress() - if addr == "" { - addr = store.GetAddress() - } - state := grpcresolver.State{Addresses: []grpcresolver.Address{{Addr: addr}}} - err = s.conn.UpdateState(state) - } - if err != nil { - s.conn.ReportError(err) - } -} - -func (s *storeResolver) ResolveNow(grpcresolver.ResolveNowOptions) { - s.splitClient.InvalidateStoreCache(s.storeID) - s.resolveNow() -} - -func (s *storeResolver) Close() { - s.cancel() -} - -var _ grpcresolver.Resolver = &storeResolver{} diff --git a/br/pkg/resolver/resolver_test.go b/br/pkg/resolver/resolver_test.go deleted file mode 100644 index d1fcff4a385c1..0000000000000 --- a/br/pkg/resolver/resolver_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resolver - -import ( - "context" - "net/url" - "testing" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/restore" - "github.com/stretchr/testify/require" - grpcresolver "google.golang.org/grpc/resolver" -) - -type mockClientConn struct { - grpcresolver.ClientConn - state grpcresolver.State -} - -func (c *mockClientConn) UpdateState(state grpcresolver.State) error { - c.state = state - return nil -} - -type mockSplitClient struct { - restore.SplitClient - addr string - newAddr string -} - -func (c *mockSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { - store := &metapb.Store{ - Id: storeID, - Address: c.addr, - } - return store, nil -} - -func (c *mockSplitClient) InvalidateStoreCache(storeID uint64) { - c.addr = c.newAddr -} - -func TestResolver(t *testing.T) { - oldAddr := "192.168.1.1:20160" - newAddr := "192.168.1.2:20160" - sc := &mockSplitClient{ - addr: oldAddr, - newAddr: newAddr, - } - b := NewBuilder(sc) - - target := b.Target(1) - u, err := url.Parse(target) - require.NoError(t, err) - require.Equal(t, b.Scheme(), u.Scheme) - require.Equal(t, "1", u.Host) - - conn := &mockClientConn{} - rs, err := b.Build(grpcresolver.Target{URL: *u}, conn, grpcresolver.BuildOptions{}) - require.NoError(t, err) - require.NotEmpty(t, conn.state.Addresses) - require.Equal(t, oldAddr, conn.state.Addresses[0].Addr) - - conn.state.Addresses = nil - rs.ResolveNow(grpcresolver.ResolveNowOptions{}) - require.NotEmpty(t, conn.state.Addresses) - require.Equal(t, newAddr, conn.state.Addresses[0].Addr) -} diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 1cc6bcfc8e99e..c18c3969a8cdb 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -39,7 +39,6 @@ import ( const ( splitRegionMaxRetryTime = 4 - defaultDialTimeout = time.Minute ) // SplitClient is an external client used by RegionSplitter. @@ -64,7 +63,7 @@ type SplitClient interface { ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) - // ScanRegions gets a list of regions, starts from the region that contains key. + // ScanRegion gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) // GetPlacementRule loads a placement rule from PD. @@ -73,11 +72,9 @@ type SplitClient interface { SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error // DeletePlacementRule removes a placement rule from PD. DeletePlacementRule(ctx context.Context, groupID, ruleID string) error - // SetStoresLabel add or update specified label of stores. If labelValue + // SetStoreLabel add or update specified label of stores. If labelValue // is empty, it clears the label. SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error - // InvalidateStoreCache invalidate store cache for the given store id. - InvalidateStoreCache(storeID uint64) } // pdClient is a wrapper of pd client, can be used by RegionSplitter. @@ -195,7 +192,11 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key peer = regionInfo.Region.Peers[0] } storeID := peer.GetStoreId() - conn, err := c.dialStore(ctx, storeID) + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) if err != nil { return nil, errors.Trace(err) } @@ -311,7 +312,15 @@ func (c *pdClient) sendSplitRegionRequest( peer = regionInfo.Region.Peers[0] } storeID := peer.GetStoreId() - conn, err := c.dialStore(ctx, storeID) + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, multierr.Append(splitErrors, err) + } + opt := grpc.WithInsecure() + if c.tlsConf != nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) + } + conn, err := grpc.Dial(store.GetAddress(), opt) if err != nil { return nil, multierr.Append(splitErrors, err) } @@ -368,25 +377,6 @@ func (c *pdClient) sendSplitRegionRequest( return nil, errors.Trace(splitErrors) } -func (c *pdClient) dialStore(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { - store, err := c.GetStore(ctx, storeID) - if err != nil { - return nil, errors.Trace(err) - } - credsOpt := grpc.WithInsecure() - if c.tlsConf != nil { - credsOpt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) - } - subCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() - conn, err := grpc.DialContext(subCtx, store.GetAddress(), credsOpt, grpc.WithReturnConnectionError()) - if err != nil { - c.InvalidateStoreCache(storeID) - return nil, errors.Trace(err) - } - return conn, nil -} - func (c *pdClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *RegionInfo, keys [][]byte, ) (*RegionInfo, []*RegionInfo, error) { @@ -610,12 +600,6 @@ func (c *pdClient) getPDAPIAddr() string { return strings.TrimRight(addr, "/") } -func (c *pdClient) InvalidateStoreCache(storeID uint64) { - c.mu.Lock() - delete(c.storeCache, storeID) - c.mu.Unlock() -} - func checkRegionEpoch(_new, _old *RegionInfo) bool { return _new.Region.GetId() == _old.Region.GetId() && _new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() && diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index a029d09d789c4..5a4a8bbdad97b 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -237,8 +237,6 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } -func (c *TestClient) InvalidateStoreCache(storeID uint64) {} - type assertRetryLessThanBackoffer struct { max int already int