Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add retry and early terminate checking empty table #31798

Merged
merged 24 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ae0071a
add retry and cancal for retry check table empty
glorv Jan 19, 2022
03e8d85
Merge branch 'master' into retry-check-empty
glorv Jan 19, 2022
5b424f9
adjust failpoint
glorv Jan 19, 2022
ae06afc
Merge branch 'retry-check-empty' of ssh://github.com/glorv/tidb into …
glorv Jan 19, 2022
a022017
remove a misleading log
glorv Jan 19, 2022
9d465b3
fmt code
glorv Jan 19, 2022
903f900
Merge branch 'master' into retry-check-empty
glorv Jan 19, 2022
f1085a0
Merge branch 'master' into retry-check-empty
glorv Jan 20, 2022
ffce621
Merge branch 'master' into retry-check-empty
glorv Jan 20, 2022
8de3cf5
Merge branch 'master' into retry-check-empty
glorv Jan 21, 2022
f528201
adjust post process check
glorv Jan 25, 2022
8f45d59
Merge branch 'retry-check-empty' of ssh://github.com/glorv/tidb into …
glorv Jan 25, 2022
5483e70
Merge branch 'master' into retry-check-empty
glorv Jan 25, 2022
3b7a996
remove useless code
glorv Jan 25, 2022
7c3e23b
fix comment
glorv Jan 25, 2022
ef0b21f
Merge branch 'master' into retry-check-empty
glorv Jan 25, 2022
e5bf00c
Merge branch 'master' of ssh://github.com/pingcap/tidb into retry-che…
glorv Feb 7, 2022
442ea63
Merge branch 'master' into retry-check-empty
glorv Feb 7, 2022
6fbc388
Merge branch 'master' into retry-check-empty
glorv Feb 21, 2022
e5cadb6
fix an error msg typo
glorv Feb 28, 2022
433c4ad
format error msg
glorv Feb 28, 2022
a667fcc
Merge branch 'master' of ssh://github.com/pingcap/tidb into retry-che…
glorv Mar 16, 2022
79971aa
Merge branch 'master' into retry-check-empty
ti-chi-bot Mar 16, 2022
f34c3f3
Merge branch 'master' into retry-check-empty
ti-chi-bot Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)

for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
Expand Down Expand Up @@ -1125,17 +1126,23 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}

}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
Expand All @@ -1147,13 +1154,20 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) {
failpoint.Inject("CheckTableEmptyFailed", func() {
failpoint.Return(false, errors.New("mock error"))
})
query := "select 1 from " + tableName + " limit 1"
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case err == sql.ErrNoRows:
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -475,6 +476,9 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnError(mysql.NewErr(errno.ErrPDServerTimeout))
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
Expand Down Expand Up @@ -543,6 +547,18 @@ func TestCheckTableEmpty(t *testing.T) {
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)
require.NoError(t, mock.ExpectationsWereMet())

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed", `return`)
require.NoError(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed")
}()

// restrict the concurrency to ensure there are more tables than workers
rc.cfg.App.RegionConcurrency = 1
// test check tables not stuck but return the right error
err = rc.checkTableEmpty(ctx)
require.Regexp(t, ".*check table contains data failed: mock error.*", err.Error())
}

func TestLocalResource(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

if status == metaStatusChecksuming {
return common.ErrAllocTableRowIDs.GenWithStack("target table is calculating checksum, please wait unit the checksum is finished and try again.")
return common.ErrAllocTableRowIDs.GenWithStack("Target table is calculating checksum. Please wait until the checksum is finished and try again.")
}

if metaTaskID == m.taskID {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,12 +1666,10 @@ func (rc *Controller) doCompact(ctx context.Context, level int32) error {
}

func (rc *Controller) switchToImportMode(ctx context.Context) {
log.L().Info("switch to import mode")
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}

func (rc *Controller) switchToNormalMode(ctx context.Context) {
log.L().Info("switch to normal mode")
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
}

Expand All @@ -1681,6 +1679,8 @@ func (rc *Controller) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode)
return
}

log.L().Info("switch import mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
// since we're running it periodically, so we exclude disconnected stores.
// But it is essential all stores be switched back to Normal mode to allow
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ func (tr *TableRestore) postProcess(
}

// tidb backend don't need checksum & analyze
if !rc.backend.ShouldPostProcess() {
tr.logger.Debug("skip checksum & analyze, not supported by this backend")
if rc.cfg.PostRestore.Checksum == config.OpLevelOff && rc.cfg.PostRestore.Analyze == config.OpLevelOff {
tr.logger.Debug("skip checksum & analyze, either because not supported by this backend or manually disabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these (f528201) part of this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. As this is just a tiny change, maybe it not worth to file another PR(as while as an issue)😅。

err := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, nil, checkpoints.CheckpointStatusAnalyzeSkipped)
return false, errors.Trace(err)
}
Expand Down