Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55196
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tiancaiamao authored and ti-chi-bot committed Aug 6, 2024
1 parent 70bfd90 commit d7fa751
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 11 deletions.
49 changes: 38 additions & 11 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type RequestBuilder struct {
kv.Request
is infoschema.InfoSchema
err error

// When SetDAGRequest is called, builder will also this field.
dag *tipb.DAGRequest
}

// Build builds a "kv.Request".
Expand Down Expand Up @@ -72,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.Request.KeyRanges == nil {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil)
}

if dag := builder.dag; dag != nil {
if execCnt := len(dag.Executors); execCnt == 1 {
oldConcurrency := builder.Request.Concurrency
// select * from t order by id
if builder.Request.KeepOrder {
// When the DAG is just simple scan and keep order, set concurrency to 2.
// If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough.
// If very few data are returned to client, the speed is not optimal but good enough.
switch dag.Executors[0].Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan:
builder.Request.Concurrency = 2
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
builder.Request.Concurrency = oldConcurrency
}
})
}
}
}
}

return &builder.Request, builder.err
}

Expand Down Expand Up @@ -151,17 +177,18 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.Cacheable = true
builder.Request.Data, builder.err = dag.Marshal()
}
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
builder.dag = dag
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/session/test/variable/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func TestCoprocessorOOMAction(t *testing.T) {
sql: "select id from t5",
},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert"))
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}

if it.req.KeepOrder {
<<<<<<< HEAD
// Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing.
// TL;DR
// Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a
Expand All @@ -232,6 +233,8 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}
})
}
=======
>>>>>>> 9fee330077e (store/copr: partly revert pr/35975, do it correctly this time (#55196))
if it.smallTaskConcurrency > 20 {
it.smallTaskConcurrency = 20
}
Expand Down
140 changes: 140 additions & 0 deletions tests/integrationtest/r/executor/issues.result
Original file line number Diff line number Diff line change
Expand Up @@ -780,3 +780,143 @@ SELECT count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`;
count(`t`.`c`)
170
set @@tidb_max_chunk_size = default;
<<<<<<< HEAD
=======
select tan(9021874879467600608071521900001091070693729763119983979);
tan(9021874879467600608071521900001091070693729763119983979)
8.068627196084492
drop table if exists t;
create table t (id int auto_increment, c char(120), primary key(id));
create table pt (id int primary key auto_increment, val int) partition by range (id)
(PARTITION p1 VALUES LESS THAN (100),
PARTITION p2 VALUES LESS THAN (200),
PARTITION p3 VALUES LESS THAN (300),
PARTITION p4 VALUES LESS THAN (400),
PARTITION p5 VALUES LESS THAN (500),
PARTITION p6 VALUES LESS THAN (600),
PARTITION p7 VALUES LESS THAN (700));
insert into t (c) values ('abc'), ('def'), ('ghi'), ('jkl');
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
split table t between (0) and (40960) regions 30;
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
29 1
analyze table t;
insert into pt (val) values (123),(456),(789),(1112);
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
split table pt between (0) and (40960) regions 30;
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
203 1
analyze table pt;
set @@tidb_distsql_scan_concurrency = default;
explain analyze select * from t order by id; # expected distsql concurrency 2
id estRows actRows task access object execution info operator info memory disk
TableReader_11 256.00 <actRows> root NULL max_distsql_concurrency: 2 NULL <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t limit 100; # expected distsql concurrency 1
id estRows actRows task access object execution info operator info memory disk
Limit_7 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_11 100.00 <actRows> root NULL max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_10 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_9 100.00 <actRows> cop[tikv] table:t NULL keep order:false <memory> <disk>
explain analyze select * from t limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_11 256.00 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_10 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:t NULL keep order:false <memory> <disk>
explain analyze select * from t where c = 'abc' limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] <memory> <disk>
explain analyze select * from t where c = 'abc' limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] <memory> <disk>
explain analyze select * from t order by id limit 100; # expected distsql concurrency 1
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_17 100.00 <actRows> root NULL max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_16 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_15 100.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t order by id limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_21 256.00 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_20 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t where c = 'abd' order by id limit 100;
id estRows actRows task access object execution info operator info memory disk
Limit_11 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abd") <memory> <disk>
└─TableFullScan_17 256.00 <actRows> cop[tikv] table:t NULL keep order:true, stats:partial[c:unInitialized] <memory> <disk>
select @@tidb_partition_prune_mode;
@@tidb_partition_prune_mode
dynamic
explain analyze select * from pt order by id; # expected distsql concurrency 2
id estRows actRows task access object execution info operator info memory disk
TableReader_11 256.00 <actRows> root partition:all max_distsql_concurrency: 2 NULL <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt limit 100; # expected distsql concurrency 7
id estRows actRows task access object execution info operator info memory disk
Limit_7 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_11 100.00 <actRows> root partition:all max_distsql_concurrency: 7 NULL <memory> <disk>
└─Limit_10 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_9 100.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_11 256.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_10 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 125.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 204.80 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 204.80 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 204.80 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 204.80 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_17 100.00 <actRows> root partition:all max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_16 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_15 100.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt order by id limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_21 256.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_20 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 125.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] <memory> <disk>
>>>>>>> 9fee330077e (store/copr: partly revert pr/35975, do it correctly this time (#55196))
Loading

0 comments on commit d7fa751

Please sign in to comment.