-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lightning: support inject external storage when as library #33303
Merged
Merged
Changes from 4 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3e34d96
lightning: support inject external storage when as library
lance6716 34b1c89
add test
lance6716 a8ff409
remove debug message
lance6716 17328ab
address comment
lance6716 a8676cc
Merge branch 'master' of github.com:pingcap/tidb into lightning-extst…
lance6716 a7011e6
address comment
lance6716 6e9073d
address comment
lance6716 1a11964
address comment
lance6716 d5e2a6a
Merge branch 'master' into lightning-extstorage
ti-chi-bot 0c4917d
Merge branch 'master' into lightning-extstorage
ti-chi-bot 8d215c6
Merge branch 'master' into lightning-extstorage
ti-chi-bot b926161
Merge branch 'master' into lightning-extstorage
ti-chi-bot 5ac26f0
address comment
lance6716 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ import ( | |
"github.com/pingcap/errors" | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/kvproto/pkg/import_sstpb" | ||
|
||
"github.com/pingcap/tidb/br/pkg/lightning/backend" | ||
"github.com/pingcap/tidb/br/pkg/lightning/backend/importer" | ||
"github.com/pingcap/tidb/br/pkg/lightning/backend/local" | ||
|
@@ -188,6 +189,7 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { | |
// use a default glue later. | ||
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a | ||
// caller implemented glue. | ||
// deprecated: use RunOnceWithOptions instead. | ||
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error { | ||
if err := taskCfg.Adjust(taskCtx); err != nil { | ||
return err | ||
|
@@ -198,7 +200,7 @@ func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glu | |
taskCfg.TaskID = int64(val.(int)) | ||
}) | ||
|
||
return l.run(taskCtx, taskCfg, glue) | ||
return l.run(taskCtx, taskCfg, &options{glue: glue}) | ||
} | ||
|
||
func (l *Lightning) RunServer() error { | ||
|
@@ -223,12 +225,67 @@ func (l *Lightning) RunServer() error { | |
} | ||
} | ||
|
||
// RunOnceWithOptions is used by binary lightning and host when using lightning as a library. | ||
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its | ||
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options | ||
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may | ||
// be used: | ||
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later. | ||
// - WithExternalStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a | ||
// storage by config | ||
// - WithCheckpointInExternalStorage: caller has opened an external storage for lightning and want to save checkpoint | ||
// in it. Otherwise, lightning will save checkpoint by the Checkpoint.DSN in config | ||
func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config.Config, opts ...Option) error { | ||
o := &options{} | ||
for _, opt := range opts { | ||
opt(o) | ||
} | ||
|
||
failpoint.Inject("setExtStorage", func(val failpoint.Value) { | ||
path := val.(string) | ||
b, err := storage.ParseBackend(path, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
s, err := storage.New(context.Background(), b, &storage.ExternalStorageOptions{}) | ||
if err != nil { | ||
panic(err) | ||
} | ||
o.externalStorage = s | ||
}) | ||
failpoint.Inject("setCpExtStorage", func(val failpoint.Value) { | ||
file := val.(string) | ||
o.checkpointName = file | ||
}) | ||
|
||
// pre-check about options | ||
if o.externalStorage != nil && o.glue != nil { | ||
return common.ErrInvalidArgument.GenWithStack("WithExternalStorage and WithGlue can't be both set") | ||
} | ||
if o.checkpointName != "" && o.glue != nil { | ||
return common.ErrInvalidArgument.GenWithStack("WithCpNameInExtStorage and WithGlue can't be both set") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please explain why storage and glue can't be both set? |
||
|
||
if o.externalStorage != nil { | ||
// we don't use it, set a value to pass Adjust | ||
taskCfg.Mydumper.SourceDir = "noop://" | ||
} | ||
|
||
if err := taskCfg.Adjust(taskCtx); err != nil { | ||
return err | ||
} | ||
|
||
taskCfg.TaskID = time.Now().UnixNano() | ||
|
||
return l.run(taskCtx, taskCfg, o) | ||
} | ||
|
||
var ( | ||
taskRunNotifyKey = "taskRunNotifyKey" | ||
taskCfgRecorderKey = "taskCfgRecorderKey" | ||
) | ||
|
||
func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.Glue) (err error) { | ||
func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) { | ||
build.LogInfo(build.Lightning) | ||
log.L().Info("cfg", zap.Stringer("cfg", taskCfg)) | ||
|
||
|
@@ -279,6 +336,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. | |
|
||
// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust | ||
// and also put it here could avoid injecting another two SkipRunTask failpoint to caller | ||
g := o.glue | ||
if g == nil { | ||
db, err := restore.DBFromConfig(ctx, taskCfg.TiDB) | ||
if err != nil { | ||
|
@@ -287,13 +345,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. | |
g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) | ||
} | ||
|
||
u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) | ||
if err != nil { | ||
return common.NormalizeError(err) | ||
} | ||
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) | ||
if err != nil { | ||
return common.NormalizeError(err) | ||
s := o.externalStorage | ||
if s == nil { | ||
u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) | ||
if err != nil { | ||
return common.NormalizeError(err) | ||
} | ||
s, err = storage.New(ctx, u, &storage.ExternalStorageOptions{}) | ||
if err != nil { | ||
return common.NormalizeError(err) | ||
} | ||
} | ||
|
||
// return expectedErr means at least meet one file | ||
|
@@ -333,7 +394,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. | |
|
||
var procedure *restore.Controller | ||
|
||
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g) | ||
param := &restore.ControllerParam{ | ||
DBMetas: dbMetas, | ||
Status: &l.status, | ||
ExtStorage: s, | ||
OwnExtStorage: o.externalStorage == nil, | ||
Glue: g, | ||
CpNameInExtStorage: o.checkpointName, | ||
} | ||
|
||
procedure, err = restore.NewRestoreController(ctx, taskCfg, param) | ||
if err != nil { | ||
log.L().Error("restore failed", log.ShortError(err)) | ||
return errors.Trace(err) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5ac26f0