Skip to content

Commit

Permalink
ddl: Support the operation of dropping multi-indexes in one statement (
Browse files Browse the repository at this point in the history
  • Loading branch information
ou-bing authored Sep 2, 2021
1 parent 72acdb4 commit b35c7bc
Show file tree
Hide file tree
Showing 11 changed files with 626 additions and 29 deletions.
242 changes: 242 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7300,6 +7300,248 @@ func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) {

}

func testDropIndexes(c *C, store kv.Storage, lease time.Duration, createSQL, dropIdxSQL string, idxNames []string) {
tk := testkit.NewTestKit(c, store)
tk.MustExec("use test_db")
tk.MustExec("drop table if exists test_drop_indexes")
tk.MustExec(createSQL)
done := make(chan error, 1)

num := 100
// add some rows
for i := 0; i < num; i++ {
tk.MustExec("insert into test_drop_indexes values (?, ?, ?)", i, i, i)
}
ctx := tk.Se.(sessionctx.Context)
t := testGetTableByName(c, ctx, "test_db", "test_drop_indexes")
var idxs []table.Index
for _, tidx := range t.Indices() {
for _, idxName := range idxNames {
if tidx.Meta().Name.L == idxName {
idxs = append(idxs, tidx)
break
}
}
}
c.Assert(idxs, NotNil)

testddlutil.SessionExecInGoroutine(c, store, dropIdxSQL, done)

ticker := time.NewTicker(lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
if err == nil {
break LOOP
}
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ticker.C:
step := 5
// delete some rows, and add some data
for i := num; i < num+step; i++ {
n := rand.Intn(num)
tk.MustExec("update test_drop_indexes set c2 = 1 where c1 = ?", n)
tk.MustExec("insert into test_drop_indexes values (?, ?, ?)", i, i, i)
}
num += step
}
}

// Check in index, it must be no index in KV.
// Make sure there is no index with name c2_index、c3_index.
t = testGetTableByName(c, ctx, "test_db", "test_drop_indexes")
var nidxs []table.Index
for _, tidx := range t.Indices() {
for _, ids := range idxs {
if tidx.Meta().Name.L == ids.Meta().Name.L {
nidxs = append(nidxs, tidx)
}
}
}
c.Assert(nidxs, IsNil)

for _, idx := range idxs {
idx := tables.NewIndex(t.Meta().ID, t.Meta(), idx.Meta())
checkDelRangeDone(c, ctx, idx)
}
}

func testCancelDropIndexes(c *C, store kv.Storage, d ddl.DDL) {
indexesName := []string{"idx_c1", "idx_c2"}
addIdxesSQL := "alter table t add index idx_c1 (c1);alter table t add index idx_c2 (c2);"
dropIdxesSQL := "alter table t drop index idx_c1;alter table t drop index idx_c2;"

tk := testkit.NewTestKit(c, store)
tk.MustExec("use test_db")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")
defer tk.MustExec("drop table t;")
for i := 0; i < 5; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
testCases := []struct {
needAddIndex bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
// model.JobStateNone means the jobs is canceled before the first run.
// if we cancel successfully, we need to set needAddIndex to false in the next test case. Otherwise, set needAddIndex to true.
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}
var checkErr error
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if (job.Type == model.ActionDropIndex || job.Type == model.ActionDropPrimaryKey) &&
job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobID = job.ID
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = store
err := hookCtx.NewTxn(context.TODO())
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}

errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}
originalHook := d.GetHook()
d.(ddl.DDLForTest).SetHook(hook)
ctx := tk.Se.(sessionctx.Context)
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddIndex {
tk.MustExec(addIdxesSQL)
}
rs, err := tk.Exec(dropIdxesSQL)
if rs != nil {
rs.Close()
}
t := testGetTableByName(c, ctx, "test_db", "t")

var indexInfos []*model.IndexInfo
for _, idxName := range indexesName {
indexInfo := t.Meta().FindIndexByName(idxName)
if indexInfo != nil {
indexInfos = append(indexInfos, indexInfo)
}
}

if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:8214]Cancelled DDL job")
c.Assert(indexInfos, NotNil)
c.Assert(indexInfos[0].State, Equals, model.StatePublic)
} else {
err1 := admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID)
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, err1.Error())
c.Assert(indexInfos, IsNil)
}
}
d.(ddl.DDLForTest).SetHook(originalHook)
tk.MustExec(addIdxesSQL)
tk.MustExec(dropIdxesSQL)
}

func testDropIndexesIfExists(c *C, store kv.Storage) {
tk := testkit.NewTestKitWithInit(c, store)
tk.MustExec("use test_db;")
tk.MustExec("drop table if exists test_drop_indexes_if_exists;")
tk.MustExec("create table test_drop_indexes_if_exists (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));")

// Drop different indexes.
tk.MustGetErrMsg(
"alter table test_drop_indexes_if_exists drop index i1, drop index i3;",
"[ddl:1091]index i3 doesn't exist",
)
if _, err := tk.Exec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;"); true {
c.Assert(err, IsNil)
}
tk.MustQuery("show warnings;").Check(
testutil.RowsWithSep("|", "Warning|1091|index i3 doesn't exist"),
)

// Verify the impact of deletion order when dropping duplicate indexes.
tk.MustGetErrMsg(
"alter table test_drop_indexes_if_exists drop index i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
)
tk.MustGetErrMsg(
"alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
)
if _, err := tk.Exec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;"); true {
c.Assert(err, IsNil)
}
tk.MustQuery("show warnings;").Check(
testutil.RowsWithSep("|", "Warning|1091|index i2 doesn't exist"),
)
}

func testDropIndexesFromPartitionedTable(c *C, store kv.Storage) {
tk := testkit.NewTestKitWithInit(c, store)
tk.MustExec("use test_db;")
tk.MustExec("drop table if exists test_drop_indexes_from_partitioned_table;")
tk.MustExec(`
create table test_drop_indexes_from_partitioned_table (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2))
partition by range(id) (partition p0 values less than (6), partition p1 values less than maxvalue);
`)
for i := 0; i < 20; i++ {
tk.MustExec("insert into test_drop_indexes_from_partitioned_table values (?, ?, ?)", i, i, i)
}
if _, err := tk.Exec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;"); true {
c.Assert(err, IsNil)
}
}

func (s *testDBSuite5) TestDropIndexes(c *C) {
// drop multiple indexes
createSQL := "create table test_drop_indexes (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));"
dropIdxSQL := "alter table test_drop_indexes drop index i1, drop index i2;"
idxNames := []string{"i1", "i2"}
testDropIndexes(c, s.store, s.lease, createSQL, dropIdxSQL, idxNames)

createSQL = "create table test_drop_indexes (id int, c1 int, c2 int, primary key(id) nonclustered, unique key i1(c1), key i2(c2));"
dropIdxSQL = "alter table test_drop_indexes drop primary key, drop index i1;"
idxNames = []string{"primary", "i1"}
testDropIndexes(c, s.store, s.lease, createSQL, dropIdxSQL, idxNames)

createSQL = "create table test_drop_indexes (uuid varchar(32), c1 int, c2 int, primary key(uuid), unique key i1(c1), key i2(c2));"
dropIdxSQL = "alter table test_drop_indexes drop primary key, drop index i1, drop index i2;"
idxNames = []string{"primary", "i1", "i2"}
testDropIndexes(c, s.store, s.lease, createSQL, dropIdxSQL, idxNames)

testDropIndexesIfExists(c, s.store)
testDropIndexesFromPartitionedTable(c, s.store)
testCancelDropIndexes(c, s.store, s.dom.DDL())
}

// Close issue #24580.
// See https://github.com/pingcap/tidb/issues/24580
func (s *testDBSuite8) TestIssue24580(c *C) {
Expand Down
16 changes: 14 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}
}

if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}

logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID))
return nil
}
Expand All @@ -609,8 +616,13 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
logutil.BgLogger().Info("[ddl] DDL job is failed", zap.Int64("jobID", jobID))
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns) {
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return nil
}
Expand Down
Loading

0 comments on commit b35c7bc

Please sign in to comment.