Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: skip the error when fallback from ingest to txn method #39360

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker)
}

func (b *backfillScheduler) initCopReqSenderPool() {
Expand Down Expand Up @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
return nil
}

func injectCheckBackfillWorkerNum(curWorkerSize int) error {
func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
if isMergeWorker {
return nil
}
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
Expand Down
2 changes: 0 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) {

// Prepare schema.
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t_partition;")
tk.MustExec(`create table t_partition (
c1 int, c2 int, c3 int
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr.
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")
Expand Down
5 changes: 0 additions & 5 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))")
})
}
Expand All @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
})
}
Expand Down Expand Up @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) {
s := createFailDBSuite(t)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec(`create table partition_add_idx (
id int not null,
hired date not null
Expand Down
11 changes: 6 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,13 @@ func IngestJobsNotExisted(ctx sessionctx.Context) bool {
}

// tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong.
func tryFallbackToTxnMerge(job *model.Job, err error) {
func tryFallbackToTxnMerge(job *model.Job, err *error) {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
if job.State != model.JobStateRollingback {
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err))
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(*err))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
job.SnapshotVer = 0
job.RowCount = 0
*err = nil
}
}

Expand Down Expand Up @@ -801,13 +802,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
tryFallbackToTxnMerge(job, err)
tryFallbackToTxnMerge(job, &err)
return false, ver, errors.Trace(err)
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
tryFallbackToTxnMerge(job, err)
tryFallbackToTxnMerge(job, &err)
return false, ver, errors.Trace(err)
}
if !done {
Expand All @@ -820,7 +821,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
} else {
logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
tryFallbackToTxnMerge(job, err)
tryFallbackToTxnMerge(job, &err)
}
ingest.LitBackCtxMgr.Unregister(job.ID)
return false, ver, errors.Trace(err)
Expand Down
7 changes: 7 additions & 0 deletions ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ const maxMemoryQuota = 2 * size.GB
// InitGlobalLightningEnv initialize Lightning backfill environment.
func InitGlobalLightningEnv() {
log.SetAppLogger(logutil.BgLogger())
globalCfg := config.GetGlobalConfig()
if globalCfg.Store != "tikv" {
logutil.BgLogger().Warn(LitWarnEnvInitFail,
zap.String("storage limitation", "only support TiKV storage"),
zap.String("current storage", globalCfg.Store),
zap.Bool("lightning is initialized", LitInitialized))
}
sPath, err := genLightningDataDir()
if err != nil {
logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err),
Expand Down
2 changes: 0 additions & 2 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
Expand Down
2 changes: 0 additions & 2 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic"))
}()
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")

Expand Down
2 changes: 0 additions & 2 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func TestDDLStatsInfo(t *testing.T) {
tblInfo, err := testTableInfo(store, "t", 2)
require.NoError(t, err)
testCreateTable(t, ctx, d, dbInfo, tblInfo)
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down