From 700931be2efebed8b2b682876f589f8cfad4a173 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 11:17:14 +0800 Subject: [PATCH 1/5] lightning: revert custom gRPC resolver --- br/pkg/lightning/backend/local/local.go | 16 ++- .../backend/local/localhelper_test.go | 2 - br/pkg/lightning/restore/restore.go | 2 +- br/pkg/resolver/resolver.go | 118 ------------------ br/pkg/resolver/resolver_test.go | 81 ------------ br/pkg/restore/split_client.go | 48 +++---- br/pkg/restore/split_test.go | 2 - 7 files changed, 27 insertions(+), 242 deletions(-) delete mode 100644 br/pkg/resolver/resolver.go delete mode 100644 br/pkg/resolver/resolver_test.go diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 480d7eaf6f93e..ae5d292d04dc7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pingcap/tidb/br/pkg/resolver" split "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" @@ -128,22 +127,23 @@ type importClientFactoryImpl struct { conns *common.GRPCConns splitCli split.SplitClient tls *common.TLS - resolveBuilder *resolver.Builder tcpConcurrency int } func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl { - resolveBuilder := resolver.NewBuilder(splitCli) return &importClientFactoryImpl{ conns: common.NewGRPCConns(), splitCli: splitCli, tls: tls, - resolveBuilder: resolveBuilder, tcpConcurrency: tcpConcurrency, } } func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { + store, err := f.splitCli.GetStore(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) + } opt := grpc.WithInsecure() if f.tls.TLSConfig() != nil { opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig())) @@ -152,9 +152,14 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) bfConf := backoff.DefaultConfig bfConf.MaxDelay = gRPCBackOffMaxDelay + // we should use peer address for tiflash. for tikv, peer address is empty + addr := store.GetPeerAddress() + if addr == "" { + addr = store.GetAddress() + } conn, err := grpc.DialContext( ctx, - f.resolveBuilder.Target(storeID), + addr, opt, grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), grpc.WithKeepaliveParams(keepalive.ClientParameters{ @@ -162,7 +167,6 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) Timeout: gRPCKeepAliveTimeout, PermitWithoutStream: true, }), - grpc.WithResolvers(f.resolveBuilder), ) cancel() if err != nil { diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index cdaf939b81fbf..dd52b71493507 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } -func (c *testClient) InvalidateStoreCache(storeID uint64) {} - func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo { r := &metapb.Region{} if region.Region != nil { diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 4804b90e09cf4..2ac8e0581fd4f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser( backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr) if err != nil { - return nil, errors.Annotate(err, "build local backend failed") + return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir) if err != nil { diff --git a/br/pkg/resolver/resolver.go b/br/pkg/resolver/resolver.go deleted file mode 100644 index 1656c1a9ae290..0000000000000 --- a/br/pkg/resolver/resolver.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resolver - -import ( - "context" - "crypto/rand" - "encoding/hex" - "fmt" - "io" - "strconv" - "time" - - "github.com/pingcap/tidb/br/pkg/restore" - grpcresolver "google.golang.org/grpc/resolver" -) - -const ( - schemePrefix = "tikvstore-" - defaultGetStoreTimeout = time.Second * 30 -) - -type Builder struct { - scheme string - splitClient restore.SplitClient -} - -func NewBuilder(splitClient restore.SplitClient) *Builder { - return &Builder{ - scheme: generateScheme(), - splitClient: splitClient, - } -} - -func generateScheme() string { - var buf [8]byte - _, err := io.ReadFull(rand.Reader, buf[:]) - if err != nil { - panic(err) - } - return schemePrefix + hex.EncodeToString(buf[:]) -} - -func (b *Builder) Target(storeID uint64) string { - return fmt.Sprintf("%s://%d", b.scheme, storeID) -} - -func (b *Builder) Build(target grpcresolver.Target, conn grpcresolver.ClientConn, opts grpcresolver.BuildOptions) (grpcresolver.Resolver, error) { - storeID, err := strconv.ParseUint(target.URL.Host, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid tikv store target %s: %w", target.URL.String(), err) - } - ctx, cancel := context.WithCancel(context.Background()) - r := &storeResolver{ - ctx: ctx, - cancel: cancel, - storeID: storeID, - splitClient: b.splitClient, - conn: conn, - } - r.resolveNow() - return r, nil -} - -func (b *Builder) Scheme() string { - return b.scheme -} - -var _ grpcresolver.Builder = &Builder{} - -type storeResolver struct { - ctx context.Context - cancel context.CancelFunc - storeID uint64 - splitClient restore.SplitClient - conn grpcresolver.ClientConn -} - -func (s *storeResolver) resolveNow() { - ctx, cancel := context.WithTimeout(s.ctx, defaultGetStoreTimeout) - defer cancel() - store, err := s.splitClient.GetStore(ctx, s.storeID) - if err == nil { - // We should use peer address for tiflash. For tikv, peer address is empty. - addr := store.GetPeerAddress() - if addr == "" { - addr = store.GetAddress() - } - state := grpcresolver.State{Addresses: []grpcresolver.Address{{Addr: addr}}} - err = s.conn.UpdateState(state) - } - if err != nil { - s.conn.ReportError(err) - } -} - -func (s *storeResolver) ResolveNow(grpcresolver.ResolveNowOptions) { - s.splitClient.InvalidateStoreCache(s.storeID) - s.resolveNow() -} - -func (s *storeResolver) Close() { - s.cancel() -} - -var _ grpcresolver.Resolver = &storeResolver{} diff --git a/br/pkg/resolver/resolver_test.go b/br/pkg/resolver/resolver_test.go deleted file mode 100644 index d1fcff4a385c1..0000000000000 --- a/br/pkg/resolver/resolver_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package resolver - -import ( - "context" - "net/url" - "testing" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/restore" - "github.com/stretchr/testify/require" - grpcresolver "google.golang.org/grpc/resolver" -) - -type mockClientConn struct { - grpcresolver.ClientConn - state grpcresolver.State -} - -func (c *mockClientConn) UpdateState(state grpcresolver.State) error { - c.state = state - return nil -} - -type mockSplitClient struct { - restore.SplitClient - addr string - newAddr string -} - -func (c *mockSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { - store := &metapb.Store{ - Id: storeID, - Address: c.addr, - } - return store, nil -} - -func (c *mockSplitClient) InvalidateStoreCache(storeID uint64) { - c.addr = c.newAddr -} - -func TestResolver(t *testing.T) { - oldAddr := "192.168.1.1:20160" - newAddr := "192.168.1.2:20160" - sc := &mockSplitClient{ - addr: oldAddr, - newAddr: newAddr, - } - b := NewBuilder(sc) - - target := b.Target(1) - u, err := url.Parse(target) - require.NoError(t, err) - require.Equal(t, b.Scheme(), u.Scheme) - require.Equal(t, "1", u.Host) - - conn := &mockClientConn{} - rs, err := b.Build(grpcresolver.Target{URL: *u}, conn, grpcresolver.BuildOptions{}) - require.NoError(t, err) - require.NotEmpty(t, conn.state.Addresses) - require.Equal(t, oldAddr, conn.state.Addresses[0].Addr) - - conn.state.Addresses = nil - rs.ResolveNow(grpcresolver.ResolveNowOptions{}) - require.NotEmpty(t, conn.state.Addresses) - require.Equal(t, newAddr, conn.state.Addresses[0].Addr) -} diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 1cc6bcfc8e99e..c18c3969a8cdb 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -39,7 +39,6 @@ import ( const ( splitRegionMaxRetryTime = 4 - defaultDialTimeout = time.Minute ) // SplitClient is an external client used by RegionSplitter. @@ -64,7 +63,7 @@ type SplitClient interface { ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) - // ScanRegions gets a list of regions, starts from the region that contains key. + // ScanRegion gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) // GetPlacementRule loads a placement rule from PD. @@ -73,11 +72,9 @@ type SplitClient interface { SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error // DeletePlacementRule removes a placement rule from PD. DeletePlacementRule(ctx context.Context, groupID, ruleID string) error - // SetStoresLabel add or update specified label of stores. If labelValue + // SetStoreLabel add or update specified label of stores. If labelValue // is empty, it clears the label. SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error - // InvalidateStoreCache invalidate store cache for the given store id. - InvalidateStoreCache(storeID uint64) } // pdClient is a wrapper of pd client, can be used by RegionSplitter. @@ -195,7 +192,11 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key peer = regionInfo.Region.Peers[0] } storeID := peer.GetStoreId() - conn, err := c.dialStore(ctx, storeID) + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) + } + conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure()) if err != nil { return nil, errors.Trace(err) } @@ -311,7 +312,15 @@ func (c *pdClient) sendSplitRegionRequest( peer = regionInfo.Region.Peers[0] } storeID := peer.GetStoreId() - conn, err := c.dialStore(ctx, storeID) + store, err := c.GetStore(ctx, storeID) + if err != nil { + return nil, multierr.Append(splitErrors, err) + } + opt := grpc.WithInsecure() + if c.tlsConf != nil { + opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) + } + conn, err := grpc.Dial(store.GetAddress(), opt) if err != nil { return nil, multierr.Append(splitErrors, err) } @@ -368,25 +377,6 @@ func (c *pdClient) sendSplitRegionRequest( return nil, errors.Trace(splitErrors) } -func (c *pdClient) dialStore(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { - store, err := c.GetStore(ctx, storeID) - if err != nil { - return nil, errors.Trace(err) - } - credsOpt := grpc.WithInsecure() - if c.tlsConf != nil { - credsOpt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf)) - } - subCtx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() - conn, err := grpc.DialContext(subCtx, store.GetAddress(), credsOpt, grpc.WithReturnConnectionError()) - if err != nil { - c.InvalidateStoreCache(storeID) - return nil, errors.Trace(err) - } - return conn, nil -} - func (c *pdClient) BatchSplitRegionsWithOrigin( ctx context.Context, regionInfo *RegionInfo, keys [][]byte, ) (*RegionInfo, []*RegionInfo, error) { @@ -610,12 +600,6 @@ func (c *pdClient) getPDAPIAddr() string { return strings.TrimRight(addr, "/") } -func (c *pdClient) InvalidateStoreCache(storeID uint64) { - c.mu.Lock() - delete(c.storeCache, storeID) - c.mu.Unlock() -} - func checkRegionEpoch(_new, _old *RegionInfo) bool { return _new.Region.GetId() == _old.Region.GetId() && _new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() && diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index a029d09d789c4..5a4a8bbdad97b 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -237,8 +237,6 @@ func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK return nil } -func (c *TestClient) InvalidateStoreCache(storeID uint64) {} - type assertRetryLessThanBackoffer struct { max int already int From 672ecb108be1738822e2e81b960c403675b1e1df Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 12:58:30 +0800 Subject: [PATCH 2/5] fix unstable test --- br/tests/br_split_region_fail/run.sh | 2 +- br/tests/lightning_fail_fast/run.sh | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/br/tests/br_split_region_fail/run.sh b/br/tests/br_split_region_fail/run.sh index be26823cd6810..13b511313ebfa 100644 --- a/br/tests/br_split_region_fail/run.sh +++ b/br/tests/br_split_region_fail/run.sh @@ -52,7 +52,7 @@ BR_LOG_TO_TERM=1 grep "a error occurs on split region" $LOG && \ grep "split region meet not leader error" $LOG && \ -grep "Full restore success" $LOG && \ +grep "Full Restore success" $LOG && \ grep "find new leader" $LOG if [ $? -ne 0 ]; then diff --git a/br/tests/lightning_fail_fast/run.sh b/br/tests/lightning_fail_fast/run.sh index 8894bd0c5c8f4..caba9f39b48e5 100755 --- a/br/tests/lightning_fail_fast/run.sh +++ b/br/tests/lightning_fail_fast/run.sh @@ -27,8 +27,7 @@ for CFG in chunk engine; do tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062: Duplicate entry '1-1' for key 'uq'" - ! grep -Fq "restore file completed" $TEST_DIR/lightning-tidb.log - [ $? -eq 0 ] + grep -Fq "restore file failed" $TEST_DIR/lightning-tidb.log ! grep -Fq "restore engine completed" $TEST_DIR/lightning-tidb.log [ $? -eq 0 ] From 81a44a9f6766e6ae34556d7106a051b9d0cb5f27 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 13:59:40 +0800 Subject: [PATCH 3/5] fix typo --- br/pkg/restore/split_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index c18c3969a8cdb..36f5d0164e35d 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -63,7 +63,7 @@ type SplitClient interface { ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) - // ScanRegion gets a list of regions, starts from the region that contains key. + // ScanRegions gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) // GetPlacementRule loads a placement rule from PD. @@ -72,7 +72,7 @@ type SplitClient interface { SetPlacementRule(ctx context.Context, rule pdtypes.Rule) error // DeletePlacementRule removes a placement rule from PD. DeletePlacementRule(ctx context.Context, groupID, ruleID string) error - // SetStoreLabel add or update specified label of stores. If labelValue + // SetStoresLabel add or update specified label of stores. If labelValue // is empty, it clears the label. SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error } From 5e6a45507cd711a6bb7499e05a2f8e99b152f15a Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 14:05:44 +0800 Subject: [PATCH 4/5] fix lightning_fail_fast --- br/tests/lightning_fail_fast/run.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/br/tests/lightning_fail_fast/run.sh b/br/tests/lightning_fail_fast/run.sh index caba9f39b48e5..a1723b3e2cffd 100755 --- a/br/tests/lightning_fail_fast/run.sh +++ b/br/tests/lightning_fail_fast/run.sh @@ -16,7 +16,7 @@ set -eux -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(50);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)' +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownWriteRows=sleep(100);github.com/pingcap/tidb/br/pkg/lightning/restore/SetMinDeliverBytes=return(1)' for CFG in chunk engine; do rm -f "$TEST_DIR/lightning-tidb.log" @@ -27,7 +27,8 @@ for CFG in chunk engine; do tail -n 10 $TEST_DIR/lightning-tidb.log | grep "ERROR" | tail -n 1 | grep -Fq "Error 1062: Duplicate entry '1-1' for key 'uq'" - grep -Fq "restore file failed" $TEST_DIR/lightning-tidb.log + ! grep -Fq "restore file completed" $TEST_DIR/lightning-tidb.log + [ $? -eq 0 ] ! grep -Fq "restore engine completed" $TEST_DIR/lightning-tidb.log [ $? -eq 0 ] From 37b47d79b638db2fed90c61498e491d2ad2776e2 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 16 Mar 2022 15:44:05 +0800 Subject: [PATCH 5/5] fix error --- br/pkg/lightning/lightning.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 62b41bafa6c21..907b316ab8399 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -306,7 +306,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. if walkErr == nil { return common.ErrEmptySourceDir.GenWithStackByArgs(taskCfg.Mydumper.SourceDir) } - return common.NormalizeError(err) + return common.NormalizeOrWrapErr(common.ErrStorageUnknown, walkErr) } loadTask := log.L().Begin(zap.InfoLevel, "load data source")