From d26516ae678062abdbf33a11087b250ee9c04cf1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 23 Nov 2022 11:18:44 +0800 Subject: [PATCH 01/10] executor: add index_merge_intersection cases Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 136 +++++++++++++++++++++++++--- 1 file changed, 125 insertions(+), 11 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 58dfa71814f28..2e4740f85334f 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -86,7 +86,7 @@ func TestIndexMergeReaderIssue25045(t *testing.T) { tk.MustExec("create table t1(a int primary key, b int, c int, key(b), key(c));") tk.MustExec("INSERT INTO t1 VALUES (10, 10, 10), (11, 11, 11)") tk.MustQuery("explain format='brief' select /*+ use_index_merge(t1) */ * from t1 where c=10 or (b=10 and a=10);").Check(testkit.Rows( - "IndexMerge 0.01 root ", + "IndexMerge 0.01 root Union", "├─IndexRangeScan(Build) 10.00 cop[tikv] table:t1, index:c(c) range:[10,10], keep order:false, stats:pseudo", "├─TableRangeScan(Build) 1.00 cop[tikv] table:t1 range:[10,10], keep order:false, stats:pseudo", "└─Selection(Probe) 0.01 cop[tikv] or(eq(test.t1.c, 10), and(eq(test.t1.b, 10), eq(test.t1.a, 10)))", @@ -230,44 +230,71 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustExec("begin;") // Expect two IndexScan(c1, c2). tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1841.86 root ", + "IndexMerge_9 1841.86 root Union", "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) // Expect one IndexScan(c2) and one TableScan(pk). tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1106.67 root ", + "IndexMerge_9 1106.67 root Union", "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustQuery("explain select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where c1 < 10 and c2 < 10 and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 367.05 root Intersection", + "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", + "└─TableRowIDScan_8(Probe) 367.05 cop[tikv] table:t1 keep order:false, stats:pseudo")) // Test with normal key. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) + tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) // Test with primary key, so the partialPlan is TableScan. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) tk.MustExec("commit;") if i == 1 { @@ -281,14 +308,14 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustExec("begin;") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( "SelectLock_6 1841.86 root for update 0", - "└─IndexMerge_11 1841.86 root ", + "└─IndexMerge_11 1841.86 root Union", " ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( "SelectLock_6 1106.67 root for update 0", - "└─IndexMerge_11 1106.67 root ", + "└─IndexMerge_11 1106.67 root Union", " ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", @@ -403,7 +430,7 @@ func TestIndexMergeReaderInTransIssue30685(t *testing.T) { tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows( "UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, -1), lt(test.t1.c3, 10))", - "└─IndexMerge_11 1841.86 root ", + "└─IndexMerge_11 1841.86 root Union", " ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,-1), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)", @@ -422,7 +449,7 @@ func TestIndexMergeReaderInTransIssue30685(t *testing.T) { tk.MustExec("insert into t1 values('b', 1, 1, 1);") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows( "UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, \"a\"), lt(test.t1.c3, 10))", - "└─IndexMerge_11 1841.86 root ", + "└─IndexMerge_11 1841.86 root Union", " ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,\"a\"), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)", @@ -524,19 +551,19 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { " ├─IndexReader(Build) 3.00 root index:IndexFullScan", " │ └─IndexFullScan 3.00 cop[tikv] table:t2, index:c_datetime(c_datetime) keep order:false", " └─PartitionUnion(Probe) 5545.21 root ", - " ├─IndexMerge 5542.21 root ", + " ├─IndexMerge 5542.21 root Union", " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " │ └─TableRowIDScan(Probe) 5542.21 cop[tikv] table:t1, partition:p0 keep order:false, stats:pseudo", - " ├─IndexMerge 1.00 root ", + " ├─IndexMerge 1.00 root Union", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c1(c1) range:[-inf,10), keep order:false", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c2(c2) range:[-inf,10), keep order:false", " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p1 keep order:false", - " ├─IndexMerge 1.00 root ", + " ├─IndexMerge 1.00 root Union", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c1(c1) range:[-inf,10), keep order:false", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c2(c2) range:[-inf,10), keep order:false", " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p2 keep order:false", - " └─IndexMerge 1.00 root ", + " └─IndexMerge 1.00 root Union", " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c1(c1) range:[-inf,10), keep order:false", " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c2(c2) range:[-inf,10), keep order:false", " └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p3 keep order:false", @@ -566,3 +593,90 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } + +func TestIntersectionHugePartition(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + var execCon []int + tblSchemas := []string{ + // partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;", + // non-partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));", + } + + for tblIdx, tblSchema := range tblSchemas { + if tblIdx == 0 { + // Test different intersectionProcessWorker with partition table(10 partitions). + execCon = []int{1, 3, 10, 11, 20} + } else { + // Default concurrency. + execCon = []int{5} + } + tk.MustExec("use test") + tk.MustExec("drop table if exists t1;") + tk.MustExec(tblSchema) + + const queryCnt int = 10 + const rowCnt int = 1000 + curRowCnt := 0 + insertStr := "insert into t1 values" + for i := 0; i < rowCnt; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int()) + curRowCnt++ + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1") + + for _, concurrency := range execCon { + tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency)) + for i := 0; i < 2; i++ { + if i == 0 { + // Dynamic mode. + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + fmt.Printf("gjt debug %d, %d, %d\n", tblIdx, concurrency, i) + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } else { + tk.MustExec("set tidb_partition_prune_mode = 'static'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + if tblIdx == 0 { + // partition table + require.Contains(t, res.Rows()[1][0], "PartitionUnion") + require.Contains(t, res.Rows()[2][0], "IndexMerge") + } else { + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } + } + for i := 0; i < queryCnt; i++ { + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + } + + // In tranaction + for i := 0; i < queryCnt; i++ { + tk.MustExec("begin;") + r := rand.Intn(3) + if r == 0 { + tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt))) + } else if r == 1 { + tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt))) + } else if r == 2 { + tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int())) + curRowCnt++ + } + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + tk.MustExec("commit;") + } + } + } + tk.MustExec("drop table t1") + } +} From bfb16b06f84399bd772006e29af30ef1f852895f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 23 Nov 2022 15:25:53 +0800 Subject: [PATCH 02/10] update explain Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 2e4740f85334f..a4faee758ff34 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -86,7 +86,7 @@ func TestIndexMergeReaderIssue25045(t *testing.T) { tk.MustExec("create table t1(a int primary key, b int, c int, key(b), key(c));") tk.MustExec("INSERT INTO t1 VALUES (10, 10, 10), (11, 11, 11)") tk.MustQuery("explain format='brief' select /*+ use_index_merge(t1) */ * from t1 where c=10 or (b=10 and a=10);").Check(testkit.Rows( - "IndexMerge 0.01 root Union", + "IndexMerge 0.01 root type: union", "├─IndexRangeScan(Build) 10.00 cop[tikv] table:t1, index:c(c) range:[10,10], keep order:false, stats:pseudo", "├─TableRangeScan(Build) 1.00 cop[tikv] table:t1 range:[10,10], keep order:false, stats:pseudo", "└─Selection(Probe) 0.01 cop[tikv] or(eq(test.t1.c, 10), and(eq(test.t1.b, 10), eq(test.t1.a, 10)))", @@ -230,20 +230,20 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustExec("begin;") // Expect two IndexScan(c1, c2). tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1841.86 root Union", + "IndexMerge_9 1841.86 root type: union", "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "└─Selection_8(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_7 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) // Expect one IndexScan(c2) and one TableScan(pk). tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 1106.67 root Union", + "IndexMerge_9 1106.67 root type: union", "├─TableRangeScan_5(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) tk.MustQuery("explain select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where c1 < 10 and c2 < 10 and c3 < 10;").Check(testkit.Rows( - "IndexMerge_9 367.05 root Intersection", + "IndexMerge_9 367.05 root type: intersection", "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", @@ -308,14 +308,14 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustExec("begin;") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( "SelectLock_6 1841.86 root for update 0", - "└─IndexMerge_11 1841.86 root Union", + "└─IndexMerge_11 1841.86 root type: union", " ├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_9 5542.21 cop[tikv] table:t1 keep order:false, stats:pseudo")) tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < 10) and c3 < 10 for update;").Check(testkit.Rows( "SelectLock_6 1106.67 root for update 0", - "└─IndexMerge_11 1106.67 root Union", + "└─IndexMerge_11 1106.67 root type: union", " ├─TableRangeScan_7(Build) 3333.33 cop[tikv] table:t1 range:[-inf,10), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", @@ -430,7 +430,7 @@ func TestIndexMergeReaderInTransIssue30685(t *testing.T) { tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c3 < 10) and c4 < 10;").Check(testkit.Rows( "UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, -1), lt(test.t1.c3, 10))", - "└─IndexMerge_11 1841.86 root Union", + "└─IndexMerge_11 1841.86 root type: union", " ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,-1), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)", @@ -449,7 +449,7 @@ func TestIndexMergeReaderInTransIssue30685(t *testing.T) { tk.MustExec("insert into t1 values('b', 1, 1, 1);") tk.MustQuery("explain select /*+ use_index_merge(t1) */ * from t1 where (c1 < 'a' or c3 < 10) and c4 < 10;").Check(testkit.Rows( "UnionScan_6 1841.86 root lt(test.t1.c4, 10), or(lt(test.t1.c1, \"a\"), lt(test.t1.c3, 10))", - "└─IndexMerge_11 1841.86 root Union", + "└─IndexMerge_11 1841.86 root type: union", " ├─TableRangeScan_7(Build) 3323.33 cop[tikv] table:t1 range:[-inf,\"a\"), keep order:false, stats:pseudo", " ├─IndexRangeScan_8(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", " └─Selection_10(Probe) 1841.86 cop[tikv] lt(test.t1.c4, 10)", @@ -551,19 +551,19 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { " ├─IndexReader(Build) 3.00 root index:IndexFullScan", " │ └─IndexFullScan 3.00 cop[tikv] table:t2, index:c_datetime(c_datetime) keep order:false", " └─PartitionUnion(Probe) 5545.21 root ", - " ├─IndexMerge 5542.21 root Union", + " ├─IndexMerge 5542.21 root type: union", " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", " │ ├─IndexRangeScan(Build) 3323.33 cop[tikv] table:t1, partition:p0, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", " │ └─TableRowIDScan(Probe) 5542.21 cop[tikv] table:t1, partition:p0 keep order:false, stats:pseudo", - " ├─IndexMerge 1.00 root Union", + " ├─IndexMerge 1.00 root type: union", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c1(c1) range:[-inf,10), keep order:false", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p1, index:c2(c2) range:[-inf,10), keep order:false", " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p1 keep order:false", - " ├─IndexMerge 1.00 root Union", + " ├─IndexMerge 1.00 root type: union", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c1(c1) range:[-inf,10), keep order:false", " │ ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p2, index:c2(c2) range:[-inf,10), keep order:false", " │ └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p2 keep order:false", - " └─IndexMerge 1.00 root Union", + " └─IndexMerge 1.00 root type: union", " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c1(c1) range:[-inf,10), keep order:false", " ├─IndexRangeScan(Build) 1.00 cop[tikv] table:t1, partition:p3, index:c2(c2) range:[-inf,10), keep order:false", " └─TableRowIDScan(Probe) 1.00 cop[tikv] table:t1, partition:p3 keep order:false", @@ -594,7 +594,7 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } -func TestIntersectionHugePartition(t *testing.T) { +func TestIntersectionWithDifferentConcurrency(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -638,7 +638,6 @@ func TestIntersectionHugePartition(t *testing.T) { if i == 0 { // Dynamic mode. tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") - fmt.Printf("gjt debug %d, %d, %d\n", tblIdx, concurrency, i) res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") require.Contains(t, res.Rows()[1][0], "IndexMerge") } else { From 8bb5fc1de8a2d7ea9ee2c4bbb7cf11459ae6776c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 25 Nov 2022 13:21:30 +0800 Subject: [PATCH 03/10] add concurrency case Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index a4faee758ff34..b62eaa4c36633 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" @@ -594,6 +595,64 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } +func TestIndexMergeIntersectionConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + // Default is tidb_executor_concurrency. + res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows() + defExecCon := res[0][0].(string) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency")) + }() + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_executor_concurrency = 10") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + // workerCnt = min(part_num, concurrency) + tk.MustExec("set tidb_executor_concurrency = 20") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_executor_concurrency = 2") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 21") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 3") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table. + tk.MustExec("set tidb_partition_prune_mode = 'static'") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) +} + func TestIntersectionWithDifferentConcurrency(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From ac0e8ce05c5d228ea162d6705068b5eaf27fd44b Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 29 Nov 2022 13:18:19 +0800 Subject: [PATCH 04/10] add panic case Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index b62eaa4c36633..3587094d5a01f 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -738,3 +738,24 @@ func TestIntersectionWithDifferentConcurrency(t *testing.T) { tk.MustExec("drop table t1") } } + +func TestIntersectionWorkerPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) + }() + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, err.Error(), "IndexMergeReaderExecutor") +} From 3ff00e3bcc7804e732a50aa087a0db77ccaa050d Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 29 Nov 2022 19:47:15 +0800 Subject: [PATCH 05/10] add out of mem failpoint Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 341 ++++++++++++++++++++++------ executor/index_merge_reader_test.go | 32 ++- testkit/testutil/require.go | 12 + 3 files changed, 313 insertions(+), 72 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 0e7eb394710fd..a54c25aa36b25 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -94,8 +94,8 @@ type IndexMergeReaderExecutor struct { workerStarted bool keyRanges [][]kv.KeyRange - resultCh chan *lookupTableTask - resultCurr *lookupTableTask + resultCh chan *indexMergeTableTask + resultCurr *indexMergeTableTask feedbacks []*statistics.QueryFeedback // memTracker is used to track the memory usage of this executor. @@ -118,6 +118,16 @@ type IndexMergeReaderExecutor struct { isCorColInPartialFilters []bool isCorColInTableFilter bool isCorColInPartialAccess []bool + + // Whether it's intersection or union. + isIntersection bool +} + +type indexMergeTableTask struct { + lookupTableTask + + // parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection. + parTblIdx int } // Table implements the dataSourceExecutor interface. @@ -150,7 +160,7 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) (err error) { } } e.finished = make(chan struct{}) - e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) + e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) return nil @@ -209,8 +219,8 @@ func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (rang func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { exitCh := make(chan struct{}) - workCh := make(chan *lookupTableTask, 1) - fetchCh := make(chan *lookupTableTask, len(e.keyRanges)) + workCh := make(chan *indexMergeTableTask, 1) + fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges)) e.startIndexMergeProcessWorker(ctx, workCh, fetchCh) @@ -237,12 +247,12 @@ func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { return nil } -func (e *IndexMergeReaderExecutor) waitPartialWorkersAndCloseFetchChan(fetchCh chan *lookupTableTask) { +func (e *IndexMergeReaderExecutor) waitPartialWorkersAndCloseFetchChan(fetchCh chan *indexMergeTableTask) { e.idxWorkerWg.Wait() close(fetchCh) } -func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *lookupTableTask, fetch <-chan *lookupTableTask) { +func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Context, workCh chan<- *indexMergeTableTask, fetch <-chan *indexMergeTableTask) { idxMergeProcessWorker := &indexMergeProcessWorker{ indexMerge: e, stats: e.stats, @@ -252,15 +262,19 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont defer trace.StartRegion(ctx, "IndexMergeProcessWorker").End() util.WithRecovery( func() { - idxMergeProcessWorker.fetchLoop(ctx, fetch, workCh, e.resultCh, e.finished) + if e.isIntersection { + idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished) + } else { + idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished) + } }, - idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh), + idxMergeProcessWorker.handleLoopFetcherPanic(ctx, e.resultCh, "IndexMergeProcessWorker", nil), ) e.processWokerWg.Done() }() } -func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { +func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -297,7 +311,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // We got correlated column, so need to refresh Selection operator. var err error if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(e.resultCh, err) return } } @@ -331,12 +345,12 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, }) kvReq, err := builder.SetKeyRanges(keyRange).Build() if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(e.resultCh, err) return } result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.getPartitalPlanID(workID)) if err != nil { - worker.syncErr(e.resultCh, err) + syncErr(e.resultCh, err) return } worker.batchSize = e.maxChunkSize @@ -349,7 +363,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, // fetch all data from this partition ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols, parTblIdx) if fetchErr != nil { // this error is synced in fetchHandles(), don't sync it again e.feedbacks[workID].Invalidate() } @@ -370,7 +384,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, return nil } -func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { +func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, workID int) error { ts := e.partialPlans[workID][0].(*plannercore.PhysicalTableScan) tbls := make([]table.Table, 0, 1) @@ -412,13 +426,13 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, if e.isCorColInPartialFilters[workID] { if e.dagPBs[workID].Executors, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { - worker.syncErr(e.resultCh, err) + syncErr(e.resultCh, err) return } partialTableReader.dagPB = e.dagPBs[workID] } - for _, tbl := range tbls { + for parTblIdx, tbl := range tbls { // check if this executor is closed select { case <-e.finished: @@ -430,7 +444,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, partialTableReader.table = tbl if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) - worker.syncErr(e.resultCh, err) + syncErr(e.resultCh, err) break } worker.batchSize = e.maxChunkSize @@ -443,7 +457,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // fetch all handles from this table ctx1, cancel := context.WithCancel(ctx) - _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols) + _, fetchErr := worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols, parTblIdx) if fetchErr != nil { // this error is synced in fetchHandles, so don't sync it again e.feedbacks[workID].Invalidate() } @@ -498,16 +512,8 @@ type partialTableWorker struct { partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table } -func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { - doneCh := make(chan error, 1) - doneCh <- err - resultCh <- &lookupTableTask{ - doneCh: doneCh, - } -} - -func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, - finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { +func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, + finished <-chan struct{}, handleCols plannercore.HandleCols, parTblIdx int) (count int64, err error) { chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) var basic *execdetails.BasicRuntimeStats if be := w.tableReader.base(); be != nil && be.runtimeStats != nil { @@ -517,14 +523,14 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(resultCh, err) return count, err } if len(handles) == 0 { return count, nil } count += int64(len(handles)) - task := w.buildTableTask(handles, retChunk) + task := w.buildTableTask(handles, retChunk, parTblIdx) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } @@ -570,19 +576,22 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, retChk, nil } -func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) *lookupTableTask { - task := &lookupTableTask{ - handles: handles, - idxRows: retChk, +func (w *partialTableWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int) *indexMergeTableTask { + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: handles, + idxRows: retChk, - partitionTable: w.partition, + partitionTable: w.partition, + }, + parTblIdx: parTblIdx, } task.doneCh = make(chan error, 1) return task } -func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Context, workCh <-chan *lookupTableTask) { +func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Context, workCh <-chan *indexMergeTableTask) { lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency() e.tblWorkerWg.Add(lookupConcurrencyLimit) for i := 0; i < lookupConcurrencyLimit; i++ { @@ -597,7 +606,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co ctx1, cancel := context.WithCancel(ctx) go func() { defer trace.StartRegion(ctx, "IndexMergeTableScanWorker").End() - var task *lookupTableTask + var task *indexMergeTableTask util.WithRecovery( func() { task = worker.pickAndExecTask(ctx1) }, worker.handlePickAndExecTaskPanic(ctx1, task), @@ -666,7 +675,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e } } -func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { +func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) { if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } @@ -686,7 +695,7 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*lookupTableTask, error) { return e.resultCurr, nil } -func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask, worker string) func(r interface{}) { +func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context, resultCh chan<- *indexMergeTableTask, worker string) func(r interface{}) { return func(r interface{}) { if r == nil { return @@ -696,8 +705,10 @@ func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic - resultCh <- &lookupTableTask{ - doneCh: doneCh, + resultCh <- &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + doneCh: doneCh, + }, } } } @@ -722,8 +733,8 @@ type indexMergeProcessWorker struct { stats *IndexMergeRuntimeStat } -func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan *lookupTableTask, - workCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) { +func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-chan *indexMergeTableTask, + workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { defer func() { close(workCh) close(resultCh) @@ -755,11 +766,13 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan if len(fhs) == 0 { continue } - task := &lookupTableTask{ - handles: fhs, - doneCh: make(chan error, 1), + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: fhs, + doneCh: make(chan error, 1), - partitionTable: task.partitionTable, + partitionTable: task.partitionTable, + }, } if w.stats != nil { w.stats.IndexMergeProcess += time.Since(start) @@ -775,18 +788,202 @@ func (w *indexMergeProcessWorker) fetchLoop(ctx context.Context, fetchCh <-chan } } -func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *lookupTableTask) func(r interface{}) { +type intersectionProcessWorker struct { + // key: parTblIdx, val: HandleMap + // Value of MemAwareHandleMap is *int to avoid extra Get(). + handleMapsPerWorker map[int]*kv.MemAwareHandleMap[*int] + workerID int + workerCh chan *indexMergeTableTask + indexMerge *IndexMergeReaderExecutor + memTracker *memory.Tracker + batchSize int + + // When rowDelta == memConsumeBatchSize, Consume(memUsage) + rowDelta int64 + mapUsageDelta int64 +} + +func (w *intersectionProcessWorker) consumeMemDelta() { + w.memTracker.Consume(w.mapUsageDelta + w.rowDelta*int64(unsafe.Sizeof(int(0)))) + w.mapUsageDelta = 0 + w.rowDelta = 0 +} + +func (w *intersectionProcessWorker) doIntersectionPerPartition(ctx context.Context, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + defer w.memTracker.Detach() + + for task := range w.workerCh { + var ok bool + var hMap *kv.MemAwareHandleMap[*int] + if hMap, ok = w.handleMapsPerWorker[task.parTblIdx]; !ok { + hMap = kv.NewMemAwareHandleMap[*int]() + w.handleMapsPerWorker[task.parTblIdx] = hMap + } + var mapDelta int64 + var rowDelta int64 + for _, h := range task.handles { + // Use *int to avoid Get() again. + if cntPtr, ok := hMap.Get(h); ok { + (*cntPtr)++ + } else { + cnt := 1 + mapDelta += hMap.Set(h, &cnt) + int64(h.ExtraMemSize()) + rowDelta += 1 + } + } + + logutil.BgLogger().Debug("intersectionProcessWorker handle tasks", zap.Int("workerID", w.workerID), + zap.Int("task.handles", len(task.handles)), zap.Int64("rowDelta", rowDelta)) + + w.mapUsageDelta += mapDelta + w.rowDelta += rowDelta + if w.rowDelta >= int64(w.batchSize) { + w.consumeMemDelta() + } + failpoint.Inject("testIndexMergeIntersectionWorkerPanic", nil) + failpoint.Inject("testIndexMergeIntersectionWorkerOutOfMem", func(val failpoint.Value) { + w.rowDelta = int64(val.(int)) + }) + } + if w.rowDelta > 0 { + w.consumeMemDelta() + } + + // We assume the result of intersection is small, so no need to track memory. + intersectedMap := make(map[int][]kv.Handle, len(w.handleMapsPerWorker)) + for parTblIdx, hMap := range w.handleMapsPerWorker { + hMap.Range(func(h kv.Handle, val interface{}) bool { + if *(val.(*int)) == len(w.indexMerge.partialPlans) { + // Means all partial paths have this handle. + intersectedMap[parTblIdx] = append(intersectedMap[parTblIdx], h) + } + return true + }) + } + + tasks := make([]*indexMergeTableTask, 0, len(w.handleMapsPerWorker)) + for parTblIdx, intersected := range intersectedMap { + // Split intersected[parTblIdx] to avoid task is too large. + for len(intersected) > 0 { + length := w.batchSize + if length > len(intersected) { + length = len(intersected) + } + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: intersected[:length], + doneCh: make(chan error, 1), + }, + } + intersected = intersected[length:] + if w.indexMerge.partitionTableMode { + task.partitionTable = w.indexMerge.prunedPartitions[parTblIdx] + } + tasks = append(tasks, task) + logutil.BgLogger().Debug("intersectionProcessWorker build tasks", + zap.Int("parTblIdx", parTblIdx), zap.Int("task.handles", len(task.handles))) + } + } + for _, task := range tasks { + select { + case <-ctx.Done(): + return + case <-finished: + return + case workCh <- task: + resultCh <- task + } + } +} + +// For each partition(dynamic mode), a map is used to do intersection. Key of the map is handle, and value is the number of times it occurs. +// If the value of handle equals the number of partial paths, it should be sent to final_table_scan_worker. +// To avoid too many goroutines, each intersectionProcessWorker can handle multiple partitions. +func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fetchCh <-chan *indexMergeTableTask, + workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { + defer func() { + close(workCh) + close(resultCh) + }() + + if w.stats != nil { + start := time.Now() + defer func() { + w.stats.IndexMergeProcess += time.Since(start) + }() + } + + // One goroutine may handle one or multiple partitions. + // Max number of partition number is 8192, we use ExecutorConcurrency to avoid too many goroutines. + maxWorkerCnt := w.indexMerge.ctx.GetSessionVars().IndexMergeIntersectionConcurrency() + maxChannelSize := atomic.LoadInt32(&LookupTableTaskChannelSize) + batchSize := w.indexMerge.ctx.GetSessionVars().IndexLookupSize + + partCnt := 1 + if w.indexMerge.partitionTableMode { + partCnt = len(w.indexMerge.prunedPartitions) + } + workerCnt := partCnt + if workerCnt > maxWorkerCnt { + workerCnt = maxWorkerCnt + } + failpoint.Inject("testIndexMergeIntersectionConcurrency", func(val failpoint.Value) { + con := val.(int) + if con != workerCnt { + panic(fmt.Sprintf("unexpected workerCnt, expect %d, got %d", con, workerCnt)) + } + }) + + workers := make([]*intersectionProcessWorker, 0, workerCnt) + wg := util.WaitGroupWrapper{} + errCh := make(chan bool, workerCnt) + for i := 0; i < workerCnt; i++ { + tracker := memory.NewTracker(w.indexMerge.id, -1) + tracker.AttachTo(w.indexMerge.memTracker) + worker := &intersectionProcessWorker{ + workerID: i, + handleMapsPerWorker: make(map[int]*kv.MemAwareHandleMap[*int]), + workerCh: make(chan *indexMergeTableTask, maxChannelSize), + indexMerge: w.indexMerge, + memTracker: tracker, + batchSize: batchSize, + } + wg.RunWithRecover(func() { + defer trace.StartRegion(ctx, "IndexMergeIntersectionProcessWorker").End() + worker.doIntersectionPerPartition(ctx, workCh, resultCh, finished) + }, w.handleLoopFetcherPanic(ctx, resultCh, "IndexMergeIntersectionProcessWorker", errCh)) + workers = append(workers, worker) + } + for task := range fetchCh { + select { + case workers[task.parTblIdx%workerCnt].workerCh <- task: + case <-errCh: + break + } + } + for _, processWorker := range workers { + close(processWorker.workerCh) + } + wg.Wait() +} + +func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, resultCh chan<- *indexMergeTableTask, worker string, extraCh chan bool) func(r interface{}) { return func(r interface{}) { if r == nil { return } + if extraCh != nil { + extraCh <- true + } - err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor indexMergeTableWorker: %v", r) + err4Panic := errors.Errorf("panic in IndexMergeReaderExecutor %s: %v", worker, r) logutil.Logger(ctx).Error(err4Panic.Error()) doneCh := make(chan error, 1) doneCh <- err4Panic - resultCh <- &lookupTableTask{ - doneCh: doneCh, + resultCh <- &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + doneCh: doneCh, + }, } } } @@ -801,11 +998,13 @@ type partialIndexWorker struct { partition table.PhysicalTable // it indicates if this worker is accessing a particular partition table } -func (w *partialIndexWorker) syncErr(resultCh chan<- *lookupTableTask, err error) { +func syncErr(resultCh chan<- *indexMergeTableTask, err error) { doneCh := make(chan error, 1) doneCh <- err - resultCh <- &lookupTableTask{ - doneCh: doneCh, + resultCh <- &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + doneCh: doneCh, + }, } } @@ -813,10 +1012,11 @@ func (w *partialIndexWorker) fetchHandles( ctx context.Context, result distsql.SelectResult, exitCh <-chan struct{}, - fetchCh chan<- *lookupTableTask, - resultCh chan<- *lookupTableTask, + fetchCh chan<- *indexMergeTableTask, + resultCh chan<- *indexMergeTableTask, finished <-chan struct{}, - handleCols plannercore.HandleCols) (count int64, err error) { + handleCols plannercore.HandleCols, + parTblIdx int) (count int64, err error) { chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) var basicStats *execdetails.BasicRuntimeStats if w.stats != nil { @@ -829,7 +1029,7 @@ func (w *partialIndexWorker) fetchHandles( start := time.Now() handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols) if err != nil { - w.syncErr(resultCh, err) + syncErr(resultCh, err) return count, err } if len(handles) == 0 { @@ -839,7 +1039,7 @@ func (w *partialIndexWorker) fetchHandles( return count, nil } count += int64(len(handles)) - task := w.buildTableTask(handles, retChunk) + task := w.buildTableTask(handles, retChunk, parTblIdx) if w.stats != nil { atomic.AddInt64(&w.stats.FetchIdxTime, int64(time.Since(start))) } @@ -885,12 +1085,15 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk. return handles, retChk, nil } -func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) *lookupTableTask { - task := &lookupTableTask{ - handles: handles, - idxRows: retChk, +func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk, parTblIdx int) *indexMergeTableTask { + task := &indexMergeTableTask{ + lookupTableTask: lookupTableTask{ + handles: handles, + idxRows: retChk, - partitionTable: w.partition, + partitionTable: w.partition, + }, + parTblIdx: parTblIdx, } task.doneCh = make(chan error, 1) @@ -899,7 +1102,7 @@ func (w *partialIndexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.C type indexMergeTableScanWorker struct { stats *IndexMergeRuntimeStat - workCh <-chan *lookupTableTask + workCh <-chan *indexMergeTableTask finished <-chan struct{} indexMergeExec *IndexMergeReaderExecutor tblPlans []plannercore.PhysicalPlan @@ -908,7 +1111,7 @@ type indexMergeTableScanWorker struct { memTracker *memory.Tracker } -func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *lookupTableTask) { +func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task *indexMergeTableTask) { var ok bool for { waitStart := time.Now() @@ -931,7 +1134,7 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context) (task * } } -func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *lookupTableTask) func(r interface{}) { +func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Context, task *indexMergeTableTask) func(r interface{}) { return func(r interface{}) { if r == nil { return @@ -943,7 +1146,7 @@ func (w *indexMergeTableScanWorker) handlePickAndExecTaskPanic(ctx context.Conte } } -func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *lookupTableTask) error { +func (w *indexMergeTableScanWorker) executeTask(ctx context.Context, task *indexMergeTableTask) error { tbl := w.indexMergeExec.table if w.indexMergeExec.partitionTableMode { tbl = task.partitionTable diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 3587094d5a01f..b5276588a47b3 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/testutil" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" ) @@ -752,10 +753,35 @@ func TestIntersectionWorkerPanic(t *testing.T) { res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() require.Contains(t, res[1][0], "IndexMerge") + // Test panic in intersection. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic")) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) - }() err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") require.Contains(t, err.Error(), "IndexMergeReaderExecutor") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) +} + +func TestIntersectionMemQuota(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))") + + insertStr := "insert into t1 values" + for i := 0; i < 20; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1) + } + tk.MustExec(insertStr) + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + tk.MustExec("set global tidb_mem_oom_action='CANCEL'") + defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT") + tk.MustExec("set @@tidb_mem_quota_query = 4000") + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024") + require.Contains(t, err.Error(), "Out Of Memory Quota!") } diff --git a/testkit/testutil/require.go b/testkit/testutil/require.go index 90b157fcb7591..07bbee52ea979 100644 --- a/testkit/testutil/require.go +++ b/testkit/testutil/require.go @@ -18,6 +18,7 @@ package testutil import ( "testing" + "math/rand" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -75,3 +76,14 @@ func CompareUnorderedStringSlice(a []string, b []string) bool { } return len(m) == 0 } + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func RandStringRunes(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} + From ff981b005f133f89f185091e951636238277dc95 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 11:37:54 +0800 Subject: [PATCH 06/10] fix conflict Signed-off-by: guo-shaoge --- executor/index_merge_reader.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index fb968ed56562a..d7475f71374b8 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -922,14 +922,7 @@ func (w *indexMergeProcessWorker) fetchLoopIntersection(ctx context.Context, fet if w.indexMerge.partitionTableMode { partCnt = len(w.indexMerge.prunedPartitions) } -<<<<<<< HEAD - workerCnt := partCnt - if workerCnt > maxWorkerCnt { - workerCnt = maxWorkerCnt - } -======= workerCnt := mathutil.Min(partCnt, maxWorkerCnt) ->>>>>>> 179fcef405c73497af86813192535569d8480dc6 failpoint.Inject("testIndexMergeIntersectionConcurrency", func(val failpoint.Value) { con := val.(int) if con != workerCnt { From 86333f5dd54466c1bcb1a9feb90e1a1aece06d8d Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 11:42:34 +0800 Subject: [PATCH 07/10] fix fmt Signed-off-by: guo-shaoge --- testkit/testutil/require.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/testkit/testutil/require.go b/testkit/testutil/require.go index 07bbee52ea979..a4dd335b387b3 100644 --- a/testkit/testutil/require.go +++ b/testkit/testutil/require.go @@ -17,8 +17,8 @@ package testutil import ( - "testing" "math/rand" + "testing" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -80,10 +80,9 @@ func CompareUnorderedStringSlice(a []string, b []string) bool { var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func RandStringRunes(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) } - From 1b3b13c32a3ad58242fcd05771e0daf75c25fbf0 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 11:55:32 +0800 Subject: [PATCH 08/10] fix lint Signed-off-by: guo-shaoge --- testkit/testutil/require.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testkit/testutil/require.go b/testkit/testutil/require.go index a4dd335b387b3..09e8e871312ae 100644 --- a/testkit/testutil/require.go +++ b/testkit/testutil/require.go @@ -79,6 +79,7 @@ func CompareUnorderedStringSlice(a []string, b []string) bool { var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +// RandStringRunes generate random string of length n. func RandStringRunes(n int) string { b := make([]rune, n) for i := range b { From 685f86d4be0de44de4c905421e16fe87c17c6c3e Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 13:07:29 +0800 Subject: [PATCH 09/10] fix comment Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index b5276588a47b3..89e8b520e7832 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -269,13 +269,11 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) - tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) - tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) // Test with primary key, so the partialPlan is TableScan. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) @@ -286,17 +284,14 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) - tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) - tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) - tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) tk.MustExec("commit;") if i == 1 { From 5f44fd1b063e0fc9d6a9f5b0043e7ca3cdd9ac91 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 13:11:29 +0800 Subject: [PATCH 10/10] fix dup Signed-off-by: guo-shaoge --- executor/index_merge_reader_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 89e8b520e7832..79d2d8b895a81 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -261,8 +261,6 @@ func TestIndexMergeInTransaction(t *testing.T) { tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) - tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) - tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") @@ -634,13 +632,13 @@ func TestIndexMergeIntersectionConcurrency(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)")) tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) - // Concurrency only works for dynamic pruning partition table. + // Concurrency only works for dynamic pruning partition table, so real concurrency is 1. tk.MustExec("set tidb_partition_prune_mode = 'static'") tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) - // Concurrency only works for dynamic pruning partition table. + // Concurrency only works for dynamic pruning partition table. so real concurrency is 1. tk.MustExec("drop table if exists t1") tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)")