diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index da6a6eda7b8d4..29d192c913a3b 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -44,6 +44,9 @@ type RequestBuilder struct { kv.Request is infoschema.InfoSchema err error + + // When SetDAGRequest is called, builder will also this field. + dag *tipb.DAGRequest } // Build builds a "kv.Request". @@ -72,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { if builder.Request.KeyRanges == nil { builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil) } + + if dag := builder.dag; dag != nil { + if execCnt := len(dag.Executors); execCnt == 1 { + oldConcurrency := builder.Request.Concurrency + // select * from t order by id + if builder.Request.KeepOrder { + // When the DAG is just simple scan and keep order, set concurrency to 2. + // If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough. + // If very few data are returned to client, the speed is not optimal but good enough. + switch dag.Executors[0].Tp { + case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan: + builder.Request.Concurrency = 2 + failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { + if val.(bool) { + // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition + builder.Request.Concurrency = oldConcurrency + } + }) + } + } + } + } + return &builder.Request, builder.err } @@ -151,17 +177,18 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild builder.Request.Tp = kv.ReqTypeDAG builder.Request.Cacheable = true builder.Request.Data, builder.err = dag.Marshal() - } - if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { - limit := dag.Executors[execCnt-1].GetLimit() - builder.Request.LimitSize = limit.GetLimit() - // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. - if execCnt == 2 { - if limit.Limit < estimatedRegionRowCount { - if kr := builder.Request.KeyRanges; kr != nil { - builder.Request.Concurrency = kr.PartitionNum() - } else { - builder.Request.Concurrency = 1 + builder.dag = dag + if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil { + limit := dag.Executors[execCnt-1].GetLimit() + builder.Request.LimitSize = limit.GetLimit() + // When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient. + if execCnt == 2 { + if limit.Limit < estimatedRegionRowCount { + if kr := builder.Request.KeyRanges; kr != nil { + builder.Request.Concurrency = kr.PartitionNum() + } else { + builder.Request.Concurrency = 1 + } } } } diff --git a/pkg/session/test/variable/variable_test.go b/pkg/session/test/variable/variable_test.go index a882f536108a5..791c989ef537d 100644 --- a/pkg/session/test/variable/variable_test.go +++ b/pkg/session/test/variable/variable_test.go @@ -93,6 +93,11 @@ func TestCoprocessorOOMAction(t *testing.T) { sql: "select id from t5", }, } + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert")) + }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert")) diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 93b80a138a6fe..810b6910b2512 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -207,31 +207,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if it.req.KeepOrder { - // Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing. - // TL;DR - // Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a - // higher concurrency, the data is just cached and not consumed for a while, this increase the memory usage. - // Set concurrency to 2 can reduce the memory usage and I've tested that it does not necessarily - // decrease the performance. - // For ReqTypeAnalyze, we keep its concurrency to avoid slow analyze(see https://github.com/pingcap/tidb/issues/40162 for details). - if it.concurrency > 2 && it.req.Tp != kv.ReqTypeAnalyze { - oldConcurrency := it.concurrency - partitionNum := req.KeyRanges.PartitionNum() - if partitionNum > it.concurrency { - partitionNum = it.concurrency - } - it.concurrency = 2 - if it.concurrency < partitionNum { - it.concurrency = partitionNum - } - - failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { - if val.(bool) { - // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition - it.concurrency = oldConcurrency - } - }) - } if it.smallTaskConcurrency > 20 { it.smallTaskConcurrency = 20 }