Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: skip the error when fallback from ingest to txn method #39360

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker)
}

func (b *backfillScheduler) initCopReqSenderPool() {
Expand Down Expand Up @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
return nil
}

func injectCheckBackfillWorkerNum(curWorkerSize int) error {
func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
if isMergeWorker {
return nil
}
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
Expand Down
2 changes: 0 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) {

// Prepare schema.
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t_partition;")
tk.MustExec(`create table t_partition (
c1 int, c2 int, c3 int
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,7 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr.
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")
Expand Down
5 changes: 0 additions & 5 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))")
})
}
Expand All @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
})
}
Expand Down Expand Up @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) {
s := createFailDBSuite(t)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec(`create table partition_add_idx (
id int not null,
hired date not null
Expand Down
6 changes: 3 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,13 +802,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
return false, ver, nil
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
return false, ver, nil
}
if !done {
return false, ver, nil
Expand All @@ -823,7 +823,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
tryFallbackToTxnMerge(job, err)
}
ingest.LitBackCtxMgr.Unregister(job.ID)
return false, ver, errors.Trace(err)
return false, ver, nil
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
}
bc.SetDone()
case model.ReorgTypeTxnMerge:
Expand Down
7 changes: 7 additions & 0 deletions ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ const maxMemoryQuota = 2 * size.GB
// InitGlobalLightningEnv initialize Lightning backfill environment.
func InitGlobalLightningEnv() {
log.SetAppLogger(logutil.BgLogger())
globalCfg := config.GetGlobalConfig()
if globalCfg.Store != "tikv" {
logutil.BgLogger().Warn(LitWarnEnvInitFail,
zap.String("storage limitation", "only support TiKV storage"),
zap.String("current storage", globalCfg.Store),
zap.Bool("lightning is initialized", LitInitialized))
}
sPath, err := genLightningDataDir()
if err != nil {
logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err),
Expand Down
2 changes: 0 additions & 2 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
Expand Down
2 changes: 0 additions & 2 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic"))
}()
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")

Expand Down
2 changes: 0 additions & 2 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func TestDDLStatsInfo(t *testing.T) {
tblInfo, err := testTableInfo(store, "t", 2)
require.NoError(t, err)
testCreateTable(t, ctx, d, dbInfo, tblInfo)
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down