Skip to content

Commit

Permalink
ddl: Added tests for Reorganize Partition with TiFlash (#42082)
Browse files Browse the repository at this point in the history
ref #38535
  • Loading branch information
mjonss authored May 15, 2023
1 parent 6043234 commit 9019f78
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 11 deletions.
87 changes: 87 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -4348,6 +4349,92 @@ func TestAddPartitionReplicaBiggerThanTiFlashStores(t *testing.T) {
require.Equal(t, "[ddl:-1]DDL job rollback, error msg: [ddl] add partition wait for tiflash replica to complete", err.Error())
}

func TestReorgPartitionTiFlash(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "ReorgPartTiFlash"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` +
` partition by list columns (a) ` +
`(partition p0 values in (10,11,45),` +
` partition p1 values in (20,1,23,56),` +
` partition p2 values in (12,34,9))`)
tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`)

require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`))
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount")
require.NoError(t, err)
}()

tk.MustExec(`alter table t set tiflash replica 1`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY LIST COLUMNS(`a`)\n" +
"(PARTITION `p0` VALUES IN (10,11,45),\n" +
" PARTITION `p1` VALUES IN (20,1,23,56),\n" +
" PARTITION `p2` VALUES IN (12,34,9))"))

tbl := external.GetTableByName(t, tk, schemaName, "t")
p := tbl.GetPartitionedTable()
for _, pid := range p.GetAllPartitionIDs() {
require.NoError(t, domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), pid, true))
}
// Reload
tbl = external.GetTableByName(t, tk, schemaName, "t")
p = tbl.GetPartitionedTable()
require.NotNil(t, tbl.Meta().TiFlashReplica)
require.True(t, tbl.Meta().TiFlashReplica.Available)
pids := p.GetAllPartitionIDs()
sort.Slice(pids, func(i, j int) bool { return pids[i] < pids[j] })
availablePids := tbl.Meta().TiFlashReplica.AvailablePartitionIDs
sort.Slice(availablePids, func(i, j int) bool { return availablePids[i] < availablePids[j] })
require.Equal(t, pids, availablePids)
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockWaitTiFlashReplicaOK", `return(true)`))
defer func() {
err := failpoint.Disable("github.com/pingcap/tidb/ddl/mockWaitTiFlashReplicaOK")
require.NoError(t, err)
}()
tk.MustExec(`alter table t reorganize partition p1, p2 into (partition p1 values in (34,2,23), partition p2 values in (12,56,9),partition p3 values in (1,8,19))`)
tk.MustExec(`admin check table t`)
tk.MustQuery(`show create table t`).Check(testkit.Rows("" +
"t CREATE TABLE `t` (\n" +
" `a` int(10) unsigned NOT NULL,\n" +
" `b` varchar(255) DEFAULT NULL,\n" +
" `c` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `b` (`b`),\n" +
" KEY `c` (`c`,`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
"PARTITION BY LIST COLUMNS(`a`)\n" +
"(PARTITION `p0` VALUES IN (10,11,45),\n" +
" PARTITION `p1` VALUES IN (34,2,23),\n" +
" PARTITION `p2` VALUES IN (12,56,9),\n" +
" PARTITION `p3` VALUES IN (1,8,19))"))

// TODO: Check how to properly test TiFlash, since this will just change the actual configuration
tbl = external.GetTableByName(t, tk, schemaName, "t")
p = tbl.GetPartitionedTable()
for _, pid := range p.GetAllPartitionIDs() {
require.NoError(t, domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), pid, true))
}
tbl = external.GetTableByName(t, tk, schemaName, "t")
p = tbl.GetPartitionedTable()
require.NotNil(t, tbl.Meta().TiFlashReplica)
require.True(t, tbl.Meta().TiFlashReplica.Available)
for _, pid := range p.GetAllPartitionIDs() {
require.True(t, tbl.Meta().TiFlashReplica.IsPartitionAvailable(pid))
}
}

func TestDuplicatePartitionNames(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
36 changes: 26 additions & 10 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ func checkPartitionReplica(replicaCount uint64, addingDefinitions []model.Partit
failpoint.Return(true, nil)
}
})
failpoint.Inject("mockWaitTiFlashReplicaOK", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(false, nil)
}
})

ctx := context.Background()
pdCli := d.store.(tikv.Storage).GetRegionCache().PDClient()
Expand Down Expand Up @@ -1880,6 +1885,9 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, errors.Trace(err)
}
}
if tblInfo.TiFlashReplica != nil {
removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs)
}
tblInfo.Partition.DroppingDefinitions = nil
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{physicalTableIDs}
Expand All @@ -1898,6 +1906,23 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, errors.Trace(err)
}

func removeTiFlashAvailablePartitionIDs(tblInfo *model.TableInfo, pids []int64) {
// Remove the partitions
ids := tblInfo.TiFlashReplica.AvailablePartitionIDs
// Rarely called, so OK to take some time, to make it easy
for _, id := range pids {
for i, avail := range ids {
if id == avail {
tmp := ids[:i]
tmp = append(tmp, ids[i+1:]...)
ids = tmp
break
}
}
}
tblInfo.TiFlashReplica.AvailablePartitionIDs = ids
}

// onTruncateTablePartition truncates old partition meta.
func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64
Expand Down Expand Up @@ -1949,16 +1974,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
}
tblInfo.TiFlashReplica.Available = false
// Set partition replica become unavailable.
for _, oldID := range oldIDs {
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
if id == oldID {
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
break
}
}
}
removeTiFlashAvailablePartitionIDs(tblInfo, oldIDs)
}

bundles, err := placement.NewPartitionListBundles(t, newPartitions)
Expand Down
5 changes: 4 additions & 1 deletion ddl/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 32,
shard_count = 33,
deps = [
"//config",
"//ddl",
Expand All @@ -23,7 +23,9 @@ go_test(
"//store/mockstore",
"//store/mockstore/unistore",
"//table",
"//tablecodec",
"//testkit",
"//testkit/external",
"//testkit/testsetup",
"//util",
"//util/logutil",
Expand All @@ -32,6 +34,7 @@ go_test(
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
Expand Down
109 changes: 109 additions & 0 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1351,3 +1354,109 @@ func TestTiFlashAvailableAfterDownOneStore(t *testing.T) {
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
}

// TestDLLCallback copied from ddl.TestDDLCallback, but smaller
type TestDDLCallback struct {
*ddl.BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do ddl.DomainReloader

// Only need this for now
OnJobRunBeforeExported func(*model.Job)
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

func TestTiFlashReorgPartition(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
fCancel := TempDisableEmulatorGC()
defer fCancel()
tk := testkit.NewTestKit(t, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")

tk.MustExec(`create table ddltiflash (id int, vc varchar(255), i int, key (vc), key(i,vc))` +
` partition by range (id)` +
` (partition p0 values less than (1000000), partition p1 values less than (2000000))`)
tk.MustExec(`alter table ddltiflash set tiflash replica 1`)
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
tb := external.GetTableByName(t, tk, "test", "ddltiflash")
firstPartitionID := tb.Meta().Partition.Definitions[0].ID
ruleName := fmt.Sprintf("table-%v-r", firstPartitionID)
_, ok := s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)

// Note that the mock TiFlash does not have any data or regions, so the wait for regions being available will fail
dom := domain.GetDomain(tk.Session())
originHook := dom.DDL().GetHook()
defer dom.DDL().SetHook(originHook)
hook := &TestDDLCallback{Do: dom}
dom.DDL().SetHook(hook)
done := false

hook.OnJobRunBeforeExported = func(job *model.Job) {
if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly {
// Let it fail once (to check that code path) then increase the count to skip retry
if job.ErrorCount > 0 {
job.ErrorCount = 1000
done = true
}
}
}
tk.MustContainErrMsg(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`, "[ddl] add partition wait for tiflash replica to complete")

done = false
hook.OnJobRunBeforeExported = func(job *model.Job) {
if !done && job.Type == model.ActionReorganizePartition && job.SchemaState == model.StateDeleteOnly {
// Let it fail once (to check that code path) then mock the regions into the partitions
if job.ErrorCount > 0 {
// Add the tiflash stores as peers for the new regions, to fullfil the check
// in checkPartitionReplica
pdCli := s.store.(tikv.Storage).GetRegionCache().PDClient()
var dummy []model.CIStr
partInfo := &model.PartitionInfo{}
_ = job.DecodeArgs(&dummy, &partInfo)
ctx := context.Background()
stores, _ := pdCli.GetAllStores(ctx)
for _, pd := range partInfo.Definitions {
startKey, endKey := tablecodec.GetTableHandleKeyRange(pd.ID)
regions, _ := pdCli.ScanRegions(ctx, startKey, endKey, -1)
for i := range regions {
// similar as storeHasEngineTiFlashLabel
for _, store := range stores {
for _, label := range store.Labels {
if label.Key == placement.EngineLabelKey && label.Value == placement.EngineLabelTiFlash {
s.cluster.MockRegionManager.AddPeer(regions[i].Meta.Id, store.Id, 1)
break
}
}
}
}
}
done = true
}
}
}
tk.MustExec(`alter table ddltiflash reorganize partition p0 into (partition p0 values less than (500000), partition p500k values less than (1000000))`)
tk.MustExec(`admin check table ddltiflash`)
_, ok = s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)
gcWorker, err := gcworker.NewMockGCWorker(s.store)
require.NoError(t, err)
require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64))
_, ok = s.tiflash.GetPlacementRule(ruleName)
require.False(t, ok)
tk.MustExec(`drop table ddltiflash`)
}
6 changes: 6 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdStoreSafeTS:
resp.Resp, err = c.usSvr.GetStoreSafeTS(ctx, req.StoreSafeTS())
return resp, err
case tikvrpc.CmdUnsafeDestroyRange:
// Pretend it was done. Unistore does not have "destroy", and the
// keys has already been removed one-by-one before through:
// (dr *delRange) startEmulator()
resp.Resp = &kvrpcpb.UnsafeDestroyRangeResponse{}
return resp, nil
default:
err = errors.Errorf("not support this request type %v", req.Type)
}
Expand Down

0 comments on commit 9019f78

Please sign in to comment.