Skip to content

Commit

Permalink
store/copr: partly revert pr/35975, do it correctly this time (#55196)
Browse files Browse the repository at this point in the history
close #54969
  • Loading branch information
tiancaiamao authored Aug 6, 2024
1 parent fa554f8 commit 9fee330
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 22 deletions.
49 changes: 38 additions & 11 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type RequestBuilder struct {
kv.Request
is infoschema.MetaOnlyInfoSchema
err error

// When SetDAGRequest is called, builder will also this field.
dag *tipb.DAGRequest
}

// Build builds a "kv.Request".
Expand Down Expand Up @@ -75,6 +78,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.Request.KeyRanges == nil {
builder.Request.KeyRanges = kv.NewNonPartitionedKeyRanges(nil)
}

if dag := builder.dag; dag != nil {
if execCnt := len(dag.Executors); execCnt == 1 {
oldConcurrency := builder.Request.Concurrency
// select * from t order by id
if builder.Request.KeepOrder {
// When the DAG is just simple scan and keep order, set concurrency to 2.
// If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough.
// If very few data are returned to client, the speed is not optimal but good enough.
switch dag.Executors[0].Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan:
builder.Request.Concurrency = 2
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
builder.Request.Concurrency = oldConcurrency
}
})
}
}
}
}

return &builder.Request, builder.err
}

Expand Down Expand Up @@ -154,17 +180,18 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.Cacheable = true
builder.Request.Data, builder.err = dag.Marshal()
}
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
builder.dag = dag
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/session/test/variable/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func TestCoprocessorOOMAction(t *testing.T) {
sql: "select id from t5",
},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert"))
Expand Down
11 changes: 0 additions & 11 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}

if it.req.KeepOrder {
oldConcurrency := it.concurrency
partitionNum := req.KeyRanges.PartitionNum()
if partitionNum > 0 && partitionNum < it.concurrency {
it.concurrency = partitionNum
}
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
it.concurrency = oldConcurrency
}
})
if it.smallTaskConcurrency > 20 {
it.smallTaskConcurrency = 20
}
Expand Down
134 changes: 134 additions & 0 deletions tests/integrationtest/r/executor/issues.result
Original file line number Diff line number Diff line change
Expand Up @@ -868,3 +868,137 @@ set @@tidb_max_chunk_size = default;
select tan(9021874879467600608071521900001091070693729763119983979);
tan(9021874879467600608071521900001091070693729763119983979)
8.068627196084492
drop table if exists t;
create table t (id int auto_increment, c char(120), primary key(id));
create table pt (id int primary key auto_increment, val int) partition by range (id)
(PARTITION p1 VALUES LESS THAN (100),
PARTITION p2 VALUES LESS THAN (200),
PARTITION p3 VALUES LESS THAN (300),
PARTITION p4 VALUES LESS THAN (400),
PARTITION p5 VALUES LESS THAN (500),
PARTITION p6 VALUES LESS THAN (600),
PARTITION p7 VALUES LESS THAN (700));
insert into t (c) values ('abc'), ('def'), ('ghi'), ('jkl');
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
insert into t (c) select (c) from t;
split table t between (0) and (40960) regions 30;
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
29 1
analyze table t;
insert into pt (val) values (123),(456),(789),(1112);
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
insert into pt (val) select (val) from pt;
split table pt between (0) and (40960) regions 30;
TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO
203 1
analyze table pt;
set @@tidb_distsql_scan_concurrency = default;
explain analyze select * from t order by id; # expected distsql concurrency 2
id estRows actRows task access object execution info operator info memory disk
TableReader_11 256.00 <actRows> root NULL max_distsql_concurrency: 2 NULL <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t limit 100; # expected distsql concurrency 1
id estRows actRows task access object execution info operator info memory disk
Limit_7 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_11 100.00 <actRows> root NULL max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_10 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_9 100.00 <actRows> cop[tikv] table:t NULL keep order:false <memory> <disk>
explain analyze select * from t limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_11 256.00 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_10 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:t NULL keep order:false <memory> <disk>
explain analyze select * from t where c = 'abc' limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] <memory> <disk>
explain analyze select * from t where c = 'abc' limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 0.26 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abc") <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:t NULL keep order:false, stats:partial[c:unInitialized] <memory> <disk>
explain analyze select * from t order by id limit 100; # expected distsql concurrency 1
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_17 100.00 <actRows> root NULL max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_16 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_15 100.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t order by id limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_21 256.00 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_20 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:t NULL keep order:true <memory> <disk>
explain analyze select * from t where c = 'abd' order by id limit 100;
id estRows actRows task access object execution info operator info memory disk
Limit_11 0.26 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 0.26 <actRows> root NULL max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 0.26 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 0.26 <actRows> cop[tikv] NULL NULL eq(executor__issues.t.c, "abd") <memory> <disk>
└─TableFullScan_17 256.00 <actRows> cop[tikv] table:t NULL keep order:true, stats:partial[c:unInitialized] <memory> <disk>
select @@tidb_partition_prune_mode;
@@tidb_partition_prune_mode
dynamic
explain analyze select * from pt order by id; # expected distsql concurrency 2
id estRows actRows task access object execution info operator info memory disk
TableReader_11 256.00 <actRows> root partition:all max_distsql_concurrency: 2 NULL <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt limit 100; # expected distsql concurrency 7
id estRows actRows task access object execution info operator info memory disk
Limit_7 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_11 100.00 <actRows> root partition:all max_distsql_concurrency: 7 NULL <memory> <disk>
└─Limit_10 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_9 100.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_7 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_11 256.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_10 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_9 256.00 <actRows> cop[tikv] table:pt NULL keep order:false <memory> <disk>
explain analyze select * from pt where val = 125 limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_13 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_11 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 125.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
explain analyze select * from pt where val = 125 limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_8 204.80 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_13 204.80 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_12 204.80 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─Selection_11 204.80 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 125) <memory> <disk>
└─TableFullScan_10 256.00 <actRows> cop[tikv] table:pt NULL keep order:false, stats:partial[val:missing] <memory> <disk>
explain analyze select * from pt order by id limit 100; # expected distsql concurrency 7, but currently get 1, see issue #55190
id estRows actRows task access object execution info operator info memory disk
Limit_10 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_17 100.00 <actRows> root partition:all max_distsql_concurrency: 1 NULL <memory> <disk>
└─Limit_16 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─TableFullScan_15 100.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt order by id limit 100000; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 256.00 <actRows> root NULL NULL offset:0, count:100000 <memory> <disk>
└─TableReader_21 256.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_20 256.00 <actRows> cop[tikv] NULL NULL offset:0, count:100000 <memory> <disk>
└─TableFullScan_19 256.00 <actRows> cop[tikv] table:pt NULL keep order:true <memory> <disk>
explain analyze select * from pt where val = 126 order by id limit 100; # expected distsql concurrency 15
id estRows actRows task access object execution info operator info memory disk
Limit_11 100.00 <actRows> root NULL NULL offset:0, count:100 <memory> <disk>
└─TableReader_20 100.00 <actRows> root partition:all max_distsql_concurrency: 15 NULL <memory> <disk>
└─Limit_19 100.00 <actRows> cop[tikv] NULL NULL offset:0, count:100 <memory> <disk>
└─Selection_18 100.00 <actRows> cop[tikv] NULL NULL eq(executor__issues.pt.val, 126) <memory> <disk>
└─TableFullScan_17 125.00 <actRows> cop[tikv] table:pt NULL keep order:true, stats:partial[val:missing] <memory> <disk>
Loading

0 comments on commit 9fee330

Please sign in to comment.