diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 3b73297fff7c0..bfa8c2c3c4abc 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "math/rand" + "sort" "strings" "sync" "sync/atomic" @@ -4315,6 +4316,92 @@ func TestAddPartitionReplicaBiggerThanTiFlashStores(t *testing.T) { require.Equal(t, "[ddl:-1]DDL job rollback, error msg: [ddl] add partition wait for tiflash replica to complete", err.Error()) } +func TestReorgPartitionTiFlash(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "ReorgPartTiFlash" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by list columns (a) ` + + `(partition p0 values in (10,11,45),` + + ` partition p1 values in (20,1,23,56),` + + ` partition p2 values in (12,34,9))`) + tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) + + require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`)) + defer func() { + err := failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount") + require.NoError(t, err) + }() + + tk.MustExec(`alter table t set tiflash replica 1`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY LIST COLUMNS(`a`)\n" + + "(PARTITION `p0` VALUES IN (10,11,45),\n" + + " PARTITION `p1` VALUES IN (20,1,23,56),\n" + + " PARTITION `p2` VALUES IN (12,34,9))")) + + tbl := external.GetTableByName(t, tk, schemaName, "t") + p := tbl.GetPartitionedTable() + for _, pid := range p.GetAllPartitionIDs() { + require.NoError(t, domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), pid, true)) + } + // Reload + tbl = external.GetTableByName(t, tk, schemaName, "t") + p = tbl.GetPartitionedTable() + require.NotNil(t, tbl.Meta().TiFlashReplica) + require.True(t, tbl.Meta().TiFlashReplica.Available) + pids := p.GetAllPartitionIDs() + sort.Slice(pids, func(i, j int) bool { return pids[i] < pids[j] }) + availablePids := tbl.Meta().TiFlashReplica.AvailablePartitionIDs + sort.Slice(availablePids, func(i, j int) bool { return availablePids[i] < availablePids[j] }) + require.Equal(t, pids, availablePids) + require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockWaitTiFlashReplicaOK", `return(true)`)) + defer func() { + err := failpoint.Disable("github.com/pingcap/tidb/ddl/mockWaitTiFlashReplicaOK") + require.NoError(t, err) + }() + tk.MustExec(`alter table t reorganize partition p1, p2 into (partition p1 values in (34,2,23), partition p2 values in (12,56,9),partition p3 values in (1,8,19))`) + tk.MustExec(`admin check table t`) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`),\n" + + " KEY `c` (`c`,`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY LIST COLUMNS(`a`)\n" + + "(PARTITION `p0` VALUES IN (10,11,45),\n" + + " PARTITION `p1` VALUES IN (34,2,23),\n" + + " PARTITION `p2` VALUES IN (12,56,9),\n" + + " PARTITION `p3` VALUES IN (1,8,19))")) + + // TODO: Check how to properly test TiFlash, since this will just change the actual configuration + tbl = external.GetTableByName(t, tk, schemaName, "t") + p = tbl.GetPartitionedTable() + for _, pid := range p.GetAllPartitionIDs() { + require.NoError(t, domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), pid, true)) + } + tbl = external.GetTableByName(t, tk, schemaName, "t") + p = tbl.GetPartitionedTable() + require.NotNil(t, tbl.Meta().TiFlashReplica) + require.True(t, tbl.Meta().TiFlashReplica.Available) + for _, pid := range p.GetAllPartitionIDs() { + require.True(t, tbl.Meta().TiFlashReplica.IsPartitionAvailable(pid)) + } +} + func TestDuplicatePartitionNames(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/partition.go b/ddl/partition.go index b6cca85cc0899..52a12a443d4ea 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -358,6 +358,11 @@ func checkPartitionReplica(replicaCount uint64, addingDefinitions []model.Partit failpoint.Return(true, nil) } }) + failpoint.Inject("mockWaitTiFlashReplicaOK", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(false, nil) + } + }) ctx := context.Background() pdCli := d.store.(tikv.Storage).GetRegionCache().PDClient() @@ -1876,6 +1881,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Trace(err) } } + if tblInfo.TiFlashReplica != nil { + removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs) + } tblInfo.Partition.DroppingDefinitions = nil // used by ApplyDiff in updateSchemaVersion job.CtxVars = []interface{}{physicalTableIDs} @@ -1894,6 +1902,23 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( return ver, errors.Trace(err) } +func removeTiFlashAvailablePartitionIDs(tblInfo *model.TableInfo, pids []int64) { + // Remove the partitions + ids := tblInfo.TiFlashReplica.AvailablePartitionIDs + // Rarely called, so OK to take some time, to make it easy + for _, id := range pids { + for i, avail := range ids { + if id == avail { + tmp := ids[:i] + tmp = append(tmp, ids[i+1:]...) + ids = tmp + break + } + } + } + tblInfo.TiFlashReplica.AvailablePartitionIDs = ids +} + // onTruncateTablePartition truncates old partition meta. func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) { var ver int64 @@ -1945,16 +1970,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } tblInfo.TiFlashReplica.Available = false // Set partition replica become unavailable. - for _, oldID := range oldIDs { - for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs { - if id == oldID { - newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i] - newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...) - tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs - break - } - } - } + removeTiFlashAvailablePartitionIDs(tblInfo, oldIDs) } bundles, err := placement.NewPartitionListBundles(t, newPartitions) diff --git a/ddl/tiflashtest/BUILD.bazel b/ddl/tiflashtest/BUILD.bazel index b1bae2dd615d8..d6975ff86d773 100644 --- a/ddl/tiflashtest/BUILD.bazel +++ b/ddl/tiflashtest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 32, + shard_count = 33, deps = [ "//config", "//ddl", @@ -23,7 +23,9 @@ go_test( "//store/mockstore", "//store/mockstore/unistore", "//table", + "//tablecodec", "//testkit", + "//testkit/external", "//testkit/testsetup", "//util", "//util/logutil", @@ -32,6 +34,7 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", ], diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go index 77359c5f1c62e..0234b307b06ba 100644 --- a/ddl/tiflashtest/ddl_tiflash_test.go +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -41,12 +41,15 @@ import ( "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -1351,3 +1354,109 @@ func TestTiFlashAvailableAfterDownOneStore(t *testing.T) { time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) CheckTableAvailable(s.dom, t, 1, []string{}) } + +// TestDLLCallback copied from ddl.TestDDLCallback, but smaller +type TestDDLCallback struct { + *ddl.BaseCallback + // We recommended to pass the domain parameter to the test ddl callback, it will ensure + // domain to reload schema before your ddl stepping into the next state change. + Do ddl.DomainReloader + + // Only need this for now + OnJobRunBeforeExported func(*model.Job) +} + +// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first. +func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) { + logutil.BgLogger().Info("on job run before", zap.String("job", job.String())) + if tc.OnJobRunBeforeExported != nil { + tc.OnJobRunBeforeExported(job) + return + } + + tc.BaseCallback.OnJobRunBefore(job) +} + +func TestTiFlashReorgPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + fCancel := TempDisableEmulatorGC() + defer fCancel() + tk := testkit.NewTestKit(t, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + + tk.MustExec(`create table ddltiflash (id int, vc varchar(255), i int, key (vc), key(i,vc))` + + ` partition by range (id)` + + ` (partition p0 values less than (1000000), partition p1 values less than (2000000))`) + tk.MustExec(`alter table ddltiflash set tiflash replica 1`) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + tb := external.GetTableByName(t, tk, "test", "ddltiflash") + firstPartitionID := tb.Meta().Partition.Definitions[0].ID + ruleName := fmt.Sprintf("table-%v-r", firstPartitionID) + _, ok := s.tiflash.GetPlacementRule(ruleName) + require.True(t, ok) + + // Note that the mock TiFlash does not have any data or regions, so the wait for regions being available will fail + dom := domain.GetDomain(tk.Session()) + originHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(originHook) + hook := &TestDDLCallback{Do: dom} + dom.DDL().SetHook(hook) + done := false + + hook.OnJobRunBeforeExported = func(job *model.Job) { + if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly { + // Let it fail once (to check that code path) then increase the count to skip retry + if job.ErrorCount > 0 { + job.ErrorCount = 1000 + done = true + } + } + } + tk.MustContainErrMsg(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`, "[ddl] add partition wait for tiflash replica to complete") + + done = false + hook.OnJobRunBeforeExported = func(job *model.Job) { + if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly { + // Let it fail once (to check that code path) then mock the regions into the partitions + if job.ErrorCount > 0 { + // Add the tiflash stores as peers for the new regions, to fullfil the check + // in checkPartitionReplica + pdCli := s.store.(tikv.Storage).GetRegionCache().PDClient() + var dummy []model.CIStr + partInfo := &model.PartitionInfo{} + _ = job.DecodeArgs(&dummy, &partInfo) + ctx := context.Background() + stores, _ := pdCli.GetAllStores(ctx) + for _, pd := range partInfo.Definitions { + startKey, endKey := tablecodec.GetTableHandleKeyRange(pd.ID) + regions, _ := pdCli.ScanRegions(ctx, startKey, endKey, -1) + for i := range regions { + // similar as storeHasEngineTiFlashLabel + for _, store := range stores { + for _, label := range store.Labels { + if label.Key == placement.EngineLabelKey && label.Value == placement.EngineLabelTiFlash { + s.cluster.MockRegionManager.AddPeer(regions[i].Meta.Id, store.Id, 1) + break + } + } + } + } + } + done = true + } + } + } + tk.MustExec(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`) + tk.MustExec(`admin check table ddltiflash`) + _, ok = s.tiflash.GetPlacementRule(ruleName) + require.True(t, ok) + gcWorker, err := gcworker.NewMockGCWorker(s.store) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + _, ok = s.tiflash.GetPlacementRule(ruleName) + require.False(t, ok) + tk.MustExec(`drop table ddltiflash`) +} diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index b34051d3d3c9a..169c5ee07c3f7 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -298,6 +298,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R case tikvrpc.CmdStoreSafeTS: resp.Resp, err = c.usSvr.GetStoreSafeTS(ctx, req.StoreSafeTS()) return resp, err + case tikvrpc.CmdUnsafeDestroyRange: + // Pretend it was done. Unistore does not have "destroy", and the + // keys has already been removed one-by-one before through: + // (dr *delRange) startEmulator() + resp.Resp = &kvrpcpb.UnsafeDestroyRangeResponse{} + return resp, nil default: err = errors.Errorf("not support this request type %v", req.Type) }