From d7fa751f042f3eff836560f3d984f2f44c551420 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 6 Aug 2024 13:39:38 +0800 Subject: [PATCH] This is an automated cherry-pick of #55196 Signed-off-by: ti-chi-bot --- pkg/distsql/request_builder.go | 49 ++++-- pkg/session/test/variable/variable_test.go | 5 + pkg/store/copr/coprocessor.go | 3 + .../integrationtest/r/executor/issues.result | 140 ++++++++++++++++++ tests/integrationtest/t/executor/issues.test | 109 ++++++++++++++ 5 files changed, 295 insertions(+), 11 deletions(-) diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index d325df7709934..11a951dd83fd8 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -44,6 +44,9 @@ type RequestBuilder struct { kv.Request is infoschema.InfoSchema err error + + // When SetDAGRequest is called, builder will also this field. + dag *tipb.DAGRequest } // Build builds a "kv.Request". @@ -72,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { if builder.Request.KeyRanges == nil { builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil) } + + if dag := builder.dag; dag != nil { + if execCnt := len(dag.Executors); execCnt == 1 { + oldConcurrency := builder.Request.Concurrency + // select * from t order by id + if builder.Request.KeepOrder { + // When the DAG is just simple scan and keep order, set concurrency to 2. + // If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough. + // If very few data are returned to client, the speed is not optimal but good enough. + switch dag.Executors[0].Tp { + case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan: + builder.Request.Concurrency = 2 + failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { + if val.(bool) { + // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition + builder.Request.Concurrency = oldConcurrency + } + }) + } + } + } + } + return &builder.Request, builder.err } @@ -151,17 +177,18 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild builder.Request.Tp = kv.ReqTypeDAG builder.Request.Cacheable = true builder.Request.Data, builder.err = dag.Marshal() - } - if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { - limit := dag.Executors[execCnt-1].GetLimit() - builder.Request.LimitSize = limit.GetLimit() - // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. - if execCnt == 2 { - if limit.Limit < estimatedRegionRowCount { - if kr := builder.Request.KeyRanges; kr != nil { - builder.Request.Concurrency = kr.PartitionNum() - } else { - builder.Request.Concurrency = 1 + builder.dag = dag + if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { + limit := dag.Executors[execCnt-1].GetLimit() + builder.Request.LimitSize = limit.GetLimit() + // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. + if execCnt == 2 { + if limit.Limit < estimatedRegionRowCount { + if kr := builder.Request.KeyRanges; kr != nil { + builder.Request.Concurrency = kr.PartitionNum() + } else { + builder.Request.Concurrency = 1 + } } } } diff --git a/pkg/session/test/variable/variable_test.go b/pkg/session/test/variable/variable_test.go index a882f536108a5..791c989ef537d 100644 --- a/pkg/session/test/variable/variable_test.go +++ b/pkg/session/test/variable/variable_test.go @@ -93,6 +93,11 @@ func TestCoprocessorOOMAction(t *testing.T) { sql: "select id from t5", }, } + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert")) + }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert")) diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 67df921495696..f813190f16222 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -207,6 +207,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if it.req.KeepOrder { +<<<<<<< HEAD // Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing. // TL;DR // Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a @@ -232,6 +233,8 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } }) } +======= +>>>>>>> 9fee330077e (store/copr: partly revert pr/35975, do it correctly this time (#55196)) if it.smallTaskConcurrency > 20 { it.smallTaskConcurrency = 20 } diff --git a/tests/integrationtest/r/executor/issues.result b/tests/integrationtest/r/executor/issues.result index 6a181e6404348..2034f0f5b859d 100644 --- a/tests/integrationtest/r/executor/issues.result +++ b/tests/integrationtest/r/executor/issues.result @@ -780,3 +780,143 @@ SELECT count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`; count(`t`.`c`) 170 set @@tidb_max_chunk_size = default; +<<<<<<< HEAD +======= +select tan(9021874879467600608071521900001091070693729763119983979); +tan(9021874879467600608071521900001091070693729763119983979) +8.068627196084492 +drop table if exists t; +create table t (id int auto_increment, c char(120), primary key(id)); +create table pt (id int primary key auto_increment, val int) partition by range (id) +(PARTITION p1 VALUES LESS THAN (100), +PARTITION p2 VALUES LESS THAN (200), +PARTITION p3 VALUES LESS THAN (300), +PARTITION p4 VALUES LESS THAN (400), +PARTITION p5 VALUES LESS THAN (500), +PARTITION p6 VALUES LESS THAN (600), +PARTITION p7 VALUES LESS THAN (700)); +insert into t (c) values ('abc'), ('def'), ('ghi'), ('jkl'); +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +split table t between (0) and (40960) regions 30; +TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO +29 1 +analyze table t; +insert into pt (val) values (123),(456),(789),(1112); +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +split table pt between (0) and (40960) regions 30; +TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO +203 1 +analyze table pt; +set @@tidb_distsql_scan_concurrency = default; +explain analyze select * from t order by id; # expected distsql concurrency 2 +id estRows actRows task access object execution info operator info memory disk +TableReader_11 256.00 root NULL max_distsql_concurrency: 2 NULL +└─TableFullScan_10 256.00 cop[tikv] table:t NULL keep order:true +explain analyze select * from t limit 100; # expected distsql concurrency 1 +id estRows actRows task access object execution info operator info memory disk +Limit_7 100.00 root NULL NULL offset:0, count:100 +└─TableReader_11 100.00 root NULL max_distsql_concurrency: 1 NULL + └─Limit_10 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─TableFullScan_9 100.00 cop[tikv] table:t NULL keep order:false +explain analyze select * from t limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_7 256.00 root NULL NULL offset:0, count:100000 +└─TableReader_11 256.00 root NULL max_distsql_concurrency: 15 NULL + └─Limit_10 256.00 cop[tikv] NULL NULL offset:0, count:100000 + └─TableFullScan_9 256.00 cop[tikv] table:t NULL keep order:false +explain analyze select * from t where c = 'abc' limit 100; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_8 0.26 root NULL NULL offset:0, count:100 +└─TableReader_13 0.26 root NULL max_distsql_concurrency: 15 NULL + └─Limit_12 0.26 cop[tikv] NULL NULL offset:0, count:100 + └─Selection_11 0.26 cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") + └─TableFullScan_10 256.00 cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] +explain analyze select * from t where c = 'abc' limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_8 0.26 root NULL NULL offset:0, count:100000 +└─TableReader_13 0.26 root NULL max_distsql_concurrency: 15 NULL + └─Limit_12 0.26 cop[tikv] NULL NULL offset:0, count:100000 + └─Selection_11 0.26 cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") + └─TableFullScan_10 256.00 cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] +explain analyze select * from t order by id limit 100; # expected distsql concurrency 1 +id estRows actRows task access object execution info operator info memory disk +Limit_10 100.00 root NULL NULL offset:0, count:100 +└─TableReader_17 100.00 root NULL max_distsql_concurrency: 1 NULL + └─Limit_16 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─TableFullScan_15 100.00 cop[tikv] table:t NULL keep order:true +explain analyze select * from t order by id limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_11 256.00 root NULL NULL offset:0, count:100000 +└─TableReader_21 256.00 root NULL max_distsql_concurrency: 15 NULL + └─Limit_20 256.00 cop[tikv] NULL NULL offset:0, count:100000 + └─TableFullScan_19 256.00 cop[tikv] table:t NULL keep order:true +explain analyze select * from t where c = 'abd' order by id limit 100; +id estRows actRows task access object execution info operator info memory disk +Limit_11 0.26 root NULL NULL offset:0, count:100 +└─TableReader_20 0.26 root NULL max_distsql_concurrency: 15 NULL + └─Limit_19 0.26 cop[tikv] NULL NULL offset:0, count:100 + └─Selection_18 0.26 cop[tikv] NULL NULL eq(executor__issues.t.c, "abd") + └─TableFullScan_17 256.00 cop[tikv] table:t NULL keep order:true, stats:partial[c:unInitialized] +select @@tidb_partition_prune_mode; +@@tidb_partition_prune_mode +dynamic +explain analyze select * from pt order by id; # expected distsql concurrency 2 +id estRows actRows task access object execution info operator info memory disk +TableReader_11 256.00 root partition:all max_distsql_concurrency: 2 NULL +└─TableFullScan_10 256.00 cop[tikv] table:pt NULL keep order:true +explain analyze select * from pt limit 100; # expected distsql concurrency 7 +id estRows actRows task access object execution info operator info memory disk +Limit_7 100.00 root NULL NULL offset:0, count:100 +└─TableReader_11 100.00 root partition:all max_distsql_concurrency: 7 NULL + └─Limit_10 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─TableFullScan_9 100.00 cop[tikv] table:pt NULL keep order:false +explain analyze select * from pt limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_7 256.00 root NULL NULL offset:0, count:100000 +└─TableReader_11 256.00 root partition:all max_distsql_concurrency: 15 NULL + └─Limit_10 256.00 cop[tikv] NULL NULL offset:0, count:100000 + └─TableFullScan_9 256.00 cop[tikv] table:pt NULL keep order:false +explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_8 100.00 root NULL NULL offset:0, count:100 +└─TableReader_13 100.00 root partition:all max_distsql_concurrency: 15 NULL + └─Limit_12 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─Selection_11 100.00 cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) + └─TableFullScan_10 125.00 cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] +explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_8 204.80 root NULL NULL offset:0, count:100000 +└─TableReader_13 204.80 root partition:all max_distsql_concurrency: 15 NULL + └─Limit_12 204.80 cop[tikv] NULL NULL offset:0, count:100000 + └─Selection_11 204.80 cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) + └─TableFullScan_10 256.00 cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] +explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190 +id estRows actRows task access object execution info operator info memory disk +Limit_10 100.00 root NULL NULL offset:0, count:100 +└─TableReader_17 100.00 root partition:all max_distsql_concurrency: 1 NULL + └─Limit_16 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─TableFullScan_15 100.00 cop[tikv] table:pt NULL keep order:true +explain analyze select * from pt order by id limit 100000; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_11 256.00 root NULL NULL offset:0, count:100000 +└─TableReader_21 256.00 root partition:all max_distsql_concurrency: 15 NULL + └─Limit_20 256.00 cop[tikv] NULL NULL offset:0, count:100000 + └─TableFullScan_19 256.00 cop[tikv] table:pt NULL keep order:true +explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15 +id estRows actRows task access object execution info operator info memory disk +Limit_11 100.00 root NULL NULL offset:0, count:100 +└─TableReader_20 100.00 root partition:all max_distsql_concurrency: 15 NULL + └─Limit_19 100.00 cop[tikv] NULL NULL offset:0, count:100 + └─Selection_18 100.00 cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) + └─TableFullScan_17 125.00 cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] +>>>>>>> 9fee330077e (store/copr: partly revert pr/35975, do it correctly this time (#55196)) diff --git a/tests/integrationtest/t/executor/issues.test b/tests/integrationtest/t/executor/issues.test index 5fd2e9f07ec36..556b266d0037a 100644 --- a/tests/integrationtest/t/executor/issues.test +++ b/tests/integrationtest/t/executor/issues.test @@ -582,3 +582,112 @@ insert into s values(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1), SELECT /*+ stream_agg()*/ count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`; SELECT count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`; set @@tidb_max_chunk_size = default; +<<<<<<< HEAD +======= + +# TestIssue52672 +select tan(9021874879467600608071521900001091070693729763119983979); + + +# TestIssue54969 +# For order by query, the distsql concurrency should be @@tidb_distsql_scan_concurrency. +# For simple limit N query and N < 10000, the distsql concurrency should be 1 (normal table) +# For this case nn partition table, concurrency should be partition number. +drop table if exists t; +create table t (id int auto_increment, c char(120), primary key(id)); +create table pt (id int primary key auto_increment, val int) partition by range (id) +(PARTITION p1 VALUES LESS THAN (100), + PARTITION p2 VALUES LESS THAN (200), + PARTITION p3 VALUES LESS THAN (300), + PARTITION p4 VALUES LESS THAN (400), + PARTITION p5 VALUES LESS THAN (500), + PARTITION p6 VALUES LESS THAN (600), + PARTITION p7 VALUES LESS THAN (700)); +insert into t (c) values ('abc'), ('def'), ('ghi'), ('jkl'); +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +insert into t (c) select (c) from t; +split table t between (0) and (40960) regions 30; +analyze table t; + +insert into pt (val) values (123),(456),(789),(1112); +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +insert into pt (val) select (val) from pt; +split table pt between (0) and (40960) regions 30; +analyze table pt; + +set @@tidb_distsql_scan_concurrency = default; +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t order by id; # expected distsql concurrency 2 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t limit 100; # expected distsql concurrency 1 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t where c = 'abc' limit 100; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t where c = 'abc' limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t order by id limit 100; # expected distsql concurrency 1 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t order by id limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from t where c = 'abd' order by id limit 100; +select @@tidb_partition_prune_mode; + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt order by id; # expected distsql concurrency 2 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt limit 100; # expected distsql concurrency 7 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt order by id limit 100000; # expected distsql concurrency 15 + +-- replace_column 8 9 3 +-- replace_regex /.*max_distsql_concurrency: (?P[0-9]+).*/max_distsql_concurrency: $num/ /tikv_task:.*// /time:.*// /data:.*// +explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15 + + +>>>>>>> 9fee330077e (store/copr: partly revert pr/35975, do it correctly this time (#55196))