Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support inject external storage when as library #33303

Merged
merged 13 commits into from
Mar 25, 2022
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func main() {
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil)
return app.RunOnceWithOptions(context.Background(), cfg)
}()

finished := true
Expand Down
39 changes: 28 additions & 11 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,24 +954,23 @@ type FileCheckpointsDB struct {
exStorage storage.ExternalStorage
}

func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
func newFileCheckpointsDB(
ctx context.Context,
path string,
exStorage storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
cpdb := &FileCheckpointsDB{
path: path,
checkpoints: checkpointspb.CheckpointsModel{
TaskCheckpoint: &checkpointspb.TaskCheckpointModel{},
Checkpoints: map[string]*checkpointspb.TableCheckpointModel{},
},
ctx: ctx,
path: path,
fileName: fileName,
exStorage: exStorage,
}

// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
cpdb.ctx = ctx
cpdb.fileName = fileName
cpdb.exStorage = s

if cpdb.fileName == "" {
return nil, errors.Errorf("the checkpoint DSN '%s' must not be a directory", path)
}
Expand Down Expand Up @@ -1013,6 +1012,24 @@ func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB,
return cpdb, nil
}

func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error) {
// init ExternalStorage
s, fileName, err := createExstorageByCompletePath(ctx, path)
if err != nil {
return nil, errors.Trace(err)
}
return newFileCheckpointsDB(ctx, path, s, fileName)
}

func NewFileCheckpointsDBWithExstorageFileName(
ctx context.Context,
path string,
s storage.ExternalStorage,
fileName string,
) (*FileCheckpointsDB, error) {
return newFileCheckpointsDB(ctx, path, s, fileName)
}

// createExstorageByCompletePath create ExternalStorage by completePath and return fileName.
func createExstorageByCompletePath(ctx context.Context, completePath string) (storage.ExternalStorage, string, error) {
if completePath == "" {
Expand Down
86 changes: 74 additions & 12 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
Expand All @@ -49,8 +50,6 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/zap"
Expand Down Expand Up @@ -188,6 +187,7 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
// deprecated: use RunOnceWithOptions instead.
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error {
if err := taskCfg.Adjust(taskCtx); err != nil {
return err
Expand All @@ -198,7 +198,7 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu
taskCfg.TaskID = int64(val.(int))
})

return l.run(taskCtx, taskCfg, glue)
return l.run(taskCtx, taskCfg, &options{glue: glue})
}

func (l *Lightning) RunServer() error {
Expand All @@ -223,12 +223,60 @@ func (l *Lightning) RunServer() error {
}
}

// RunOnceWithOptions is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later.
// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a
// storage by config
// - WithCheckpointStorage: caller has opened an external storage for lightning and want to save checkpoint
// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config
func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error {
o := &options{}
for _, opt := range opts {
opt(o)
}

failpoint.Inject("setExtStorage", func(val failpoint.Value) {
path := val.(string)
b, err := storage.ParseBackend(path, nil)
if err != nil {
panic(err)
}
s, err := storage.New(context.Background(), b, &storage.ExternalStorageOptions{})
if err != nil {
panic(err)
}
o.dumpFileStorage = s
o.checkpointStorage = s
})
failpoint.Inject("setCheckpointName", func(val failpoint.Value) {
file := val.(string)
o.checkpointName = file
})

if o.dumpFileStorage != nil {
// we don't use it, set a value to pass Adjust
taskCfg.Mydumper.SourceDir = "noop://"
}

if err := taskCfg.Adjust(taskCtx); err != nil {
return err
}

taskCfg.TaskID = time.Now().UnixNano()

return l.run(taskCtx, taskCfg, o)
}

var (
taskRunNotifyKey = "taskRunNotifyKey"
taskCfgRecorderKey = "taskCfgRecorderKey"
)

func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.Glue) (err error) {
func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) {
build.LogInfo(build.Lightning)
log.L().Info("cfg", zap.Stringer("cfg", taskCfg))

Expand Down Expand Up @@ -279,6 +327,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.

// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust
// and also put it here could avoid injecting another two SkipRunTask failpoint to caller
g := o.glue
if g == nil {
db, err := restore.DBFromConfig(ctx, taskCfg.TiDB)
if err != nil {
Expand All @@ -287,13 +336,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode)
}

u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil)
if err != nil {
return common.NormalizeError(err)
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return common.NormalizeError(err)
s := o.dumpFileStorage
if s == nil {
u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil)
if err != nil {
return common.NormalizeError(err)
}
s, err = storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return common.NormalizeError(err)
}
}

// return expectedErr means at least meet one file
Expand Down Expand Up @@ -333,7 +385,17 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.

var procedure *restore.Controller

procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g)
param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
DumpFileStorage: s,
OwnExtStorage: o.dumpFileStorage == nil,
Glue: g,
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
}

procedure, err = restore.NewRestoreController(ctx, taskCfg, param)
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/lightning/lightning_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func TestRun(t *testing.T) {
cfg := config.NewConfig()
err := cfg.LoadFromGlobal(globalConfig)
require.NoError(t, err)
err = lightning.RunOnce(context.Background(), cfg, nil)
err = lightning.RunOnceWithOptions(context.Background(), cfg)
require.Error(t, err)
require.Regexp(t, "`mydumper.data-source-dir` does not exist$", err.Error())

path, _ := filepath.Abs(".")
ctx := context.Background()
invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
o := &options{glue: invalidGlue}
err = lightning.run(ctx, &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: "file://" + filepath.ToSlash(path),
Expand All @@ -71,7 +72,7 @@ func TestRun(t *testing.T) {
Enable: true,
Driver: "invalid",
},
}, invalidGlue)
}, o)
require.EqualError(t, err, "[Lightning:Checkpoint:ErrUnknownCheckpointDriver]unknown checkpoint driver 'invalid'")

err = lightning.run(ctx, &config.Config{
Expand All @@ -84,7 +85,7 @@ func TestRun(t *testing.T) {
Driver: "file",
DSN: "any-file",
},
}, invalidGlue)
}, o)
require.Error(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/lightning_server_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestHTTPAPIOutsideServerMode(t *testing.T) {
err := cfg.LoadFromGlobal(s.lightning.globalCfg)
require.NoError(t, err)
go func() {
errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil)
errCh <- s.lightning.RunOnceWithOptions(s.lightning.ctx, cfg)
}()
time.Sleep(600 * time.Millisecond)

Expand Down
8 changes: 3 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -52,6 +48,9 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -348,7 +347,6 @@ func (rc *Controller) checkClusterRegion(ctx context.Context) error {
}

// StoragePermission checks whether Lightning has enough permission to storage.
// this test cannot be skipped.
func (rc *Controller) StoragePermission(ctx context.Context) error {
passed := true
message := "Lightning has the correct storage permission"
Expand Down
Loading