diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 0c06fa0fab551..76e1fc2aaf5fd 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error { if b.copReqSenderPool != nil { b.copReqSenderPool.adjustSize(len(b.workers)) } - return injectCheckBackfillWorkerNum(len(b.workers)) + return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker) } func (b *backfillScheduler) initCopReqSenderPool() { @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return nil } -func injectCheckBackfillWorkerNum(curWorkerSize int) error { +func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { + if isMergeWorker { + return nil + } failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) { //nolint:forcetypeassert if val.(bool) { diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index 431b97357b2db..cd446f3441baf 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) { // Prepare schema. tk.MustExec("use test") - // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("drop table if exists t_partition;") tk.MustExec(`create table t_partition ( c1 int, c2 int, c3 int diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index ee0634215c465..6de1376e2b75b 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1719,7 +1719,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr. + // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t(a int default 0, b int default 0)") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index ece5b804ae336..d12c2182f9730 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) { s := createFailDBSuite(t) testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) { tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))") }) } @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) { func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) { s := createFailDBSuite(t) testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) { - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") }) } @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) { s := createFailDBSuite(t) tk := testkit.NewTestKit(t, s.store) tk.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec(`create table partition_add_idx ( id int not null, hired date not null diff --git a/ddl/index.go b/ddl/index.go index c3c3ffd37c926..818dc1eac7738 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -753,13 +753,15 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool { } // tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. -func tryFallbackToTxnMerge(job *model.Job, err error) { +func tryFallbackToTxnMerge(job *model.Job, err error) error { if job.State != model.JobStateRollingback { logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge job.SnapshotVer = 0 job.RowCount = 0 + return nil } + return err } func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, @@ -801,13 +803,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) if err != nil { - tryFallbackToTxnMerge(job, err) + err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) - tryFallbackToTxnMerge(job, err) + err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } if !done { @@ -820,7 +822,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) - tryFallbackToTxnMerge(job, err) + err = tryFallbackToTxnMerge(job, err) } ingest.LitBackCtxMgr.Unregister(job.ID) return false, ver, errors.Trace(err) diff --git a/ddl/ingest/env.go b/ddl/ingest/env.go index 185f873b820a4..6e482523ad84a 100644 --- a/ddl/ingest/env.go +++ b/ddl/ingest/env.go @@ -47,6 +47,13 @@ const maxMemoryQuota = 2 * size.GB // InitGlobalLightningEnv initialize Lightning backfill environment. func InitGlobalLightningEnv() { log.SetAppLogger(logutil.BgLogger()) + globalCfg := config.GetGlobalConfig() + if globalCfg.Store != "tikv" { + logutil.BgLogger().Warn(LitWarnEnvInitFail, + zap.String("storage limitation", "only support TiKV storage"), + zap.String("current storage", globalCfg.Store), + zap.Bool("lightning is initialized", LitInitialized)) + } sPath, err := genLightningDataDir() if err != nil { logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err), diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index d48aa6752f8ee..f302737d60c3b 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -568,8 +568,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") originHook := dom.DDL().GetHook() // Test cancel successfully. diff --git a/ddl/serial_test.go b/ddl/serial_test.go index d0f2ada6521cb..d99036ed4dc51 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -432,8 +432,6 @@ func TestCancelAddIndexPanic(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic")) }() tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("drop table if exists t") tk.MustExec("create table t(c1 int, c2 int)") diff --git a/ddl/stat_test.go b/ddl/stat_test.go index 260a98ab25f6d..556b9eb5dadc7 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -52,8 +52,6 @@ func TestDDLStatsInfo(t *testing.T) { tblInfo, err := testTableInfo(store, "t", 2) require.NoError(t, err) testCreateTable(t, ctx, d, dbInfo, tblInfo) - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") err = sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err)