Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add index_merge_intersection cases #39323

Merged
merged 15 commits into from
Nov 30, 2022
Merged
212 changes: 212 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testutil"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -242,32 +244,52 @@ func TestIndexMergeInTransaction(t *testing.T) {
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)",
" └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo"))
tk.MustQuery("explain select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where c1 < 10 and c2 < 10 and c3 < 10;").Check(testkit.Rows(
"IndexMerge_9 367.05 root type: intersection",
"├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo",
"├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo",
"└─TableRowIDScan_8(Probe) 367.05 cop[tikv] table:t1 keep order:false, stats:pseudo"))

// Test with normal key.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows())

tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1"))

tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

// Test with primary key, so the partialPlan is TableScan.
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustExec("insert into t1 values(1, 1, 1, 1);")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1"))
tk.MustExec("update t1 set c3 = 100 where c3 = 1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1"))
tk.MustExec("delete from t1;")
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows())
tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows())

tk.MustExec("commit;")
if i == 1 {
Expand Down Expand Up @@ -566,3 +588,193 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) {

// TODO: add support for index merge reader in dynamic tidb_partition_prune_mode
}

func TestIndexMergeIntersectionConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Default is tidb_executor_concurrency.
res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows()
defExecCon := res[0][0].(string)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency"))
}()
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_executor_concurrency = 10")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
// workerCnt = min(part_num, concurrency)
tk.MustExec("set tidb_executor_concurrency = 20")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_executor_concurrency = 2")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 21")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
tk.MustExec("set tidb_index_merge_intersection_concurrency = 3")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table, so real concurrency is 1.
tk.MustExec("set tidb_partition_prune_mode = 'static'")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))

// Concurrency only works for dynamic pruning partition table. so real concurrency is 1.
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("set tidb_index_merge_intersection_concurrency = 9")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)"))
tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1"))
}

func TestIntersectionWithDifferentConcurrency(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var execCon []int
tblSchemas := []string{
// partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;",
// non-partition table
"create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));",
}

for tblIdx, tblSchema := range tblSchemas {
if tblIdx == 0 {
// Test different intersectionProcessWorker with partition table(10 partitions).
execCon = []int{1, 3, 10, 11, 20}
} else {
// Default concurrency.
execCon = []int{5}
}
tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec(tblSchema)

const queryCnt int = 10
const rowCnt int = 1000
curRowCnt := 0
insertStr := "insert into t1 values"
for i := 0; i < rowCnt; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int())
curRowCnt++
}
tk.MustExec(insertStr)
tk.MustExec("analyze table t1")

for _, concurrency := range execCon {
tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency))
for i := 0; i < 2; i++ {
if i == 0 {
// Dynamic mode.
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, res.Rows()[1][0], "IndexMerge")
} else {
tk.MustExec("set tidb_partition_prune_mode = 'static'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
if tblIdx == 0 {
// partition table
require.Contains(t, res.Rows()[1][0], "PartitionUnion")
require.Contains(t, res.Rows()[2][0], "IndexMerge")
} else {
require.Contains(t, res.Rows()[1][0], "IndexMerge")
}
}
for i := 0; i < queryCnt; i++ {
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
}

// In tranaction
for i := 0; i < queryCnt; i++ {
tk.MustExec("begin;")
r := rand.Intn(3)
if r == 0 {
tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt)))
} else if r == 1 {
tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt)))
} else if r == 2 {
tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int()))
curRowCnt++
}
c3 := rand.Intn(1024)
res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows()
tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res)
tk.MustExec("commit;")
}
}
}
tk.MustExec("drop table t1")
}
}

func TestIntersectionWorkerPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;")
tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")
tk.MustExec("analyze table t1;")
tk.MustExec("set tidb_partition_prune_mode = 'dynamic'")
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

// Test panic in intersection.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic"))
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024")
require.Contains(t, err.Error(), "IndexMergeReaderExecutor")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic"))
}

func TestIntersectionMemQuota(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))")

insertStr := "insert into t1 values"
for i := 0; i < 20; i++ {
if i != 0 {
insertStr += ", "
}
insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1)
}
tk.MustExec(insertStr)
res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows()
require.Contains(t, res[1][0], "IndexMerge")

tk.MustExec("set global tidb_mem_oom_action='CANCEL'")
defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT")
tk.MustExec("set @@tidb_mem_quota_query = 4000")
err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024")
require.Contains(t, err.Error(), "Out Of Memory Quota!")
}
12 changes: 12 additions & 0 deletions testkit/testutil/require.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package testutil

import (
"math/rand"
"testing"

"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -75,3 +76,14 @@ func CompareUnorderedStringSlice(a []string, b []string) bool {
}
return len(m) == 0
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// RandStringRunes generate random string of length n.
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}