Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix expression index with insert causes losing index data (#26248) #26479

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1769,3 +1769,96 @@ func (s *serialTestStateChangeSuite) TestModifyColumnTypeArgs(c *C) {
c.Assert(changingCol, IsNil)
c.Assert(changingIdxs, IsNil)
}

func (s *serialTestStateChangeSuite) TestCreateExpressionIndex(c *C) {
originalVal := config.GetGlobalConfig().Experimental.AllowsExpressionIndex
config.GetGlobalConfig().Experimental.AllowsExpressionIndex = true
defer func() {
config.GetGlobalConfig().Experimental.AllowsExpressionIndex = originalVal
}()

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db_state")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int default 0, b int default 0)")
defer func() {
tk.MustExec("drop table t")
}()
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test_db_state")

var checkErr error
d := s.dom.DDL()
originalCallback := d.GetHook()
callback := &ddl.TestDDLCallback{}
callback.OnJobUpdatedExported = func(job *model.Job) {
if checkErr != nil {
return
}
err := originalCallback.OnChanged(nil)
c.Assert(err, IsNil)
switch job.SchemaState {
case model.StateDeleteOnly:
_, checkErr = tk1.Exec("insert into t values (5, 5)")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("insert into t set b = 6")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("update t set b = 7 where a = 1")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("delete from t where b = 4")
if checkErr != nil {
return
}
// (1, 7), (2, 2), (3, 3), (5, 5), (0, 6)
case model.StateWriteOnly:
_, checkErr = tk1.Exec("insert into t values (8, 8)")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("insert into t set b = 9")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("update t set b = 7 where a = 2")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("delete from t where b = 3")
if checkErr != nil {
return
}
// (1, 7), (2, 7), (5, 5), (0, 6), (8, 8), (0, 9)
case model.StateWriteReorganization:
_, checkErr = tk1.Exec("insert into t values (10, 10)")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("insert into t set b = 11")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("update t set b = 7 where a = 5")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("delete from t where b = 6")
if checkErr != nil {
return
}
// (1, 7), (2, 7), (5, 7), (8, 8), (0, 9), (10, 10), (10, 10), (0, 11), (0, 11)
}
}

d.(ddl.DDLForTest).SetHook(callback)
tk.MustExec("alter table t add index idx((b+1))")
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))
}
39 changes: 30 additions & 9 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,22 +331,43 @@ func (s *testSerialDBSuite) TestAddExpressionIndexRollback(c *C) {
ctx.Store = s.store
times := 0
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("delete from t1 where c1 = 40;")
if checkErr != nil {
return
}
if checkErr == nil && job.SchemaState == model.StateWriteReorganization && times == 0 {
currJob = job
times++
switch job.SchemaState {
case model.StateDeleteOnly:
_, checkErr = tk1.Exec("insert into t1 values (6, 3, 3) on duplicate key update c1 = 10")
if checkErr == nil {
_, checkErr = tk1.Exec("update t1 set c1 = 7 where c2=6;")
}
if checkErr == nil {
_, checkErr = tk1.Exec("delete from t1 where c1 = 40;")
}
case model.StateWriteOnly:
_, checkErr = tk1.Exec("insert into t1 values (2, 2, 2)")
if checkErr == nil {
_, checkErr = tk1.Exec("update t1 set c1 = 3 where c2 = 80")
}
case model.StateWriteReorganization:
if checkErr == nil && job.SchemaState == model.StateWriteReorganization && times == 0 {
_, checkErr = tk1.Exec("insert into t1 values (4, 4, 4)")
if checkErr != nil {
return
}
_, checkErr = tk1.Exec("update t1 set c1 = 5 where c2 = 80")
if checkErr != nil {
return
}
currJob = job
times++
}
}
}
d.(ddl.DDLForTest).SetHook(hook)

tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[ddl:8202]Cannot decode index value, because [types:1690]DOUBLE value is out of range in 'pow(160, 160)'")
c.Assert(checkErr, IsNil)
tk.MustQuery("select * from t1;").Check(testkit.Rows("20 20 20", "80 80 80", "160 160 160"))
tk.MustQuery("select * from t1 order by c1;").Check(testkit.Rows("2 2 2", "4 4 4", "5 80 80", "10 3 3", "20 20 20", "160 160 160"))

// Check whether the reorg information is cleaned up.
err := ctx.NewTxn(context.Background())
Expand Down
43 changes: 20 additions & 23 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,16 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}
return ver, err
}
for _, hiddenCol := range hiddenCols {
columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L)
if columnInfo != nil && columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
// TODO: refine the error message
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name)

if indexInfo == nil {
for _, hiddenCol := range hiddenCols {
columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L)
if columnInfo != nil && columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobStateCancelled
// TODO: refine the error message
return ver, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name)
}
}
}

Expand Down Expand Up @@ -495,7 +498,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StateDeleteOnly)
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, err
Expand All @@ -505,7 +508,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateDeleteOnly:
// delete only -> write only
indexInfo.State = model.StateWriteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StateWriteOnly)
_, err = checkPrimaryKeyNotNull(w, sqlMode, t, job, tblInfo, indexInfo)
if err != nil {
break
Expand All @@ -518,7 +520,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateWriteOnly:
// write only -> reorganization
indexInfo.State = model.StateWriteReorganization
updateHiddenColumns(tblInfo, indexInfo, model.StateWriteReorganization)
_, err = checkPrimaryKeyNotNull(w, sqlMode, t, job, tblInfo, indexInfo)
if err != nil {
break
Expand All @@ -532,7 +533,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
tbl, err := getTable(d.store, schemaID, tblInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -611,14 +611,6 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StatePublic:
// public -> write only
indexInfo.State = model.StateWriteOnly
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
tblInfo.Columns[firstHiddenOffset].State = model.StateWriteOnly
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -627,7 +619,6 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateWriteOnly:
// write only -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -636,14 +627,21 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateDeleteOnly:
// delete only -> reorganization
indexInfo.State = model.StateDeleteReorganization
updateHiddenColumns(tblInfo, indexInfo, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}

newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
Expand All @@ -654,14 +652,13 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
}
})

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]

ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != model.StateNone)
if err != nil {
return ver, errors.Trace(err)
Expand Down
2 changes: 0 additions & 2 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T
// So the next state is delete only state.
originalState := indexInfo.State
indexInfo.State = model.StateDeleteOnly
// Change dependent hidden columns if necessary.
updateHiddenColumns(tblInfo, indexInfo, model.StateDeleteOnly)
job.SchemaState = model.StateDeleteOnly
ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err1 != nil {
Expand Down