diff --git a/br/pkg/restore/internal/log_split/BUILD.bazel b/br/pkg/restore/internal/log_split/BUILD.bazel index ffe8b4f5f1dea..d929b04c003ad 100644 --- a/br/pkg/restore/internal/log_split/BUILD.bazel +++ b/br/pkg/restore/internal/log_split/BUILD.bazel @@ -10,10 +10,9 @@ go_library( visibility = ["//br/pkg/restore:__subpackages__"], deps = [ "//br/pkg/logutil", - "//br/pkg/restore/internal/utils", + "//br/pkg/restore/internal/snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", - "//br/pkg/rtree", "//br/pkg/utils", "//pkg/kv", "//pkg/tablecodec", @@ -40,7 +39,7 @@ go_test( flaky = True, shard_count = 4, deps = [ - "//br/pkg/restore/internal/utils", + "//br/pkg/restore/internal/snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/utiltest", diff --git a/br/pkg/restore/internal/log_split/split.go b/br/pkg/restore/internal/log_split/split.go index 1c2319d513425..eb9b3165ce761 100644 --- a/br/pkg/restore/internal/log_split/split.go +++ b/br/pkg/restore/internal/log_split/split.go @@ -24,10 +24,9 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/restore/internal/utils" + snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" @@ -139,11 +138,11 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { }) } -type splitFunc = func(context.Context, *utils.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error +type splitFunc = func(context.Context, *snapsplit.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error func (helper *LogSplitHelper) splitRegionByPoints( ctx context.Context, - regionSplitter *utils.RegionSplitter, + regionSplitter *snapsplit.RegionSplitter, initialLength uint64, initialNumber int64, region *split.RegionInfo, @@ -176,14 +175,10 @@ func (helper *LogSplitHelper) splitRegionByPoints( newRegions, errSplit := regionSplitter.SplitWaitAndScatter(ctx, region, splitPoints) if errSplit != nil { log.Warn("failed to split the scaned region", zap.Error(errSplit)) - _, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil) - ranges := make([]rtree.Range, 0, len(splitPoints)) - for _, point := range splitPoints { - ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point}) - startKey = point - } - - return regionSplitter.ExecuteSplit(ctx, ranges) + sort.Slice(splitPoints, func(i, j int) bool { + return bytes.Compare(splitPoints[i], splitPoints[j]) < 0 + }) + return regionSplitter.ExecuteSplit(ctx, splitPoints) } select { case <-ctx.Done(): @@ -205,7 +200,7 @@ func SplitPoint( ) (err error) { // common status var ( - regionSplitter *utils.RegionSplitter = utils.NewRegionSplitter(client) + regionSplitter *snapsplit.RegionSplitter = snapsplit.NewRegionSplitter(client) ) // region traverse status var ( @@ -357,7 +352,7 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error { } } - regionSplitter := utils.NewRegionSplitter(helper.client) + regionSplitter := snapsplit.NewRegionSplitter(helper.client) // It is too expensive to stop recovery and wait for a small number of regions // to complete scatter, so the maximum waiting time is reduced to 1 minute. _ = regionSplitter.WaitForScatterRegionsTimeout(ctx, scatterRegions, time.Minute) diff --git a/br/pkg/restore/internal/log_split/split_test.go b/br/pkg/restore/internal/log_split/split_test.go index 578d9eefb447d..acbf61ae12f29 100644 --- a/br/pkg/restore/internal/log_split/split_test.go +++ b/br/pkg/restore/internal/log_split/split_test.go @@ -23,7 +23,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split" - "github.com/pingcap/tidb/br/pkg/restore/internal/utils" + snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/utiltest" @@ -66,7 +66,7 @@ func TestSplitPoint(t *testing.T) { client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { + err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { require.Equal(t, u, uint64(0)) require.Equal(t, o, int64(0)) require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) @@ -124,7 +124,7 @@ func TestSplitPoint2(t *testing.T) { firstSplit := true iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { + err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { if firstSplit { require.Equal(t, u, uint64(0)) require.Equal(t, o, int64(0)) diff --git a/br/pkg/restore/internal/utils/BUILD.bazel b/br/pkg/restore/internal/snap_split/BUILD.bazel similarity index 73% rename from br/pkg/restore/internal/utils/BUILD.bazel rename to br/pkg/restore/internal/snap_split/BUILD.bazel index 9d791b3d5fefd..ab6df360220d6 100644 --- a/br/pkg/restore/internal/utils/BUILD.bazel +++ b/br/pkg/restore/internal/snap_split/BUILD.bazel @@ -1,16 +1,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "utils", + name = "snap_split", srcs = ["split.go"], - importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/utils", + importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split", visibility = ["//br/pkg/restore:__subpackages__"], deps = [ - "//br/pkg/errors", - "//br/pkg/logutil", "//br/pkg/restore/split", - "//br/pkg/rtree", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@org_uber_go_zap//:zap", @@ -18,17 +14,16 @@ go_library( ) go_test( - name = "utils_test", + name = "snap_split_test", timeout = "short", srcs = ["split_test.go"], flaky = True, - shard_count = 5, + shard_count = 4, deps = [ - ":utils", + ":snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/rtree", - "//pkg/tablecodec", "//pkg/util/codec", "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_stretchr_testify//require", diff --git a/br/pkg/restore/internal/utils/split.go b/br/pkg/restore/internal/snap_split/split.go similarity index 70% rename from br/pkg/restore/internal/utils/split.go rename to br/pkg/restore/internal/snap_split/split.go index 82ff17acc2817..fca4a69cb5e6b 100644 --- a/br/pkg/restore/internal/utils/split.go +++ b/br/pkg/restore/internal/snap_split/split.go @@ -1,18 +1,14 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils +package snapsplit import ( "context" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/log" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" - "github.com/pingcap/tidb/br/pkg/rtree" "go.uber.org/zap" ) @@ -41,37 +37,15 @@ func (rs *RegionSplitter) SplitWaitAndScatter(ctx context.Context, region *split // note: all ranges and rewrite rules must have raw key. func (rs *RegionSplitter) ExecuteSplit( ctx context.Context, - ranges []rtree.Range, + sortedSplitKeys [][]byte, ) error { - if len(ranges) == 0 { - log.Info("skip split regions, no range") + if len(sortedSplitKeys) == 0 { + log.Info("skip split regions, no split keys") return nil } - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("RegionSplitter.Split", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - // Sort the range for getting the min and max key of the ranges - // TODO: this sort may not needed if we sort tables after creatation outside. - sortedRanges, errSplit := SortRanges(ranges) - if errSplit != nil { - return errors.Trace(errSplit) - } - if len(sortedRanges) == 0 { - log.Info("skip split regions after sorted, no range") - return nil - } - sortedKeys := make([][]byte, 0, len(sortedRanges)) - totalRangeSize := uint64(0) - for _, r := range sortedRanges { - sortedKeys = append(sortedKeys, r.EndKey) - totalRangeSize += r.Size - } - // the range size must be greater than 0 here - return rs.executeSplitByRanges(ctx, sortedKeys) + log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys))) + return rs.executeSplitByRanges(ctx, sortedSplitKeys) } func (rs *RegionSplitter) executeSplitByRanges( @@ -151,20 +125,3 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos) return leftRegions } - -// SortRanges checks if the range overlapped and sort them. -func SortRanges(ranges []rtree.Range) ([]rtree.Range, error) { - rangeTree := rtree.NewRangeTree() - for _, rg := range ranges { - if out := rangeTree.InsertRange(rg); out != nil { - log.Error("insert ranges overlapped", - logutil.Key("startKeyOut", out.StartKey), - logutil.Key("endKeyOut", out.EndKey), - logutil.Key("startKeyIn", rg.StartKey), - logutil.Key("endKeyIn", rg.EndKey)) - return nil, errors.Annotatef(berrors.ErrInvalidRange, "ranges overlapped") - } - } - sortedRanges := rangeTree.GetSortedRanges() - return sortedRanges, nil -} diff --git a/br/pkg/restore/internal/utils/split_test.go b/br/pkg/restore/internal/snap_split/split_test.go similarity index 52% rename from br/pkg/restore/internal/utils/split_test.go rename to br/pkg/restore/internal/snap_split/split_test.go index 4eeacb69f61c9..0507950d589c5 100644 --- a/br/pkg/restore/internal/utils/split_test.go +++ b/br/pkg/restore/internal/snap_split/split_test.go @@ -1,17 +1,18 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils_test +package snapsplit_test import ( + "bytes" "context" + "sort" "testing" "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/tidb/br/pkg/restore/internal/utils" + snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" ) @@ -20,13 +21,13 @@ func TestScanEmptyRegion(t *testing.T) { mockPDCli := split.NewMockPDClientForSplit() mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) client := split.NewClient(mockPDCli, nil, nil, 100, 4) - ranges := initRanges() - // make ranges has only one - ranges = ranges[0:1] - regionSplitter := utils.NewRegionSplitter(client) + keys := initKeys() + // make keys has only one + keys = keys[0:1] + regionSplitter := snapsplit.NewRegionSplitter(client) ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, ranges) + err := regionSplitter.ExecuteSplit(ctx, keys) // should not return error with only one range entry require.NoError(t, err) } @@ -35,7 +36,7 @@ func TestSplitEmptyRegion(t *testing.T) { mockPDCli := split.NewMockPDClientForSplit() mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) client := split.NewClient(mockPDCli, nil, nil, 100, 4) - regionSplitter := utils.NewRegionSplitter(client) + regionSplitter := snapsplit.NewRegionSplitter(client) err := regionSplitter.ExecuteSplit(context.Background(), nil) require.NoError(t, err) } @@ -53,17 +54,21 @@ func TestSplitAndScatter(t *testing.T) { mockPDCli := split.NewMockPDClientForSplit() mockPDCli.SetRegions(rangeBoundaries) client := split.NewClient(mockPDCli, nil, nil, 100, 4) - regionSplitter := utils.NewRegionSplitter(client) + regionSplitter := snapsplit.NewRegionSplitter(client) ctx := context.Background() ranges := initRanges() rules := initRewriteRules() - for i, rg := range ranges { + splitKeys := make([][]byte, 0, len(ranges)) + for _, rg := range ranges { tmp, err := restoreutils.RewriteRange(&rg, rules) require.NoError(t, err) - ranges[i] = *tmp + splitKeys = append(splitKeys, tmp.EndKey) } - err := regionSplitter.ExecuteSplit(ctx, ranges) + sort.Slice(splitKeys, func(i, j int) bool { + return bytes.Compare(splitKeys[i], splitKeys[j]) < 0 + }) + err := regionSplitter.ExecuteSplit(ctx, splitKeys) require.NoError(t, err) regions := mockPDCli.Regions.ScanRange(nil, nil, 100) expected := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbf"), []byte("bbh"), []byte("bbj"), []byte("cca"), []byte("xxe"), []byte("xxz"), []byte("")} @@ -86,20 +91,15 @@ func encodeBytes(keys [][]byte) { func TestRawSplit(t *testing.T) { // Fix issue #36490. - ranges := []rtree.Range{ - { - StartKey: []byte{0}, - EndKey: []byte{}, - }, - } + splitKeys := [][]byte{{}} ctx := context.Background() rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} mockPDCli := split.NewMockPDClientForSplit() mockPDCli.SetRegions(rangeBoundaries) client := split.NewClient(mockPDCli, nil, nil, 100, 4, split.WithRawKV()) - regionSplitter := utils.NewRegionSplitter(client) - err := regionSplitter.ExecuteSplit(ctx, ranges) + regionSplitter := snapsplit.NewRegionSplitter(client) + err := regionSplitter.ExecuteSplit(ctx, splitKeys) require.NoError(t, err) regions := mockPDCli.Regions.ScanRange(nil, nil, 100) @@ -110,6 +110,16 @@ func TestRawSplit(t *testing.T) { } } +// keys: aae, aaz, ccf, ccj +func initKeys() [][]byte { + return [][]byte{ + []byte("aae"), + []byte("aaz"), + []byte("ccf"), + []byte("ccj"), + } +} + // range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) func initRanges() []rtree.Range { var ranges [4]rtree.Range @@ -146,89 +156,3 @@ func initRewriteRules() *restoreutils.RewriteRules { Data: rules[:], } } - -func TestSortRange(t *testing.T) { - dataRules := []*import_sstpb.RewriteRule{ - {OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), NewKeyPrefix: tablecodec.GenTableRecordPrefix(4)}, - {OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), NewKeyPrefix: tablecodec.GenTableRecordPrefix(5)}, - } - rewriteRules := &restoreutils.RewriteRules{ - Data: dataRules, - } - ranges1 := []rtree.Range{ - { - StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - EndKey: append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), Files: nil, - }, - } - for i, rg := range ranges1 { - tmp, _ := restoreutils.RewriteRange(&rg, rewriteRules) - ranges1[i] = *tmp - } - rs1, err := utils.SortRanges(ranges1) - require.NoErrorf(t, err, "sort range1 failed: %v", err) - rangeEquals(t, rs1, []rtree.Range{ - { - StartKey: append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), - EndKey: append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...), Files: nil, - }, - }) - - ranges2 := []rtree.Range{ - { - StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - EndKey: append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), Files: nil, - }, - } - for _, rg := range ranges2 { - _, err := restoreutils.RewriteRange(&rg, rewriteRules) - require.Error(t, err) - require.Regexp(t, "table id mismatch.*", err.Error()) - } - - ranges3 := []rtree.Range{ - {StartKey: []byte("aaa"), EndKey: []byte("aae")}, - {StartKey: []byte("aae"), EndKey: []byte("aaz")}, - {StartKey: []byte("ccd"), EndKey: []byte("ccf")}, - {StartKey: []byte("ccf"), EndKey: []byte("ccj")}, - } - rewriteRules1 := &restoreutils.RewriteRules{ - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: []byte("aa"), - NewKeyPrefix: []byte("xx"), - }, { - OldKeyPrefix: []byte("cc"), - NewKeyPrefix: []byte("bb"), - }, - }, - } - for i, rg := range ranges3 { - tmp, _ := restoreutils.RewriteRange(&rg, rewriteRules1) - ranges3[i] = *tmp - } - rs3, err := utils.SortRanges(ranges3) - require.NoErrorf(t, err, "sort range1 failed: %v", err) - rangeEquals(t, rs3, []rtree.Range{ - {StartKey: []byte("bbd"), EndKey: []byte("bbf"), Files: nil}, - {StartKey: []byte("bbf"), EndKey: []byte("bbj"), Files: nil}, - {StartKey: []byte("xxa"), EndKey: []byte("xxe"), Files: nil}, - {StartKey: []byte("xxe"), EndKey: []byte("xxz"), Files: nil}, - }) - - // overlap ranges - ranges4 := []rtree.Range{ - {StartKey: []byte("aaa"), EndKey: []byte("aae")}, - {StartKey: []byte("aaa"), EndKey: []byte("aaz")}, - } - _, err = utils.SortRanges(ranges4) - require.Error(t, err) -} - -func rangeEquals(t *testing.T, obtained, expected []rtree.Range) { - require.Equal(t, len(expected), len(obtained)) - for i := range obtained { - require.Equal(t, expected[i].StartKey, obtained[i].StartKey) - require.Equal(t, expected[i].EndKey, obtained[i].EndKey) - } -} diff --git a/br/pkg/restore/snap_client/BUILD.bazel b/br/pkg/restore/snap_client/BUILD.bazel index 6f01151d99bc6..6bf12771e5ce0 100644 --- a/br/pkg/restore/snap_client/BUILD.bazel +++ b/br/pkg/restore/snap_client/BUILD.bazel @@ -5,11 +5,11 @@ go_library( srcs = [ "batcher.go", "client.go", - "context_manager.go", "import.go", "pipeline_items.go", + "placement_rule_manager.go", "systable_restore.go", - "zap.go", + "tikv_sender.go", ], importpath = "github.com/pingcap/tidb/br/pkg/restore/snap_client", visibility = ["//visibility:public"], @@ -27,7 +27,7 @@ go_library( "//br/pkg/restore/internal/import_client", "//br/pkg/restore/internal/prealloc_db", "//br/pkg/restore/internal/prealloc_table_id", - "//br/pkg/restore/internal/utils", + "//br/pkg/restore/internal/snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/rtree", @@ -67,7 +67,6 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zapcore", ], ) @@ -75,29 +74,25 @@ go_test( name = "snap_client_test", timeout = "short", srcs = [ - "batcher_test.go", "client_test.go", - "context_manager_test.go", "export_test.go", "import_test.go", "main_test.go", - "pipeline_items_test.go", + "placement_rule_manager_test.go", "systable_restore_test.go", + "tikv_sender_test.go", ], embed = [":snap_client"], flaky = True, - shard_count = 23, + shard_count = 16, deps = [ "//br/pkg/errors", - "//br/pkg/glue", "//br/pkg/gluetidb", - "//br/pkg/logutil", "//br/pkg/metautil", "//br/pkg/mock", "//br/pkg/restore", "//br/pkg/restore/internal/import_client", "//br/pkg/restore/utils", - "//br/pkg/rtree", "//br/pkg/utils", "//br/pkg/utiltest", "//pkg/domain", @@ -115,11 +110,9 @@ go_test( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_pd_client//:client", "@org_golang_x_exp//slices", "@org_uber_go_goleak//:goleak", - "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/restore/snap_client/batcher.go b/br/pkg/restore/snap_client/batcher.go index 8795a3044397c..39ba91d36a8fb 100644 --- a/br/pkg/restore/snap_client/batcher.go +++ b/br/pkg/restore/snap_client/batcher.go @@ -3,223 +3,13 @@ package snapclient import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/opentracing/opentracing-go" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/summary" - "go.uber.org/zap" ) -// SendType is the 'type' of a send. -// when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled), -// or, we just want to clean overflowing ranges(when just adding a table to batcher). -type SendType int - -const ( - // SendUntilLessThanBatch will make the batcher send batch until - // its remaining range is less than its batchSizeThreshold. - SendUntilLessThanBatch SendType = iota - // SendAll will make the batcher send all pending ranges. - SendAll - // SendAllThenClose will make the batcher send all pending ranges and then close itself. - SendAllThenClose -) - -// Batcher collects ranges to restore and send batching split/ingest request. -type Batcher struct { - cachedTables []TableWithRange - cachedTablesMu *sync.Mutex - - // autoCommitJoiner is for joining the background batch sender. - autoCommitJoiner chan<- struct{} - // everythingIsDone is for waiting for worker done: that is, after we send a - // signal to autoCommitJoiner, we must give it enough time to get things done. - // Then, it should notify us by this wait group. - // Use wait group instead of a trivial channel for further extension. - everythingIsDone *sync.WaitGroup - // sendErr is for output error information. - sendErr chan<- error - // sendCh is for communiate with sendWorker. - sendCh chan<- SendType - // outCh is for output the restored table, so it can be sent to do something like checksum. - outCh chan<- *CreatedTable - - updateCh glue.Progress - - sender BatchSender - manager ContextManager - batchSizeThreshold int - size int32 - - checkpointSetWithTableID map[int64]map[string]struct{} -} - -// Len calculate the current size of this batcher. -func (b *Batcher) Len() int { - return int(atomic.LoadInt32(&b.size)) -} - -// contextCleaner is the worker goroutine that cleaning the 'context' -// (e.g. make regions leave restore mode). -func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTable) { - defer func() { - if ctx.Err() != nil { - log.Info("restore canceled, cleaning in background context") - b.manager.Close(context.Background()) - } else { - b.manager.Close(ctx) - } - }() - defer b.everythingIsDone.Done() - for { - select { - case <-ctx.Done(): - return - case tbls, ok := <-tables: - if !ok { - return - } - if err := b.manager.Leave(ctx, tbls); err != nil { - b.sendErr <- err - return - } - for _, tbl := range tbls { - cloneTable := tbl - b.outCh <- &cloneTable - } - } - } -} - -// NewBatcher creates a new batcher by a sender and a context manager. -// the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). -// the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). -// this batcher will work background, send batches per second, or batch size reaches limit. -// and it will emit full-restored tables to the output channel returned. -func NewBatcher( - ctx context.Context, - sender BatchSender, - manager ContextManager, - errCh chan<- error, - updateCh glue.Progress, -) (*Batcher, chan *CreatedTable) { - outCh := defaultOutputTableChan() - sendChan := make(chan SendType, 2) - b := &Batcher{ - sendErr: errCh, - outCh: outCh, - sender: sender, - manager: manager, - sendCh: sendChan, - updateCh: updateCh, - cachedTablesMu: new(sync.Mutex), - everythingIsDone: new(sync.WaitGroup), - batchSizeThreshold: 1, - } - b.everythingIsDone.Add(2) - go b.sendWorker(ctx, sendChan) - restoredTables := make(chan []CreatedTable, defaultChannelSize) - go b.contextCleaner(ctx, restoredTables) - sink := chanTableSink{restoredTables, errCh} - sender.PutSink(sink) - return b, outCh -} - -// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough. -// we make this function for disable AutoCommit in some case. -func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration) { - if b.autoCommitJoiner != nil { - // IMO, making two auto commit goroutine wouldn't be a good idea. - // If desire(e.g. change the peroid of auto commit), please disable auto commit firstly. - log.L().DPanic("enabling auto commit on a batcher that auto commit has been enabled, which isn't allowed") - } - joiner := make(chan struct{}) - go b.autoCommitWorker(ctx, joiner, delay) - b.autoCommitJoiner = joiner -} - -// DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, -// and then disable auto commit. -func (b *Batcher) DisableAutoCommit() { - b.joinAutoCommitWorker() - b.autoCommitJoiner = nil -} - -func (b *Batcher) waitUntilSendDone() { - b.sendCh <- SendAllThenClose - b.everythingIsDone.Wait() -} - -// joinAutoCommitWorker blocks the current goroutine until the worker can gracefully stop. -// return immediately when auto commit disabled. -func (b *Batcher) joinAutoCommitWorker() { - if b.autoCommitJoiner != nil { - log.Debug("gracefully stopping worker goroutine") - b.autoCommitJoiner <- struct{}{} - close(b.autoCommitJoiner) - log.Debug("gracefully stopped worker goroutine") - } -} - -// sendWorker is the 'worker' that send all ranges to TiKV. -// TODO since all operations are asynchronous now, it's possible to remove this worker. -func (b *Batcher) sendWorker(ctx context.Context, send <-chan SendType) { - sendUntil := func(lessOrEqual int) { - for b.Len() > lessOrEqual { - b.Send(ctx) - } - } - - for sendType := range send { - switch sendType { - case SendUntilLessThanBatch: - sendUntil(b.batchSizeThreshold) - case SendAll: - sendUntil(0) - case SendAllThenClose: - sendUntil(0) - b.sender.Close() - b.everythingIsDone.Done() - return - } - } -} - -func (b *Batcher) autoCommitWorker(ctx context.Context, joiner <-chan struct{}, delay time.Duration) { - tick := time.NewTicker(delay) - defer tick.Stop() - for { - select { - case <-joiner: - log.Debug("graceful stop signal received") - return - case <-ctx.Done(): - b.sendErr <- ctx.Err() - return - case <-tick.C: - if b.Len() > 0 { - log.Debug("sending batch because time limit exceed", zap.Int("size", b.Len())) - b.asyncSend(SendAll) - } - } - } -} - -func (b *Batcher) asyncSend(t SendType) { - // add a check here so we won't replica sending. - if len(b.sendCh) == 0 { - b.sendCh <- t - } -} - // DrainResult is the collection of some ranges and theirs metadata. type DrainResult struct { // TablesToSend are tables that would be send at this batch. @@ -229,7 +19,7 @@ type DrainResult struct { // RewriteRules are the rewrite rules for the tables. // the key is the table id after rewritten. RewriteRulesMap map[int64]*utils.RewriteRules - Ranges []rtree.Range + Ranges []rtree.RangeStats // Record which part of ranges belongs to the table TableEndOffsetInRanges []int } @@ -268,13 +58,13 @@ func newDrainResult() DrainResult { TablesToSend: make([]CreatedTable, 0), BlankTablesAfterSend: make([]CreatedTable, 0), RewriteRulesMap: utils.EmptyRewriteRulesMap(), - Ranges: make([]rtree.Range, 0), + Ranges: make([]rtree.RangeStats, 0), TableEndOffsetInRanges: make([]int, 0), } } // fileterOutRanges filter out the files from `drained-range` that exists in the checkpoint set. -func (b *Batcher) filterOutRanges(checkpointSet map[string]struct{}, drained []rtree.Range) []rtree.Range { +func filterOutRanges(checkpointSet map[string]struct{}, drained []rtree.RangeStats, updateCh glue.Progress) []rtree.RangeStats { progress := int(0) totalKVs := uint64(0) totalBytes := uint64(0) @@ -297,7 +87,7 @@ func (b *Batcher) filterOutRanges(checkpointSet map[string]struct{}, drained []r } if progress > 0 { // (split/scatter + download/ingest) / (default cf + write cf) - b.updateCh.IncBy(int64(progress) * 2 / 2) + updateCh.IncBy(int64(progress) * 2 / 2) summary.CollectSuccessUnit(summary.TotalKV, progress, totalKVs) summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, progress, totalKVs) summary.CollectSuccessUnit(summary.TotalBytes, progress, totalBytes) @@ -324,133 +114,29 @@ func (b *Batcher) filterOutRanges(checkpointSet map[string]struct{}, drained []r // |--|-------| // |t2|t3 | // as you can see, all restored ranges would be removed. -func (b *Batcher) drainRanges() DrainResult { +func drainRanges( + tableWithRanges []TableWithRange, + checkpointSetWithTableID map[int64]map[string]struct{}, + updateCh glue.Progress, +) DrainResult { result := newDrainResult() - b.cachedTablesMu.Lock() - defer b.cachedTablesMu.Unlock() - - for offset, thisTable := range b.cachedTables { - t, exists := b.checkpointSetWithTableID[thisTable.Table.ID] - thisTableLen := len(thisTable.Range) - collected := len(result.Ranges) + for offset, thisTable := range tableWithRanges { + t, exists := checkpointSetWithTableID[thisTable.Table.ID] result.RewriteRulesMap[thisTable.Table.ID] = thisTable.RewriteRule result.TablesToSend = append(result.TablesToSend, thisTable.CreatedTable) - - // the batch is full, we should stop here! - // we use strictly greater than because when we send a batch at equal, the offset should plus one. - // (because the last table is sent, we should put it in emptyTables), and this will introduce extra complex. - if thisTableLen+collected > b.batchSizeThreshold { - drainSize := b.batchSizeThreshold - collected - thisTableRanges := thisTable.Range - - var drained []rtree.Range - drained, b.cachedTables[offset].Range = thisTableRanges[:drainSize], thisTableRanges[drainSize:] - log.Debug("draining partial table to batch", - zap.Stringer("db", thisTable.OldTable.DB.Name), - zap.Stringer("table", thisTable.Table.Name), - zap.Int("size", thisTableLen), - zap.Int("drained", drainSize), - ) - // Firstly calculated the batcher size, and then - // filter out ranges by checkpoint. - atomic.AddInt32(&b.size, -int32(len(drained))) - if exists { - drained = b.filterOutRanges(t, drained) - } - result.Ranges = append(result.Ranges, drained...) - result.TableEndOffsetInRanges = append(result.TableEndOffsetInRanges, len(result.Ranges)) - b.cachedTables = b.cachedTables[offset:] - return result - } - result.BlankTablesAfterSend = append(result.BlankTablesAfterSend, thisTable.CreatedTable) - // Firstly calculated the batcher size, and then filter out ranges by checkpoint. - atomic.AddInt32(&b.size, -int32(len(thisTable.Range))) // let's 'drain' the ranges of current table. This op must not make the batch full. if exists { - result.Ranges = append(result.Ranges, b.filterOutRanges(t, thisTable.Range)...) + result.Ranges = append(result.Ranges, filterOutRanges(t, thisTable.Range, updateCh)...) } else { result.Ranges = append(result.Ranges, thisTable.Range...) } result.TableEndOffsetInRanges = append(result.TableEndOffsetInRanges, len(result.Ranges)) // clear the table length. - b.cachedTables[offset].Range = []rtree.Range{} - log.Debug("draining table to batch", - zap.Stringer("db", thisTable.OldTable.DB.Name), - zap.Stringer("table", thisTable.Table.Name), - zap.Int("size", thisTableLen), - ) + tableWithRanges[offset].Range = []rtree.RangeStats{} } - // all tables are drained. - b.cachedTables = []TableWithRange{} return result } - -// Send sends all pending requests in the batcher. -// returns tables sent FULLY in the current batch. -func (b *Batcher) Send(ctx context.Context) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Batcher.Send", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - drainResult := b.drainRanges() - tbs := drainResult.TablesToSend - ranges := drainResult.Ranges - log.Info("restore batch start", rtree.ZapRanges(ranges), zapTables(tbs)) - // Leave is called at b.contextCleaner - if err := b.manager.Enter(ctx, drainResult.TablesToSend); err != nil { - b.sendErr <- err - return - } - b.sender.RestoreBatch(drainResult) -} - -func (b *Batcher) sendIfFull() { - if b.Len() >= b.batchSizeThreshold { - log.Debug("sending batch because batcher is full", zap.Int("size", b.Len())) - b.asyncSend(SendUntilLessThanBatch) - } -} - -// Add adds a task to the Batcher. -func (b *Batcher) Add(tbs TableWithRange) { - b.cachedTablesMu.Lock() - log.Debug("adding table to batch", - zap.Stringer("db", tbs.OldTable.DB.Name), - zap.Stringer("table", tbs.Table.Name), - zap.Int64("old id", tbs.OldTable.Info.ID), - zap.Int64("new id", tbs.Table.ID), - zap.Int("table size", len(tbs.Range)), - zap.Int("batch size", b.Len()), - ) - b.cachedTables = append(b.cachedTables, tbs) - atomic.AddInt32(&b.size, int32(len(tbs.Range))) - b.cachedTablesMu.Unlock() - - b.sendIfFull() -} - -// Close closes the batcher, sending all pending requests, close updateCh. -func (b *Batcher) Close() { - log.Info("sending batch lastly on close", zap.Int("size", b.Len())) - b.DisableAutoCommit() - b.waitUntilSendDone() - close(b.outCh) - close(b.sendCh) -} - -// SetThreshold sets the threshold that how big the batch size reaching need to send batch. -// note this function isn't goroutine safe yet, -// just set threshold before anything starts(e.g. EnableAutoCommit), please. -func (b *Batcher) SetThreshold(newThreshold int) { - b.batchSizeThreshold = newThreshold -} - -func (b *Batcher) SetCheckpoint(sets map[int64]map[string]struct{}) { - b.checkpointSetWithTableID = sets -} diff --git a/br/pkg/restore/snap_client/batcher_test.go b/br/pkg/restore/snap_client/batcher_test.go deleted file mode 100644 index f18bf9ade1adb..0000000000000 --- a/br/pkg/restore/snap_client/batcher_test.go +++ /dev/null @@ -1,387 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package snapclient_test - -import ( - "bytes" - "context" - "sync" - "testing" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/metautil" - snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" - "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -type drySender struct { - mu *sync.Mutex - - rewriteRules *utils.RewriteRules - ranges []rtree.Range - nBatch int - - sink snapclient.TableSink -} - -func (sender *drySender) PutSink(sink snapclient.TableSink) { - sender.sink = sink -} - -func (sender *drySender) RestoreBatch(ranges snapclient.DrainResult) { - sender.mu.Lock() - defer sender.mu.Unlock() - log.Info("fake restore range", rtree.ZapRanges(ranges.Ranges)) - sender.nBatch++ - for _, r := range ranges.RewriteRulesMap { - sender.rewriteRules.Append(*r) - } - sender.ranges = append(sender.ranges, ranges.Ranges...) - sender.sink.EmitTables(ranges.BlankTablesAfterSend...) -} - -func (sender *drySender) Close() { - sender.sink.Close() -} - -func waitForSend() { - time.Sleep(50 * time.Millisecond) -} - -func (sender *drySender) Ranges() []rtree.Range { - return sender.ranges -} - -func newDrySender() *drySender { - return &drySender{ - rewriteRules: utils.EmptyRewriteRule(), - ranges: []rtree.Range{}, - mu: new(sync.Mutex), - } -} - -type recordCurrentTableManager struct { - lock sync.Mutex - m map[int64]bool -} - -func (manager *recordCurrentTableManager) Close(ctx context.Context) { - manager.lock.Lock() - defer manager.lock.Unlock() - if len(manager.m) > 0 { - log.Panic("When closing, there are still some tables doesn't be sent", - zap.Any("tables", manager.m)) - } -} - -func newMockManager() *recordCurrentTableManager { - return &recordCurrentTableManager{ - m: make(map[int64]bool), - } -} - -func (manager *recordCurrentTableManager) Enter(_ context.Context, tables []snapclient.CreatedTable) error { - manager.lock.Lock() - defer manager.lock.Unlock() - for _, t := range tables { - log.Info("entering", zap.Int64("table ID", t.Table.ID)) - manager.m[t.Table.ID] = true - } - return nil -} - -func (manager *recordCurrentTableManager) Leave(_ context.Context, tables []snapclient.CreatedTable) error { - manager.lock.Lock() - defer manager.lock.Unlock() - for _, t := range tables { - if !manager.m[t.Table.ID] { - return errors.Errorf("Table %d is removed before added", t.Table.ID) - } - log.Info("leaving", zap.Int64("table ID", t.Table.ID)) - delete(manager.m, t.Table.ID) - } - return nil -} - -func (manager *recordCurrentTableManager) Has(tables ...snapclient.TableWithRange) bool { - manager.lock.Lock() - defer manager.lock.Unlock() - ids := make([]int64, 0, len(tables)) - currentIDs := make([]int64, 0, len(manager.m)) - for _, t := range tables { - ids = append(ids, t.Table.ID) - } - for id, contains := range manager.m { - if contains { - currentIDs = append(currentIDs, id) - } - } - log.Info("testing", zap.Int64s("should has ID", ids), zap.Int64s("has ID", currentIDs)) - for _, i := range ids { - if !manager.m[i] { - return false - } - } - return true -} - -func (sender *drySender) HasRewriteRuleOfKey(prefix string) bool { - sender.mu.Lock() - defer sender.mu.Unlock() - for _, rule := range sender.rewriteRules.Data { - if bytes.Equal([]byte(prefix), rule.OldKeyPrefix) { - return true - } - } - return false -} - -func (sender *drySender) RangeLen() int { - sender.mu.Lock() - defer sender.mu.Unlock() - return len(sender.ranges) -} - -func (sender *drySender) BatchCount() int { - return sender.nBatch -} - -func fakeTableWithRange(id int64, rngs []rtree.Range) snapclient.TableWithRange { - tbl := &metautil.Table{ - DB: &model.DBInfo{}, - Info: &model.TableInfo{ - ID: id, - }, - } - tblWithRng := snapclient.TableWithRange{ - CreatedTable: snapclient.CreatedTable{ - RewriteRule: utils.EmptyRewriteRule(), - Table: tbl.Info, - OldTable: tbl, - }, - Range: rngs, - } - return tblWithRng -} - -func fakeRewriteRules(oldPrefix string, newPrefix string) *utils.RewriteRules { - return &utils.RewriteRules{ - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: []byte(oldPrefix), - NewKeyPrefix: []byte(newPrefix), - }, - }, - } -} - -func fakeRange(startKey, endKey string) rtree.Range { - return rtree.Range{ - StartKey: []byte(startKey), - EndKey: []byte(endKey), - } -} - -func join(nested [][]rtree.Range) (plain []rtree.Range) { - for _, ranges := range nested { - plain = append(plain, ranges...) - } - return plain -} - -// TestBasic tests basic workflow of batcher. -func TestBasic(t *testing.T) { - ctx := context.Background() - errCh := make(chan error, 8) - sender := newDrySender() - manager := newMockManager() - batcher, _ := snapclient.NewBatcher(ctx, sender, manager, errCh, nil) - batcher.SetThreshold(2) - - tableRanges := [][]rtree.Range{ - {fakeRange("aaa", "aab")}, - {fakeRange("baa", "bab"), fakeRange("bac", "bad")}, - {fakeRange("caa", "cab"), fakeRange("cac", "cad")}, - } - - simpleTables := []snapclient.TableWithRange{} - for i, ranges := range tableRanges { - simpleTables = append(simpleTables, fakeTableWithRange(int64(i), ranges)) - } - for _, tbl := range simpleTables { - batcher.Add(tbl) - } - - batcher.Close() - rngs := sender.Ranges() - - require.Equal(t, rngs, join(tableRanges)) - select { - case err := <-errCh: - t.Fatal(errors.Trace(err)) - default: - } -} - -func TestAutoSend(t *testing.T) { - ctx := context.Background() - errCh := make(chan error, 8) - sender := newDrySender() - manager := newMockManager() - batcher, _ := snapclient.NewBatcher(ctx, sender, manager, errCh, nil) - batcher.SetThreshold(1024) - - simpleTable := fakeTableWithRange(1, []rtree.Range{fakeRange("caa", "cab"), fakeRange("cac", "cad")}) - - batcher.Add(simpleTable) - require.Greater(t, batcher.Len(), 0) - - // enable auto commit. - batcher.EnableAutoCommit(ctx, 100*time.Millisecond) - time.Sleep(200 * time.Millisecond) - - require.Greater(t, sender.RangeLen(), 0) - require.Equal(t, 0, batcher.Len()) - - batcher.Close() - - rngs := sender.Ranges() - require.Equal(t, simpleTable.Range, rngs) - select { - case err := <-errCh: - t.Fatal(errors.Trace(err)) - default: - } -} - -func TestSplitRangeOnSameTable(t *testing.T) { - ctx := context.Background() - errCh := make(chan error, 8) - sender := newDrySender() - manager := newMockManager() - batcher, _ := snapclient.NewBatcher(ctx, sender, manager, errCh, nil) - batcher.SetThreshold(2) - - simpleTable := fakeTableWithRange(1, []rtree.Range{ - fakeRange("caa", "cab"), fakeRange("cac", "cad"), - fakeRange("cae", "caf"), fakeRange("cag", "cai"), - fakeRange("caj", "cak"), fakeRange("cal", "cam"), - fakeRange("can", "cao"), fakeRange("cap", "caq"), - }) - - batcher.Add(simpleTable) - batcher.Close() - require.Equal(t, 4, sender.BatchCount()) - - rngs := sender.Ranges() - require.Equal(t, simpleTable.Range, rngs) - select { - case err := <-errCh: - t.Fatal(errors.Trace(err)) - default: - } -} - -func TestRewriteRules(t *testing.T) { - tableRanges := [][]rtree.Range{ - {fakeRange("aaa", "aab")}, - {fakeRange("baa", "bab"), fakeRange("bac", "bad")}, - { - fakeRange("caa", "cab"), fakeRange("cac", "cad"), - fakeRange("cae", "caf"), fakeRange("cag", "cai"), - fakeRange("caj", "cak"), fakeRange("cal", "cam"), - fakeRange("can", "cao"), fakeRange("cap", "caq"), - }, - } - rewriteRules := []*utils.RewriteRules{ - fakeRewriteRules("a", "ada"), - fakeRewriteRules("b", "bob"), - fakeRewriteRules("c", "cpp"), - } - - tables := make([]snapclient.TableWithRange, 0, len(tableRanges)) - for i, ranges := range tableRanges { - table := fakeTableWithRange(int64(i), ranges) - table.RewriteRule = rewriteRules[i] - tables = append(tables, table) - } - - ctx := context.Background() - errCh := make(chan error, 8) - sender := newDrySender() - manager := newMockManager() - batcher, _ := snapclient.NewBatcher(ctx, sender, manager, errCh, nil) - batcher.SetThreshold(2) - - batcher.Add(tables[0]) - waitForSend() - require.Equal(t, 0, sender.RangeLen()) - - batcher.Add(tables[1]) - waitForSend() - require.True(t, sender.HasRewriteRuleOfKey("a")) - require.True(t, sender.HasRewriteRuleOfKey("b")) - require.True(t, manager.Has(tables[1])) - require.Equal(t, 2, sender.RangeLen()) - - batcher.Add(tables[2]) - batcher.Close() - require.True(t, sender.HasRewriteRuleOfKey("c")) - require.Equal(t, join(tableRanges), sender.Ranges()) - - select { - case err := <-errCh: - t.Fatal(errors.Trace(err)) - default: - } -} - -func TestBatcherLen(t *testing.T) { - ctx := context.Background() - errCh := make(chan error, 8) - sender := newDrySender() - manager := newMockManager() - batcher, _ := snapclient.NewBatcher(ctx, sender, manager, errCh, nil) - batcher.SetThreshold(15) - - simpleTable := fakeTableWithRange(1, []rtree.Range{ - fakeRange("caa", "cab"), fakeRange("cac", "cad"), - fakeRange("cae", "caf"), fakeRange("cag", "cai"), - fakeRange("caj", "cak"), fakeRange("cal", "cam"), - fakeRange("can", "cao"), fakeRange("cap", "caq"), - }) - - simpleTable2 := fakeTableWithRange(2, []rtree.Range{ - fakeRange("caa", "cab"), fakeRange("cac", "cad"), - fakeRange("cae", "caf"), fakeRange("cag", "cai"), - fakeRange("caj", "cak"), fakeRange("cal", "cam"), - fakeRange("can", "cao"), fakeRange("cap", "caq"), - }) - - batcher.Add(simpleTable) - waitForSend() - require.Equal(t, 8, batcher.Len()) - require.False(t, manager.Has(simpleTable)) - require.False(t, manager.Has(simpleTable2)) - - batcher.Add(simpleTable2) - waitForSend() - require.Equal(t, 1, batcher.Len()) - require.True(t, manager.Has(simpleTable2)) - require.False(t, manager.Has(simpleTable)) - batcher.Close() - require.Equal(t, 0, batcher.Len()) - - select { - case err := <-errCh: - t.Fatal(errors.Trace(err)) - default: - } -} diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 783687da1e164..0cf20cf42baee 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -20,7 +20,6 @@ import ( "context" "crypto/tls" "encoding/json" - "fmt" "slices" "strings" "sync" @@ -44,10 +43,8 @@ import ( importclient "github.com/pingcap/tidb/br/pkg/restore/internal/import_client" tidallocdb "github.com/pingcap/tidb/br/pkg/restore/internal/prealloc_db" tidalloc "github.com/pingcap/tidb/br/pkg/restore/internal/prealloc_table_id" - internalutils "github.com/pingcap/tidb/br/pkg/restore/internal/utils" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" @@ -975,128 +972,6 @@ func (rc *SnapClient) setSpeedLimit(ctx context.Context, rateLimit uint64) error return nil } -func getFileRangeKey(f string) string { - // the backup date file pattern is `{store_id}_{region_id}_{epoch_version}_{key}_{ts}_{cf}.sst` - // so we need to compare with out the `_{cf}.sst` suffix - idx := strings.LastIndex(f, "_") - if idx < 0 { - panic(fmt.Sprintf("invalid backup data file name: '%s'", f)) - } - - return f[:idx] -} - -// isFilesBelongToSameRange check whether two files are belong to the same range with different cf. -func isFilesBelongToSameRange(f1, f2 string) bool { - return getFileRangeKey(f1) == getFileRangeKey(f2) -} - -func drainFilesByRange(files []*backuppb.File) ([]*backuppb.File, []*backuppb.File) { - if len(files) == 0 { - return nil, nil - } - idx := 1 - for idx < len(files) { - if !isFilesBelongToSameRange(files[idx-1].Name, files[idx].Name) { - break - } - idx++ - } - - return files[:idx], files[idx:] -} - -// RestoreSSTFiles tries to restore the files. -func (rc *SnapClient) RestoreSSTFiles( - ctx context.Context, - tableIDWithFiles []TableIDWithFiles, - updateCh glue.Progress, -) (err error) { - start := time.Now() - fileCount := 0 - defer func() { - elapsed := time.Since(start) - if err == nil { - log.Info("Restore files", zap.Duration("take", elapsed)) - summary.CollectSuccessUnit("files", fileCount, elapsed) - } - }() - - log.Debug("start to restore files", zap.Int("files", fileCount)) - - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - eg, ectx := errgroup.WithContext(ctx) - err = rc.setSpeedLimit(ctx, rc.rateLimit) - if err != nil { - return errors.Trace(err) - } - - var rangeFiles []*backuppb.File - var leftFiles []*backuppb.File -LOOPFORTABLE: - for _, tableIDWithFile := range tableIDWithFiles { - tableID := tableIDWithFile.TableID - files := tableIDWithFile.Files - rules := tableIDWithFile.RewriteRules - fileCount += len(files) - for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) { - if ectx.Err() != nil { - log.Warn("Restoring encountered error and already stopped, give up remained files.", - zap.Int("remained", len(leftFiles)), - logutil.ShortError(ectx.Err())) - // We will fetch the error from the errgroup then (If there were). - // Also note if the parent context has been canceled or something, - // breaking here directly is also a reasonable behavior. - break LOOPFORTABLE - } - filesReplica := rangeFiles - rc.fileImporter.WaitUntilUnblock() - rc.workerPool.ApplyOnErrorGroup(eg, func() (restoreErr error) { - fileStart := time.Now() - defer func() { - if restoreErr == nil { - log.Info("import files done", logutil.Files(filesReplica), - zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() - } - }() - if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil { - return errors.Trace(importErr) - } - - // the data of this range has been import done - if rc.checkpointRunner != nil && len(filesReplica) > 0 { - rangeKey := getFileRangeKey(filesReplica[0].Name) - // The checkpoint range shows this ranges of kvs has been restored into - // the table corresponding to the table-id. - if err := checkpoint.AppendRangesForRestore(ectx, rc.checkpointRunner, tableID, rangeKey); err != nil { - return errors.Trace(err) - } - } - return nil - }) - } - } - - if err := eg.Wait(); err != nil { - summary.CollectFailureUnit("file", err) - log.Error( - "restore files failed", - zap.Error(err), - ) - return errors.Trace(err) - } - // Once the parent context canceled and there is no task running in the errgroup, - // we may break the for loop without error in the errgroup. (Will this happen?) - // At that time, return the error in the context here. - return ctx.Err() -} - func (rc *SnapClient) execChecksum( ctx context.Context, tbl *CreatedTable, @@ -1222,33 +1097,3 @@ func (rc *SnapClient) RestoreRaw( ) return nil } - -// SplitRanges implements TiKVRestorer. It splits region by -// data range after rewrite. -func (rc *SnapClient) SplitRanges( - ctx context.Context, - ranges []rtree.Range, - updateCh glue.Progress, - isRawKv bool, -) error { - splitClientOpts := make([]split.ClientOptionalParameter, 0, 2) - splitClientOpts = append(splitClientOpts, split.WithOnSplit(func(keys [][]byte) { - for range keys { - updateCh.Inc() - } - })) - if isRawKv { - splitClientOpts = append(splitClientOpts, split.WithRawKV()) - } - - splitter := internalutils.NewRegionSplitter(split.NewClient( - rc.pdClient, - rc.pdHTTPClient, - rc.tlsConf, - maxSplitKeysOnce, - rc.storeCount+1, - splitClientOpts..., - )) - - return splitter.ExecuteSplit(ctx, ranges) -} diff --git a/br/pkg/restore/snap_client/export_test.go b/br/pkg/restore/snap_client/export_test.go index 3240ee1f77662..27f48efb8a4eb 100644 --- a/br/pkg/restore/snap_client/export_test.go +++ b/br/pkg/restore/snap_client/export_test.go @@ -35,6 +35,7 @@ var ( GetSSTMetaFromFile = getSSTMetaFromFile GetKeyRangeByMode = getKeyRangeByMode + MapTableToFiles = mapTableToFiles ) // MockClient create a fake Client used to test. diff --git a/br/pkg/restore/snap_client/pipeline_items.go b/br/pkg/restore/snap_client/pipeline_items.go index 55c7cb4d44e54..d31b4db6be738 100644 --- a/br/pkg/restore/snap_client/pipeline_items.go +++ b/br/pkg/restore/snap_client/pipeline_items.go @@ -16,14 +16,12 @@ package snapclient import ( "context" - "sync" "time" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/rtree" @@ -45,31 +43,6 @@ const defaultChannelSize = 1024 // checksum tasks. const defaultChecksumConcurrency = 64 -// TableSink is the 'sink' of restored data by a sender. -type TableSink interface { - EmitTables(tables ...CreatedTable) - EmitError(error) - Close() -} - -type chanTableSink struct { - outCh chan<- []CreatedTable - errCh chan<- error -} - -func (sink chanTableSink) EmitTables(tables ...CreatedTable) { - sink.outCh <- tables -} - -func (sink chanTableSink) EmitError(err error) { - sink.errCh <- err -} - -func (sink chanTableSink) Close() { - // ErrCh may has multi sender part, don't close it. - close(sink.outCh) -} - // CreatedTable is a table created on restore process, // but not yet filled with data. type CreatedTable struct { @@ -87,7 +60,7 @@ type TableWithRange struct { CreatedTable // Range has been rewrited by rewrite rules. - Range []rtree.Range + Range []rtree.RangeStats } type TableIDWithFiles struct { @@ -100,251 +73,6 @@ type TableIDWithFiles struct { RewriteRules *restoreutils.RewriteRules } -// BatchSender is the abstract of how the batcher send a batch. -type BatchSender interface { - // PutSink sets the sink of this sender, user to this interface promise - // call this function at least once before first call to `RestoreBatch`. - PutSink(sink TableSink) - // RestoreBatch will send the restore request. - RestoreBatch(ranges DrainResult) - Close() -} - -// TiKVRestorer is the minimal methods required for restoring. -// It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant. -// Maybe TODO: make a better abstraction? -type TiKVRestorer interface { - // SplitRanges split regions implicated by the ranges and rewrite rules. - // After spliting, it also scatters the fresh regions. - SplitRanges(ctx context.Context, - ranges []rtree.Range, - updateCh glue.Progress, - isRawKv bool) error - // RestoreSSTFiles import the files to the TiKV. - RestoreSSTFiles(ctx context.Context, - tableIDWithFiles []TableIDWithFiles, - updateCh glue.Progress) error -} - -type tikvSender struct { - client TiKVRestorer - - updateCh glue.Progress - - sink TableSink - inCh chan<- DrainResult - - wg *sync.WaitGroup - - tableWaiters *sync.Map -} - -func (b *tikvSender) PutSink(sink TableSink) { - // don't worry about visibility, since we will call this before first call to - // RestoreBatch, which is a sync point. - b.sink = sink -} - -func (b *tikvSender) RestoreBatch(ranges DrainResult) { - log.Info("restore batch: waiting ranges", zap.Int("range", len(b.inCh))) - b.inCh <- ranges -} - -// NewTiKVSender make a sender that send restore requests to TiKV. -func NewTiKVSender( - ctx context.Context, - cli TiKVRestorer, - updateCh glue.Progress, - splitConcurrency uint, -) (BatchSender, error) { - inCh := make(chan DrainResult, defaultChannelSize) - midCh := make(chan drainResultAndDone, defaultChannelSize) - - sender := &tikvSender{ - client: cli, - updateCh: updateCh, - inCh: inCh, - wg: new(sync.WaitGroup), - tableWaiters: new(sync.Map), - } - - sender.wg.Add(2) - go sender.splitWorker(ctx, inCh, midCh, splitConcurrency) - outCh := make(chan drainResultAndDone, defaultChannelSize) - // block on splitting and scattering regions. - // in coarse-grained mode, wait all regions are split and scattered is - // no longer a time-consuming operation, then we can batch download files - // as much as enough and reduce the time of blocking restore. - go sender.blockPipelineWorker(ctx, midCh, outCh) - go sender.restoreWorker(ctx, outCh) - return sender, nil -} - -func (b *tikvSender) Close() { - close(b.inCh) - b.wg.Wait() - log.Debug("tikv sender closed") -} - -type drainResultAndDone struct { - result DrainResult - done func() -} - -func (b *tikvSender) blockPipelineWorker(ctx context.Context, - inCh <-chan drainResultAndDone, - outCh chan<- drainResultAndDone, -) { - defer close(outCh) - res := make([]drainResultAndDone, 0, defaultChannelSize) - for dr := range inCh { - res = append(res, dr) - } - - for _, dr := range res { - select { - case <-ctx.Done(): - return - default: - outCh <- dr - } - } -} - -func (b *tikvSender) splitWorker(ctx context.Context, - ranges <-chan DrainResult, - next chan<- drainResultAndDone, - concurrency uint, -) { - defer log.Debug("split worker closed") - eg, ectx := errgroup.WithContext(ctx) - defer func() { - b.wg.Done() - if err := eg.Wait(); err != nil { - b.sink.EmitError(err) - } - close(next) - log.Info("TiKV Sender: split worker exits.") - }() - - start := time.Now() - defer func() { - elapsed := time.Since(start) - summary.CollectDuration("split region", elapsed) - }() - - pool := tidbutil.NewWorkerPool(concurrency, "split") - for { - select { - case <-ectx.Done(): - return - case result, ok := <-ranges: - if !ok { - return - } - // When the batcher has sent all ranges from a table, it would - // mark this table 'all done'(BlankTablesAfterSend), and then we can send it to checksum. - // - // When there a sole worker sequentially running those batch tasks, everything is fine, however, - // in the context of multi-workers, that become buggy, for example: - // |------table 1, ranges 1------|------table 1, ranges 2------| - // The batcher send batches: [ - // {Ranges: ranges 1}, - // {Ranges: ranges 2, BlankTablesAfterSend: table 1} - // ] - // And there are two workers runs concurrently: - // worker 1: {Ranges: ranges 1} - // worker 2: {Ranges: ranges 2, BlankTablesAfterSend: table 1} - // And worker 2 finished its job before worker 1 done. Note the table wasn't restored fully, - // hence the checksum would fail. - done := b.registerTableIsRestoring(result.TablesToSend) - pool.ApplyOnErrorGroup(eg, func() error { - err := b.client.SplitRanges(ectx, result.Ranges, b.updateCh, false) - if err != nil { - log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) - return err - } - next <- drainResultAndDone{ - result: result, - done: done, - } - return nil - }) - } - } -} - -// registerTableIsRestoring marks some tables as 'current restoring'. -// Returning a function that mark the restore has been done. -func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() { - wgs := make([]*sync.WaitGroup, 0, len(ts)) - for _, t := range ts { - i, _ := b.tableWaiters.LoadOrStore(t.Table.ID, new(sync.WaitGroup)) - wg := i.(*sync.WaitGroup) - wg.Add(1) - wgs = append(wgs, wg) - } - return func() { - for _, wg := range wgs { - wg.Done() - } - } -} - -// waitTablesDone block the current goroutine, -// till all tables provided are no more ‘current restoring’. -func (b *tikvSender) waitTablesDone(ts []CreatedTable) { - for _, t := range ts { - wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID) - if !ok { - log.Panic("bug! table done before register!", - zap.Any("wait-table-map", b.tableWaiters), - zap.Stringer("table", t.Table.Name)) - } - wg.(*sync.WaitGroup).Wait() - } -} - -func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) { - eg, ectx := errgroup.WithContext(ctx) - defer func() { - log.Info("TiKV Sender: restore worker prepare to close.") - if err := eg.Wait(); err != nil { - b.sink.EmitError(err) - } - b.sink.Close() - b.wg.Done() - log.Info("TiKV Sender: restore worker exits.") - }() - for { - select { - case <-ectx.Done(): - return - case r, ok := <-ranges: - if !ok { - return - } - - files := r.result.Files() - // There has been a worker in the `RestoreSSTFiles` procedure. - // Spawning a raw goroutine won't make too many requests to TiKV. - eg.Go(func() error { - e := b.client.RestoreSSTFiles(ectx, files, b.updateCh) - if e != nil { - log.Error("restore batch meet error", logutil.ShortError(e), zapTableIDWithFiles(files)) - r.done() - return e - } - log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges), zapTableIDWithFiles(files)) - r.done() - b.waitTablesDone(r.result.BlankTablesAfterSend) - b.sink.EmitTables(r.result.BlankTablesAfterSend...) - return nil - }) - } - } -} - func concurrentHandleTablesCh( ctx context.Context, inCh <-chan *CreatedTable, @@ -386,83 +114,6 @@ func concurrentHandleTablesCh( } } -// GoValidateFileRanges validate files by a stream of tables and yields -// tables with range. -func (rc *SnapClient) GoValidateFileRanges( - ctx context.Context, - tableStream <-chan CreatedTable, - fileOfTable map[int64][]*backuppb.File, - splitSizeBytes, splitKeyCount uint64, - errCh chan<- error, -) <-chan TableWithRange { - // Could we have a smaller outCh size? - outCh := make(chan TableWithRange, len(fileOfTable)) - go func() { - defer close(outCh) - defer log.Info("all range generated") - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case t, ok := <-tableStream: - if !ok { - return - } - files := fileOfTable[t.OldTable.Info.ID] - if partitions := t.OldTable.Info.Partition; partitions != nil { - log.Debug("table partition", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Any("partition info", partitions), - ) - for _, partition := range partitions.Definitions { - files = append(files, fileOfTable[partition.ID]...) - } - } - for _, file := range files { - err := restoreutils.ValidateFileRewriteRule(file, t.RewriteRule) - if err != nil { - errCh <- err - return - } - } - // Merge small ranges to reduce split and scatter regions. - ranges, stat, err := restoreutils.MergeAndRewriteFileRanges( - files, t.RewriteRule, splitSizeBytes, splitKeyCount) - if err != nil { - errCh <- err - return - } - log.Info("merge and validate file", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Int("Files(total)", stat.TotalFiles), - zap.Int("File(write)", stat.TotalWriteCFFile), - zap.Int("File(default)", stat.TotalDefaultCFFile), - zap.Int("Region(total)", stat.TotalRegions), - zap.Int("Regoin(keys avg)", stat.RegionKeysAvg), - zap.Int("Region(bytes avg)", stat.RegionBytesAvg), - zap.Int("Merged(regions)", stat.MergedRegions), - zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), - zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) - - tableWithRange := TableWithRange{ - CreatedTable: t, - Range: ranges, - } - log.Debug("sending range info", - zap.Stringer("table", t.Table.Name), - zap.Int("files", len(files)), - zap.Int("range size", len(ranges)), - zap.Int("output channel size", len(outCh))) - outCh <- tableWithRange - } - } - }() - return outCh -} - // GoValidateChecksum forks a goroutine to validate checksum after restore. // it returns a channel fires a struct{} when all things get done. func (rc *SnapClient) GoValidateChecksum( diff --git a/br/pkg/restore/snap_client/pipeline_items_test.go b/br/pkg/restore/snap_client/pipeline_items_test.go deleted file mode 100644 index 97660996b116a..0000000000000 --- a/br/pkg/restore/snap_client/pipeline_items_test.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package snapclient_test - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" - berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/glue" - "github.com/pingcap/tidb/br/pkg/logutil" - snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" - "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/pkg/parser/model" - "github.com/stretchr/testify/require" -) - -type fakeRestorer struct { - mu sync.Mutex - errorInSplit bool - splitRanges []rtree.Range - restoredFiles []*backuppb.File - tableIDIsInsequence bool -} - -func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, updateCh glue.Progress, isRawKv bool) error { - f.mu.Lock() - defer f.mu.Unlock() - - if ctx.Err() != nil { - return ctx.Err() - } - f.splitRanges = append(f.splitRanges, ranges...) - if f.errorInSplit { - err := errors.Annotatef(berrors.ErrRestoreSplitFailed, - "the key space takes many efforts and finally get together, how dare you split them again... :<") - log.Error("error happens :3", logutil.ShortError(err)) - return err - } - return nil -} - -func (f *fakeRestorer) RestoreSSTFiles(ctx context.Context, tableIDWithFiles []snapclient.TableIDWithFiles, updateCh glue.Progress) error { - f.mu.Lock() - defer f.mu.Unlock() - - if ctx.Err() != nil { - return ctx.Err() - } - for i, tableIDWithFile := range tableIDWithFiles { - if int64(i) != tableIDWithFile.TableID { - f.tableIDIsInsequence = false - } - f.restoredFiles = append(f.restoredFiles, tableIDWithFile.Files...) - } - err := errors.Annotatef(berrors.ErrRestoreWriteAndIngest, "the files to restore are taken by a hijacker, meow :3") - log.Error("error happens :3", logutil.ShortError(err)) - return err -} - -func fakeRanges(keys ...string) (r snapclient.DrainResult) { - for i := range keys { - if i+1 == len(keys) { - return - } - r.Ranges = append(r.Ranges, rtree.Range{ - StartKey: []byte(keys[i]), - EndKey: []byte(keys[i+1]), - Files: []*backuppb.File{{Name: "fake.sst"}}, - }) - r.TableEndOffsetInRanges = append(r.TableEndOffsetInRanges, len(r.Ranges)) - r.TablesToSend = append(r.TablesToSend, snapclient.CreatedTable{ - Table: &model.TableInfo{ - ID: int64(i), - }, - }) - } - return -} - -type errorInTimeSink struct { - ctx context.Context - errCh chan error - t *testing.T -} - -func (e errorInTimeSink) EmitTables(tables ...snapclient.CreatedTable) {} - -func (e errorInTimeSink) EmitError(err error) { - e.errCh <- err -} - -func (e errorInTimeSink) Close() {} - -func (e errorInTimeSink) Wait() { - select { - case <-e.ctx.Done(): - e.t.Logf("The context is canceled but no error happen") - e.t.FailNow() - case <-e.errCh: - } -} - -func assertErrorEmitInTime(ctx context.Context, t *testing.T) errorInTimeSink { - errCh := make(chan error, 1) - return errorInTimeSink{ - ctx: ctx, - errCh: errCh, - t: t, - } -} - -func TestSplitFailed(t *testing.T) { - ranges := []snapclient.DrainResult{ - fakeRanges("aax", "abx", "abz"), - fakeRanges("abz", "bbz", "bcy"), - fakeRanges("bcy", "cad", "xxy"), - } - r := &fakeRestorer{errorInSplit: true, tableIDIsInsequence: true} - sender, err := snapclient.NewTiKVSender(context.TODO(), r, nil, 1) - require.NoError(t, err) - dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - sink := assertErrorEmitInTime(dctx, t) - sender.PutSink(sink) - for _, r := range ranges { - sender.RestoreBatch(r) - } - sink.Wait() - sender.Close() - require.GreaterOrEqual(t, len(r.splitRanges), 2) - require.Len(t, r.restoredFiles, 0) - require.True(t, r.tableIDIsInsequence) -} - -func TestRestoreFailed(t *testing.T) { - ranges := []snapclient.DrainResult{ - fakeRanges("aax", "abx", "abz"), - fakeRanges("abz", "bbz", "bcy"), - fakeRanges("bcy", "cad", "xxy"), - } - r := &fakeRestorer{ - tableIDIsInsequence: true, - } - sender, err := snapclient.NewTiKVSender(context.TODO(), r, nil, 1) - require.NoError(t, err) - dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - sink := assertErrorEmitInTime(dctx, t) - sender.PutSink(sink) - for _, r := range ranges { - sender.RestoreBatch(r) - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - sink.Wait() - }() - sink.Close() - sender.Close() - wg.Wait() - require.GreaterOrEqual(t, len(r.restoredFiles), 1) - require.True(t, r.tableIDIsInsequence) -} diff --git a/br/pkg/restore/snap_client/context_manager.go b/br/pkg/restore/snap_client/placement_rule_manager.go similarity index 51% rename from br/pkg/restore/snap_client/context_manager.go rename to br/pkg/restore/snap_client/placement_rule_manager.go index 294f774630db6..70915b0e93f34 100644 --- a/br/pkg/restore/snap_client/context_manager.go +++ b/br/pkg/restore/snap_client/placement_rule_manager.go @@ -20,7 +20,6 @@ import ( "encoding/hex" "fmt" "strconv" - "sync" "time" "github.com/pingcap/errors" @@ -31,7 +30,6 @@ import ( "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/restore/split" - "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" @@ -39,143 +37,115 @@ import ( "go.uber.org/zap" ) -// ContextManager is the struct to manage a TiKV 'context' for restore. -// Batcher will call Enter when any table should be restore on batch, -// so you can do some prepare work here(e.g. set placement rules for online restore). -type ContextManager interface { - // Enter make some tables 'enter' this context(a.k.a., prepare for restore). - Enter(ctx context.Context, tables []CreatedTable) error - // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). - Leave(ctx context.Context, tables []CreatedTable) error - // Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup - // it would be call. - Close(ctx context.Context) +// PlacementRuleManager manages to set the placement rule of tables to label constraint key `exclusive`, +// and unset the rule. +type PlacementRuleManager interface { + SetPlacementRule(ctx context.Context, tables []*CreatedTable) error + ResetPlacementRules(ctx context.Context) error } -// NewBRContextManager makes a BR context manager, that is, -// set placement rules for online restore when enter(see ), -// unset them when leave. -func NewBRContextManager(ctx context.Context, pdClient pd.Client, pdHTTPCli pdhttp.Client, tlsConf *tls.Config, isOnline bool) (ContextManager, error) { - manager := &brContextManager{ - // toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't - // call split related functions so set the arguments to arbitrary values. - toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3), - isOnline: isOnline, +const ( + restoreLabelKey = "exclusive" + restoreLabelValue = "restore" +) - hasTable: make(map[int64]CreatedTable), +// loadRestoreStores loads the stores used to restore data. This function is called only when is online. +func loadRestoreStores(ctx context.Context, pdClient util.StoreMeta) ([]uint64, error) { + restoreStores := make([]uint64, 0) + stores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, util.SkipTiFlash) + if err != nil { + return nil, errors.Trace(err) } - - err := manager.loadRestoreStores(ctx, pdClient) - return manager, errors.Trace(err) -} - -type brContextManager struct { - toolClient split.SplitClient - restoreStores []uint64 - isOnline bool - - // This 'set' of table ID allow us to handle each table just once. - hasTable map[int64]CreatedTable - mu sync.Mutex -} - -func (manager *brContextManager) Close(ctx context.Context) { - tbls := make([]*model.TableInfo, 0, len(manager.hasTable)) - for _, tbl := range manager.hasTable { - tbls = append(tbls, tbl.Table) + for _, s := range stores { + if s.GetState() != metapb.StoreState_Up { + continue + } + for _, l := range s.GetLabels() { + if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue { + restoreStores = append(restoreStores, s.GetId()) + break + } + } } - manager.splitPostWork(ctx, tbls) + log.Info("load restore stores", zap.Uint64s("store-ids", restoreStores)) + return restoreStores, nil } -func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error { - placementRuleTables := make([]*model.TableInfo, 0, len(tables)) - manager.mu.Lock() - defer manager.mu.Unlock() +// NewPlacementRuleManager sets and unset placement rules for online restore. +func NewPlacementRuleManager(ctx context.Context, pdClient pd.Client, pdHTTPCli pdhttp.Client, tlsConf *tls.Config, isOnline bool) (PlacementRuleManager, error) { + if !isOnline { + return offlinePlacementRuleManager{}, nil + } - for _, tbl := range tables { - if _, ok := manager.hasTable[tbl.Table.ID]; !ok { - placementRuleTables = append(placementRuleTables, tbl.Table) - } - manager.hasTable[tbl.Table.ID] = tbl + restoreStores, err := loadRestoreStores(ctx, pdClient) + if err != nil { + return nil, errors.Trace(err) + } + if len(restoreStores) == 0 { + log.Warn("The cluster has not any TiKV node with the specify label, so skip setting placement rules", + zap.String("label-key", restoreLabelKey), zap.String("label-value", restoreLabelValue)) + return offlinePlacementRuleManager{}, nil } - return manager.splitPrepareWork(ctx, placementRuleTables) + return &onlinePlacementRuleManager{ + // toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't + // call split related functions so set the arguments to arbitrary values. + toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3), + + restoreStores: restoreStores, + restoreTables: make(map[int64]struct{}), + }, nil } -func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error { - manager.mu.Lock() - defer manager.mu.Unlock() - placementRuleTables := make([]*model.TableInfo, 0, len(tables)) +// An offline placement rule manager, which does nothing for placement rule. +type offlinePlacementRuleManager struct{} - for _, table := range tables { - placementRuleTables = append(placementRuleTables, table.Table) - } +// SetPlacementRule implements the interface `PlacementRuleManager`, it does nothing actually. +func (offlinePlacementRuleManager) SetPlacementRule(ctx context.Context, tables []*CreatedTable) error { + return nil +} - manager.splitPostWork(ctx, placementRuleTables) - log.Info("restore table done", zapTables(tables)) - for _, tbl := range placementRuleTables { - delete(manager.hasTable, tbl.ID) - } +// ResetPlacementRules implements the interface `PlacementRuleManager`, it does nothing actually. +func (offlinePlacementRuleManager) ResetPlacementRules(ctx context.Context) error { return nil } -func (manager *brContextManager) splitPostWork(ctx context.Context, tables []*model.TableInfo) { - err := manager.resetPlacementRules(ctx, tables) - if err != nil { - log.Warn("reset placement rules failed", zap.Error(err)) - return - } +// An online placement rule manager, it sets the placement rule of tables to label constraint key `exclusive`, +// and unsets the rule. +type onlinePlacementRuleManager struct { + toolClient split.SplitClient + + restoreStores []uint64 + restoreTables map[int64]struct{} } -func (manager *brContextManager) splitPrepareWork(ctx context.Context, tables []*model.TableInfo) error { - err := manager.setupPlacementRules(ctx, tables) - if err != nil { - log.Error("setup placement rules failed", zap.Error(err)) - return errors.Trace(err) +// SetPlacementRule sets the placement rule of tables to label constraint key `exclusive`, +func (manager *onlinePlacementRuleManager) SetPlacementRule(ctx context.Context, tables []*CreatedTable) error { + for _, tbl := range tables { + manager.restoreTables[tbl.Table.ID] = struct{}{} + if tbl.Table.Partition != nil && tbl.Table.Partition.Definitions != nil { + for _, def := range tbl.Table.Partition.Definitions { + manager.restoreTables[def.ID] = struct{}{} + } + } } - err = manager.waitPlacementSchedule(ctx, tables) + err := manager.setupPlacementRules(ctx) if err != nil { - log.Error("wait placement schedule failed", zap.Error(err)) + log.Error("setup placement rules failed", zap.Error(err)) return errors.Trace(err) } - return nil -} -const ( - restoreLabelKey = "exclusive" - restoreLabelValue = "restore" -) - -// loadRestoreStores loads the stores used to restore data. This function is called only when is online. -func (manager *brContextManager) loadRestoreStores(ctx context.Context, pdClient util.StoreMeta) error { - if !manager.isOnline { - return nil - } - stores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, util.SkipTiFlash) + err = manager.waitPlacementSchedule(ctx) if err != nil { + log.Error("wait placement schedule failed", zap.Error(err)) return errors.Trace(err) } - for _, s := range stores { - if s.GetState() != metapb.StoreState_Up { - continue - } - for _, l := range s.GetLabels() { - if l.GetKey() == restoreLabelKey && l.GetValue() == restoreLabelValue { - manager.restoreStores = append(manager.restoreStores, s.GetId()) - break - } - } - } - log.Info("load restore stores", zap.Uint64s("store-ids", manager.restoreStores)) return nil } // SetupPlacementRules sets rules for the tables' regions. -func (manager *brContextManager) setupPlacementRules(ctx context.Context, tables []*model.TableInfo) error { - if !manager.isOnline || len(manager.restoreStores) == 0 { - return nil - } +func (manager *onlinePlacementRuleManager) setupPlacementRules(ctx context.Context) error { log.Info("start setting placement rules") rule, err := manager.toolClient.GetPlacementRule(ctx, "pd", "default") if err != nil { @@ -188,10 +158,10 @@ func (manager *brContextManager) setupPlacementRules(ctx context.Context, tables Op: "in", Values: []string{restoreLabelValue}, }) - for _, t := range tables { - rule.ID = getRuleID(t.ID) - rule.StartKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID))) - rule.EndKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1))) + for tableID := range manager.restoreTables { + rule.ID = getRuleID(tableID) + rule.StartKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID))) + rule.EndKeyHex = hex.EncodeToString(codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1))) err = manager.toolClient.SetPlacementRule(ctx, rule) if err != nil { return errors.Trace(err) @@ -201,22 +171,24 @@ func (manager *brContextManager) setupPlacementRules(ctx context.Context, tables return nil } -func (manager *brContextManager) checkRegions(ctx context.Context, tables []*model.TableInfo) (bool, string, error) { - for i, t := range tables { - start := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID)) - end := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(t.ID+1)) +func (manager *onlinePlacementRuleManager) checkRegions(ctx context.Context) (bool, string, error) { + progress := 0 + for tableID := range manager.restoreTables { + start := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID)) + end := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(tableID+1)) ok, regionProgress, err := manager.checkRange(ctx, start, end) if err != nil { return false, "", errors.Trace(err) } if !ok { - return false, fmt.Sprintf("table %v/%v, %s", i, len(tables), regionProgress), nil + return false, fmt.Sprintf("table %v/%v, %s", progress, len(manager.restoreTables), regionProgress), nil } + progress += 1 } return true, "", nil } -func (manager *brContextManager) checkRange(ctx context.Context, start, end []byte) (bool, string, error) { +func (manager *onlinePlacementRuleManager) checkRange(ctx context.Context, start, end []byte) (bool, string, error) { regions, err := manager.toolClient.ScanRegions(ctx, start, end, -1) if err != nil { return false, "", errors.Trace(err) @@ -236,10 +208,7 @@ func (manager *brContextManager) checkRange(ctx context.Context, start, end []by } // waitPlacementSchedule waits PD to move tables to restore stores. -func (manager *brContextManager) waitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error { - if !manager.isOnline || len(manager.restoreStores) == 0 { - return nil - } +func (manager *onlinePlacementRuleManager) waitPlacementSchedule(ctx context.Context) error { log.Info("start waiting placement schedule") ticker := time.NewTicker(time.Second * 10) failpoint.Inject("wait-placement-schedule-quicker-ticker", func() { @@ -250,7 +219,7 @@ func (manager *brContextManager) waitPlacementSchedule(ctx context.Context, tabl for { select { case <-ticker.C: - ok, progress, err := manager.checkRegions(ctx, tables) + ok, progress, err := manager.checkRegions(ctx) if err != nil { return errors.Trace(err) } @@ -270,17 +239,14 @@ func getRuleID(tableID int64) string { } // resetPlacementRules removes placement rules for tables. -func (manager *brContextManager) resetPlacementRules(ctx context.Context, tables []*model.TableInfo) error { - if !manager.isOnline || len(manager.restoreStores) == 0 { - return nil - } +func (manager *onlinePlacementRuleManager) ResetPlacementRules(ctx context.Context) error { log.Info("start resetting placement rules") var failedTables []int64 - for _, t := range tables { - err := manager.toolClient.DeletePlacementRule(ctx, "pd", getRuleID(t.ID)) + for tableID := range manager.restoreTables { + err := manager.toolClient.DeletePlacementRule(ctx, "pd", getRuleID(tableID)) if err != nil { - log.Info("failed to delete placement rule for table", zap.Int64("table-id", t.ID)) - failedTables = append(failedTables, t.ID) + log.Info("failed to delete placement rule for table", zap.Int64("table-id", tableID)) + failedTables = append(failedTables, tableID) } } if len(failedTables) > 0 { diff --git a/br/pkg/restore/snap_client/context_manager_test.go b/br/pkg/restore/snap_client/placement_rule_manager_test.go similarity index 84% rename from br/pkg/restore/snap_client/context_manager_test.go rename to br/pkg/restore/snap_client/placement_rule_manager_test.go index c13326d21653a..8ff29e6dc0aa6 100644 --- a/br/pkg/restore/snap_client/context_manager_test.go +++ b/br/pkg/restore/snap_client/placement_rule_manager_test.go @@ -33,8 +33,8 @@ import ( pd "github.com/tikv/pd/client" ) -func generateTables() []snapclient.CreatedTable { - return []snapclient.CreatedTable{ +func generateTables() []*snapclient.CreatedTable { + return []*snapclient.CreatedTable{ { Table: &model.TableInfo{ ID: 1, @@ -56,26 +56,15 @@ func generateTables() []snapclient.CreatedTable { } } -func TestContextManagerOfflineLeave(t *testing.T) { +func TestContextManagerOffline(t *testing.T) { ctx := context.Background() - brContextManager, err := snapclient.NewBRContextManager(ctx, nil, nil, nil, false) + placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, nil, nil, nil, false) require.NoError(t, err) tables := generateTables() - err = brContextManager.Enter(ctx, tables) + err = placementRuleManager.SetPlacementRule(ctx, tables) require.NoError(t, err) - err = brContextManager.Leave(ctx, tables) + err = placementRuleManager.ResetPlacementRules(ctx) require.NoError(t, err) - brContextManager.Close(ctx) -} - -func TestContextManagerOfflineClose(t *testing.T) { - ctx := context.Background() - brContextManager, err := snapclient.NewBRContextManager(ctx, nil, nil, nil, false) - require.NoError(t, err) - tables := generateTables() - err = brContextManager.Enter(ctx, tables) - require.NoError(t, err) - brContextManager.Close(ctx) } func TestContextManagerOnlineNoStores(t *testing.T) { @@ -105,14 +94,13 @@ func TestContextManagerOnlineNoStores(t *testing.T) { pdClient := utiltest.NewFakePDClient(stores, false, nil) pdHTTPCli := utiltest.NewFakePDHTTPClient() - brContextManager, err := snapclient.NewBRContextManager(ctx, pdClient, pdHTTPCli, nil, true) + placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, pdClient, pdHTTPCli, nil, true) require.NoError(t, err) tables := generateTables() - err = brContextManager.Enter(ctx, tables) + err = placementRuleManager.SetPlacementRule(ctx, tables) require.NoError(t, err) - err = brContextManager.Leave(ctx, tables) + err = placementRuleManager.ResetPlacementRules(ctx) require.NoError(t, err) - brContextManager.Close(ctx) } func generateRegions() []*pd.Region { @@ -248,12 +236,11 @@ func TestContextManagerOnlineLeave(t *testing.T) { pdClient := utiltest.NewFakePDClient(stores, false, nil) pdClient.SetRegions(regions) pdHTTPCli := utiltest.NewFakePDHTTPClient() - brContextManager, err := snapclient.NewBRContextManager(ctx, pdClient, pdHTTPCli, nil, true) + placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, pdClient, pdHTTPCli, nil, true) require.NoError(t, err) tables := generateTables() - err = brContextManager.Enter(ctx, tables) + err = placementRuleManager.SetPlacementRule(ctx, tables) require.NoError(t, err) - err = brContextManager.Leave(ctx, tables) + err = placementRuleManager.ResetPlacementRules(ctx) require.NoError(t, err) - brContextManager.Close(ctx) } diff --git a/br/pkg/restore/snap_client/tikv_sender.go b/br/pkg/restore/snap_client/tikv_sender.go new file mode 100644 index 0000000000000..89c535ad57fbf --- /dev/null +++ b/br/pkg/restore/snap_client/tikv_sender.go @@ -0,0 +1,328 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapclient + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/checkpoint" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" + snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" + "github.com/pingcap/tidb/br/pkg/restore/split" + restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/br/pkg/summary" + "github.com/pingcap/tidb/pkg/tablecodec" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +// mapTableToFiles makes a map that mapping table ID to its backup files. +// aware that one file can and only can hold one table. +func mapTableToFiles(files []*backuppb.File) (map[int64][]*backuppb.File, int) { + result := map[int64][]*backuppb.File{} + // count the write cf file that hint for split key slice size + maxSplitKeyCount := 0 + for _, file := range files { + tableID := tablecodec.DecodeTableID(file.GetStartKey()) + tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) + if tableID != tableEndID { + log.Panic("key range spread between many files.", + zap.String("file name", file.Name), + logutil.Key("startKey", file.StartKey), + logutil.Key("endKey", file.EndKey)) + } + if tableID == 0 { + log.Panic("invalid table key of file", + zap.String("file name", file.Name), + logutil.Key("startKey", file.StartKey), + logutil.Key("endKey", file.EndKey)) + } + result[tableID] = append(result[tableID], file) + if file.Cf == restoreutils.WriteCFName { + maxSplitKeyCount += 1 + } + } + return result, maxSplitKeyCount +} + +// SortAndValidateFileRanges sort, merge and validate files by tables and yields tables with range. +func SortAndValidateFileRanges( + createdTables []*CreatedTable, + allFiles []*backuppb.File, + splitSizeBytes, splitKeyCount uint64, +) ([][]byte, []TableWithRange, error) { + // sort the created table by downstream stream table id + sort.Slice(createdTables, func(a, b int) bool { + return createdTables[a].Table.ID < createdTables[b].Table.ID + }) + // mapping table ID to its backup files + fileOfTable, hintSplitKeyCount := mapTableToFiles(allFiles) + // sort, merge, and validate files in each tables, and generate split keys by the way + var ( + // to generate region split keys, merge the small ranges over the adjacent tables + sortedSplitKeys = make([][]byte, 0, hintSplitKeyCount) + + tableWithRanges = make([]TableWithRange, 0, len(createdTables)) + ) + + log.Info("start to merge ranges", zap.Uint64("kv size threshold", splitSizeBytes), zap.Uint64("kv count threshold", splitKeyCount)) + for _, table := range createdTables { + files := fileOfTable[table.OldTable.Info.ID] + if partitions := table.OldTable.Info.Partition; partitions != nil { + for _, partition := range partitions.Definitions { + files = append(files, fileOfTable[partition.ID]...) + } + } + for _, file := range files { + if err := restoreutils.ValidateFileRewriteRule(file, table.RewriteRule); err != nil { + return nil, nil, errors.Trace(err) + } + } + // Merge small ranges to reduce split and scatter regions. + // Notice that the files having the same start key and end key are in the same range. + sortedRanges, stat, err := restoreutils.MergeAndRewriteFileRanges( + files, table.RewriteRule, splitSizeBytes, splitKeyCount) + if err != nil { + return nil, nil, errors.Trace(err) + } + log.Info("merge and validate file", + zap.Stringer("database", table.OldTable.DB.Name), + zap.Stringer("table", table.Table.Name), + zap.Int("Files(total)", stat.TotalFiles), + zap.Int("File(write)", stat.TotalWriteCFFile), + zap.Int("File(default)", stat.TotalDefaultCFFile), + zap.Int("Region(total)", stat.TotalRegions), + zap.Int("Regoin(keys avg)", stat.RegionKeysAvg), + zap.Int("Region(bytes avg)", stat.RegionBytesAvg), + zap.Int("Merged(regions)", stat.MergedRegions), + zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), + zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) + + for _, rg := range sortedRanges { + sortedSplitKeys = append(sortedSplitKeys, rg.EndKey) + } + + tableWithRanges = append(tableWithRanges, TableWithRange{ + CreatedTable: *table, + Range: sortedRanges, + }) + } + return sortedSplitKeys, tableWithRanges, nil +} + +func (rc *SnapClient) RestoreTables( + ctx context.Context, + placementRuleManager PlacementRuleManager, + createdTables []*CreatedTable, + allFiles []*backuppb.File, + checkpointSetWithTableID map[int64]map[string]struct{}, + splitSizeBytes, splitKeyCount uint64, + updateCh glue.Progress, +) error { + if err := placementRuleManager.SetPlacementRule(ctx, createdTables); err != nil { + return errors.Trace(err) + } + defer func() { + err := placementRuleManager.ResetPlacementRules(ctx) + if err != nil { + log.Warn("failed to reset placement rules", zap.Error(err)) + } + }() + + start := time.Now() + sortedSplitKeys, tableWithRanges, err := SortAndValidateFileRanges(createdTables, allFiles, splitSizeBytes, splitKeyCount) + if err != nil { + return errors.Trace(err) + } + drainResult := drainRanges(tableWithRanges, checkpointSetWithTableID, updateCh) + log.Info("Merge ranges", zap.Duration("take", time.Since(start))) + + start = time.Now() + if err = rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false); err != nil { + return errors.Trace(err) + } + log.Info("Split regions", zap.Duration("take", time.Since(start))) + + start = time.Now() + if err = rc.RestoreSSTFiles(ctx, drainResult.Files(), updateCh); err != nil { + return errors.Trace(err) + } + elapsed := time.Since(start) + log.Info("Retore files", zap.Duration("take", elapsed)) + + summary.CollectSuccessUnit("files", len(allFiles), elapsed) + return nil +} + +// SplitRanges implements TiKVRestorer. It splits region by +// data range after rewrite. +func (rc *SnapClient) SplitPoints( + ctx context.Context, + sortedSplitKeys [][]byte, + updateCh glue.Progress, + isRawKv bool, +) error { + splitClientOpts := make([]split.ClientOptionalParameter, 0, 2) + splitClientOpts = append(splitClientOpts, split.WithOnSplit(func(keys [][]byte) { + for range keys { + updateCh.Inc() + } + })) + if isRawKv { + splitClientOpts = append(splitClientOpts, split.WithRawKV()) + } + + splitter := snapsplit.NewRegionSplitter(split.NewClient( + rc.pdClient, + rc.pdHTTPClient, + rc.tlsConf, + maxSplitKeysOnce, + rc.storeCount+1, + splitClientOpts..., + )) + + return splitter.ExecuteSplit(ctx, sortedSplitKeys) +} + +func getFileRangeKey(f string) string { + // the backup date file pattern is `{store_id}_{region_id}_{epoch_version}_{key}_{ts}_{cf}.sst` + // so we need to compare with out the `_{cf}.sst` suffix + idx := strings.LastIndex(f, "_") + if idx < 0 { + panic(fmt.Sprintf("invalid backup data file name: '%s'", f)) + } + + return f[:idx] +} + +// isFilesBelongToSameRange check whether two files are belong to the same range with different cf. +func isFilesBelongToSameRange(f1, f2 string) bool { + return getFileRangeKey(f1) == getFileRangeKey(f2) +} + +func drainFilesByRange(files []*backuppb.File) ([]*backuppb.File, []*backuppb.File) { + if len(files) == 0 { + return nil, nil + } + idx := 1 + for idx < len(files) { + if !isFilesBelongToSameRange(files[idx-1].Name, files[idx].Name) { + break + } + idx++ + } + + return files[:idx], files[idx:] +} + +// RestoreSSTFiles tries to restore the files. +func (rc *SnapClient) RestoreSSTFiles( + ctx context.Context, + tableIDWithFiles []TableIDWithFiles, + updateCh glue.Progress, +) (err error) { + start := time.Now() + fileCount := 0 + defer func() { + elapsed := time.Since(start) + if err == nil { + log.Info("Restore files", zap.Duration("take", elapsed)) + summary.CollectSuccessUnit("files", fileCount, elapsed) + } + }() + + log.Debug("start to restore files", zap.Int("files", fileCount)) + + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + eg, ectx := errgroup.WithContext(ctx) + err = rc.setSpeedLimit(ctx, rc.rateLimit) + if err != nil { + return errors.Trace(err) + } + + var rangeFiles []*backuppb.File + var leftFiles []*backuppb.File +LOOPFORTABLE: + for _, tableIDWithFile := range tableIDWithFiles { + tableID := tableIDWithFile.TableID + files := tableIDWithFile.Files + rules := tableIDWithFile.RewriteRules + fileCount += len(files) + for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) { + if ectx.Err() != nil { + log.Warn("Restoring encountered error and already stopped, give up remained files.", + zap.Int("remained", len(leftFiles)), + logutil.ShortError(ectx.Err())) + // We will fetch the error from the errgroup then (If there were). + // Also note if the parent context has been canceled or something, + // breaking here directly is also a reasonable behavior. + break LOOPFORTABLE + } + filesReplica := rangeFiles + rc.fileImporter.WaitUntilUnblock() + rc.workerPool.ApplyOnErrorGroup(eg, func() (restoreErr error) { + fileStart := time.Now() + defer func() { + if restoreErr == nil { + log.Info("import files done", logutil.Files(filesReplica), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + } + }() + if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil { + return errors.Trace(importErr) + } + + // the data of this range has been import done + if rc.checkpointRunner != nil && len(filesReplica) > 0 { + rangeKey := getFileRangeKey(filesReplica[0].Name) + // The checkpoint range shows this ranges of kvs has been restored into + // the table corresponding to the table-id. + if err := checkpoint.AppendRangesForRestore(ectx, rc.checkpointRunner, tableID, rangeKey); err != nil { + return errors.Trace(err) + } + } + return nil + }) + } + } + + if err := eg.Wait(); err != nil { + summary.CollectFailureUnit("file", err) + log.Error( + "restore files failed", + zap.Error(err), + ) + return errors.Trace(err) + } + // Once the parent context canceled and there is no task running in the errgroup, + // we may break the for loop without error in the errgroup. (Will this happen?) + // At that time, return the error in the context here. + return ctx.Err() +} diff --git a/br/pkg/restore/snap_client/tikv_sender_test.go b/br/pkg/restore/snap_client/tikv_sender_test.go new file mode 100644 index 0000000000000..d58c8f73439a2 --- /dev/null +++ b/br/pkg/restore/snap_client/tikv_sender_test.go @@ -0,0 +1,66 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapclient_test + +import ( + "testing" + + backuppb "github.com/pingcap/kvproto/pkg/brpb" + snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" + restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/stretchr/testify/require" +) + +func TestMapTableToFiles(t *testing.T) { + filesOfTable1 := []*backuppb.File{ + { + Name: "table1-1.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1), + Cf: restoreutils.WriteCFName, + }, + { + Name: "table1-2.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1), + Cf: restoreutils.WriteCFName, + }, + { + Name: "table1-3.sst", + StartKey: tablecodec.EncodeTablePrefix(1), + EndKey: tablecodec.EncodeTablePrefix(1), + }, + } + filesOfTable2 := []*backuppb.File{ + { + Name: "table2-1.sst", + StartKey: tablecodec.EncodeTablePrefix(2), + EndKey: tablecodec.EncodeTablePrefix(2), + Cf: restoreutils.WriteCFName, + }, + { + Name: "table2-2.sst", + StartKey: tablecodec.EncodeTablePrefix(2), + EndKey: tablecodec.EncodeTablePrefix(2), + }, + } + + result, hintSplitKeyCount := snapclient.MapTableToFiles(append(filesOfTable2, filesOfTable1...)) + + require.Equal(t, filesOfTable1, result[1]) + require.Equal(t, filesOfTable2, result[2]) + require.Equal(t, 3, hintSplitKeyCount) +} diff --git a/br/pkg/restore/snap_client/zap.go b/br/pkg/restore/snap_client/zap.go deleted file mode 100644 index 453b3337d5e82..0000000000000 --- a/br/pkg/restore/snap_client/zap.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package snapclient - -import ( - "fmt" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/utils" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -// ZapTables make zap field of table for debuging, including table names. -func zapTables(tables []CreatedTable) zapcore.Field { - return logutil.AbbreviatedArray("tables", tables, func(input any) []string { - tables := input.([]CreatedTable) - names := make([]string, 0, len(tables)) - for _, t := range tables { - names = append(names, fmt.Sprintf("%s.%s", - utils.EncloseName(t.OldTable.DB.Name.String()), - utils.EncloseName(t.OldTable.Info.Name.String()))) - } - return names - }) -} - -type zapTableIDWithFilesMarshaler []TableIDWithFiles - -func zapTableIDWithFiles(fs []TableIDWithFiles) zap.Field { - return zap.Object("files", zapTableIDWithFilesMarshaler(fs)) -} - -func (fs zapTableIDWithFilesMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error { - for _, f := range fs { - encoder.AddInt64("table-id", f.TableID) - if err := logutil.MarshalLogObjectForFiles(f.Files, encoder); err != nil { - return errors.Trace(err) - } - } - return nil -} diff --git a/br/pkg/restore/utils/merge.go b/br/pkg/restore/utils/merge.go index 837eacd9937c5..39b7e1a3f9691 100644 --- a/br/pkg/restore/utils/merge.go +++ b/br/pkg/restore/utils/merge.go @@ -3,12 +3,15 @@ package utils import ( + "bytes" "strings" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/rtree" + "go.uber.org/zap" ) // MergeRangesStat holds statistics for the MergeRanges. @@ -34,9 +37,9 @@ func MergeAndRewriteFileRanges( rewriteRules *RewriteRules, splitSizeBytes, splitKeyCount uint64, -) ([]rtree.Range, *MergeRangesStat, error) { +) ([]rtree.RangeStats, *MergeRangesStat, error) { if len(files) == 0 { - return []rtree.Range{}, &MergeRangesStat{}, nil + return []rtree.RangeStats{}, &MergeRangesStat{}, nil } totalBytes := uint64(0) totalKvs := uint64(0) @@ -48,6 +51,14 @@ func MergeAndRewriteFileRanges( for _, file := range files { filesMap[string(file.StartKey)] = append(filesMap[string(file.StartKey)], file) + // Assert that it has the same end key. + if !bytes.Equal(filesMap[string(file.StartKey)][0].EndKey, file.EndKey) { + log.Panic("there are two files having the same start key, but different end key", + zap.ByteString("start key", file.StartKey), + zap.ByteString("file 1 end key", file.EndKey), + zap.ByteString("file 2 end key", filesMap[string(file.StartKey)][0].EndKey), + ) + } // We skips all default cf files because we don't range overlap. if file.Cf == WriteCFName || strings.Contains(file.GetName(), WriteCFName) { writeCFFile++ @@ -58,7 +69,7 @@ func MergeAndRewriteFileRanges( totalKvs += file.TotalKvs } if writeCFFile == 0 && defaultCFFile == 0 { - return []rtree.Range{}, nil, errors.Annotatef(berrors.ErrRestoreInvalidBackup, + return []rtree.RangeStats{}, nil, errors.Annotatef(berrors.ErrRestoreInvalidBackup, "unknown backup data from neither Wrtie CF nor Default CF") } @@ -69,18 +80,19 @@ func MergeAndRewriteFileRanges( } // Check if files are overlapped - rangeTree := rtree.NewRangeTree() + rangeTree := rtree.NewRangeStatsTree() for key := range filesMap { files := filesMap[key] rangeSize := uint64(0) + rangeCount := uint64(0) for _, f := range filesMap[key] { - rangeSize += f.Size_ + rangeSize += f.TotalBytes + rangeCount += f.TotalKvs } rg := &rtree.Range{ StartKey: files[0].GetStartKey(), EndKey: files[0].GetEndKey(), Files: files, - Size: rangeSize, } // rewrite Range for split. // so that splitRanges no need to handle rewrite rules any more. @@ -89,7 +101,7 @@ func MergeAndRewriteFileRanges( return nil, nil, errors.Annotatef(berrors.ErrInvalidRange, "unable to rewrite range files %+v", files) } - if out := rangeTree.InsertRange(*tmpRng); out != nil { + if out := rangeTree.InsertRange(tmpRng, rangeSize, rangeCount); out != nil { return nil, nil, errors.Annotatef(berrors.ErrInvalidRange, "duplicate range %s files %+v", out, files) } diff --git a/br/pkg/rtree/merge_fuzz_test.go b/br/pkg/rtree/merge_fuzz_test.go index df6e304cc1e52..637539af1deb9 100644 --- a/br/pkg/rtree/merge_fuzz_test.go +++ b/br/pkg/rtree/merge_fuzz_test.go @@ -15,8 +15,8 @@ func FuzzMerge(f *testing.F) { baseKeyB := tablecodec.EncodeIndexSeekKey(42, 1, nil) f.Add([]byte(baseKeyA), []byte(baseKeyB)) f.Fuzz(func(t *testing.T, a, b []byte) { - left := rtree.Range{StartKey: a, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}} - right := rtree.Range{StartKey: b, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}} + left := rtree.RangeStats{Range: rtree.Range{StartKey: a, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}} + right := rtree.RangeStats{Range: rtree.Range{StartKey: b, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}} rtree.NeedsMerge(&left, &right, 42, 42) }) } diff --git a/br/pkg/rtree/rtree.go b/br/pkg/rtree/rtree.go index 0c7d0ed5ce460..4302a40fbb220 100644 --- a/br/pkg/rtree/rtree.go +++ b/br/pkg/rtree/rtree.go @@ -20,7 +20,6 @@ type Range struct { StartKey []byte EndKey []byte Files []*backuppb.File - Size uint64 } // BytesAndKeys returns total bytes and keys in a range. @@ -85,8 +84,64 @@ func (rg *Range) Less(than btree.Item) bool { return bytes.Compare(rg.StartKey, ta.StartKey) < 0 } +// RangeStats represents a restore merge result. +type RangeStats struct { + Range + Size uint64 + Count uint64 +} + +// Less impls btree.Item. +func (rg *RangeStats) Less(ta *RangeStats) bool { + // rg.StartKey < than.StartKey + return bytes.Compare(rg.StartKey, ta.StartKey) < 0 +} + +type RangeStatsTree struct { + *btree.BTreeG[*RangeStats] +} + +func NewRangeStatsTree() RangeStatsTree { + return RangeStatsTree{ + BTreeG: btree.NewG[*RangeStats](32, (*RangeStats).Less), + } +} + +// InsertRange inserts ranges into the range tree. +// It returns a non-nil range if there are soe overlapped ranges. +func (rangeTree *RangeStatsTree) InsertRange(rg *Range, rangeSize, rangeCount uint64) *RangeStats { + out, _ := rangeTree.ReplaceOrInsert(&RangeStats{ + Range: *rg, + Size: rangeSize, + Count: rangeCount, + }) + return out +} + +// MergedRanges output the sortedRanges having merged according to given `splitSizeBytes` and `splitKeyCount`. +func (rangeTree *RangeStatsTree) MergedRanges(splitSizeBytes, splitKeyCount uint64) []RangeStats { + var mergeTargetIndex int = -1 + sortedRanges := make([]RangeStats, 0, rangeTree.Len()) + rangeTree.Ascend(func(rg *RangeStats) bool { + if mergeTargetIndex < 0 || !NeedsMerge(&sortedRanges[mergeTargetIndex], rg, splitSizeBytes, splitKeyCount) { + // unintialized or the sortedRanges[mergeTargetIndex] does not need to merged + mergeTargetIndex += 1 + sortedRanges = append(sortedRanges, *rg) + } else { + // need to merge from rg to sortedRages[mergeTargetIndex] + sortedRanges[mergeTargetIndex].EndKey = rg.EndKey + sortedRanges[mergeTargetIndex].Size += rg.Size + sortedRanges[mergeTargetIndex].Count += rg.Count + sortedRanges[mergeTargetIndex].Files = append(sortedRanges[mergeTargetIndex].Files, rg.Files...) + } + + return true + }) + return sortedRanges +} + // NeedsMerge checks whether two ranges needs to be merged. -func NeedsMerge(left, right *Range, splitSizeBytes, splitKeyCount uint64) bool { +func NeedsMerge(left, right *RangeStats, splitSizeBytes, splitKeyCount uint64) bool { leftBytes, leftKeys := left.BytesAndKeys() rightBytes, rightKeys := right.BytesAndKeys() if rightBytes == 0 { @@ -217,41 +272,6 @@ func (rangeTree *RangeTree) InsertRange(rg Range) *Range { return out.(*Range) } -// MergedRanges output the sortedRanges having merged according to given `splitSizeBytes` and `splitKeyCount`. -func (rangeTree *RangeTree) MergedRanges(splitSizeBytes, splitKeyCount uint64) []Range { - var mergeTargetIndex int = -1 - sortedRanges := make([]Range, 0, rangeTree.Len()) - rangeTree.Ascend(func(item btree.Item) bool { - rg := item.(*Range) - if mergeTargetIndex < 0 || !NeedsMerge(&sortedRanges[mergeTargetIndex], rg, splitSizeBytes, splitKeyCount) { - // unintialized or the sortedRanges[mergeTargetIndex] does not need to merged - mergeTargetIndex += 1 - sortedRanges = append(sortedRanges, *rg) - } else { - // need to merge from rg to sortedRages[mergeTargetIndex] - sortedRanges[mergeTargetIndex].EndKey = rg.EndKey - sortedRanges[mergeTargetIndex].Size += rg.Size - sortedRanges[mergeTargetIndex].Files = append(sortedRanges[mergeTargetIndex].Files, rg.Files...) - } - - return true - }) - return sortedRanges -} - -// GetSortedRanges collects and returns sorted ranges. -func (rangeTree *RangeTree) GetSortedRanges() []Range { - sortedRanges := make([]Range, 0, rangeTree.Len()) - rangeTree.Ascend(func(rg btree.Item) bool { - if rg == nil { - return false - } - sortedRanges = append(sortedRanges, *rg.(*Range)) - return true - }) - return sortedRanges -} - // GetIncompleteRange returns missing range covered by startKey and endKey. func (rangeTree *RangeTree) GetIncompleteRange( startKey, endKey []byte, diff --git a/br/pkg/rtree/rtree_test.go b/br/pkg/rtree/rtree_test.go index f5b702e04072b..bf1235e2dd0db 100644 --- a/br/pkg/rtree/rtree_test.go +++ b/br/pkg/rtree/rtree_test.go @@ -189,10 +189,10 @@ func encodeTableRecord(prefix kv.Key, rowID uint64) []byte { } func TestRangeTreeMerge(t *testing.T) { - rangeTree := rtree.NewRangeTree() + rangeTree := rtree.NewRangeStatsTree() tablePrefix := tablecodec.GenTableRecordPrefix(1) for i := uint64(0); i < 10000; i += 1 { - item := rtree.Range{ + rangeTree.InsertRange(&rtree.Range{ StartKey: encodeTableRecord(tablePrefix, i), EndKey: encodeTableRecord(tablePrefix, i+1), Files: []*backuppb.File{ @@ -202,9 +202,7 @@ func TestRangeTreeMerge(t *testing.T) { TotalBytes: 1, }, }, - Size: i, - } - rangeTree.Update(item) + }, i, 0) } sortedRanges := rangeTree.MergedRanges(10, 10) require.Equal(t, 1000, len(sortedRanges)) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 60f80c77a5f86..545409bd37629 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -58,13 +58,11 @@ go_library( "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics/handle", - "//pkg/tablecodec", "//pkg/types", "//pkg/util", "//pkg/util/cdcutil", "//pkg/util/collate", "//pkg/util/engine", - "//pkg/util/mathutil", "//pkg/util/table-filter", "@com_github_docker_go_units//:go-units", "@com_github_fatih_color//:color", @@ -111,7 +109,7 @@ go_test( ], embed = [":task"], flaky = True, - shard_count = 34, + shard_count = 33, deps = [ "//br/pkg/backup", "//br/pkg/config", diff --git a/br/pkg/task/config_test.go b/br/pkg/task/config_test.go index 3090570046644..85532f019863d 100644 --- a/br/pkg/task/config_test.go +++ b/br/pkg/task/config_test.go @@ -286,40 +286,3 @@ func mockBackupMeta(mockSchemas []*backuppb.Schema, mockFiles []*backuppb.File) Schemas: mockSchemas, } } - -func TestMapTableToFiles(t *testing.T) { - filesOfTable1 := []*backuppb.File{ - { - Name: "table1-1.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-2.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-3.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - } - filesOfTable2 := []*backuppb.File{ - { - Name: "table2-1.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - { - Name: "table2-2.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - } - - result := MapTableToFiles(append(filesOfTable2, filesOfTable1...)) - - require.Equal(t, filesOfTable1, result[1]) - require.Equal(t, filesOfTable2, result[2]) -} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index f1ff4391634af..e0447b79c0a02 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -7,7 +7,6 @@ import ( "context" "fmt" "slices" - "sort" "strings" "time" @@ -15,7 +14,6 @@ import ( "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/checkpoint" @@ -38,10 +36,8 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/engine" - "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/tikv" @@ -161,7 +157,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { flags.Uint64(FlagMergeRegionKeyCount, conn.DefaultMergeRegionKeyCount, "the threshold of merging small regions (Default 960_000, region split key count)") flags.Uint(FlagPDConcurrency, defaultPDConcurrency, - "concurrency pd-relative operations like split & scatter.") + "(deprecated) concurrency pd-relative operations like split & scatter.") flags.Uint(FlagStatsConcurrency, defaultStatsConcurrency, "concurrency to restore statistic") flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval, @@ -1050,8 +1046,6 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } } - // We make bigger errCh so we won't block on multi-part failed. - errCh := make(chan error, 32) createdTables, err := client.CreateTables(ctx, tables, newTS) if err != nil { @@ -1094,14 +1088,6 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - tableStream := afterTableCreatedCh(ctx, createdTables) - - tableFileMap := MapTableToFiles(files) - log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) - - rangeStream := client.GoValidateFileRanges( - ctx, tableStream, tableFileMap, kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, errCh) - rangeSize := EstimateRangeSize(files) summary.CollectInt("restore ranges", rangeSize) log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) @@ -1115,13 +1101,6 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - // Restore sst files in batch. - batchSize := mathutil.MaxInt - failpoint.Inject("small-batch-size", func(v failpoint.Value) { - log.Info("failpoint small batch size is on", zap.Int("size", v.(int))) - batchSize = v.(int) - }) - // Split/Scatter + Download/Ingest progressLen := int64(rangeSize + len(files)) if cfg.Checksum { @@ -1131,28 +1110,22 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf progressLen += int64(len(tables)) } // Redirect to log if there is no log file to avoid unreadable output. - updateCh := g.StartProgress( - ctx, - cmdName, - progressLen, - !cfg.LogProgress) + updateCh := g.StartProgress(ctx, cmdName, progressLen, !cfg.LogProgress) defer updateCh.Close() - sender, err := snapclient.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency) + placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), cfg.Online) if err != nil { return errors.Trace(err) } - manager, err := snapclient.NewBRContextManager(ctx, mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), cfg.Online) - if err != nil { + if err := client.RestoreTables(ctx, placementRuleManager, createdTables, files, checkpointSetWithTableID, + kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, + updateCh, + ); err != nil { return errors.Trace(err) } - batcher, afterTableRestoredCh := snapclient.NewBatcher(ctx, sender, manager, errCh, updateCh) - batcher.SetCheckpoint(checkpointSetWithTableID) - batcher.SetThreshold(batchSize) - batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval) - go restoreTableStream(ctx, rangeStream, batcher, errCh) - var finish <-chan struct{} - postHandleCh := afterTableRestoredCh + // We make bigger errCh so we won't block on multi-part failed. + errCh := make(chan error, 32) + postHandleCh := afterTableRestoredCh(ctx, createdTables) // pipeline checksum if cfg.Checksum { @@ -1168,7 +1141,7 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf postHandleCh = client.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh) } - finish = dropToBlackhole(ctx, postHandleCh, errCh) + finish := dropToBlackhole(ctx, postHandleCh, errCh) // Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called. defer func() { @@ -1399,30 +1372,6 @@ func EstimateRangeSize(files []*backuppb.File) int { return result } -// MapTableToFiles makes a map that mapping table ID to its backup files. -// aware that one file can and only can hold one table. -func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File { - result := map[int64][]*backuppb.File{} - for _, file := range files { - tableID := tablecodec.DecodeTableID(file.GetStartKey()) - tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) - if tableID != tableEndID { - log.Panic("key range spread between many files.", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - if tableID == 0 { - log.Panic("invalid table key of file", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - result[tableID] = append(result[tableID], file) - } - return result -} - // dropToBlackhole drop all incoming tables into black hole, // i.e. don't execute checksum, just increase the process anyhow. func dropToBlackhole( @@ -1494,39 +1443,6 @@ func enableTiDBConfig() func() { return restoreConfig } -// restoreTableStream blocks current goroutine and restore a stream of tables, -// by send tables to batcher. -func restoreTableStream( - ctx context.Context, - inputCh <-chan snapclient.TableWithRange, - batcher *snapclient.Batcher, - errCh chan<- error, -) { - oldTableCount := 0 - defer func() { - // when things done, we must clean pending requests. - batcher.Close() - log.Info("doing postwork", - zap.Int("table count", oldTableCount), - ) - }() - - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case t, ok := <-inputCh: - if !ok { - return - } - oldTableCount += 1 - - batcher.Add(t) - } - } -} - func getTiFlashNodeCount(ctx context.Context, pdClient pd.Client) (uint64, error) { tiFlashStores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, connutil.TiFlashOnly) if err != nil { @@ -1742,22 +1658,18 @@ func checkIsInActions(action model.ActionType, actions map[model.ActionType]stru return ok } -func afterTableCreatedCh(ctx context.Context, createdTables []*snapclient.CreatedTable) <-chan snapclient.CreatedTable { - outCh := make(chan snapclient.CreatedTable) +func afterTableRestoredCh(ctx context.Context, createdTables []*snapclient.CreatedTable) <-chan *snapclient.CreatedTable { + outCh := make(chan *snapclient.CreatedTable) go func() { defer close(outCh) - sort.Slice(createdTables, func(a, b int) bool { - return createdTables[a].Table.ID < createdTables[b].Table.ID - }) - for _, createdTable := range createdTables { select { case <-ctx.Done(): return default: - outCh <- *createdTable + outCh <- createdTable } } }() diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index b53459a386a05..ea87cc7345b8a 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/summary" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -145,7 +146,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = client.SplitRanges(ctx, ranges, updateCh, true) + err = client.SplitPoints(ctx, getEndKeys(ranges), updateCh, true) if err != nil { return errors.Trace(err) } @@ -169,3 +170,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR summary.SetSuccessStatus(true) return nil } + +func getEndKeys(ranges []rtree.RangeStats) [][]byte { + endKeys := make([][]byte, 0, len(ranges)) + for _, rg := range ranges { + endKeys = append(endKeys, rg.EndKey) + } + return endKeys +} diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 00039eb51370e..7b24a974490d4 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -88,7 +88,7 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) !cfg.LogProgress) // RawKV restore does not need to rewrite keys. - err = client.SplitRanges(ctx, ranges, updateCh, false) + err = client.SplitPoints(ctx, getEndKeys(ranges), updateCh, false) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_small_batch_size/run.sh b/br/tests/br_small_batch_size/run.sh deleted file mode 100755 index 3fe09fca81063..0000000000000 --- a/br/tests/br_small_batch_size/run.sh +++ /dev/null @@ -1,81 +0,0 @@ -#!/bin/sh -# -# Copyright 2020 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) - -random_values() { - length=$1 - count=$2 - python -c " -import random -import string -for ignored in range($count): - print(''.join(random.choice(string.ascii_letters) for _ in range($length)))" | - awk '{print "(1" $1 "1)"}' | - tr "\n1" ",'" | - sed 's/,$//' -} - -create_and_insert() { - table_name=$1 - record_count=$2 - run_sql "CREATE TABLE $DB.$table_name(k varchar(256) primary key)" - stmt="INSERT INTO $DB.$table_name VALUES `random_values 255 $record_count`" - echo $stmt | mysql -uroot -h127.0.0.1 -P4000 -} - -check_size() { - table_name=$1 - record_count=$2 - - count=`run_sql 'select count(*) from $DB.$table_name' | awk '/count/{print $2}'` - - if [ $count -ne $record_count ]; then - echo "check size failed: $count vs $record_count" - fi -} - -set -eu -DB="$TEST_NAME" -TABLE="usertable" - -run_sql "CREATE DATABASE $DB;" - -record_counts=(10000 10010 10086) -for i in $record_counts; do - create_and_insert "t$i" $i -done -go-ycsb load mysql -P $CUR/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB - - -echo "backup start..." -backup_dir="$TEST_DIR/${TEST_NAME}_backup" -rm -rf $backup_dir -run_br backup full -s "local://$backup_dir" --pd $PD_ADDR - -run_sql "drop database $DB" - - -echo "restore start..." -GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/small-batch-size=return(2)" \ -run_br restore full -s "local://$backup_dir" --pd $PD_ADDR --ratelimit 1024 - -for i in $record_counts; do - check_size "t$i" $i -done -check_size $TABLE 10000 - -run_sql "DROP DATABASE $DB" diff --git a/br/tests/br_small_batch_size/workload b/br/tests/br_small_batch_size/workload deleted file mode 100644 index caba5e1caabd0..0000000000000 --- a/br/tests/br_small_batch_size/workload +++ /dev/null @@ -1,12 +0,0 @@ -recordcount=30000 -operationcount=0 -workload=core - -readallfields=true - -readproportion=0 -updateproportion=0 -scanproportion=0 -insertproportion=0 - -requestdistribution=uniform \ No newline at end of file diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 9fe7fd643a58b..ae9f17d7a462b 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -25,7 +25,7 @@ groups=( ["G02"]="br_full_cluster_restore br_full_index br_incremental_ddl br_pitr_failpoint" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash br_tiflash_conflict' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' - ["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter' + ["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter' ["G06"]='br_tikv_outage br_tikv_outage3' ["G07"]='br_pitr' ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom'