diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 108ab61a658b5..1a56c741c3b2e 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -1097,6 +1097,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error { concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency) ch := make(chan string, concurrency) eg, gCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { eg.Go(func() error { for tblName := range ch { @@ -1125,9 +1126,15 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error { return nil }) } +loop: for _, db := range rc.dbMetas { for _, tbl := range db.Tables { - ch <- common.UniqueTable(tbl.DB, tbl.Name) + select { + case ch <- common.UniqueTable(tbl.DB, tbl.Name): + case <-gCtx.Done(): + break loop + } + } } close(ch) @@ -1135,7 +1142,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error { if common.IsContextCanceledError(err) { return nil } - return errors.Trace(err) + return errors.Annotate(err, "check table contains data failed") } if len(tableNames) > 0 { @@ -1147,13 +1154,20 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error { return nil } -func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) { +func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) { + failpoint.Inject("CheckTableEmptyFailed", func() { + failpoint.Return(false, errors.New("mock error")) + }) query := "select 1 from " + tableName + " limit 1" + exec := common.SQLWithRetry{ + DB: db, + Logger: log.L(), + } var dump int - err := db.QueryRowContext(ctx, query).Scan(&dump) + err := exec.QueryRow(ctx, "check table empty", query, &dump) switch { - case err == sql.ErrNoRows: + case errors.ErrorEqual(err, sql.ErrNoRows): return false, nil case err != nil: return false, errors.Trace(err) diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go index f7377066f56a6..cfd9a00adc53a 100644 --- a/br/pkg/lightning/restore/check_info_test.go +++ b/br/pkg/lightning/restore/check_info_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -475,6 +476,9 @@ func TestCheckTableEmpty(t *testing.T) { require.NoError(t, err) rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) mock.MatchExpectationsInOrder(false) + // test auto retry retryable error + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnError(mysql.NewErr(errno.ErrPDServerTimeout)) mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). @@ -543,6 +547,18 @@ func TestCheckTableEmpty(t *testing.T) { err = rc.checkTableEmpty(ctx) require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) + + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed", `return`) + require.NoError(t, err) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed") + }() + + // restrict the concurrency to ensure there are more tables than workers + rc.cfg.App.RegionConcurrency = 1 + // test check tables not stuck but return the right error + err = rc.checkTableEmpty(ctx) + require.Regexp(t, ".*check table contains data failed: mock error.*", err.Error()) } func TestLocalResource(t *testing.T) { diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index b74d352f3262d..6c009279e9e0f 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -209,7 +209,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 } if status == metaStatusChecksuming { - return common.ErrAllocTableRowIDs.GenWithStack("target table is calculating checksum, please wait unit the checksum is finished and try again.") + return common.ErrAllocTableRowIDs.GenWithStack("Target table is calculating checksum. Please wait until the checksum is finished and try again.") } if metaTaskID == m.taskID { diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 2ac8e0581fd4f..2c9f48e089497 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1666,12 +1666,10 @@ func (rc *Controller) doCompact(ctx context.Context, level int32) error { } func (rc *Controller) switchToImportMode(ctx context.Context) { - log.L().Info("switch to import mode") rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import) } func (rc *Controller) switchToNormalMode(ctx context.Context) { - log.L().Info("switch to normal mode") rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal) } @@ -1681,6 +1679,8 @@ func (rc *Controller) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) return } + log.L().Info("switch import mode", zap.Stringer("mode", mode)) + // It is fine if we miss some stores which did not switch to Import mode, // since we're running it periodically, so we exclude disconnected stores. // But it is essential all stores be switched back to Normal mode to allow diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 147c97fc7db6b..9c3048d9518d6 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -697,8 +697,8 @@ func (tr *TableRestore) postProcess( } // tidb backend don't need checksum & analyze - if !rc.backend.ShouldPostProcess() { - tr.logger.Debug("skip checksum & analyze, not supported by this backend") + if rc.cfg.PostRestore.Checksum == config.OpLevelOff && rc.cfg.PostRestore.Analyze == config.OpLevelOff { + tr.logger.Debug("skip checksum & analyze, either because not supported by this backend or manually disabled") err := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, nil, checkpoints.CheckpointStatusAnalyzeSkipped) return false, errors.Trace(err) }