diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 5a12c4ba34e4e..7fa273b827a1c 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" @@ -807,6 +808,15 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F return files[:idx], files[idx:] } +// SplitRanges implements TiKVRestorer. +func (rc *Client) SplitRanges(ctx context.Context, + ranges []rtree.Range, + rewriteRules *RewriteRules, + updateCh glue.Progress, + isRawKv bool) error { + return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv) +} + // RestoreFiles tries to restore the files. func (rc *Client) RestoreFiles( ctx context.Context, diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 7d3ce107ff70b..26da3824b9a4b 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -8,8 +8,10 @@ import ( "time" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/summary" @@ -183,8 +185,27 @@ type BatchSender interface { Close() } +// TiKVRestorer is the minimal methods required for restoring. +// It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant. +// Maybe TODO: make a better abstraction? +type TiKVRestorer interface { + // SplitRanges split regions implicated by the ranges and rewrite rules. + // After spliting, it also scatters the fresh regions. + SplitRanges(ctx context.Context, + ranges []rtree.Range, + rewriteRules *RewriteRules, + updateCh glue.Progress, + isRawKv bool) error + // RestoreFiles import the files to the TiKV. + RestoreFiles(ctx context.Context, + files []*backuppb.File, + rewriteRules *RewriteRules, + updateCh glue.Progress) error +} + type tikvSender struct { - client *Client + client TiKVRestorer + updateCh glue.Progress sink TableSink @@ -209,7 +230,7 @@ func (b *tikvSender) RestoreBatch(ranges DrainResult) { // NewTiKVSender make a sender that send restore requests to TiKV. func NewTiKVSender( ctx context.Context, - cli *Client, + cli TiKVRestorer, updateCh glue.Progress, splitConcurrency uint, ) (BatchSender, error) { @@ -252,9 +273,9 @@ func (b *tikvSender) splitWorker(ctx context.Context, b.wg.Done() if err := eg.Wait(); err != nil { b.sink.EmitError(err) - return } close(next) + log.Info("TiKV Sender: split worker exits.") }() start := time.Now() @@ -266,7 +287,7 @@ func (b *tikvSender) splitWorker(ctx context.Context, pool := utils.NewWorkerPool(concurrency, "split") for { select { - case <-ctx.Done(): + case <-ectx.Done(): return case result, ok := <-ranges: if !ok { @@ -289,7 +310,7 @@ func (b *tikvSender) splitWorker(ctx context.Context, // hence the checksum would fail. done := b.registerTableIsRestoring(result.TablesToSend) pool.ApplyOnErrorGroup(eg, func() error { - err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, false) + err := b.client.SplitRanges(ectx, result.Ranges, result.RewriteRules, b.updateCh, false) if err != nil { log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err)) return err @@ -338,17 +359,17 @@ func (b *tikvSender) waitTablesDone(ts []CreatedTable) { func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) { eg, ectx := errgroup.WithContext(ctx) defer func() { - log.Debug("restore worker closed") + log.Info("TiKV Sender: restore worker prepare to close.") if err := eg.Wait(); err != nil { b.sink.EmitError(err) - return } - b.wg.Done() b.sink.Close() + b.wg.Done() + log.Info("TiKV Sender: restore worker exits.") }() for { select { - case <-ctx.Done(): + case <-ectx.Done(): return case r, ok := <-ranges: if !ok { @@ -360,6 +381,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul eg.Go(func() error { e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh) if e != nil { + log.Error("restore batch meet error", logutil.ShortError(e), logutil.Files(files)) r.done() return e } diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 5a4a8bbdad97b..89fc769062874 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -10,9 +10,14 @@ import ( "time" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" @@ -560,3 +565,123 @@ func TestRegionConsistency(t *testing.T) { require.Regexp(t, ca.err, err.Error()) } } + +type fakeRestorer struct { + errorInSplit bool + splitRanges []rtree.Range + restoredFiles []*backuppb.File +} + +func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *restore.RewriteRules, updateCh glue.Progress, isRawKv bool) error { + if ctx.Err() != nil { + return ctx.Err() + } + f.splitRanges = append(f.splitRanges, ranges...) + if f.errorInSplit { + err := errors.Annotatef(berrors.ErrRestoreSplitFailed, + "the key space takes many efforts and finally get together, how dare you split them again... :<") + log.Error("error happens :3", logutil.ShortError(err)) + return err + } + return nil +} + +func (f *fakeRestorer) RestoreFiles(ctx context.Context, files []*backuppb.File, rewriteRules *restore.RewriteRules, updateCh glue.Progress) error { + if ctx.Err() != nil { + return ctx.Err() + } + f.restoredFiles = append(f.restoredFiles, files...) + err := errors.Annotatef(berrors.ErrRestoreWriteAndIngest, "the files to restore are taken by a hijacker, meow :3") + log.Error("error happens :3", logutil.ShortError(err)) + return err +} + +func fakeRanges(keys ...string) (r restore.DrainResult) { + for i := range keys { + if i+1 == len(keys) { + return + } + r.Ranges = append(r.Ranges, rtree.Range{ + StartKey: []byte(keys[i]), + EndKey: []byte(keys[i+1]), + Files: []*backuppb.File{{Name: "fake.sst"}}, + }) + } + return +} + +type errorInTimeSink struct { + ctx context.Context + errCh chan error + t *testing.T +} + +func (e errorInTimeSink) EmitTables(tables ...restore.CreatedTable) {} + +func (e errorInTimeSink) EmitError(err error) { + e.errCh <- err +} + +func (e errorInTimeSink) Close() {} + +func (e errorInTimeSink) Wait() { + select { + case <-e.ctx.Done(): + e.t.Logf("The context is canceled but no error happen") + e.t.FailNow() + case <-e.errCh: + } +} + +func assertErrorEmitInTime(ctx context.Context, t *testing.T) errorInTimeSink { + errCh := make(chan error, 1) + return errorInTimeSink{ + ctx: ctx, + errCh: errCh, + t: t, + } +} + +func TestRestoreFailed(t *testing.T) { + ranges := []restore.DrainResult{ + fakeRanges("aax", "abx", "abz"), + fakeRanges("abz", "bbz", "bcy"), + fakeRanges("bcy", "cad", "xxy"), + } + r := &fakeRestorer{} + sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1) + require.NoError(t, err) + dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + sink := assertErrorEmitInTime(dctx, t) + sender.PutSink(sink) + for _, r := range ranges { + sender.RestoreBatch(r) + } + sink.Wait() + sink.Close() + sender.Close() + require.GreaterOrEqual(t, len(r.restoredFiles), 1) +} + +func TestSplitFailed(t *testing.T) { + ranges := []restore.DrainResult{ + fakeRanges("aax", "abx", "abz"), + fakeRanges("abz", "bbz", "bcy"), + fakeRanges("bcy", "cad", "xxy"), + } + r := &fakeRestorer{errorInSplit: true} + sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1) + require.NoError(t, err) + dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + sink := assertErrorEmitInTime(dctx, t) + sender.PutSink(sink) + for _, r := range ranges { + sender.RestoreBatch(r) + } + sink.Wait() + sender.Close() + require.GreaterOrEqual(t, len(r.splitRanges), 2) + require.Len(t, r.restoredFiles, 0) +}