From 66e2f733b34af53687062d8d945e50006533db77 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 00:06:10 +0800 Subject: [PATCH 01/11] part1:fix tidb_ddl_enable_fast_reorg default on ut failed problem --- ddl/backfilling.go | 7 +++++-- ddl/cancel_test.go | 2 -- ddl/db_change_test.go | 2 +- ddl/failtest/fail_db_test.go | 5 ----- ddl/index.go | 6 +++--- ddl/ingest/env.go | 7 +++++++ ddl/multi_schema_change_test.go | 2 -- ddl/serial_test.go | 2 -- ddl/stat_test.go | 2 -- 9 files changed, 16 insertions(+), 19 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 35abc16cd1a6f..81c9f0cb31fc5 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error { if b.copReqSenderPool != nil { b.copReqSenderPool.adjustSize(len(b.workers)) } - return injectCheckBackfillWorkerNum(len(b.workers)) + return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker) } func (b *backfillScheduler) initCopReqSenderPool() { @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return nil } -func injectCheckBackfillWorkerNum(curWorkerSize int) error { +func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { + if isMergeWorker { + return nil + } failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) { //nolint:forcetypeassert if val.(bool) { diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index 431b97357b2db..cd446f3441baf 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) { // Prepare schema. tk.MustExec("use test") - // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("drop table if exists t_partition;") tk.MustExec(`create table t_partition ( c1 int, c2 int, c3 int diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index ee0634215c465..6de1376e2b75b 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1719,7 +1719,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr. + // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t(a int default 0, b int default 0)") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index ece5b804ae336..d12c2182f9730 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) { s := createFailDBSuite(t) testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) { tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))") }) } @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) { func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) { s := createFailDBSuite(t) testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) { - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") }) } @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) { s := createFailDBSuite(t) tk := testkit.NewTestKit(t, s.store) tk.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec(`create table partition_add_idx ( id int not null, hired date not null diff --git a/ddl/index.go b/ddl/index.go index c3c3ffd37c926..b27411e5721a8 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -802,13 +802,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) if err != nil { tryFallbackToTxnMerge(job, err) - return false, ver, errors.Trace(err) + return false, ver, nil } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) tryFallbackToTxnMerge(job, err) - return false, ver, errors.Trace(err) + return false, ver, nil } if !done { return false, ver, nil @@ -823,7 +823,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo tryFallbackToTxnMerge(job, err) } ingest.LitBackCtxMgr.Unregister(job.ID) - return false, ver, errors.Trace(err) + return false, ver, nil } bc.SetDone() case model.ReorgTypeTxnMerge: diff --git a/ddl/ingest/env.go b/ddl/ingest/env.go index 185f873b820a4..6e482523ad84a 100644 --- a/ddl/ingest/env.go +++ b/ddl/ingest/env.go @@ -47,6 +47,13 @@ const maxMemoryQuota = 2 * size.GB // InitGlobalLightningEnv initialize Lightning backfill environment. func InitGlobalLightningEnv() { log.SetAppLogger(logutil.BgLogger()) + globalCfg := config.GetGlobalConfig() + if globalCfg.Store != "tikv" { + logutil.BgLogger().Warn(LitWarnEnvInitFail, + zap.String("storage limitation", "only support TiKV storage"), + zap.String("current storage", globalCfg.Store), + zap.Bool("lightning is initialized", LitInitialized)) + } sPath, err := genLightningDataDir() if err != nil { logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err), diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index d48aa6752f8ee..f302737d60c3b 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -568,8 +568,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") originHook := dom.DDL().GetHook() // Test cancel successfully. diff --git a/ddl/serial_test.go b/ddl/serial_test.go index d0f2ada6521cb..d99036ed4dc51 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -432,8 +432,6 @@ func TestCancelAddIndexPanic(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic")) }() tk.MustExec("use test") - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("drop table if exists t") tk.MustExec("create table t(c1 int, c2 int)") diff --git a/ddl/stat_test.go b/ddl/stat_test.go index 260a98ab25f6d..556b9eb5dadc7 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -52,8 +52,6 @@ func TestDDLStatsInfo(t *testing.T) { tblInfo, err := testTableInfo(store, "t", 2) require.NoError(t, err) testCreateTable(t, ctx, d, dbInfo, tblInfo) - // TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") err = sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) From 3b6295209713bf4171108e273b74bbb8706e126a Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 15:31:25 +0800 Subject: [PATCH 02/11] Update ddl/index.go --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index b27411e5721a8..2dbbdb0f30e7c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -823,7 +823,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo tryFallbackToTxnMerge(job, err) } ingest.LitBackCtxMgr.Unregister(job.ID) - return false, ver, nil + return false, ver, errors.Trace(err) } bc.SetDone() case model.ReorgTypeTxnMerge: From 7ca601d4fb6531a42b01297f7cdd51aa2c817086 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 15:56:03 +0800 Subject: [PATCH 03/11] set err to nil before return to uplayer --- ddl/index.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/index.go b/ddl/index.go index 2dbbdb0f30e7c..4a215cc134f2e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -821,6 +821,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) tryFallbackToTxnMerge(job, err) + err = nil } ingest.LitBackCtxMgr.Unregister(job.ID) return false, ver, errors.Trace(err) From 9346fdeda11ec5d482e9d4589362dd8a9b9ad097 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 18:51:20 +0800 Subject: [PATCH 04/11] should let rollbaack err return and stored. --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index 4a215cc134f2e..317e2eb650877 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -808,7 +808,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) tryFallbackToTxnMerge(job, err) - return false, ver, nil + return false, ver, errors.Trace(err) } if !done { return false, ver, nil From 0d8d2b7458407e1d34fb6d0f8f0d348c05fd4933 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 22:09:42 +0800 Subject: [PATCH 05/11] only retrun rollback kind err back to uplayer. --- ddl/index.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 317e2eb650877..0c2d9aafbd8b6 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -806,9 +806,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { - ingest.LitBackCtxMgr.Unregister(job.ID) - tryFallbackToTxnMerge(job, err) - return false, ver, errors.Trace(err) + if job.State == model.JobStateRollingback { + return false, ver, errors.Trace(err) + } else { + ingest.LitBackCtxMgr.Unregister(job.ID) + tryFallbackToTxnMerge(job, err) + return false, ver, nil + } } if !done { return false, ver, nil From 9177c17a1526b63714b6da8d008971ee9d2e2b6c Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Thu, 24 Nov 2022 22:14:44 +0800 Subject: [PATCH 06/11] ident fix --- ddl/index.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 0c2d9aafbd8b6..dcddc7de3b007 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -806,13 +806,12 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { - if job.State == model.JobStateRollingback { - return false, ver, errors.Trace(err) - } else { + if job.State != model.JobStateRollingback { ingest.LitBackCtxMgr.Unregister(job.ID) tryFallbackToTxnMerge(job, err) return false, ver, nil } + return false, ver, errors.Trace(err) } if !done { return false, ver, nil From 14ac8fabb5975e0f257b5d44dcec535d0720d58e Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 25 Nov 2022 11:51:46 +0800 Subject: [PATCH 07/11] clean lightning env if fallback to txn merge way --- ddl/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index dcddc7de3b007..bf6e190429443 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -806,9 +806,9 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { + ingest.LitBackCtxMgr.Unregister(job.ID) + tryFallbackToTxnMerge(job, err) if job.State != model.JobStateRollingback { - ingest.LitBackCtxMgr.Unregister(job.ID) - tryFallbackToTxnMerge(job, err) return false, ver, nil } return false, ver, errors.Trace(err) From 1ba9a3b10fab2a5caa963748a95644235983a319 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 25 Nov 2022 11:56:32 +0800 Subject: [PATCH 08/11] refactor --- ddl/index.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index bf6e190429443..f1f1ebe8cfe1e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -753,12 +753,13 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool { } // tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. -func tryFallbackToTxnMerge(job *model.Job, err error) { +func tryFallbackToTxnMerge(job *model.Job, err *error) { if job.State != model.JobStateRollingback { - logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err)) + logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(*err)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge job.SnapshotVer = 0 job.RowCount = 0 + *err = nil } } @@ -801,16 +802,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) if err != nil { - tryFallbackToTxnMerge(job, err) - return false, ver, nil + tryFallbackToTxnMerge(job, &err) + return false, ver, err } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) - tryFallbackToTxnMerge(job, err) - if job.State != model.JobStateRollingback { - return false, ver, nil - } + tryFallbackToTxnMerge(job, &err) return false, ver, errors.Trace(err) } if !done { @@ -823,8 +821,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) - tryFallbackToTxnMerge(job, err) - err = nil + tryFallbackToTxnMerge(job, &err) } ingest.LitBackCtxMgr.Unregister(job.ID) return false, ver, errors.Trace(err) From cdff86e4f9f76dec5821c584b383a2c38ee20823 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 25 Nov 2022 11:57:57 +0800 Subject: [PATCH 09/11] refactor code --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index f1f1ebe8cfe1e..f7f8670643d3b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -803,7 +803,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) if err != nil { tryFallbackToTxnMerge(job, &err) - return false, ver, err + return false, ver, errors.Trace(err) } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { From 6c47fd64320ded1d4033fbfe13d43119b340d823 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 25 Nov 2022 13:43:14 +0800 Subject: [PATCH 10/11] handle err in tryFallbackToTxnMerge and return err according --- ddl/index.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index f7f8670643d3b..ab86cfc444d16 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -753,14 +753,15 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool { } // tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. -func tryFallbackToTxnMerge(job *model.Job, err *error) { +func tryFallbackToTxnMerge(job *model.Job, err error) error { if job.State != model.JobStateRollingback { logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(*err)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge job.SnapshotVer = 0 job.RowCount = 0 - *err = nil + return nil } + return err } func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, @@ -802,13 +803,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) if err != nil { - tryFallbackToTxnMerge(job, &err) + err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) - tryFallbackToTxnMerge(job, &err) + err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } if !done { @@ -821,7 +822,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) - tryFallbackToTxnMerge(job, &err) + err = tryFallbackToTxnMerge(job, err) } ingest.LitBackCtxMgr.Unregister(job.ID) return false, ver, errors.Trace(err) From ebc9852e5861d20a007adecaea3a325863ebbc92 Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 25 Nov 2022 13:44:51 +0800 Subject: [PATCH 11/11] fix error --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index ab86cfc444d16..818dc1eac7738 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -755,7 +755,7 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool { // tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. func tryFallbackToTxnMerge(job *model.Job, err error) error { if job.State != model.JobStateRollingback { - logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(*err)) + logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err)) job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge job.SnapshotVer = 0 job.RowCount = 0