Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
Signed-off-by: SpadeA-Tang <[email protected]>
  • Loading branch information
SpadeA-Tang committed Jun 29, 2022
2 parents 8e9d80c + f0d5f6e commit 770ae35
Show file tree
Hide file tree
Showing 130 changed files with 4,552 additions and 1,975 deletions.
44 changes: 0 additions & 44 deletions .github/workflows/integration-test-with-real-tikv.yml

This file was deleted.

8 changes: 8 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,14 @@ def go_deps():
sum = "h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=",
version = "v0.0.4",
)
go_repository(
name = "com_github_golangci_gofmt",
build_file_proto_mode = "disable",
importpath = "github.com/golangci/gofmt",
sum = "h1:iR3fYXUjHCR97qWS8ch1y9zPNsgXThGwjKPrYfqMPks=",
version = "v0.0.0-20190930125516-244bba706f1a",
)

go_repository(
name = "com_github_golangci_prealloc",
build_file_proto_mode = "disable",
Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ The [community repository](https://github.com/pingcap/community) hosts all infor

Contributions are welcomed and greatly appreciated. All the contributors are welcomed to claim your reward by filing this [form](https://forms.pingcap.com/f/tidb-contribution-swag). See [Contribution to TiDB](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/introduction.html) for details on typical contribution workflows. For more contributing information, click on the contributor icon above.

## Adopters

View the current list of in-production TiDB adopters [here](https://docs.pingcap.com/tidb/stable/adopters).

## Case studies

- [English](https://pingcap.com/case-studies)
Expand Down
12 changes: 11 additions & 1 deletion br/pkg/errors/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "errors",
Expand All @@ -7,3 +7,13 @@ go_library(
visibility = ["//visibility:public"],
deps = ["@com_github_pingcap_errors//:errors"],
)

go_test(
name = "errors_test",
srcs = ["errors_test.go"],
deps = [
":errors",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
14 changes: 14 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
package errors

import (
"context"
stderrors "errors"

"github.com/pingcap/errors"
)

Expand All @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool {
return errorFound != nil
}

// IsContextCanceled checks whether the is caused by context.Canceled.
// errors.Cause does not work for the error wrapped by %w in fmt.Errorf.
// So we need to call stderrors.Is to unwrap the error.
func IsContextCanceled(err error) bool {
err = errors.Cause(err)
if err == context.Canceled || err == context.DeadlineExceeded {
return true
}
return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded)
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package errors_test

import (
"context"
"net/url"
"testing"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestIsContextCanceled(t *testing.T) {
require.False(t, berrors.IsContextCanceled(nil))
require.False(t, berrors.IsContextCanceled(errors.New("connection closed")))
require.True(t, berrors.IsContextCanceled(context.Canceled))
require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled)))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded)))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled}))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded}))
}
25 changes: 24 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,8 +1392,32 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

failpoint.Inject("ReadyForImportEngine", func() {})

for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down Expand Up @@ -1480,7 +1504,6 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
logger.Warn("[resolve-dupe] skipping resolution due to selected algorithm. this table will become inconsistent!", zap.Stringer("algorithm", algorithm))
return nil
case config.DupeResAlgRemove:
break
default:
panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
}
Expand Down
59 changes: 29 additions & 30 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -332,14 +331,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
Expand All @@ -349,7 +341,8 @@ func (local *local) SplitAndScatterRegionByRanges(
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
}
Expand Down Expand Up @@ -447,28 +440,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -478,13 +481,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
return false, errors.Errorf(
"failed to get region operator, error type: %s, error message: %s",
respErr.GetType().String(), respErr.GetMessage())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func (cfg *Config) CheckAndAdjustSecurity() error {
return common.ErrInvalidConfig.GenWithStack("cannot set `tidb.tls` to 'cluster' without a [security] section")
}
case "false", "skip-verify", "preferred":
break
return nil
default:
return common.ErrInvalidConfig.GenWithStack("unsupported `tidb.tls` config %s", cfg.TiDB.TLS)
}
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// Enable failpoint http API for testing.
failpoint.Inject("EnableTestAPI", func() {
mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
})
})

handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
Expand Down
26 changes: 12 additions & 14 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,19 @@ type MDDatabaseMeta struct {
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) string {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.FromContext(ctx).Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
if m.SchemaFile.FileMeta.Path != "" {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.FromContext(ctx).Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
} else if schemaStr := strings.TrimSpace(string(schema)); schemaStr != "" {
return schemaStr
}
}

return schemaStr
// set default if schema sql is empty or failed to extract.
return "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

type MDTableMeta struct {
Expand Down
Loading

0 comments on commit 770ae35

Please sign in to comment.