From 3e34d96b204119c6e288d30ab7af6b6e81ccef40 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 11:54:18 +0800 Subject: [PATCH 1/8] lightning: support inject external storage when as library --- br/pkg/lightning/checkpoints/checkpoints.go | 44 +++++++---- br/pkg/lightning/lightning.go | 56 +++++++++++--- br/pkg/lightning/lightning_serial_test.go | 8 +- br/pkg/lightning/restore/check_info.go | 2 +- br/pkg/lightning/restore/restore.go | 85 ++++++++++++++------- br/pkg/lightning/run_options.go | 52 +++++++++++++ 6 files changed, 191 insertions(+), 56 deletions(-) create mode 100644 br/pkg/lightning/run_options.go diff --git a/br/pkg/lightning/checkpoints/checkpoints.go b/br/pkg/lightning/checkpoints/checkpoints.go index f323c58e9b88c..76864cd986567 100644 --- a/br/pkg/lightning/checkpoints/checkpoints.go +++ b/br/pkg/lightning/checkpoints/checkpoints.go @@ -28,6 +28,9 @@ import ( "github.com/joho/sqltocsv" "github.com/pingcap/errors" + "go.uber.org/zap" + "modernc.org/mathutil" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints/checkpointspb" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -36,8 +39,6 @@ import ( verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/version/build" - "go.uber.org/zap" - "modernc.org/mathutil" ) type CheckpointStatus uint8 @@ -954,24 +955,23 @@ type FileCheckpointsDB struct { exStorage storage.ExternalStorage } -func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) { +func newFileCheckpointsDB( + ctx context.Context, + path string, + exStorage storage.ExternalStorage, + fileName string, +) (*FileCheckpointsDB, error) { cpdb := &FileCheckpointsDB{ - path: path, checkpoints: checkpointspb.CheckpointsModel{ TaskCheckpoint: &checkpointspb.TaskCheckpointModel{}, Checkpoints: map[string]*checkpointspb.TableCheckpointModel{}, }, + ctx: ctx, + path: path, + fileName: fileName, + exStorage: exStorage, } - // init ExternalStorage - s, fileName, err := createExstorageByCompletePath(ctx, path) - if err != nil { - return nil, errors.Trace(err) - } - cpdb.ctx = ctx - cpdb.fileName = fileName - cpdb.exStorage = s - if cpdb.fileName == "" { return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path) } @@ -1013,6 +1013,24 @@ func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, return cpdb, nil } +func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) { + // init ExternalStorage + s, fileName, err := createExstorageByCompletePath(ctx, path) + if err != nil { + return nil, errors.Trace(err) + } + return newFileCheckpointsDB(ctx, path, s, fileName) +} + +func NewFileCheckpointsDBWithExstorageFileName( + ctx context.Context, + path string, + s storage.ExternalStorage, + fileName string, +) (*FileCheckpointsDB, error) { + return newFileCheckpointsDB(ctx, path, s, fileName) +} + // createExstorageByCompletePath create ExternalStorage by completePath and return fileName. func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) { if completePath == "" { diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 907b316ab8399..764fe782b13f0 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/importer" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" @@ -188,6 +189,7 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { // use a default glue later. // - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a // caller implemented glue. +// deprecated: use RunOnceWithOptions instead. func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error { if err := taskCfg.Adjust(taskCtx); err != nil { return err @@ -198,7 +200,7 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu taskCfg.TaskID = int64(val.(int)) }) - return l.run(taskCtx, taskCfg, glue) + return l.run(taskCtx, taskCfg, &options{glue: glue}) } func (l *Lightning) RunServer() error { @@ -223,12 +225,33 @@ func (l *Lightning) RunServer() error { } } +func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error { + if err := taskCfg.Adjust(taskCtx); err != nil { + return err + } + + taskCfg.TaskID = time.Now().UnixNano() + o := &options{} + for _, opt := range opts { + opt(o) + } + // pre-check about options + if o.externalStorage != nil && o.glue != nil { + return common.NormalizeError(errors.New("WithExternalStorage and WithGlue can't be both set")) + } + if o.cpNameInExtStorage != "" && o.glue != nil { + return common.NormalizeError(errors.New("WithCpNameInExtStorage and WithGlue can't be both set")) + } + + return l.run(taskCtx, taskCfg, o) +} + var ( taskRunNotifyKey = "taskRunNotifyKey" taskCfgRecorderKey = "taskCfgRecorderKey" ) -func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.Glue) (err error) { +func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) { build.LogInfo(build.Lightning) log.L().Info("cfg", zap.Stringer("cfg", taskCfg)) @@ -279,6 +302,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. // initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust // and also put it here could avoid injecting another two SkipRunTask failpoint to caller + g := o.glue if g == nil { db, err := restore.DBFromConfig(ctx, taskCfg.TiDB) if err != nil { @@ -287,13 +311,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) } - u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) - if err != nil { - return common.NormalizeError(err) - } - s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) - if err != nil { - return common.NormalizeError(err) + s := o.externalStorage + if s == nil { + u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) + if err != nil { + return common.NormalizeError(err) + } + s, err = storage.New(ctx, u, &storage.ExternalStorageOptions{}) + if err != nil { + return common.NormalizeError(err) + } } // return expectedErr means at least meet one file @@ -333,7 +360,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. var procedure *restore.Controller - procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g) + param := &restore.ControllerParam{ + DBMetas: dbMetas, + Status: &l.status, + ExtStorage: s, + OwnExtStorage: o.externalStorage == nil, + Glue: g, + CpNameInExtStorage: o.cpNameInExtStorage, + } + + procedure, err = restore.NewRestoreController(ctx, taskCfg, param) if err != nil { log.L().Error("restore failed", log.ShortError(err)) return errors.Trace(err) diff --git a/br/pkg/lightning/lightning_serial_test.go b/br/pkg/lightning/lightning_serial_test.go index 3dd46d33c1f2f..3a5d8a7200144 100644 --- a/br/pkg/lightning/lightning_serial_test.go +++ b/br/pkg/lightning/lightning_serial_test.go @@ -22,11 +22,12 @@ import ( "github.com/docker/go-units" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" + "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/glue" "github.com/pingcap/tidb/br/pkg/lightning/mydump" - "github.com/stretchr/testify/require" ) func TestInitEnv(t *testing.T) { @@ -61,6 +62,7 @@ func TestRun(t *testing.T) { path, _ := filepath.Abs(".") ctx := context.Background() invalidGlue := glue.NewExternalTiDBGlue(nil, 0) + o := &options{glue: invalidGlue} err = lightning.run(ctx, &config.Config{ Mydumper: config.MydumperRuntime{ SourceDir: "file://" + filepath.ToSlash(path), @@ -71,7 +73,7 @@ func TestRun(t *testing.T) { Enable: true, Driver: "invalid", }, - }, invalidGlue) + }, o) require.EqualError(t, err, "[Lightning:Checkpoint:ErrUnknownCheckpointDriver]unknown checkpoint driver 'invalid'") err = lightning.run(ctx, &config.Config{ @@ -84,7 +86,7 @@ func TestRun(t *testing.T) { Driver: "file", DSN: "any-file", }, - }, invalidGlue) + }, o) require.Error(t, err) } diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 1a56c741c3b2e..b6b4a24064435 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -35,6 +35,7 @@ import ( "modernc.org/mathutil" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -348,7 +349,6 @@ func (rc *Controller) checkClusterRegion(ctx context.Context) error { } // StoragePermission checks whether Lightning has enough permission to storage. -// this test cannot be skipped. func (rc *Controller) StoragePermission(ctx context.Context) error { passed := true message := "Lightning has the correct storage permission" diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 2c9f48e089497..4875effd0a36f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -31,6 +31,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" + pd "github.com/tikv/pd/client" + "go.uber.org/atomic" + "go.uber.org/multierr" + "go.uber.org/zap" + "modernc.org/mathutil" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/importer" @@ -57,11 +63,6 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/collate" - pd "github.com/tikv/pd/client" - "go.uber.org/atomic" - "go.uber.org/multierr" - "go.uber.org/zap" - "modernc.org/mathutil" ) const ( @@ -262,6 +263,7 @@ type Controller struct { closedEngineLimit *worker.Pool store storage.ExternalStorage + ownStore bool metaMgrBuilder metaMgrBuilder errorMgr *errormanager.ErrorManager taskMgr taskMetaMgr @@ -277,37 +279,59 @@ type LightningStatus struct { TotalFileSize atomic.Int64 } +// ControllerParam contains many parameters for creating a Controller. +type ControllerParam struct { + // databases that dumper created + DBMetas []*mydump.MDDatabaseMeta + // a pointer to status to report it to caller + Status *LightningStatus + // storage interface to read the dump data + ExtStorage storage.ExternalStorage + // if ExtStorage is created by lightning. In some cases where lightning is a library, the framework may pass an ExtStorage + OwnExtStorage bool + // used by lightning server mode to pause tasks + Pauser *common.Pauser + // lightning via SQL will implement its glue, to let lightning use host TiDB's environment + Glue glue.Glue + // when not OwnExtStorage, checkpoint can also be saved in framework-created ExtStorage by setting this field + CpNameInExtStorage string +} + func NewRestoreController( ctx context.Context, - dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, - status *LightningStatus, - s storage.ExternalStorage, - g glue.Glue, + param *ControllerParam, ) (*Controller, error) { - return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, status, s, DeliverPauser, g) + param.Pauser = DeliverPauser + return NewRestoreControllerWithPauser(ctx, cfg, param) } func NewRestoreControllerWithPauser( ctx context.Context, - dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, - status *LightningStatus, - s storage.ExternalStorage, - pauser *common.Pauser, - g glue.Glue, + p *ControllerParam, ) (*Controller, error) { tls, err := cfg.ToTLS() if err != nil { return nil, err } - cpdb, err := g.OpenCheckpointsDB(ctx, cfg) - if err != nil { - if berrors.Is(err, common.ErrUnknownCheckpointDriver) { - return nil, err + var cpdb checkpoints.DB + // if cpNameInExtStorage is set, we should use given ExternalStorage to create checkpoints. + if p.CpNameInExtStorage != "" { + cpdb, err = checkpoints.NewFileCheckpointsDBWithExstorageFileName(ctx, p.ExtStorage.URI(), p.ExtStorage, p.CpNameInExtStorage) + if err != nil { + return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() + } + } else { + // use another function to create file checkpoint + cpdb, err = p.Glue.OpenCheckpointsDB(ctx, cfg) + if err != nil { + if berrors.Is(err, common.ErrUnknownCheckpointDriver) { + return nil, err + } + return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } - return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } taskCp, err := cpdb.TaskCheckpoint(ctx) @@ -323,7 +347,7 @@ func NewRestoreControllerWithPauser( } // TODO: support Lightning via SQL - db, err := g.GetDB() + db, err := p.Glue.GetDB() if err != nil { return nil, errors.Trace(err) } @@ -365,7 +389,7 @@ func NewRestoreControllerWithPauser( } } - backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr) + backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -395,15 +419,15 @@ func NewRestoreControllerWithPauser( rc := &Controller{ cfg: cfg, - dbMetas: dbMetas, + dbMetas: p.DBMetas, tableWorkers: nil, indexWorkers: nil, regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"), ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"), checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), - pauser: pauser, + pauser: p.Pauser, backend: backend, - tidbGlue: g, + tidbGlue: p.Glue, sysVars: defaultImportantVariables, tls: tls, checkTemplate: NewSimpleTemplate(), @@ -413,11 +437,12 @@ func NewRestoreControllerWithPauser( saveCpCh: make(chan saveCp), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), - store: s, + store: p.ExtStorage, + ownStore: p.OwnExtStorage, metaMgrBuilder: metaBuilder, errorMgr: errorMgr, diskQuotaLock: newDiskQuotaLock(), - status: status, + status: p.Status, taskMgr: nil, } @@ -1863,8 +1888,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if rc.cfg.App.CheckRequirements { rc.ClusterIsAvailable(ctx) - if err := rc.StoragePermission(ctx); err != nil { - return errors.Trace(err) + if rc.ownStore { + if err := rc.StoragePermission(ctx); err != nil { + return errors.Trace(err) + } } } diff --git a/br/pkg/lightning/run_options.go b/br/pkg/lightning/run_options.go new file mode 100644 index 0000000000000..7c630ad61eaae --- /dev/null +++ b/br/pkg/lightning/run_options.go @@ -0,0 +1,52 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lightning + +import ( + "github.com/pingcap/tidb/br/pkg/lightning/glue" + "github.com/pingcap/tidb/br/pkg/storage" +) + +type options struct { + glue glue.Glue + externalStorage storage.ExternalStorage + cpNameInExtStorage string +} + +type Option func(*options) + +// WithGlue sets the glue to a lightning task. +// Typically, the glue is set when lightning is integrated with a TiDB. +func WithGlue(g glue.Glue) Option { + return func(o *options) { + o.glue = g + } +} + +// WithExternalStorage sets the external storage to a lightning task. +// Typically, the external storage is set when lightning is integrated with dataflow engine by DM. +func WithExternalStorage(s storage.ExternalStorage) Option { + return func(o *options) { + o.externalStorage = s + } +} + +// WithCheckpointInExternalStorage sets the checkpoint name in external storage to a lightning task. +// Typically, the checkpoint name is set when lightning is integrated with dataflow engine by DM. +func WithCheckpointInExternalStorage(cpName string) Option { + return func(o *options) { + o.cpNameInExtStorage = cpName + } +} From 34b1c89821a231b8651ba8272c26710c5e08599f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 14:00:07 +0800 Subject: [PATCH 2/8] add test Signed-off-by: lance6716 --- br/cmd/tidb-lightning/main.go | 5 ++- br/pkg/lightning/lightning.go | 44 ++++++++++++++++--- br/pkg/lightning/lightning_serial_test.go | 2 +- .../lightning/lightning_server_serial_test.go | 5 ++- br/tests/README.md | 3 +- br/tests/lightning_restore/config.toml | 4 ++ br/tests/lightning_restore/run.sh | 25 +++++++++++ br/tests/run.sh | 2 +- 8 files changed, 78 insertions(+), 12 deletions(-) diff --git a/br/cmd/tidb-lightning/main.go b/br/cmd/tidb-lightning/main.go index 533c5eec5641b..30813598f1c81 100644 --- a/br/cmd/tidb-lightning/main.go +++ b/br/cmd/tidb-lightning/main.go @@ -22,12 +22,13 @@ import ( "runtime/debug" "syscall" + "go.uber.org/zap" + "github.com/pingcap/tidb/br/pkg/lightning" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/web" - "go.uber.org/zap" ) func main() { @@ -91,7 +92,7 @@ func main() { if err := cfg.LoadFromGlobal(globalCfg); err != nil { return err } - return app.RunOnce(context.Background(), cfg, nil) + return app.RunOnceWithOptions(context.Background(), cfg) }() finished := true diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 764fe782b13f0..acb0414d2c4fe 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -225,16 +225,39 @@ func (l *Lightning) RunServer() error { } } +// RunOnceWithOptions is used by binary lightning and host when using lightning as a library. +// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its +// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options +// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may +// be used: +// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later. +// - WithExternalStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a +// storage by config +// - WithCheckpointInExternalStorage: caller has opened an external storage for lightning and want to save checkpoint +// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error { - if err := taskCfg.Adjust(taskCtx); err != nil { - return err - } - - taskCfg.TaskID = time.Now().UnixNano() o := &options{} for _, opt := range opts { opt(o) } + + failpoint.Inject("setExtStorage", func(val failpoint.Value) { + path := val.(string) + b, err := storage.ParseBackend(path, nil) + if err != nil { + panic(err) + } + s, err := storage.New(context.Background(), b, &storage.ExternalStorageOptions{}) + if err != nil { + panic(err) + } + o.externalStorage = s + }) + failpoint.Inject("setCpExtStorage", func(val failpoint.Value) { + file := val.(string) + o.cpNameInExtStorage = file + }) + // pre-check about options if o.externalStorage != nil && o.glue != nil { return common.NormalizeError(errors.New("WithExternalStorage and WithGlue can't be both set")) @@ -243,6 +266,17 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. return common.NormalizeError(errors.New("WithCpNameInExtStorage and WithGlue can't be both set")) } + if o.externalStorage != nil { + // we don't use it, set a value to pass Adjust + taskCfg.Mydumper.SourceDir = "noop://" + } + + if err := taskCfg.Adjust(taskCtx); err != nil { + return err + } + + taskCfg.TaskID = time.Now().UnixNano() + return l.run(taskCtx, taskCfg, o) } diff --git a/br/pkg/lightning/lightning_serial_test.go b/br/pkg/lightning/lightning_serial_test.go index 3a5d8a7200144..bc77e047d93d2 100644 --- a/br/pkg/lightning/lightning_serial_test.go +++ b/br/pkg/lightning/lightning_serial_test.go @@ -55,7 +55,7 @@ func TestRun(t *testing.T) { cfg := config.NewConfig() err := cfg.LoadFromGlobal(globalConfig) require.NoError(t, err) - err = lightning.RunOnce(context.Background(), cfg, nil) + err = lightning.RunOnceWithOptions(context.Background(), cfg) require.Error(t, err) require.Regexp(t, "`mydumper.data-source-dir` does not exist$", err.Error()) diff --git a/br/pkg/lightning/lightning_server_serial_test.go b/br/pkg/lightning/lightning_server_serial_test.go index 8863bf648cc1e..a25e44847b5b7 100644 --- a/br/pkg/lightning/lightning_server_serial_test.go +++ b/br/pkg/lightning/lightning_server_serial_test.go @@ -29,9 +29,10 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" + "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/web" - "github.com/stretchr/testify/require" ) // initProgressOnce is used to ensure init progress once to avoid data race. @@ -317,7 +318,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) { err := cfg.LoadFromGlobal(s.lightning.globalCfg) require.NoError(t, err) go func() { - errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil) + errCh <- s.lightning.RunOnceWithOptions(s.lightning.ctx, cfg) }() time.Sleep(600 * time.Millisecond) diff --git a/br/tests/README.md b/br/tests/README.md index ce2cfa63ebab5..53685c967ad24 100644 --- a/br/tests/README.md +++ b/br/tests/README.md @@ -67,7 +67,8 @@ If you have docker installed, you can skip step 1 and step 2 by running ## Running -Run `make br_integration_test` to execute the integration tests. This command will +Link `bin` directory by `cd br && ln -s ../bin bin` and run `make br_integration_test` to execute the integration tests. +This command will 1. Build `br` 2. Check that all 9 required executables and `br` executable exist diff --git a/br/tests/lightning_restore/config.toml b/br/tests/lightning_restore/config.toml index 93fb4f55ae45a..fba59357bc7fd 100644 --- a/br/tests/lightning_restore/config.toml +++ b/br/tests/lightning_restore/config.toml @@ -3,3 +3,7 @@ table-concurrency = 4 [tikv-importer] backend = 'local' + +[checkpoint] +enable = true +keep-after-success = "origin" diff --git a/br/tests/lightning_restore/run.sh b/br/tests/lightning_restore/run.sh index 41542acd45f8b..b831d1e353580 100755 --- a/br/tests/lightning_restore/run.sh +++ b/br/tests/lightning_restore/run.sh @@ -41,3 +41,28 @@ for i in $(seq "$TABLE_COUNT"); do run_sql "SELECT sum(i) FROM restore_tsr.tbl$i;" check_contains 'sum(i): 1' done + +# Reset and test setting external storage from outside +DBPATH2="$TEST_DIR/restore.ext_storage" +mkdir -p $DBPATH2 +echo 'CREATE DATABASE restore_tsr;' > "$DBPATH2/restore_tsr-schema-create.sql" +for i in $(seq "$TABLE_COUNT"); do + echo "CREATE TABLE tbl$i(i TINYINT);" > "$DBPATH2/restore_tsr.tbl$i-schema.sql" + echo "INSERT INTO tbl$i VALUES (1);" > "$DBPATH2/restore_tsr.tbl$i.sql" +done + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/setExtStorage=return(\"$DBPATH2\")" +export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/setCpExtStorage=return(\"test_checkpoint.pb\")" + +run_sql 'DROP DATABASE IF EXISTS restore_tsr' +run_lightning +echo "Import finished" + +# Verify all data are imported +for i in $(seq "$TABLE_COUNT"); do + run_sql "SELECT sum(i) FROM restore_tsr.tbl$i;" + check_contains 'sum(i): 1' +done + +# Verify checkpoint file is also created in external storage +[ -f "$DBPATH2/test_checkpoint.pb" ] diff --git a/br/tests/run.sh b/br/tests/run.sh index bbf17deb3e715..ef3b23470e096 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -24,7 +24,7 @@ rm -rf $TEST_DIR && mkdir -p $TEST_DIR # Generate TLS certs tests/_utils/generate_certs &> /dev/null -SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.sh | cut -d/ -f2 | sort)}" +SELECTED_TEST_NAME="lightning_restore" source tests/_utils/run_services trap stop_services EXIT From a8ff40992ba61f7b71a24fa8a494770d14799f95 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 14:02:56 +0800 Subject: [PATCH 3/8] remove debug message Signed-off-by: lance6716 --- br/pkg/lightning/restore/restore.go | 1 - br/tests/run.sh | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 4875effd0a36f..8d4518b966204 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -324,7 +324,6 @@ func NewRestoreControllerWithPauser( return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } } else { - // use another function to create file checkpoint cpdb, err = p.Glue.OpenCheckpointsDB(ctx, cfg) if err != nil { if berrors.Is(err, common.ErrUnknownCheckpointDriver) { diff --git a/br/tests/run.sh b/br/tests/run.sh index ef3b23470e096..bbf17deb3e715 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -24,7 +24,7 @@ rm -rf $TEST_DIR && mkdir -p $TEST_DIR # Generate TLS certs tests/_utils/generate_certs &> /dev/null -SELECTED_TEST_NAME="lightning_restore" +SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.sh | cut -d/ -f2 | sort)}" source tests/_utils/run_services trap stop_services EXIT From 17328ab9945f118ca7ceaf94177ec7441197de66 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 15:34:03 +0800 Subject: [PATCH 4/8] address comment Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 10 +++++----- br/pkg/lightning/run_options.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index acb0414d2c4fe..2a4f656d42db6 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -255,15 +255,15 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. }) failpoint.Inject("setCpExtStorage", func(val failpoint.Value) { file := val.(string) - o.cpNameInExtStorage = file + o.checkpointName = file }) // pre-check about options if o.externalStorage != nil && o.glue != nil { - return common.NormalizeError(errors.New("WithExternalStorage and WithGlue can't be both set")) + return common.ErrInvalidArgument.GenWithStack("WithExternalStorage and WithGlue can't be both set") } - if o.cpNameInExtStorage != "" && o.glue != nil { - return common.NormalizeError(errors.New("WithCpNameInExtStorage and WithGlue can't be both set")) + if o.checkpointName != "" && o.glue != nil { + return common.ErrInvalidArgument.GenWithStack("WithCpNameInExtStorage and WithGlue can't be both set") } if o.externalStorage != nil { @@ -400,7 +400,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti ExtStorage: s, OwnExtStorage: o.externalStorage == nil, Glue: g, - CpNameInExtStorage: o.cpNameInExtStorage, + CpNameInExtStorage: o.checkpointName, } procedure, err = restore.NewRestoreController(ctx, taskCfg, param) diff --git a/br/pkg/lightning/run_options.go b/br/pkg/lightning/run_options.go index 7c630ad61eaae..2395e1601566a 100644 --- a/br/pkg/lightning/run_options.go +++ b/br/pkg/lightning/run_options.go @@ -20,9 +20,9 @@ import ( ) type options struct { - glue glue.Glue - externalStorage storage.ExternalStorage - cpNameInExtStorage string + glue glue.Glue + externalStorage storage.ExternalStorage + checkpointName string } type Option func(*options) @@ -47,6 +47,6 @@ func WithExternalStorage(s storage.ExternalStorage) Option { // Typically, the checkpoint name is set when lightning is integrated with dataflow engine by DM. func WithCheckpointInExternalStorage(cpName string) Option { return func(o *options) { - o.cpNameInExtStorage = cpName + o.checkpointName = cpName } } From a7011e69d9ca5cafe0ca866b6629f865ddb8f043 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 17:59:36 +0800 Subject: [PATCH 5/8] address comment Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 33 +++++++++++++++-------------- br/pkg/lightning/restore/restore.go | 18 +++++++++------- br/pkg/lightning/run_options.go | 18 +++++++++------- br/tests/lightning_restore/run.sh | 2 +- 4 files changed, 38 insertions(+), 33 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 2a4f656d42db6..3b4cda1c7adc2 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -231,9 +231,9 @@ func (l *Lightning) RunServer() error { // - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may // be used: // - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later. -// - WithExternalStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a +// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a // storage by config -// - WithCheckpointInExternalStorage: caller has opened an external storage for lightning and want to save checkpoint +// - WithCheckpointStorage: caller has opened an external storage for lightning and want to save checkpoint // in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error { o := &options{} @@ -251,22 +251,23 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. if err != nil { panic(err) } - o.externalStorage = s + o.dumpFileStorage = s + o.checkpointStorage = s }) - failpoint.Inject("setCpExtStorage", func(val failpoint.Value) { + failpoint.Inject("setCheckpointName", func(val failpoint.Value) { file := val.(string) o.checkpointName = file }) // pre-check about options - if o.externalStorage != nil && o.glue != nil { - return common.ErrInvalidArgument.GenWithStack("WithExternalStorage and WithGlue can't be both set") + if o.dumpFileStorage != nil && o.glue != nil { + return common.ErrInvalidArgument.GenWithStack("WithDumpFileStorage and WithGlue can't be both set") } - if o.checkpointName != "" && o.glue != nil { - return common.ErrInvalidArgument.GenWithStack("WithCpNameInExtStorage and WithGlue can't be both set") + if o.checkpointStorage != nil && o.glue != nil { + return common.ErrInvalidArgument.GenWithStack("WithCheckpointStorage and WithGlue can't be both set") } - if o.externalStorage != nil { + if o.dumpFileStorage != nil { // we don't use it, set a value to pass Adjust taskCfg.Mydumper.SourceDir = "noop://" } @@ -345,7 +346,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) } - s := o.externalStorage + s := o.dumpFileStorage if s == nil { u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) if err != nil { @@ -395,12 +396,12 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti var procedure *restore.Controller param := &restore.ControllerParam{ - DBMetas: dbMetas, - Status: &l.status, - ExtStorage: s, - OwnExtStorage: o.externalStorage == nil, - Glue: g, - CpNameInExtStorage: o.checkpointName, + DBMetas: dbMetas, + Status: &l.status, + DumpFileStorage: s, + OwnExtStorage: o.dumpFileStorage == nil, + Glue: g, + CheckpointName: o.checkpointName, } procedure, err = restore.NewRestoreController(ctx, taskCfg, param) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 8d4518b966204..d6460b7bb2533 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -286,15 +286,17 @@ type ControllerParam struct { // a pointer to status to report it to caller Status *LightningStatus // storage interface to read the dump data - ExtStorage storage.ExternalStorage - // if ExtStorage is created by lightning. In some cases where lightning is a library, the framework may pass an ExtStorage + DumpFileStorage storage.ExternalStorage + // true if DumpFileStorage is created by lightning. In some cases where lightning is a library, the framework may pass an DumpFileStorage OwnExtStorage bool // used by lightning server mode to pause tasks Pauser *common.Pauser // lightning via SQL will implement its glue, to let lightning use host TiDB's environment Glue glue.Glue - // when not OwnExtStorage, checkpoint can also be saved in framework-created ExtStorage by setting this field - CpNameInExtStorage string + // storage interface to write file checkpoints + CheckpointStorage storage.ExternalStorage + // when CheckpointStorage is not nil, save file checkpoint to it with this name + CheckpointName string } func NewRestoreController( @@ -317,9 +319,9 @@ func NewRestoreControllerWithPauser( } var cpdb checkpoints.DB - // if cpNameInExtStorage is set, we should use given ExternalStorage to create checkpoints. - if p.CpNameInExtStorage != "" { - cpdb, err = checkpoints.NewFileCheckpointsDBWithExstorageFileName(ctx, p.ExtStorage.URI(), p.ExtStorage, p.CpNameInExtStorage) + // if CheckpointStorage is set, we should use given ExternalStorage to create checkpoints. + if p.CheckpointStorage != nil { + cpdb, err = checkpoints.NewFileCheckpointsDBWithExstorageFileName(ctx, p.CheckpointStorage.URI(), p.CheckpointStorage, p.CheckpointName) if err != nil { return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } @@ -436,7 +438,7 @@ func NewRestoreControllerWithPauser( saveCpCh: make(chan saveCp), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), - store: p.ExtStorage, + store: p.DumpFileStorage, ownStore: p.OwnExtStorage, metaMgrBuilder: metaBuilder, errorMgr: errorMgr, diff --git a/br/pkg/lightning/run_options.go b/br/pkg/lightning/run_options.go index 2395e1601566a..8182794df3eaa 100644 --- a/br/pkg/lightning/run_options.go +++ b/br/pkg/lightning/run_options.go @@ -20,9 +20,10 @@ import ( ) type options struct { - glue glue.Glue - externalStorage storage.ExternalStorage - checkpointName string + glue glue.Glue + dumpFileStorage storage.ExternalStorage + checkpointStorage storage.ExternalStorage + checkpointName string } type Option func(*options) @@ -35,18 +36,19 @@ func WithGlue(g glue.Glue) Option { } } -// WithExternalStorage sets the external storage to a lightning task. +// WithDumpFileStorage sets the external storage to a lightning task. // Typically, the external storage is set when lightning is integrated with dataflow engine by DM. -func WithExternalStorage(s storage.ExternalStorage) Option { +func WithDumpFileStorage(s storage.ExternalStorage) Option { return func(o *options) { - o.externalStorage = s + o.dumpFileStorage = s } } -// WithCheckpointInExternalStorage sets the checkpoint name in external storage to a lightning task. +// WithCheckpointStorage sets the checkpoint name in external storage to a lightning task. // Typically, the checkpoint name is set when lightning is integrated with dataflow engine by DM. -func WithCheckpointInExternalStorage(cpName string) Option { +func WithCheckpointStorage(s storage.ExternalStorage, cpName string) Option { return func(o *options) { + o.checkpointStorage = s o.checkpointName = cpName } } diff --git a/br/tests/lightning_restore/run.sh b/br/tests/lightning_restore/run.sh index b831d1e353580..a168bca4a513e 100755 --- a/br/tests/lightning_restore/run.sh +++ b/br/tests/lightning_restore/run.sh @@ -52,7 +52,7 @@ for i in $(seq "$TABLE_COUNT"); do done export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/setExtStorage=return(\"$DBPATH2\")" -export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/setCpExtStorage=return(\"test_checkpoint.pb\")" +export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/setCheckpointName=return(\"test_checkpoint.pb\")" run_sql 'DROP DATABASE IF EXISTS restore_tsr' run_lightning From 6e9073de5787dfafca17d196ca21745626931352 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 22 Mar 2022 18:33:26 +0800 Subject: [PATCH 6/8] address comment Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 3b4cda1c7adc2..961a1e7695c04 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -260,6 +260,8 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. }) // pre-check about options + // glue should be set when lightning in TiDB, and storages should be set when lightning in DM/dataflow engine, + // so they should not both be set. if o.dumpFileStorage != nil && o.glue != nil { return common.ErrInvalidArgument.GenWithStack("WithDumpFileStorage and WithGlue can't be both set") } From 1a1196469ca9af6339f1685743ef4ea3d95c95f1 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 23 Mar 2022 11:17:55 +0800 Subject: [PATCH 7/8] address comment Signed-off-by: lance6716 --- br/pkg/lightning/lightning.go | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 961a1e7695c04..df4b004dbe289 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -259,16 +259,6 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. o.checkpointName = file }) - // pre-check about options - // glue should be set when lightning in TiDB, and storages should be set when lightning in DM/dataflow engine, - // so they should not both be set. - if o.dumpFileStorage != nil && o.glue != nil { - return common.ErrInvalidArgument.GenWithStack("WithDumpFileStorage and WithGlue can't be both set") - } - if o.checkpointStorage != nil && o.glue != nil { - return common.ErrInvalidArgument.GenWithStack("WithCheckpointStorage and WithGlue can't be both set") - } - if o.dumpFileStorage != nil { // we don't use it, set a value to pass Adjust taskCfg.Mydumper.SourceDir = "noop://" @@ -398,12 +388,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti var procedure *restore.Controller param := &restore.ControllerParam{ - DBMetas: dbMetas, - Status: &l.status, - DumpFileStorage: s, - OwnExtStorage: o.dumpFileStorage == nil, - Glue: g, - CheckpointName: o.checkpointName, + DBMetas: dbMetas, + Status: &l.status, + DumpFileStorage: s, + OwnExtStorage: o.dumpFileStorage == nil, + Glue: g, + CheckpointStorage: o.checkpointStorage, + CheckpointName: o.checkpointName, } procedure, err = restore.NewRestoreController(ctx, taskCfg, param) From 5ac26f09ba5888a07b2345d12451172bc8a5c6b5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 25 Mar 2022 15:44:30 +0800 Subject: [PATCH 8/8] address comment Signed-off-by: lance6716 --- br/cmd/tidb-lightning/main.go | 3 +-- br/pkg/lightning/checkpoints/checkpoints.go | 5 ++--- br/pkg/lightning/lightning.go | 4 +--- br/pkg/lightning/lightning_serial_test.go | 3 +-- br/pkg/lightning/lightning_server_serial_test.go | 3 +-- br/pkg/lightning/restore/check_info.go | 8 +++----- br/pkg/lightning/restore/restore.go | 11 +++++------ 7 files changed, 14 insertions(+), 23 deletions(-) diff --git a/br/cmd/tidb-lightning/main.go b/br/cmd/tidb-lightning/main.go index 30813598f1c81..079182ce0863e 100644 --- a/br/cmd/tidb-lightning/main.go +++ b/br/cmd/tidb-lightning/main.go @@ -22,13 +22,12 @@ import ( "runtime/debug" "syscall" - "go.uber.org/zap" - "github.com/pingcap/tidb/br/pkg/lightning" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/web" + "go.uber.org/zap" ) func main() { diff --git a/br/pkg/lightning/checkpoints/checkpoints.go b/br/pkg/lightning/checkpoints/checkpoints.go index 76864cd986567..d2d5320595999 100644 --- a/br/pkg/lightning/checkpoints/checkpoints.go +++ b/br/pkg/lightning/checkpoints/checkpoints.go @@ -28,9 +28,6 @@ import ( "github.com/joho/sqltocsv" "github.com/pingcap/errors" - "go.uber.org/zap" - "modernc.org/mathutil" - "github.com/pingcap/tidb/br/pkg/lightning/checkpoints/checkpointspb" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -39,6 +36,8 @@ import ( verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/version/build" + "go.uber.org/zap" + "modernc.org/mathutil" ) type CheckpointStatus uint8 diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index df4b004dbe289..f56740c185014 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -30,10 +30,10 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/importer" "github.com/pingcap/tidb/br/pkg/lightning/backend/local" @@ -50,8 +50,6 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" - - "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/shurcooL/httpgzip" "go.uber.org/zap" diff --git a/br/pkg/lightning/lightning_serial_test.go b/br/pkg/lightning/lightning_serial_test.go index bc77e047d93d2..1a9b0c9692495 100644 --- a/br/pkg/lightning/lightning_serial_test.go +++ b/br/pkg/lightning/lightning_serial_test.go @@ -22,12 +22,11 @@ import ( "github.com/docker/go-units" "github.com/pingcap/failpoint" - "github.com/stretchr/testify/require" - "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/glue" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/stretchr/testify/require" ) func TestInitEnv(t *testing.T) { diff --git a/br/pkg/lightning/lightning_server_serial_test.go b/br/pkg/lightning/lightning_server_serial_test.go index a25e44847b5b7..478dafdcac68a 100644 --- a/br/pkg/lightning/lightning_server_serial_test.go +++ b/br/pkg/lightning/lightning_server_serial_test.go @@ -29,10 +29,9 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/stretchr/testify/require" - "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/web" + "github.com/stretchr/testify/require" ) // initProgressOnce is used to ensure init progress once to avoid data race. diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index b6b4a24064435..98c2c3b720fc0 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -30,12 +30,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "modernc.org/mathutil" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -53,6 +48,9 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "modernc.org/mathutil" ) const ( diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index d6460b7bb2533..b71b97d312152 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -31,12 +31,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" - pd "github.com/tikv/pd/client" - "go.uber.org/atomic" - "go.uber.org/multierr" - "go.uber.org/zap" - "modernc.org/mathutil" - berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/importer" @@ -63,6 +57,11 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/collate" + pd "github.com/tikv/pd/client" + "go.uber.org/atomic" + "go.uber.org/multierr" + "go.uber.org/zap" + "modernc.org/mathutil" ) const (