diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 2d515c4f8853d..789b5f76aa1c6 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -45,6 +45,8 @@ type RequestBuilder struct { kv.Request is infoschema.InfoSchema err error + // When SetDAGRequest is called, builder will also set this field. + dag *tipb.DAGRequest } // Build builds a "kv.Request". @@ -73,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { if builder.Request.KeyRanges == nil { builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil) } + + if dag := builder.dag; dag != nil { + if execCnt := len(dag.Executors); execCnt == 1 { + oldConcurrency := builder.Request.Concurrency + // select * from t order by id + if builder.Request.KeepOrder { + // When the DAG is just simple scan and keep order, set concurrency to 2. + // If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough. + // If very few data are returned to client, the speed is not optimal but good enough. + switch dag.Executors[0].Tp { + case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan: + builder.Request.Concurrency = 2 + failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { + if val.(bool) { + // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition + builder.Request.Concurrency = oldConcurrency + } + }) + } + } + } + } + return &builder.Request, builder.err } @@ -152,17 +177,19 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild builder.Request.Tp = kv.ReqTypeDAG builder.Request.Cacheable = true builder.Request.Data, builder.err = dag.Marshal() - } - if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { - limit := dag.Executors[execCnt-1].GetLimit() - builder.Request.LimitSize = limit.GetLimit() - // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. - if execCnt == 2 { - if limit.Limit < estimatedRegionRowCount { - if kr := builder.Request.KeyRanges; kr != nil { - builder.Request.Concurrency = kr.PartitionNum() - } else { - builder.Request.Concurrency = 1 + + builder.dag = dag + if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { + limit := dag.Executors[execCnt-1].GetLimit() + builder.Request.LimitSize = limit.GetLimit() + // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. + if execCnt == 2 { + if limit.Limit < estimatedRegionRowCount { + if kr := builder.Request.KeyRanges; kr != nil { + builder.Request.Concurrency = kr.PartitionNum() + } else { + builder.Request.Concurrency = 1 + } } } } diff --git a/session/sessiontest/session_test.go b/session/sessiontest/session_test.go index 79f05e0abc2dc..04d1984678952 100644 --- a/session/sessiontest/session_test.go +++ b/session/sessiontest/session_test.go @@ -1211,6 +1211,10 @@ func TestCoprocessorOOMAction(t *testing.T) { sql: "select id from t5", }, } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/distsql/testRateLimitActionMockConsumeAndAssert")) + }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert")) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 6ff0f3dfec3ce..2c9bfedd94486 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -201,17 +201,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if it.req.KeepOrder { - oldConcurrency := it.concurrency - partitionNum := req.KeyRanges.PartitionNum() - if partitionNum > 0 && partitionNum < it.concurrency { - it.concurrency = partitionNum - } - failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { - if val.(bool) { - // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition - it.concurrency = oldConcurrency - } - }) if it.smallTaskConcurrency > 20 { it.smallTaskConcurrency = 20 }