diff --git a/br/cmd/tidb-lightning/main.go b/br/cmd/tidb-lightning/main.go index 533c5eec5641b..079182ce0863e 100644 --- a/br/cmd/tidb-lightning/main.go +++ b/br/cmd/tidb-lightning/main.go @@ -91,7 +91,7 @@ func main() { if err := cfg.LoadFromGlobal(globalCfg); err != nil { return err } - return app.RunOnce(context.Background(), cfg, nil) + return app.RunOnceWithOptions(context.Background(), cfg) }() finished := true diff --git a/br/pkg/lightning/checkpoints/checkpoints.go b/br/pkg/lightning/checkpoints/checkpoints.go index f323c58e9b88c..d2d5320595999 100644 --- a/br/pkg/lightning/checkpoints/checkpoints.go +++ b/br/pkg/lightning/checkpoints/checkpoints.go @@ -954,24 +954,23 @@ type FileCheckpointsDB struct { exStorage storage.ExternalStorage } -func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) { +func newFileCheckpointsDB( + ctx context.Context, + path string, + exStorage storage.ExternalStorage, + fileName string, +) (*FileCheckpointsDB, error) { cpdb := &FileCheckpointsDB{ - path: path, checkpoints: checkpointspb.CheckpointsModel{ TaskCheckpoint: &checkpointspb.TaskCheckpointModel{}, Checkpoints: map[string]*checkpointspb.TableCheckpointModel{}, }, + ctx: ctx, + path: path, + fileName: fileName, + exStorage: exStorage, } - // init ExternalStorage - s, fileName, err := createExstorageByCompletePath(ctx, path) - if err != nil { - return nil, errors.Trace(err) - } - cpdb.ctx = ctx - cpdb.fileName = fileName - cpdb.exStorage = s - if cpdb.fileName == "" { return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path) } @@ -1013,6 +1012,24 @@ func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, return cpdb, nil } +func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) { + // init ExternalStorage + s, fileName, err := createExstorageByCompletePath(ctx, path) + if err != nil { + return nil, errors.Trace(err) + } + return newFileCheckpointsDB(ctx, path, s, fileName) +} + +func NewFileCheckpointsDBWithExstorageFileName( + ctx context.Context, + path string, + s storage.ExternalStorage, + fileName string, +) (*FileCheckpointsDB, error) { + return newFileCheckpointsDB(ctx, path, s, fileName) +} + // createExstorageByCompletePath create ExternalStorage by completePath and return fileName. func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) { if completePath == "" { diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 907b316ab8399..f56740c185014 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -49,8 +50,6 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" - - "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/shurcooL/httpgzip" "go.uber.org/zap" @@ -188,6 +187,7 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { // use a default glue later. // - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a // caller implemented glue. +// deprecated: use RunOnceWithOptions instead. func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error { if err := taskCfg.Adjust(taskCtx); err != nil { return err @@ -198,7 +198,7 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu taskCfg.TaskID = int64(val.(int)) }) - return l.run(taskCtx, taskCfg, glue) + return l.run(taskCtx, taskCfg, &options{glue: glue}) } func (l *Lightning) RunServer() error { @@ -223,12 +223,60 @@ func (l *Lightning) RunServer() error { } } +// RunOnceWithOptions is used by binary lightning and host when using lightning as a library. +// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its +// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options +// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may +// be used: +// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later. +// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a +// storage by config +// - WithCheckpointStorage: caller has opened an external storage for lightning and want to save checkpoint +// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config +func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error { + o := &options{} + for _, opt := range opts { + opt(o) + } + + failpoint.Inject("setExtStorage", func(val failpoint.Value) { + path := val.(string) + b, err := storage.ParseBackend(path, nil) + if err != nil { + panic(err) + } + s, err := storage.New(context.Background(), b, &storage.ExternalStorageOptions{}) + if err != nil { + panic(err) + } + o.dumpFileStorage = s + o.checkpointStorage = s + }) + failpoint.Inject("setCheckpointName", func(val failpoint.Value) { + file := val.(string) + o.checkpointName = file + }) + + if o.dumpFileStorage != nil { + // we don't use it, set a value to pass Adjust + taskCfg.Mydumper.SourceDir = "noop://" + } + + if err := taskCfg.Adjust(taskCtx); err != nil { + return err + } + + taskCfg.TaskID = time.Now().UnixNano() + + return l.run(taskCtx, taskCfg, o) +} + var ( taskRunNotifyKey = "taskRunNotifyKey" taskCfgRecorderKey = "taskCfgRecorderKey" ) -func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.Glue) (err error) { +func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) { build.LogInfo(build.Lightning) log.L().Info("cfg", zap.Stringer("cfg", taskCfg)) @@ -279,6 +327,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. // initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust // and also put it here could avoid injecting another two SkipRunTask failpoint to caller + g := o.glue if g == nil { db, err := restore.DBFromConfig(ctx, taskCfg.TiDB) if err != nil { @@ -287,13 +336,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) } - u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) - if err != nil { - return common.NormalizeError(err) - } - s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) - if err != nil { - return common.NormalizeError(err) + s := o.dumpFileStorage + if s == nil { + u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) + if err != nil { + return common.NormalizeError(err) + } + s, err = storage.New(ctx, u, &storage.ExternalStorageOptions{}) + if err != nil { + return common.NormalizeError(err) + } } // return expectedErr means at least meet one file @@ -333,7 +385,17 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. var procedure *restore.Controller - procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g) + param := &restore.ControllerParam{ + DBMetas: dbMetas, + Status: &l.status, + DumpFileStorage: s, + OwnExtStorage: o.dumpFileStorage == nil, + Glue: g, + CheckpointStorage: o.checkpointStorage, + CheckpointName: o.checkpointName, + } + + procedure, err = restore.NewRestoreController(ctx, taskCfg, param) if err != nil { log.L().Error("restore failed", log.ShortError(err)) return errors.Trace(err) diff --git a/br/pkg/lightning/lightning_serial_test.go b/br/pkg/lightning/lightning_serial_test.go index 3dd46d33c1f2f..1a9b0c9692495 100644 --- a/br/pkg/lightning/lightning_serial_test.go +++ b/br/pkg/lightning/lightning_serial_test.go @@ -54,13 +54,14 @@ func TestRun(t *testing.T) { cfg := config.NewConfig() err := cfg.LoadFromGlobal(globalConfig) require.NoError(t, err) - err = lightning.RunOnce(context.Background(), cfg, nil) + err = lightning.RunOnceWithOptions(context.Background(), cfg) require.Error(t, err) require.Regexp(t, "`mydumper.data-source-dir` does not exist$", err.Error()) path, _ := filepath.Abs(".") ctx := context.Background() invalidGlue := glue.NewExternalTiDBGlue(nil, 0) + o := &options{glue: invalidGlue} err = lightning.run(ctx, &config.Config{ Mydumper: config.MydumperRuntime{ SourceDir: "file://" + filepath.ToSlash(path), @@ -71,7 +72,7 @@ func TestRun(t *testing.T) { Enable: true, Driver: "invalid", }, - }, invalidGlue) + }, o) require.EqualError(t, err, "[Lightning:Checkpoint:ErrUnknownCheckpointDriver]unknown checkpoint driver 'invalid'") err = lightning.run(ctx, &config.Config{ @@ -84,7 +85,7 @@ func TestRun(t *testing.T) { Driver: "file", DSN: "any-file", }, - }, invalidGlue) + }, o) require.Error(t, err) } diff --git a/br/pkg/lightning/lightning_server_serial_test.go b/br/pkg/lightning/lightning_server_serial_test.go index 8863bf648cc1e..478dafdcac68a 100644 --- a/br/pkg/lightning/lightning_server_serial_test.go +++ b/br/pkg/lightning/lightning_server_serial_test.go @@ -317,7 +317,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) { err := cfg.LoadFromGlobal(s.lightning.globalCfg) require.NoError(t, err) go func() { - errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil) + errCh <- s.lightning.RunOnceWithOptions(s.lightning.ctx, cfg) }() time.Sleep(600 * time.Millisecond) diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 1a56c741c3b2e..98c2c3b720fc0 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -30,10 +30,6 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "modernc.org/mathutil" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -52,6 +48,9 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "modernc.org/mathutil" ) const ( @@ -348,7 +347,6 @@ func (rc *Controller) checkClusterRegion(ctx context.Context) error { } // StoragePermission checks whether Lightning has enough permission to storage. -// this test cannot be skipped. func (rc *Controller) StoragePermission(ctx context.Context) error { passed := true message := "Lightning has the correct storage permission" diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 2c9f48e089497..b71b97d312152 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -262,6 +262,7 @@ type Controller struct { closedEngineLimit *worker.Pool store storage.ExternalStorage + ownStore bool metaMgrBuilder metaMgrBuilder errorMgr *errormanager.ErrorManager taskMgr taskMetaMgr @@ -277,37 +278,60 @@ type LightningStatus struct { TotalFileSize atomic.Int64 } +// ControllerParam contains many parameters for creating a Controller. +type ControllerParam struct { + // databases that dumper created + DBMetas []*mydump.MDDatabaseMeta + // a pointer to status to report it to caller + Status *LightningStatus + // storage interface to read the dump data + DumpFileStorage storage.ExternalStorage + // true if DumpFileStorage is created by lightning. In some cases where lightning is a library, the framework may pass an DumpFileStorage + OwnExtStorage bool + // used by lightning server mode to pause tasks + Pauser *common.Pauser + // lightning via SQL will implement its glue, to let lightning use host TiDB's environment + Glue glue.Glue + // storage interface to write file checkpoints + CheckpointStorage storage.ExternalStorage + // when CheckpointStorage is not nil, save file checkpoint to it with this name + CheckpointName string +} + func NewRestoreController( ctx context.Context, - dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, - status *LightningStatus, - s storage.ExternalStorage, - g glue.Glue, + param *ControllerParam, ) (*Controller, error) { - return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, status, s, DeliverPauser, g) + param.Pauser = DeliverPauser + return NewRestoreControllerWithPauser(ctx, cfg, param) } func NewRestoreControllerWithPauser( ctx context.Context, - dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, - status *LightningStatus, - s storage.ExternalStorage, - pauser *common.Pauser, - g glue.Glue, + p *ControllerParam, ) (*Controller, error) { tls, err := cfg.ToTLS() if err != nil { return nil, err } - cpdb, err := g.OpenCheckpointsDB(ctx, cfg) - if err != nil { - if berrors.Is(err, common.ErrUnknownCheckpointDriver) { - return nil, err + var cpdb checkpoints.DB + // if CheckpointStorage is set, we should use given ExternalStorage to create checkpoints. + if p.CheckpointStorage != nil { + cpdb, err = checkpoints.NewFileCheckpointsDBWithExstorageFileName(ctx, p.CheckpointStorage.URI(), p.CheckpointStorage, p.CheckpointName) + if err != nil { + return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() + } + } else { + cpdb, err = p.Glue.OpenCheckpointsDB(ctx, cfg) + if err != nil { + if berrors.Is(err, common.ErrUnknownCheckpointDriver) { + return nil, err + } + return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } - return nil, common.ErrOpenCheckpoint.Wrap(err).GenWithStackByArgs() } taskCp, err := cpdb.TaskCheckpoint(ctx) @@ -323,7 +347,7 @@ func NewRestoreControllerWithPauser( } // TODO: support Lightning via SQL - db, err := g.GetDB() + db, err := p.Glue.GetDB() if err != nil { return nil, errors.Trace(err) } @@ -365,7 +389,7 @@ func NewRestoreControllerWithPauser( } } - backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr) + backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -395,15 +419,15 @@ func NewRestoreControllerWithPauser( rc := &Controller{ cfg: cfg, - dbMetas: dbMetas, + dbMetas: p.DBMetas, tableWorkers: nil, indexWorkers: nil, regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"), ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"), checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), - pauser: pauser, + pauser: p.Pauser, backend: backend, - tidbGlue: g, + tidbGlue: p.Glue, sysVars: defaultImportantVariables, tls: tls, checkTemplate: NewSimpleTemplate(), @@ -413,11 +437,12 @@ func NewRestoreControllerWithPauser( saveCpCh: make(chan saveCp), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), - store: s, + store: p.DumpFileStorage, + ownStore: p.OwnExtStorage, metaMgrBuilder: metaBuilder, errorMgr: errorMgr, diskQuotaLock: newDiskQuotaLock(), - status: status, + status: p.Status, taskMgr: nil, } @@ -1863,8 +1888,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if rc.cfg.App.CheckRequirements { rc.ClusterIsAvailable(ctx) - if err := rc.StoragePermission(ctx); err != nil { - return errors.Trace(err) + if rc.ownStore { + if err := rc.StoragePermission(ctx); err != nil { + return errors.Trace(err) + } } } diff --git a/br/pkg/lightning/run_options.go b/br/pkg/lightning/run_options.go new file mode 100644 index 0000000000000..8182794df3eaa --- /dev/null +++ b/br/pkg/lightning/run_options.go @@ -0,0 +1,54 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lightning + +import ( + "github.com/pingcap/tidb/br/pkg/lightning/glue" + "github.com/pingcap/tidb/br/pkg/storage" +) + +type options struct { + glue glue.Glue + dumpFileStorage storage.ExternalStorage + checkpointStorage storage.ExternalStorage + checkpointName string +} + +type Option func(*options) + +// WithGlue sets the glue to a lightning task. +// Typically, the glue is set when lightning is integrated with a TiDB. +func WithGlue(g glue.Glue) Option { + return func(o *options) { + o.glue = g + } +} + +// WithDumpFileStorage sets the external storage to a lightning task. +// Typically, the external storage is set when lightning is integrated with dataflow engine by DM. +func WithDumpFileStorage(s storage.ExternalStorage) Option { + return func(o *options) { + o.dumpFileStorage = s + } +} + +// WithCheckpointStorage sets the checkpoint name in external storage to a lightning task. +// Typically, the checkpoint name is set when lightning is integrated with dataflow engine by DM. +func WithCheckpointStorage(s storage.ExternalStorage, cpName string) Option { + return func(o *options) { + o.checkpointStorage = s + o.checkpointName = cpName + } +} diff --git a/br/tests/README.md b/br/tests/README.md index ce2cfa63ebab5..53685c967ad24 100644 --- a/br/tests/README.md +++ b/br/tests/README.md @@ -67,7 +67,8 @@ If you have docker installed, you can skip step 1 and step 2 by running ## Running -Run `make br_integration_test` to execute the integration tests. This command will +Link `bin` directory by `cd br && ln -s ../bin bin` and run `make br_integration_test` to execute the integration tests. +This command will 1. Build `br` 2. Check that all 9 required executables and `br` executable exist diff --git a/br/tests/lightning_restore/config.toml b/br/tests/lightning_restore/config.toml index 93fb4f55ae45a..fba59357bc7fd 100644 --- a/br/tests/lightning_restore/config.toml +++ b/br/tests/lightning_restore/config.toml @@ -3,3 +3,7 @@ table-concurrency = 4 [tikv-importer] backend = 'local' + +[checkpoint] +enable = true +keep-after-success = "origin" diff --git a/br/tests/lightning_restore/run.sh b/br/tests/lightning_restore/run.sh index 41542acd45f8b..a168bca4a513e 100755 --- a/br/tests/lightning_restore/run.sh +++ b/br/tests/lightning_restore/run.sh @@ -41,3 +41,28 @@ for i in $(seq "$TABLE_COUNT"); do run_sql "SELECT sum(i) FROM restore_tsr.tbl$i;" check_contains 'sum(i): 1' done + +# Reset and test setting external storage from outside +DBPATH2="$TEST_DIR/restore.ext_storage" +mkdir -p $DBPATH2 +echo 'CREATE DATABASE restore_tsr;' > "$DBPATH2/restore_tsr-schema-create.sql" +for i in $(seq "$TABLE_COUNT"); do + echo "CREATE TABLE tbl$i(i TINYINT);" > "$DBPATH2/restore_tsr.tbl$i-schema.sql" + echo "INSERT INTO tbl$i VALUES (1);" > "$DBPATH2/restore_tsr.tbl$i.sql" +done + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/setExtStorage=return(\"$DBPATH2\")" +export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/tidb/br/pkg/lightning/setCheckpointName=return(\"test_checkpoint.pb\")" + +run_sql 'DROP DATABASE IF EXISTS restore_tsr' +run_lightning +echo "Import finished" + +# Verify all data are imported +for i in $(seq "$TABLE_COUNT"); do + run_sql "SELECT sum(i) FROM restore_tsr.tbl$i;" + check_contains 'sum(i): 1' +done + +# Verify checkpoint file is also created in external storage +[ -f "$DBPATH2/test_checkpoint.pb" ]