Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix tidb_ddl_enable_fast_reorg default on ut failed problem #39350

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(len(b.workers))
}
return injectCheckBackfillWorkerNum(len(b.workers))
return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker)
}

func (b *backfillScheduler) initCopReqSenderPool() {
Expand Down Expand Up @@ -871,7 +871,10 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
return nil
}

func injectCheckBackfillWorkerNum(curWorkerSize int) error {
func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
if isMergeWorker {
return nil
}
failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
Expand Down
2 changes: 0 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ func TestCancel(t *testing.T) {

// Prepare schema.
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t_partition;")
tk.MustExec(`create table t_partition (
c1 int, c2 int, c3 int
Expand Down
2 changes: 0 additions & 2 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,8 +1719,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

Expand Down
5 changes: 0 additions & 5 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1, c3))")
})
}
Expand All @@ -330,8 +328,6 @@ func TestRunDDLJobPanicEnableClusteredIndex(t *testing.T) {
func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) {
s := createFailDBSuite(t)
testAddIndexWorkerNum(t, s, func(tk *testkit.TestKit) {
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
})
}
Expand Down Expand Up @@ -424,7 +420,6 @@ func TestPartitionAddIndexGC(t *testing.T) {
s := createFailDBSuite(t)
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec(`create table partition_add_idx (
id int not null,
hired date not null
Expand Down
4 changes: 2 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,13 +802,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
return false, ver, nil
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
ingest.LitBackCtxMgr.Unregister(job.ID)
tryFallbackToTxnMerge(job, err)
return false, ver, errors.Trace(err)
return false, ver, nil
Copy link
Contributor

@tangenta tangenta Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the line 823?

	} else {
		logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err))
line 823->	tryFallbackToTxnMerge(job, err) 
	}

}
if !done {
return false, ver, nil
Expand Down
43 changes: 35 additions & 8 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func TestIndexChange(t *testing.T) {
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")

Expand Down Expand Up @@ -188,7 +186,11 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
return errors.Trace(err)
}
// old value index not exists.
err = checkIndexExists(ctx, writeOnlyTbl, 1, 4, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, writeOnlyTbl, 1, 4, true)
} else {
err = checkIndexExists(ctx, writeOnlyTbl, 1, 4, false)
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -203,7 +205,11 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
if err != nil {
return errors.Trace(err)
}
err = checkIndexExists(ctx, writeOnlyTbl, 3, 4, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, writeOnlyTbl, 3, 4, true)
} else {
err = checkIndexExists(ctx, writeOnlyTbl, 3, 4, false)
}
if err != nil {
return errors.Trace(err)
}
Expand All @@ -213,14 +219,19 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
if err != nil {
return errors.Trace(err)
}
err = checkIndexExists(ctx, writeOnlyTbl, 5, 5, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, writeOnlyTbl, 5, 5, true)
} else {
err = checkIndexExists(ctx, writeOnlyTbl, 5, 5, false)
}
if err != nil {
return errors.Trace(err)
}
return nil
}

func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table.Table) error {
var err1 error
// WriteOnlyTable: insert t values (6, 6)
err := sessiontxn.NewTxn(context.Background(), ctx)
if err != nil {
Expand All @@ -231,7 +242,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 6, 6, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 6, 6, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
// PublicTable: insert t values (7, 7)
Expand All @@ -250,10 +265,18 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 5, 7, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 5, 7, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, publicTbl, 7, 7, true)
} else {
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -284,6 +307,10 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
handle := row[0].GetInt64()
err = checkIndexExists(ctx, publicTbl, idxVal, handle, true)
if err != nil {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
}
Expand Down
8 changes: 8 additions & 0 deletions ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -47,6 +48,13 @@ const maxMemoryQuota = 2 * size.GB
// InitGlobalLightningEnv initialize Lightning backfill environment.
func InitGlobalLightningEnv() {
log.SetAppLogger(logutil.BgLogger())
globalCfg := config.GetGlobalConfig()
if globalCfg.Store != "tikv" {
logutil.BgLogger().Warn(LitWarnEnvInitFail,
zap.Error(errors.New("only support TiKV storage")),
zap.String("current storage", globalCfg.Store),
zap.Bool("lightning is initialized", LitInitialized))
}
sPath, err := genLightningDataDir()
if err != nil {
logutil.BgLogger().Warn(LitWarnEnvInitFail, zap.Error(err),
Expand Down
2 changes: 0 additions & 2 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,6 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
Expand Down
2 changes: 0 additions & 2 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ func TestCancelAddIndexPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/errorMockPanic"))
}()
tk.MustExec("use test")
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")

Expand Down
2 changes: 0 additions & 2 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func TestDDLStatsInfo(t *testing.T) {
tblInfo, err := testTableInfo(store, "t", 2)
require.NoError(t, err)
testCreateTable(t, ctx, d, dbInfo, tblInfo)
// TODO: will check why tidb_ddl_enable_fast_reorg could not default be on in another pr.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down
3 changes: 3 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
if !distinct {
continue
}
if v.Meta().BackfillState == model.BackfillStateRunning {
_, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key)
}
colValStr, err1 := formatDataForDupError(colVals)
if err1 != nil {
return nil, err1
Expand Down
6 changes: 6 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package executor

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"github.com/pingcap/tidb/table/tables"
"runtime/trace"
"time"

Expand Down Expand Up @@ -264,6 +266,10 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
return err
}
rowVal := val[:len(val)-1]
if bytes.Equal(rowVal, tables.DeleteMarkerUnique) {
continue
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return err
Expand Down
59 changes: 54 additions & 5 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tables

import (
"bytes"
"context"
"sync"

Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
keyIsTempIdxKey bool
)
if !opt.FromBackFill {
key, tempKey, keyVer = genTempIdxKeyByState(c.idxInfo, key)
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
Expand Down Expand Up @@ -230,7 +231,19 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
idxVal = append(idxVal, keyVer)
}
if lazyCheck {
flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists}
var (
needPresumeKey bool
flags []kv.FlagsOp
)
if !opt.FromBackFill {
needPresumeKey, err = KeyExistInTempIndex(txn, key, h, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
}
}
if !needPresumeKey {
flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists}
}
if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() &&
!vars.InRestrictedSQL && vars.ConnectionID > 0 {
flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
Expand Down Expand Up @@ -285,7 +298,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
return err
}

key, tempKey, tempKeyVer := genTempIdxKeyByState(c.idxInfo, key)
key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key)

if distinct {
if len(key) > 0 {
Expand Down Expand Up @@ -336,9 +349,9 @@ const (
TempIndexKeyTypeMerge byte = 'm'
)

// genTempIdxKeyByState is used to get the key version and the temporary key.
// GenTempIdxKeyByState is used to get the key version and the temporary key.
// The tempKeyVer means the temp index key/value version.
func genTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
if indexInfo.State != model.StatePublic {
switch indexInfo.BackfillState {
case model.BackfillStateInapplicable:
Expand All @@ -364,6 +377,14 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV
return false, nil, err
}

var (
tempKey kv.Key
keyVer byte
)
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
key, tempKey = tempKey, nil
}
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
Expand Down Expand Up @@ -463,3 +484,31 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo *
}
return colInfo
}

// KeyExistInTempIndex is used to check if there is unique key is marked delete in temp index.
func KeyExistInTempIndex(txn kv.Transaction, key kv.Key, h kv.Handle, IsCommonHandle bool) (bool, error) {
value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}

length := len(value)
value = value[:length-1]
if bytes.Equal(value, DeleteMarkerUnique) {
return true, nil
}
// Check if handle equal?
var handle kv.Handle
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle)
if err != nil {
return false, err
}
if !handle.Equal(h) {
return false, kv.ErrKeyExists
}

return true, nil
}