From 86c9536dd586ffbff674fda0fc894059ab0838b9 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 2 Aug 2024 10:22:19 +0800 Subject: [PATCH] store/copr: revert pr/35975, do not reduce concurrency for keep order coprocessor request (#55141) ref pingcap/tidb#54969 --- store/copr/coprocessor.go | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 074081c9c687d..6ff0f3dfec3ce 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -201,31 +201,17 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if it.req.KeepOrder { - // Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing. - // TL;DR - // Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a - // higher concurrency, the data is just cached and not consumed for a while, this increase the memory usage. - // Set concurrency to 2 can reduce the memory usage and I've tested that it does not necessarily - // decrease the performance. - // For ReqTypeAnalyze, we keep its concurrency to avoid slow analyze(see https://github.com/pingcap/tidb/issues/40162 for details). - if it.concurrency > 2 && it.req.Tp != kv.ReqTypeAnalyze { - oldConcurrency := it.concurrency - partitionNum := req.KeyRanges.PartitionNum() - if partitionNum > it.concurrency { - partitionNum = it.concurrency - } - it.concurrency = 2 - if it.concurrency < partitionNum { - it.concurrency = partitionNum - } - - failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { - if val.(bool) { - // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition - it.concurrency = oldConcurrency - } - }) + oldConcurrency := it.concurrency + partitionNum := req.KeyRanges.PartitionNum() + if partitionNum > 0 && partitionNum < it.concurrency { + it.concurrency = partitionNum } + failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { + if val.(bool) { + // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition + it.concurrency = oldConcurrency + } + }) if it.smallTaskConcurrency > 20 { it.smallTaskConcurrency = 20 }