diff --git a/ddl/backfilling.go b/ddl/backfilling.go index fac47c05f857e..45ba7a7f6597f 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -873,6 +873,7 @@ func (b *backfillScheduler) adjustWorkerSize() error { return err } runner = newBackfillWorker(jc.ddlJobCtx, i, partWorker) + worker = partWorker default: return errors.New("unknown backfill type") } diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index a33eec98e58ae..39b6a3454f409 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -4311,3 +4311,80 @@ func TestRegexpFunctionsGeneratedColumn(t *testing.T) { tk.MustExec("drop table if exists reg_like") } + +func TestReorgPartitionRangeFailure(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`create schema reorgfail`) + tk.MustExec("use reorgfail") + + tk.MustExec("CREATE TABLE t (id int, d varchar(255)) partition by range (id) (partition p0 values less than (1000000), partition p1 values less than (2000000), partition p2 values less than (3000000))") + tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (1000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions") + tk.MustContainErrMsg(`ALTER TABLE t REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (4000000))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions") +} + +func TestReorgPartitionDocs(t *testing.T) { + // To test what is added as partition management in the docs + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`create schema reorgdocs`) + tk.MustExec("use reorgdocs") + tk.MustExec(`CREATE TABLE members ( + id int, + fname varchar(255), + lname varchar(255), + dob date, + data json +) +PARTITION BY RANGE (YEAR(dob)) ( + PARTITION pBefore1950 VALUES LESS THAN (1950), + PARTITION p1950 VALUES LESS THAN (1960), + PARTITION p1960 VALUES LESS THAN (1970), + PARTITION p1970 VALUES LESS THAN (1980), + PARTITION p1980 VALUES LESS THAN (1990), + PARTITION p1990 VALUES LESS THAN (2000))`) + tk.MustExec(`CREATE TABLE member_level ( + id int, + level int, + achievements json +) +PARTITION BY LIST (level) ( + PARTITION l1 VALUES IN (1), + PARTITION l2 VALUES IN (2), + PARTITION l3 VALUES IN (3), + PARTITION l4 VALUES IN (4), + PARTITION l5 VALUES IN (5));`) + tk.MustExec(`ALTER TABLE members DROP PARTITION p1990`) + tk.MustExec(`ALTER TABLE member_level DROP PARTITION l5`) + tk.MustExec(`ALTER TABLE members TRUNCATE PARTITION p1980`) + tk.MustExec(`ALTER TABLE member_level TRUNCATE PARTITION l4`) + tk.MustExec("ALTER TABLE members ADD PARTITION (PARTITION `p1990to2010` VALUES LESS THAN (2010))") + tk.MustExec(`ALTER TABLE member_level ADD PARTITION (PARTITION l5_6 VALUES IN (5,6))`) + tk.MustContainErrMsg(`ALTER TABLE members ADD PARTITION (PARTITION p1990 VALUES LESS THAN (2000))`, "[ddl:1493]VALUES LESS THAN value must be strictly increasing for each partition") + tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p1990to2010 INTO +(PARTITION p1990 VALUES LESS THAN (2000), + PARTITION p2000 VALUES LESS THAN (2010), + PARTITION p2010 VALUES LESS THAN (2020), + PARTITION p2020 VALUES LESS THAN (2030), + PARTITION pMax VALUES LESS THAN (MAXVALUE))`) + tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l5_6 INTO +(PARTITION l5 VALUES IN (5), + PARTITION l6 VALUES IN (6))`) + tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1950,p1950 INTO (PARTITION pBefore1960 VALUES LESS THAN (1960))`) + tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1,l2 INTO (PARTITION l1_2 VALUES IN (1,2))`) + tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION pBefore1960,p1960,p1970,p1980,p1990,p2000,p2010,p2020,pMax INTO +(PARTITION p1800 VALUES LESS THAN (1900), + PARTITION p1900 VALUES LESS THAN (2000), + PARTITION p2000 VALUES LESS THAN (2100))`) + tk.MustExec(`ALTER TABLE member_level REORGANIZE PARTITION l1_2,l3,l4,l5,l6 INTO +(PARTITION lOdd VALUES IN (1,3,5), + PARTITION lEven VALUES IN (2,4,6))`) + tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p1800,p2000 INTO (PARTITION p2000 VALUES LESS THAN (2100))`, "[ddl:8200]Unsupported REORGANIZE PARTITION of RANGE; not adjacent partitions") + tk.MustExec(`INSERT INTO members VALUES (313, "John", "Doe", "2022-11-22", NULL)`) + tk.MustExec(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2050))`) + // TODO: uncomment this: + //tk.MustContainErrMsg(`ALTER TABLE members REORGANIZE PARTITION p2000 INTO (PARTITION p2000 VALUES LESS THAN (2020))`, "[table:1526]Table has no partition for value 2022") + tk.MustExec(`INSERT INTO member_level (id, level) values (313, 6)`) + // TODO: uncomment this: + //tk.MustContainErrMsg(`ALTER TABLE member_level REORGANIZE PARTITION lEven INTO (PARTITION lEven VALUES IN (2,4))`, "[table:1526]Table has no partition for value 6") +} diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index e2831e33abaa6..ad46b2cb96a4a 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -53,6 +53,56 @@ import ( "go.uber.org/zap" ) +type allTableData struct { + keys [][]byte + vals [][]byte + tp []string +} + +func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable table.PhysicalTable) allTableData { + require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) + txn, err := ctx.Txn(true) + require.NoError(t, err) + defer func() { + err := txn.Rollback() + require.NoError(t, err) + }() + + all := allTableData{ + keys: make([][]byte, 0), + vals: make([][]byte, 0), + tp: make([]string, 0), + } + pid := physTable.GetPhysicalID() + prefix := tablecodec.EncodeTablePrefix(pid) + it, err := txn.Iter(prefix, nil) + require.NoError(t, err) + for it.Valid() { + if !it.Key().HasPrefix(prefix) { + break + } + all.keys = append(all.keys, it.Key()) + all.vals = append(all.vals, it.Value()) + if tablecodec.IsRecordKey(it.Key()) { + all.tp = append(all.tp, "Record") + tblID, kv, _ := tablecodec.DecodeRecordKey(it.Key()) + require.Equal(t, pid, tblID) + vals, _ := tablecodec.DecodeValuesBytesToStrings(it.Value()) + logutil.BgLogger().Info("Record", + zap.Int64("pid", tblID), + zap.Stringer("key", kv), + zap.Strings("values", vals)) + } else if tablecodec.IsIndexKey(it.Key()) { + all.tp = append(all.tp, "Index") + } else { + all.tp = append(all.tp, "Other") + } + err = it.Next() + require.NoError(t, err) + } + return all +} + func checkGlobalIndexCleanUpDone(t *testing.T, ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, pid int64) int { require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) txn, err := ctx.Txn(true) @@ -4525,6 +4575,218 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) { ` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`)) } +func TestReorganizeRangePartition(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database ReorgPartition") + tk.MustExec("use ReorgPartition") + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b)) partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + tk.MustQuery(`select * from t where c < 40`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "12 12 21", + "23 23 32")) + tk.MustExec(`alter table t reorganize partition pMax into (partition p2 values less than (30), partition pMax values less than (MAXVALUE))`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (30),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "12 12 21", + "23 23 32", + "34 34 43", + "45 45 54", + "56 56 65")) + tk.MustQuery(`select * from t partition (p0)`).Sort().Check(testkit.Rows("" + + "1 1 1")) + tk.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("" + + "12 12 21")) + tk.MustQuery(`select * from t partition (p2)`).Sort().Check(testkit.Rows("" + + "23 23 32")) + tk.MustQuery(`select * from t partition (pMax)`).Sort().Check(testkit.Rows(""+ + "34 34 43", + "45 45 54", + "56 56 65")) + tk.MustQuery(`select * from t where b > "1"`).Sort().Check(testkit.Rows(""+ + "12 12 21", + "23 23 32", + "34 34 43", + "45 45 54", + "56 56 65")) + tk.MustQuery(`select * from t where c < 40`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "12 12 21", + "23 23 32")) + tk.MustExec(`alter table t reorganize partition p2,pMax into (partition p2 values less than (35),partition p3 values less than (47), partition pMax values less than (MAXVALUE))`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "12 12 21", + "23 23 32", + "34 34 43", + "45 45 54", + "56 56 65")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t partition (p0)`).Sort().Check(testkit.Rows("" + + "1 1 1")) + tk.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows("" + + "12 12 21")) + tk.MustQuery(`select * from t partition (p2)`).Sort().Check(testkit.Rows(""+ + "23 23 32", + "34 34 43")) + tk.MustQuery(`select * from t partition (p3)`).Sort().Check(testkit.Rows("" + + "45 45 54")) + tk.MustQuery(`select * from t partition (pMax)`).Sort().Check(testkit.Rows("" + + "56 56 65")) + tk.MustExec(`alter table t reorganize partition p0,p1 into (partition p1 values less than (20))`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "12 12 21", + "23 23 32", + "34 34 43", + "45 45 54", + "56 56 65")) + tk.MustExec(`alter table t drop index b`) + tk.MustExec(`alter table t drop index c`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustExec(`create table t2 (a int unsigned not null, b varchar(255), c int, key (b), key (c,b)) partition by range (a) ` + + "(PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))") + tk.MustExec(`insert into t2 select * from t`) + // Not allowed to change the start range! + tk.MustGetErrCode(`alter table t2 reorganize partition p2 into (partition p2a values less than (20), partition p2b values less than (36))`, + mysql.ErrRangeNotIncreasing) + // Not allowed to change the end range! + tk.MustGetErrCode(`alter table t2 reorganize partition p2 into (partition p2a values less than (30), partition p2b values less than (36))`, mysql.ErrRangeNotIncreasing) + tk.MustGetErrCode(`alter table t2 reorganize partition p2 into (partition p2a values less than (30), partition p2b values less than (34))`, mysql.ErrRangeNotIncreasing) + // Also not allowed to change from MAXVALUE to something else IF there are values in the removed range! + // TODO: uncomment this + //tk.MustContainErrMsg(`alter table t2 reorganize partition pMax into (partition p2b values less than (50))`, "[table:1526]Table has no partition for value 56") + tk.MustQuery(`show create table t2`).Check(testkit.Rows("" + + "t2 CREATE TABLE `t2` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + // But allowed to change from MAXVALUE if no existing values is outside the new range! + tk.MustExec(`alter table t2 reorganize partition pMax into (partition p4 values less than (90))`) + tk.MustQuery(`show create table t2`).Check(testkit.Rows("" + + "t2 CREATE TABLE `t2` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (35),\n" + + " PARTITION `p3` VALUES LESS THAN (47),\n" + + " PARTITION `p4` VALUES LESS THAN (90))")) +} + +func TestReorganizeListPartition(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database ReorgListPartition") + tk.MustExec("use ReorgListPartition") + tk.MustExec(`create table t (a int, b varchar(55), c int) partition by list (a)` + + ` (partition p1 values in (12,23,51,14), partition p2 values in (24,63), partition p3 values in (45))`) + tk.MustExec(`insert into t values (12,"12",21), (24,"24",42),(51,"51",15),(23,"23",32),(63,"63",36),(45,"45",54)`) + tk.MustExec(`alter table t reorganize partition p1 into (partition p0 values in (12,51,13), partition p1 values in (23))`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` varchar(55) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY LIST (`a`)\n" + + "(PARTITION `p0` VALUES IN (12,51,13),\n" + + " PARTITION `p1` VALUES IN (23),\n" + + " PARTITION `p2` VALUES IN (24,63),\n" + + " PARTITION `p3` VALUES IN (45))")) + tk.MustExec(`alter table t add primary key (a), add key (b), add key (c,b)`) + + // Note: MySQL cannot reorganize two non-consecutive list partitions :) + // ERROR 1519 (HY000): When reorganizing a set of partitions they must be in consecutive order + tk.MustExec(`alter table t reorganize partition p1, p3 into (partition pa values in (45,23,15))`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(55) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY LIST (`a`)\n" + + "(PARTITION `p0` VALUES IN (12,51,13),\n" + + " PARTITION `pa` VALUES IN (45,23,15),\n" + + " PARTITION `p2` VALUES IN (24,63))")) + tk.MustGetErrCode(`alter table t modify a varchar(20)`, errno.ErrUnsupportedDDLOperation) +} + func TestAlterModifyPartitionColTruncateWarning(t *testing.T) { t.Skip("waiting for supporting Modify Partition Column again") store := testkit.CreateMockStore(t) @@ -4598,19 +4860,341 @@ func TestDropPartitionKeyColumn(t *testing.T) { tk.MustExec("alter table t4 drop column b") } +type TestReorgDDLCallback struct { + *ddl.TestDDLCallback + syncChan chan bool +} + +func (tc *TestReorgDDLCallback) OnChanged(err error) error { + err = tc.TestDDLCallback.OnChanged(err) + <-tc.syncChan + // We want to wait here + <-tc.syncChan + return err +} + func TestReorgPartitionConcurrent(t *testing.T) { - t.Skip("Needs PR 38460 as well") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) schemaName := "ReorgPartConcurrent" tk.MustExec("create database " + schemaName) tk.MustExec("use " + schemaName) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (1,"1",1), (10,"10",10),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + dom := domain.GetDomain(tk.Session()) + originHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(originHook) + syncOnChanged := make(chan bool) + defer close(syncOnChanged) + hook := &TestReorgDDLCallback{TestDDLCallback: &ddl.TestDDLCallback{Do: dom}, syncChan: syncOnChanged} + dom.DDL().SetHook(hook) + + wait := make(chan bool) + defer close(wait) + + currState := model.StateNone + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && + (job.SchemaState == model.StateDeleteOnly || + job.SchemaState == model.StateWriteOnly || + job.SchemaState == model.StateWriteReorganization || + job.SchemaState == model.StateDeleteReorganization) && + currState != job.SchemaState { + currState = job.SchemaState + <-wait + <-wait + } + } + alterErr := make(chan error, 1) + go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) + + wait <- true + // StateDeleteOnly + deleteOnlyInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + wait <- true + + // StateWriteOnly + wait <- true + tk.MustExec(`insert into t values (11, "11", 11),(12,"12",21)`) + writeOnlyInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + require.Equal(t, int64(1), writeOnlyInfoSchema.SchemaMetaVersion()-deleteOnlyInfoSchema.SchemaMetaVersion()) + deleteOnlyTbl, err := deleteOnlyInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + writeOnlyTbl, err := writeOnlyInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + writeOnlyParts := writeOnlyTbl.Meta().Partition + writeOnlyTbl.Meta().Partition = deleteOnlyTbl.Meta().Partition + // If not DeleteOnly is working, then this would show up when reorg is done + tk.MustExec(`delete from t where a = 11`) + tk.MustExec(`update t set b = "12b", c = 12 where a = 12`) + writeOnlyTbl.Meta().Partition = writeOnlyParts + wait <- true + + // StateWriteReorganization + wait <- true + tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) + writeReorgInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + wait <- true + + // StateDeleteReorganization + wait <- true + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "10 10 10", + "12 12b 12", + "14 14 14", + "15 15 15")) + deleteReorgInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + require.Equal(t, int64(1), deleteReorgInfoSchema.SchemaMetaVersion()-writeReorgInfoSchema.SchemaMetaVersion()) + tk.MustExec(`insert into t values (16, "16", 16)`) + oldTbl, err := writeReorgInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + partDef := oldTbl.Meta().Partition.Definitions[1] + require.Equal(t, "p1", partDef.Name.O) + rows := getNumRowsFromPartitionDefs(t, tk, oldTbl, oldTbl.Meta().Partition.Definitions[1:2]) + require.Equal(t, 5, rows) + currTbl, err := deleteReorgInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + currPart := currTbl.Meta().Partition + currTbl.Meta().Partition = oldTbl.Meta().Partition + tk.MustQuery(`select * from t where b = "16"`).Sort().Check(testkit.Rows("16 16 16")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t partition (p1)`).Sort().Check(testkit.Rows(""+ + "10 10 10", + "12 12b 12", + "14 14 14", + "15 15 15", + "16 16 16")) + currTbl.Meta().Partition = currPart + wait <- true + syncOnChanged <- true + // This reads the new schema (Schema update completed) + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "10 10 10", + "12 12b 12", + "14 14 14", + "15 15 15", + "16 16 16")) + newInfoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + require.Equal(t, int64(1), newInfoSchema.SchemaMetaVersion()-deleteReorgInfoSchema.SchemaMetaVersion()) + oldTbl, err = deleteReorgInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + partDef = oldTbl.Meta().Partition.Definitions[1] + require.Equal(t, "p1a", partDef.Name.O) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + newTbl, err := deleteReorgInfoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + newPart := newTbl.Meta().Partition + newTbl.Meta().Partition = oldTbl.Meta().Partition + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + newTbl.Meta().Partition = newPart + syncOnChanged <- true + require.NoError(t, <-alterErr) +} + +func TestReorgPartitionFailConcurrent(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartFailConcurrent" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + dom := domain.GetDomain(tk.Session()) + originHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(originHook) + hook := &ddl.TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + + wait := make(chan bool) + defer close(wait) + + // Test insert of duplicate key during copy phase + injected := false + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateWriteReorganization && !injected { + injected = true + <-wait + <-wait + } + } + alterErr := make(chan error, 1) + go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) + wait <- true + tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) + tk.MustGetErrCode(`insert into t values (11, "11", 11),(12,"duplicate PK 💥", 13)`, mysql.ErrDupEntry) + wait <- true + require.NoError(t, <-alterErr) + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "12 12 21", + "14 14 14", + "15 15 15")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1a` VALUES LESS THAN (15),\n" + + " PARTITION `p1b` VALUES LESS THAN (20),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + + // Test reorg of duplicate key + prevState := model.StateNone + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionReorganizePartition && + job.SchemaState == model.StateWriteReorganization && + job.SnapshotVer == 0 && + prevState != job.SchemaState { + prevState = job.SchemaState + <-wait + <-wait + } + if job.Type == model.ActionReorganizePartition && + job.SchemaState == model.StateDeleteReorganization && + prevState != job.SchemaState { + prevState = job.SchemaState + <-wait + <-wait + } + } + go backgroundExec(store, schemaName, "alter table t reorganize partition p1a,p1b into (partition p1a values less than (14), partition p1b values less than (17), partition p1c values less than (20))", alterErr) + wait <- true + infoSchema := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + tbl, err := infoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, 0, getNumRowsFromPartitionDefs(t, tk, tbl, tbl.Meta().Partition.AddingDefinitions)) + tk.MustExec(`delete from t where a = 14`) + tk.MustExec(`insert into t values (13, "13", 31),(14,"14b",14),(16, "16",16)`) + wait <- true + wait <- true + tbl, err = infoSchema.TableByName(model.NewCIStr(schemaName), model.NewCIStr("t")) + require.NoError(t, err) + require.Equal(t, 5, getNumRowsFromPartitionDefs(t, tk, tbl, tbl.Meta().Partition.AddingDefinitions)) + tk.MustExec(`delete from t where a = 15`) + tk.MustExec(`insert into t values (11, "11", 11),(15,"15b",15),(17, "17",17)`) + wait <- true + require.NoError(t, <-alterErr) + + tk.MustQuery(`select * from t where a between 10 and 22`).Sort().Check(testkit.Rows(""+ + "11 11 11", + "12 12 21", + "13 13 31", + "14 14b 14", + "15 15b 15", + "16 16 16", + "17 17 17")) + tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ + "11 11 11", + "12 12 21", + "14 14b 14", + "15 15b 15", + "16 16 16", + "17 17 17")) + tk.MustQuery(`select * from t where b between "10" and "22"`).Sort().Check(testkit.Rows(""+ + "11 11 11", + "12 12 21", + "13 13 31", + "14 14b 14", + "15 15b 15", + "16 16 16", + "17 17 17")) +} + +func getNumRowsFromPartitionDefs(t *testing.T, tk *testkit.TestKit, tbl table.Table, defs []model.PartitionDefinition) int { + ctx := tk.Session() + pt := tbl.GetPartitionedTable() + require.NotNil(t, pt) + cnt := 0 + for _, def := range defs { + data := getAllDataForPhysicalTable(t, ctx, pt.GetPartition(def.ID)) + require.True(t, len(data.keys) == len(data.vals)) + require.True(t, len(data.keys) == len(data.tp)) + for _, s := range data.tp { + if s == "Record" { + cnt++ + } + } + } + return cnt +} + +func TestReorgPartitionFailInject(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartFailInjectConcurrent" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + ` partition by range (a) ` + `(partition p0 values less than (10),` + ` partition p1 values less than (20),` + ` partition pMax values less than (MAXVALUE))`) tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + dom := domain.GetDomain(tk.Session()) originHook := dom.DDL().GetHook() defer dom.DDL().SetHook(originHook) @@ -4632,6 +5216,7 @@ func TestReorgPartitionConcurrent(t *testing.T) { go backgroundExec(store, schemaName, "alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))", alterErr) wait <- true tk.MustExec(`insert into t values (14, "14", 14),(15, "15",15)`) + tk.MustGetErrCode(`insert into t values (11, "11", 11),(12,"duplicate PK 💥", 13)`, mysql.ErrDupEntry) wait <- true require.NoError(t, <-alterErr) tk.MustQuery(`select * from t where c between 10 and 22`).Sort().Check(testkit.Rows(""+ @@ -4653,3 +5238,32 @@ func TestReorgPartitionConcurrent(t *testing.T) { " PARTITION `p1b` VALUES LESS THAN (20),\n" + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) } + +func TestReorgPartitionData(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartData" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`SET @@session.sql_mode = default`) + tk.MustExec(`create table t (a int PRIMARY KEY AUTO_INCREMENT, b varchar(255), c int, d datetime, key (b), key (c,b)) partition by range (a) (partition p1 values less than (0), partition p1M values less than (1000000))`) + tk.MustContainErrMsg(`insert into t values (0, "Zero value!", 0, '2022-02-30')`, "[table:1292]Incorrect datetime value: '2022-02-30' for column 'd' at row 1") + tk.MustExec(`SET @@session.sql_mode = 'ALLOW_INVALID_DATES,NO_AUTO_VALUE_ON_ZERO'`) + tk.MustExec(`insert into t values (0, "Zero value!", 0, '2022-02-30')`) + tk.MustQuery(`show warnings`).Check(testkit.Rows()) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("0 Zero value! 0 2022-02-30 00:00:00")) + tk.MustExec(`SET @@session.sql_mode = default`) + tk.MustExec(`alter table t reorganize partition p1M into (partition p0 values less than (1), partition p2M values less than (2000000))`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows("0 Zero value! 0 2022-02-30 00:00:00")) +} + +// TODO Test with/without PK, indexes, UK, virtual, virtual stored columns + +// How to test rollback? +// Create new table +// insert some data +// start reorganize partition +// pause and get the AddingPartition IDs for later use +// continue reorganize partition and fail or crash in points of interests +// check if there are any data to be read from the AddingPartition IDs +// check if the table structure is correct. diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 2e58510c1b4fd..0b8babf12a984 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3321,7 +3321,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast case ast.AlterTableCoalescePartitions: err = d.CoalescePartitions(sctx, ident, spec) case ast.AlterTableReorganizePartition: - err = errors.Trace(dbterror.ErrUnsupportedReorganizePartition) + err = d.ReorganizePartitions(sctx, ident, spec) case ast.AlterTableReorganizeFirstPartition: err = dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("MERGE FIRST PARTITION") case ast.AlterTableReorganizeLastPartition: @@ -3897,6 +3897,181 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec * return errors.Trace(err) } +// getReorganizedDefinitions return the definitions as they would look like after the REORGANIZE PARTITION is done. +func getReorganizedDefinitions(pi *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) []model.PartitionDefinition { + tmpDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)+len(pi.AddingDefinitions)-len(idMap)) + if pi.Type == model.PartitionTypeList { + replaced := false + for i := range pi.Definitions { + if _, ok := idMap[i]; ok { + if !replaced { + tmpDefs = append(tmpDefs, pi.AddingDefinitions...) + replaced = true + } + continue + } + tmpDefs = append(tmpDefs, pi.Definitions[i]) + } + if !replaced { + // For safety, for future non-partitioned table -> partitioned + tmpDefs = append(tmpDefs, pi.AddingDefinitions...) + } + return tmpDefs + } + // Range + tmpDefs = append(tmpDefs, pi.Definitions[:firstPartIdx]...) + tmpDefs = append(tmpDefs, pi.AddingDefinitions...) + if len(pi.Definitions) > (lastPartIdx + 1) { + tmpDefs = append(tmpDefs, pi.Definitions[lastPartIdx+1:]...) + } + return tmpDefs +} + +func getReplacedPartitionIDs(names []model.CIStr, pi *model.PartitionInfo) (int, int, map[int]struct{}, error) { + idMap := make(map[int]struct{}) + var firstPartIdx, lastPartIdx = -1, -1 + for _, name := range names { + partIdx := pi.FindPartitionDefinitionByName(name.L) + if partIdx == -1 { + return 0, 0, nil, errors.Trace(dbterror.ErrWrongPartitionName) + } + if _, ok := idMap[partIdx]; ok { + return 0, 0, nil, errors.Trace(dbterror.ErrSameNamePartition) + } + idMap[partIdx] = struct{}{} + if firstPartIdx == -1 { + firstPartIdx = partIdx + } else { + firstPartIdx = mathutil.Min[int](firstPartIdx, partIdx) + } + if lastPartIdx == -1 { + lastPartIdx = partIdx + } else { + lastPartIdx = mathutil.Max[int](lastPartIdx, partIdx) + } + } + if pi.Type == model.PartitionTypeRange { + if len(idMap) != (lastPartIdx - firstPartIdx + 1) { + return 0, 0, nil, errors.Trace(dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs( + "REORGANIZE PARTITION of RANGE; not adjacent partitions")) + } + } + + return firstPartIdx, lastPartIdx, idMap, nil +} + +// ReorganizePartitions reorganize one set of partitions to a new set of partitions. +func (d *ddl) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { + schema, t, err := d.getSchemaAndTableByIdent(ctx, ident) + if err != nil { + return errors.Trace(infoschema.ErrTableNotExists.FastGenByArgs(ident.Schema, ident.Name)) + } + + meta := t.Meta() + pi := meta.GetPartitionInfo() + if pi == nil { + return dbterror.ErrPartitionMgmtOnNonpartitioned + } + switch pi.Type { + case model.PartitionTypeRange, model.PartitionTypeList: + default: + return errors.Trace(dbterror.ErrUnsupportedReorganizePartition) + } + firstPartIdx, lastPartIdx, idMap, err := getReplacedPartitionIDs(spec.PartitionNames, pi) + if err != nil { + return errors.Trace(err) + } + partInfo, err := BuildAddedPartitionInfo(ctx, meta, spec) + if err != nil { + return errors.Trace(err) + } + if err = d.assignPartitionIDs(partInfo.Definitions); err != nil { + return errors.Trace(err) + } + if err = checkReorgPartitionDefs(ctx, meta, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { + return errors.Trace(err) + } + if err = handlePartitionPlacement(ctx, partInfo); err != nil { + return errors.Trace(err) + } + + tzName, tzOffset := ddlutil.GetTimeZone(ctx) + job := &model.Job{ + SchemaID: schema.ID, + TableID: meta.ID, + SchemaName: schema.Name.L, + TableName: t.Meta().Name.L, + Type: model.ActionReorganizePartition, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{spec.PartitionNames, partInfo}, + ReorgMeta: &model.DDLReorgMeta{ + SQLMode: ctx.GetSessionVars().SQLMode, + Warnings: make(map[errors.ErrorID]*terror.Error), + WarningsCount: make(map[errors.ErrorID]int64), + Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, + }, + } + + // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. + err = d.DoDDLJob(ctx, job) + err = d.callHookOnChanged(job, err) + return errors.Trace(err) +} + +func checkReorgPartitionDefs(ctx sessionctx.Context, tblInfo *model.TableInfo, partInfo *model.PartitionInfo, firstPartIdx, lastPartIdx int, idMap map[int]struct{}) error { + // partInfo contains only the new added partition, we have to combine it with the + // old partitions to check all partitions is strictly increasing. + pi := tblInfo.Partition + clonedMeta := tblInfo.Clone() + clonedMeta.Partition.AddingDefinitions = partInfo.Definitions + clonedMeta.Partition.Definitions = getReorganizedDefinitions(clonedMeta.Partition, firstPartIdx, lastPartIdx, idMap) + if err := checkPartitionDefinitionConstraints(ctx, clonedMeta); err != nil { + return errors.Trace(err) + } + if pi.Type == model.PartitionTypeRange { + if lastPartIdx == len(pi.Definitions)-1 { + // Last partition dropped, OK to change the end range + // Also includes MAXVALUE + return nil + } + // Check if the replaced end range is the same as before + lastAddingPartition := partInfo.Definitions[len(partInfo.Definitions)-1] + lastOldPartition := pi.Definitions[lastPartIdx] + if len(pi.Columns) > 0 { + newGtOld, err := checkTwoRangeColumns(ctx, &lastAddingPartition, &lastOldPartition, pi, tblInfo) + if err != nil { + return errors.Trace(err) + } + if newGtOld { + return errors.Trace(dbterror.ErrRangeNotIncreasing) + } + oldGtNew, err := checkTwoRangeColumns(ctx, &lastOldPartition, &lastAddingPartition, pi, tblInfo) + if err != nil { + return errors.Trace(err) + } + if oldGtNew { + return errors.Trace(dbterror.ErrRangeNotIncreasing) + } + return nil + } + + isUnsigned := isPartExprUnsigned(tblInfo) + currentRangeValue, _, err := getRangeValue(ctx, pi.Definitions[lastPartIdx].LessThan[0], isUnsigned) + if err != nil { + return errors.Trace(err) + } + newRangeValue, _, err := getRangeValue(ctx, partInfo.Definitions[len(partInfo.Definitions)-1].LessThan[0], isUnsigned) + if err != nil { + return errors.Trace(err) + } + + if currentRangeValue != newRangeValue { + return errors.Trace(dbterror.ErrRangeNotIncreasing) + } + } + return nil +} + // CoalescePartitions coalesce partitions can be used with a table that is partitioned by hash or key to reduce the number of partitions by number. func (d *ddl) CoalescePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error { is := d.infoCache.GetLatest() diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5c700f6273a3b..c8659fcb263b0 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -668,9 +668,10 @@ func (w *worker) unlockSeqNum(err error) { // DDLBackfillers contains the DDL need backfill step. var DDLBackfillers = map[model.ActionType]string{ - model.ActionAddIndex: "add_index", - model.ActionModifyColumn: "modify_column", - model.ActionDropIndex: "drop_index", + model.ActionAddIndex: "add_index", + model.ActionModifyColumn: "modify_column", + model.ActionDropIndex: "drop_index", + model.ActionReorganizePartition: "reorganize_partition", } func getDDLRequestSource(job *model.Job) string { @@ -1083,6 +1084,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = w.onFlashbackCluster(d, t, job) case model.ActionMultiSchemaChange: ver, err = onMultiSchemaChange(w, d, t, job) + case model.ActionReorganizePartition: + ver, err = w.onReorganizePartition(d, t, job) case model.ActionAlterTTLInfo: ver, err = onTTLInfoChange(d, t, job) case model.ActionAlterTTLRemove: diff --git a/ddl/partition.go b/ddl/partition.go index d2fd5f22ffbbc..b33bec550ba2a 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -166,7 +166,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v for _, p := range tblInfo.Partition.AddingDefinitions { ids = append(ids, p.ID) } - if err := alterTableLabelRule(job.SchemaName, tblInfo, ids); err != nil { + if _, err := alterTableLabelRule(job.SchemaName, tblInfo, ids); err != nil { job.State = model.JobStateCancelled return ver, err } @@ -231,14 +231,16 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v return ver, errors.Trace(err) } -func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) error { +// alterTableLabelRule updates Label Rules if they exists +// returns true if changed. +func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) (bool, error) { tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, schemaName, meta.Name.L) oldRule, err := infosync.GetLabelRules(context.TODO(), []string{tableRuleID}) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } if len(oldRule) == 0 { - return nil + return false, nil } r, ok := oldRule[tableRuleID] @@ -246,10 +248,10 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) rule := r.Reset(schemaName, meta.Name.L, "", ids...) err = infosync.PutLabelRule(context.TODO(), rule) if err != nil { - return errors.Wrapf(err, "failed to notify PD label rule") + return false, errors.Wrapf(err, "failed to notify PD label rule") } } - return nil + return true, nil } func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) { @@ -1312,6 +1314,28 @@ func checkAddPartitionNameUnique(tbInfo *model.TableInfo, pi *model.PartitionInf return nil } +func checkReorgPartitionNames(p *model.PartitionInfo, droppedNames []model.CIStr, pi *model.PartitionInfo) error { + partNames := make(map[string]struct{}) + oldDefs := p.Definitions + for _, oldDef := range oldDefs { + partNames[oldDef.Name.L] = struct{}{} + } + for _, delName := range droppedNames { + if _, ok := partNames[delName.L]; !ok { + return dbterror.ErrSameNamePartition.GenWithStackByArgs(delName) + } + delete(partNames, delName.L) + } + newDefs := pi.Definitions + for _, newDef := range newDefs { + if _, ok := partNames[newDef.Name.L]; ok { + return dbterror.ErrSameNamePartition.GenWithStackByArgs(newDef.Name) + } + partNames[newDef.Name.L] = struct{}{} + } + return nil +} + func checkAndOverridePartitionID(newTableInfo, oldTableInfo *model.TableInfo) error { // If any old partitionInfo has lost, that means the partition ID lost too, so did the data, repair failed. if newTableInfo.Partition == nil { @@ -1695,7 +1719,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Wrapf(err, "failed to notify PD the label rules") } - if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil { + if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil { job.State = model.JobStateCancelled return ver, err } @@ -1728,7 +1752,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Wrapf(err, "failed to notify PD the label rules") } - if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil { + if _, err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil { job.State = model.JobStateCancelled return ver, err } @@ -2164,6 +2188,315 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, nil } +func checkReorgPartition(t *meta.Meta, job *model.Job) (*model.TableInfo, []model.CIStr, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) { + schemaID := job.SchemaID + tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) + if err != nil { + return nil, nil, nil, nil, nil, errors.Trace(err) + } + partInfo := &model.PartitionInfo{} + var partNames []model.CIStr + err = job.DecodeArgs(&partNames, &partInfo) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, nil, nil, nil, errors.Trace(err) + } + addingDefs := tblInfo.Partition.AddingDefinitions + droppingDefs := tblInfo.Partition.DroppingDefinitions + if len(addingDefs) == 0 { + addingDefs = []model.PartitionDefinition{} + } + if len(droppingDefs) == 0 { + droppingDefs = []model.PartitionDefinition{} + } + return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil +} + +func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + // Handle the rolling back job + if job.IsRollingback() { + ver, err := w.onDropTablePartition(d, t, job) + if err != nil { + return ver, errors.Trace(err) + } + return ver, nil + } + + tblInfo, partNamesCIStr, partInfo, _, addingDefinitions, err := checkReorgPartition(t, job) + if err != nil { + return ver, err + } + partNames := make([]string, len(partNamesCIStr)) + for i := range partNamesCIStr { + partNames[i] = partNamesCIStr[i].L + } + + // In order to skip maintaining the state check in partitionDefinition, TiDB use dropping/addingDefinition instead of state field. + // So here using `job.SchemaState` to judge what the stage of this job is. + originalState := job.SchemaState + switch job.SchemaState { + case model.StateNone: + // job.SchemaState == model.StateNone means the job is in the initial state of reorg partition. + // Here should use partInfo from job directly and do some check action. + // In case there was a race for queueing different schema changes on the same + // table and the checks was not done on the current schema version. + // The partInfo may have been checked against an older schema version for example. + // If the check is done here, it does not need to be repeated, since no other + // DDL on the same table can be run concurrently. + err = checkAddPartitionTooManyPartitions(uint64(len(tblInfo.Partition.Definitions) + + len(partInfo.Definitions) - + len(partNames))) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkReorgPartitionNames(tblInfo.Partition, partNamesCIStr, partInfo) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // Re-check that the dropped/added partitions are compatible with current definition + firstPartIdx, lastPartIdx, idMap, err := getReplacedPartitionIDs(partNamesCIStr, tblInfo.Partition) + if err != nil { + job.State = model.JobStateCancelled + return ver, err + } + sctx := w.sess.Context + if err = checkReorgPartitionDefs(sctx, tblInfo, partInfo, firstPartIdx, lastPartIdx, idMap); err != nil { + job.State = model.JobStateCancelled + return ver, err + } + + // move the adding definition into tableInfo. + updateAddingPartitionInfo(partInfo, tblInfo) + orgDefs := tblInfo.Partition.Definitions + _ = updateDroppingPartitionInfo(tblInfo, partNames) + // Reset original partitions, and keep DroppedDefinitions + tblInfo.Partition.Definitions = orgDefs + + // modify placement settings + for _, def := range tblInfo.Partition.AddingDefinitions { + if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(t, job, def.PlacementPolicyRef); err != nil { + // job.State = model.JobStateCancelled may be set depending on error in function above. + return ver, errors.Trace(err) + } + } + + // From now on we cannot just cancel the DDL, we must roll back if changesMade! + changesMade := false + if tblInfo.TiFlashReplica != nil { + // Must set placement rule, and make sure it succeeds. + if err := infosync.ConfigureTiFlashPDForPartitions(true, &tblInfo.Partition.AddingDefinitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); err != nil { + logutil.BgLogger().Error("ConfigureTiFlashPDForPartitions fails", zap.Error(err)) + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + changesMade = true + // In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK + } + + bundles, err := alterTablePartitionBundles(t, tblInfo, tblInfo.Partition.AddingDefinitions) + if err != nil { + if !changesMade { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo) + } + + if len(bundles) > 0 { + if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { + if !changesMade { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + } + return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo) + } + changesMade = true + } + + ids := getIDs([]*model.TableInfo{tblInfo}) + for _, p := range tblInfo.Partition.AddingDefinitions { + ids = append(ids, p.ID) + } + changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids) + changesMade = changesMade || changed + if err != nil { + if !changesMade { + job.State = model.JobStateCancelled + return ver, err + } + return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo) + } + + // Doing the preSplitAndScatter here, since all checks are completed, + // and we will soon start writing to the new partitions. + if s, ok := d.store.(kv.SplittableStore); ok && s != nil { + // partInfo only contains the AddingPartitions + splitPartitionTableRegion(w.sess.Context, s, tblInfo, partInfo, true) + } + + // TODO: test... + // Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that. + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64)) + job.SchemaState = model.StateDeleteOnly + tblInfo.Partition.DDLState = model.StateDeleteOnly + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + + // Is really both StateDeleteOnly AND StateWriteOnly needed? + // If transaction A in WriteOnly inserts row 1 (into both new and old partition set) + // and then transaction B in DeleteOnly deletes that row (in both new and old) + // does really transaction B need to do the delete in the new partition? + // Yes, otherwise it would still be there when the WriteReorg happens, + // and WriteReorg would only copy existing rows to the new table, so unless it is + // deleted it would result in a ghost row! + // What about update then? + // Updates also need to be handled for new partitions in DeleteOnly, + // since it would not be overwritten during Reorganize phase. + // BUT if the update results in adding in one partition and deleting in another, + // THEN only the delete must happen in the new partition set, not the insert! + case model.StateDeleteOnly: + // This state is to confirm all servers can not see the new partitions when reorg is running, + // so that all deletes will be done in both old and new partitions when in either DeleteOnly + // or WriteOnly state. + // Also using the state for checking that the optional TiFlash replica is available, making it + // in a state without (much) data and easy to retry without side effects. + + // Reason for having it here, is to make it easy for retry, and better to make sure it is in-sync + // as early as possible, to avoid a long wait after the data copying. + if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { + // For available state, the new added partition should wait its replica to + // be finished, otherwise the query to this partition will be blocked. + count := tblInfo.TiFlashReplica.Count + needRetry, err := checkPartitionReplica(count, addingDefinitions, d) + if err != nil { + // need to rollback, since we tried to register the new + // partitions before! + return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo) + } + if needRetry { + // The new added partition hasn't been replicated. + // Do nothing to the job this time, wait next worker round. + time.Sleep(tiflashCheckTiDBHTTPAPIHalfInterval) + // Set the error here which will lead this job exit when it's retry times beyond the limitation. + return ver, errors.Errorf("[ddl] add partition wait for tiflash replica to complete") + } + + // When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`. + // Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public) + for _, d := range addingDefinitions { + tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID) + } + } + + job.SchemaState = model.StateWriteOnly + tblInfo.Partition.DDLState = model.StateWriteOnly + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64)) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + case model.StateWriteOnly: + // Insert this state to confirm all servers can see the new partitions when reorg is running, + // so that new data will be updated in both old and new partitions when reorganizing. + job.SnapshotVer = 0 + job.SchemaState = model.StateWriteReorganization + tblInfo.Partition.DDLState = model.StateWriteReorganization + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64)) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + case model.StateWriteReorganization: + physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) + tbl, err2 := getTable(d.store, job.SchemaID, tblInfo) + if err2 != nil { + return ver, errors.Trace(err2) + } + // TODO: If table has global indexes, we need reorg to clean up them. + // and then add the new partition ids back... + if _, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) { + err = errors.Trace(dbterror.ErrCancelledDDLJob.GenWithStack("global indexes is not supported yet for reorganize partition")) + return convertAddTablePartitionJob2RollbackJob(d, t, job, err, tblInfo) + } + var done bool + done, ver, err = doPartitionReorgWork(w, d, t, job, tbl, physicalTableIDs) + + if !done { + return ver, err + } + + firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNamesCIStr, tblInfo.Partition) + failpoint.Inject("reorgPartWriteReorgReplacedPartIDsFail", func(val failpoint.Value) { + if val.(bool) { + err2 = errors.New("Injected error by reorgPartWriteReorgReplacedPartIDsFail") + } + }) + if err2 != nil { + return ver, err2 + } + newDefs := getReorganizedDefinitions(tblInfo.Partition, firstPartIdx, lastPartIdx, idMap) + + // From now on, use the new definitions, but keep the Adding and Dropping for double write + tblInfo.Partition.Definitions = newDefs + tblInfo.Partition.Num = uint64(len(newDefs)) + + // TODO: How do we handle the table schema change for Adding and Dropping Definitions? + + // Now all the data copying is done, but we cannot simply remove the droppingDefinitions + // since they are a part of the normal Definitions that other nodes with + // the current schema version. So we need to double write for one more schema version + job.SchemaState = model.StateDeleteReorganization + tblInfo.Partition.DDLState = model.StateDeleteReorganization + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != job.SchemaState) + + case model.StateDeleteReorganization: + // Drop the droppingDefinitions and finish the DDL + // This state is needed for the case where client A sees the schema + // with version of StateWriteReorg and would not see updates of + // client B that writes to the new partitions, previously + // addingDefinitions, since it would not double write to + // the droppingDefinitions during this time + // By adding StateDeleteReorg state, client B will write to both + // the new (previously addingDefinitions) AND droppingDefinitions + // TODO: Make sure the dropLabelRules are done both if successful (droppingDefinitions) or if rollback (addingDefinitions) + // TODO: Make sure stats is handled (eventually dropped for old partitions, and added for new?) + // Hmm, maybe we should actually update the stats here as well? + // Can we collect the stats while doing the reorg? + + // Register the droppingDefinitions ids for rangeDelete + // and the addingDefinitions for handling in the updateSchemaVersion + physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) + newIDs := getPartitionIDsFromDefinitions(partInfo.Definitions) + job.CtxVars = []interface{}{physicalTableIDs, newIDs} + definitionsToDrop := tblInfo.Partition.DroppingDefinitions + tblInfo.Partition.DroppingDefinitions = nil + tblInfo.Partition.AddingDefinitions = nil + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true) + failpoint.Inject("reorgPartWriteReorgSchemaVersionUpdateFail", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartWriteReorgSchemaVersionUpdateFail") + } + }) + if err != nil { + return ver, errors.Trace(err) + } + job.SchemaState = model.StateNone + tblInfo.Partition.DDLState = model.StateNone + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + // How to handle this? + // Seems to only trigger asynchronous update of statistics. + // Should it actually be synchronous? + asyncNotifyEvent(d, &util.Event{Tp: model.ActionReorganizePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: definitionsToDrop}}) + // A background job will be created to delete old partition data. + job.Args = []interface{}{physicalTableIDs} + + default: + err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("partition", job.SchemaState) + } + + return ver, errors.Trace(err) +} + func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, physTblIDs []int64) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn sctx, err1 := w.sessPool.get() @@ -2203,7 +2536,7 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb zap.String("job", job.String()), zap.Error(err1)) } logutil.BgLogger().Warn("[ddl] reorg partition job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) - job.State = model.JobStateRollingback + ver, err = convertAddTablePartitionJob2RollbackJob(d, t, job, err, tbl.Meta()) return false, ver, errors.Trace(err) } return true, ver, err @@ -2434,6 +2767,13 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) } } + failpoint.Inject("reorgPartitionAfterDataCopy", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) { + panic("panic test in reorgPartitionAfterDataCopy") + } + }) + // Rewrite this to do all indexes at once in addTableIndex // instead of calling it once per index (meaning reading the table multiple times) // But for now, try to understand how it works... @@ -2495,6 +2835,7 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) zap.Int64("jobID", reorgInfo.Job.ID), zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID), + zap.Int64("partitionTableId", physTbl.GetPhysicalID()), zap.String("startHandle", hex.EncodeToString(reorgInfo.StartKey)), zap.String("endHandle", hex.EncodeToString(reorgInfo.EndKey))) if err != nil { @@ -2506,6 +2847,12 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) } reorgInfo.PhysicalTableID = firstNewPartitionID } + failpoint.Inject("reorgPartitionAfterIndex", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) { + panic("panic test in reorgPartitionAfterIndex") + } + }) return nil } diff --git a/ddl/split_region.go b/ddl/split_region.go index b201bf65538f3..ffbcb7439292d 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -31,7 +31,7 @@ import ( ) func splitPartitionTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *model.TableInfo, pi *model.PartitionInfo, scatter bool) { - // Max partition count is 4096, should we sample and just choose some of the partition to split? + // Max partition count is 8192, should we sample and just choose some partitions to split? regionIDs := make([]uint64, 0, len(pi.Definitions)) ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() diff --git a/parser/model/model.go b/parser/model/model.go index 70dff656796a6..bceb529bdbe6b 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -758,6 +758,9 @@ func (t *TableInfo) Clone() *TableInfo { nt.ForeignKeys[i] = t.ForeignKeys[i].Clone() } + if t.Partition != nil { + nt.Partition = t.Partition.Clone() + } if t.TTLInfo != nil { nt.TTLInfo = t.TTLInfo.Clone() } @@ -1194,6 +1197,8 @@ type PartitionInfo struct { DroppingDefinitions []PartitionDefinition `json:"dropping_definitions"` States []PartitionState `json:"states"` Num uint64 `json:"num"` + // Only used during ReorganizePartition so far + DDLState SchemaState `json:"ddl_state"` } // Clone clones itself. @@ -1328,15 +1333,15 @@ func (ci *PartitionDefinition) MemoryUsage() (sum int64) { } // FindPartitionDefinitionByName finds PartitionDefinition by name. -func (t *TableInfo) FindPartitionDefinitionByName(partitionDefinitionName string) *PartitionDefinition { +func (pi *PartitionInfo) FindPartitionDefinitionByName(partitionDefinitionName string) int { lowConstrName := strings.ToLower(partitionDefinitionName) - definitions := t.Partition.Definitions + definitions := pi.Definitions for i := range definitions { if definitions[i].Name.L == lowConstrName { - return &t.Partition.Definitions[i] + return i } } - return nil + return -1 } // IndexColumn provides index column info. diff --git a/table/tables/partition.go b/table/tables/partition.go index 2c8112053cf9e..bce84db247440 100644 --- a/table/tables/partition.go +++ b/table/tables/partition.go @@ -76,11 +76,16 @@ func (p *partition) GetPhysicalID() int64 { return p.physicalTableID } -// GetPartitionedTable implements table.Table GetPhysicalID interface. +// GetPartitionedTable implements table.Table GetPartitionedTable interface. func (p *partition) GetPartitionedTable() table.PartitionedTable { return p.table } +// GetPartitionedTable implements table.Table GetPartitionedTable interface. +func (t *partitionedTable) GetPartitionedTable() table.PartitionedTable { + return t +} + // partitionedTable implements the table.PartitionedTable interface. // partitionedTable is a table, it contains many Partitions. type partitionedTable struct { @@ -89,11 +94,17 @@ type partitionedTable struct { partitions map[int64]*partition evalBufferTypes []*types.FieldType evalBufferPool sync.Pool + // Only used during Reorganize partition - reorgPartitions map[int64]interface{} - reorgPartitionExpr *PartitionExpr + // reorganizePartitions is the currently used partitions that are reorganized + reorganizePartitions map[int64]interface{} + // doubleWriteParittions are the partitions not visible, but we should double write to + doubleWritePartitions map[int64]interface{} + reorgPartitionExpr *PartitionExpr } +// TODO: Check which data structures that can be shared between all partitions and which +// needs to be copies func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.PartitionedTable, error) { pi := tblInfo.GetPartitionInfo() if pi == nil || len(pi.Definitions) == 0 { @@ -125,19 +136,98 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Part partitions[p.ID] = &t } ret.partitions = partitions - if len(pi.DroppingDefinitions) > 0 && len(pi.AddingDefinitions) > 0 { - ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.AddingDefinitions) + // In StateWriteReorganization we are using the 'old' partition definitions + // and if any new change happens in DroppingDefinitions, it needs to be done + // also in AddingDefinitions (with new evaluation of the new expression) + // In StateDeleteReorganization we are using the 'new' partition definitions + // and if any new change happens in AddingDefinitions, it needs to be done + // also in DroppingDefinitions (since session running on schema version -1) + // should also see the changes + if pi.DDLState == model.StateDeleteReorganization { + origIdx := setIndexesState(ret, pi.DDLState) + defer unsetIndexesState(ret, origIdx) + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.DroppingDefinitions) if err != nil { return nil, errors.Trace(err) } - ret.reorgPartitions = make(map[int64]interface{}, len(pi.DroppingDefinitions)) + ret.reorganizePartitions = make(map[int64]interface{}, len(pi.AddingDefinitions)) + for _, def := range pi.AddingDefinitions { + ret.reorganizePartitions[def.ID] = nil + } + // TODO: Test decreasing end range and concurrently insert in the gap + // TODO: Test increasing end range and concurrently insert into the gap + ret.doubleWritePartitions = make(map[int64]interface{}, len(pi.DroppingDefinitions)) for _, def := range pi.DroppingDefinitions { - ret.reorgPartitions[def.ID] = nil + p, err := initPartition(ret, def) + if err != nil { + return nil, err + } + partitions[def.ID] = p + ret.doubleWritePartitions[def.ID] = nil + } + } else { + if len(pi.AddingDefinitions) > 0 { + origIdx := setIndexesState(ret, pi.DDLState) + defer unsetIndexesState(ret, origIdx) + ret.reorgPartitionExpr, err = newPartitionExpr(tblInfo, pi.AddingDefinitions) + if err != nil { + return nil, errors.Trace(err) + } + ret.doubleWritePartitions = make(map[int64]interface{}, len(pi.AddingDefinitions)) + for _, def := range pi.AddingDefinitions { + ret.doubleWritePartitions[def.ID] = nil + p, err := initPartition(ret, def) + if err != nil { + return nil, err + } + partitions[def.ID] = p + } + } + if len(pi.DroppingDefinitions) > 0 { + ret.reorganizePartitions = make(map[int64]interface{}, len(pi.DroppingDefinitions)) + for _, def := range pi.DroppingDefinitions { + ret.reorganizePartitions[def.ID] = nil + } } } return ret, nil } +func setIndexesState(t *partitionedTable, state model.SchemaState) []*model.IndexInfo { + orig := t.meta.Indices + t.meta.Indices = make([]*model.IndexInfo, 0, len(orig)) + for i := range orig { + t.meta.Indices = append(t.meta.Indices, orig[i].Clone()) + if t.meta.Indices[i].State == model.StatePublic { + switch state { + case model.StateDeleteOnly, model.StateNone: + t.meta.Indices[i].State = model.StateDeleteOnly + case model.StatePublic: + // Keep as is + default: + // use the 'StateWriteReorganization' here, since StateDeleteReorganization + // would skip index writes. + t.meta.Indices[i].State = model.StateWriteReorganization + } + } + } + return orig +} + +func unsetIndexesState(t *partitionedTable, orig []*model.IndexInfo) { + t.meta.Indices = orig +} + +func initPartition(t *partitionedTable, def model.PartitionDefinition) (*partition, error) { + var newPart partition + err := initTableCommonWithIndices(&newPart.TableCommon, t.meta, def.ID, t.Columns, t.allocs) + if err != nil { + return nil, err + } + newPart.table = t + return &newPart, nil +} + func newPartitionExpr(tblInfo *model.TableInfo, defs []model.PartitionDefinition) (*PartitionExpr, error) { // a partitioned table cannot rely on session context/sql modes, so use a default one! ctx := mock.NewContext() @@ -1067,6 +1157,9 @@ func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []type if err != nil { return 0, errors.Trace(err) } + if pi.DDLState == model.StateDeleteReorganization { + return pi.DroppingDefinitions[idx].ID, nil + } return pi.AddingDefinitions[idx].ID, nil } @@ -1214,33 +1307,15 @@ func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model // GetPartition returns a Table, which is actually a partition. func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable { - // Attention, can't simply use `return t.partitions[pid]` here. + // Attention, can't simply use `return p.partitions[pid]` here. // Because A nil of type *partition is a kind of `table.PhysicalTable` - p, ok := t.partitions[pid] + part, ok := t.partitions[pid] if !ok { - // We might want an old or new partition. - pi := t.meta.Partition - for _, defs := range [][]model.PartitionDefinition{ - pi.AddingDefinitions, - pi.DroppingDefinitions, - } { - for _, def := range defs { - if pid != def.ID { - continue - } - var newPart partition - err := initTableCommonWithIndices(&newPart.TableCommon, t.meta, def.ID, t.Columns, t.allocs) - if err != nil { - return nil - } - newPart.table = t - t.partitions[pid] = &newPart - return &newPart - } - } - return nil + // TODO: remove and just keep return nil + panic("MJONSS: How did we get here?") + //return nil } - return p + return part } // GetReorganizedPartitionedTable returns the same table @@ -1258,7 +1333,7 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro tblInfo.Partition.AddingDefinitions = nil tblInfo.Partition.DroppingDefinitions = nil var tc TableCommon - tc.meta = tblInfo + initTableCommon(&tc, tblInfo, tblInfo.ID, t.Cols(), t.Allocators(nil)) // and rebuild the partitioning structure @@ -1307,7 +1382,7 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r [] if err != nil { return } - if _, ok := t.reorgPartitions[pid]; ok { + if _, ok := t.reorganizePartitions[pid]; ok { // Double write to the ongoing reorganized partition pid, err = t.locateReorgPartition(ctx, r) if err != nil { @@ -1367,7 +1442,7 @@ func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r [ return errors.Trace(err) } - if _, ok := t.reorgPartitions[pid]; ok { + if _, ok := t.reorganizePartitions[pid]; ok { pid, err = t.locateReorgPartition(ctx, r) if err != nil { return errors.Trace(err) @@ -1384,6 +1459,9 @@ func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r [ func (t *partitionedTable) GetAllPartitionIDs() []int64 { ptIDs := make([]int64, 0, len(t.partitions)) for id := range t.partitions { + if _, ok := t.doubleWritePartitions[id]; ok { + continue + } ptIDs = append(ptIDs, id) } return ptIDs @@ -1446,33 +1524,34 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, // TODO: Test if the update is in different partitions before reorg, // but is now in the same during the reorg? And vice-versa... // What if the change is in the same reorged partition?!? - newTo, newFrom := int64(-1), int64(-1) - if _, ok := t.reorgPartitions[to]; ok { + newTo, newFrom := int64(0), int64(0) + if _, ok := t.reorganizePartitions[to]; ok { newTo, err = t.locateReorgPartition(ctx, newData) // There might be valid cases when errors should be accepted? if err != nil { return errors.Trace(err) } } - if _, ok := t.reorgPartitions[from]; ok { + if _, ok := t.reorganizePartitions[from]; ok { newFrom, err = t.locateReorgPartition(ctx, currData) // There might be valid cases when errors should be accepted? if err != nil { return errors.Trace(err) } } - if newTo == newFrom && newTo != -1 { + if newTo == newFrom && newTo != 0 { + // Update needs to be done in StateDeleteOnly as well tbl := t.GetPartition(newTo) return tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) } - if newTo != -1 { + if newTo != 0 && t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { tbl := t.GetPartition(newTo) _, err = tbl.AddRecord(ctx, newData) if err != nil { return errors.Trace(err) } } - if newFrom != -1 { + if newFrom != 0 { tbl := t.GetPartition(newFrom) err = tbl.RemoveRecord(ctx, h, currData) // How to handle error, which can happen when the data is not yet backfilled @@ -1488,7 +1567,7 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, if err != nil { return errors.Trace(err) } - if _, ok := t.reorgPartitions[to]; ok { + if _, ok := t.reorganizePartitions[to]; ok { // Even if to == from, in the reorganized partitions they may differ // like in case of a split newTo, err := t.locateReorgPartition(ctx, newData) @@ -1500,6 +1579,7 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, return errors.Trace(err) } if newTo == newFrom { + // Update needs to be done in StateDeleteOnly as well tbl = t.GetPartition(newTo) err = tbl.UpdateRecord(gctx, ctx, h, currData, newData, touched) if err != nil { @@ -1507,11 +1587,13 @@ func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, } return nil } - tbl = t.GetPartition(newTo) - _, err = tbl.AddRecord(ctx, newData) - // TODO: Could there be a case where a duplicate unique key could happen here? - if err != nil { - return errors.Trace(err) + if t.Meta().GetPartitionInfo().DDLState != model.StateDeleteOnly { + tbl = t.GetPartition(newTo) + _, err = tbl.AddRecord(ctx, newData) + // TODO: Could there be a case where a duplicate unique key could happen here? + if err != nil { + return errors.Trace(err) + } } tbl = t.GetPartition(newFrom) err = tbl.RemoveRecord(ctx, h, currData) diff --git a/table/tables/tables.go b/table/tables/tables.go index e32328ce8fae9..b478478d2c087 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -57,6 +57,7 @@ import ( // TableCommon is shared by both Table and partition. type TableCommon struct { + // TODO: Why do we need tableID, when it is already in meta.ID ? tableID int64 // physicalTableID is a unique int64 to identify a physical table. physicalTableID int64 @@ -1309,9 +1310,32 @@ func (t *TableCommon) removeRowData(ctx sessionctx.Context, h kv.Handle) error { } } }) - err = txn.SetAssertion(key, kv.SetAssertExist) - if err != nil { - return err + doAssert := true + p := t.Meta().Partition + if p != nil { + // This disables asserting during Reorganize Partition. + switch ctx.GetSessionVars().AssertionLevel { + case variable.AssertionLevelFast: + // Fast option, just skip assertion for all partitions. + if p.DDLState != model.StateNone && p.DDLState != model.StatePublic { + doAssert = false + } + case variable.AssertionLevelStrict: + // Strict, only disable assertion for intermediate partitions. + // If there were an easy way to get from a TableCommon back to the partitioned table... + for i := range p.AddingDefinitions { + if t.physicalTableID == p.AddingDefinitions[i].ID { + doAssert = false + break + } + } + } + } + if doAssert { + err = txn.SetAssertion(key, kv.SetAssertExist) + if err != nil { + return err + } } return txn.Delete(key) }