Skip to content

Commit

Permalink
store/copr: partly revert pr/35975, do it correctly this time (#55196) (
Browse files Browse the repository at this point in the history
#55211)

close #54969
  • Loading branch information
ti-chi-bot committed Sep 23, 2024
1 parent 4b40e9b commit cdd7546
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
49 changes: 38 additions & 11 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type RequestBuilder struct {
kv.Request
is infoschema.InfoSchema
err error

// When SetDAGRequest is called, builder will also this field.
dag *tipb.DAGRequest
}

// Build builds a "kv.Request".
Expand Down Expand Up @@ -72,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.Request.KeyRanges == nil {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil)
}

if dag := builder.dag; dag != nil {
if execCnt := len(dag.Executors); execCnt == 1 {
oldConcurrency := builder.Request.Concurrency
// select * from t order by id
if builder.Request.KeepOrder {
// When the DAG is just simple scan and keep order, set concurrency to 2.
// If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough.
// If very few data are returned to client, the speed is not optimal but good enough.
switch dag.Executors[0].Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan:
builder.Request.Concurrency = 2
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
builder.Request.Concurrency = oldConcurrency
}
})
}
}
}
}

return &builder.Request, builder.err
}

Expand Down Expand Up @@ -151,17 +177,18 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.Cacheable = true
builder.Request.Data, builder.err = dag.Marshal()
}
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
builder.dag = dag
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/session/test/variable/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func TestCoprocessorOOMAction(t *testing.T) {
sql: "select id from t5",
},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/distsql/testRateLimitActionMockConsumeAndAssert"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/testRateLimitActionMockConsumeAndAssert"))
Expand Down
25 changes: 0 additions & 25 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,31 +207,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}

if it.req.KeepOrder {
// Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing.
// TL;DR
// Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a
// higher concurrency, the data is just cached and not consumed for a while, this increase the memory usage.
// Set concurrency to 2 can reduce the memory usage and I've tested that it does not necessarily
// decrease the performance.
// For ReqTypeAnalyze, we keep its concurrency to avoid slow analyze(see https://github.com/pingcap/tidb/issues/40162 for details).
if it.concurrency > 2 && it.req.Tp != kv.ReqTypeAnalyze {
oldConcurrency := it.concurrency
partitionNum := req.KeyRanges.PartitionNum()
if partitionNum > it.concurrency {
partitionNum = it.concurrency
}
it.concurrency = 2
if it.concurrency < partitionNum {
it.concurrency = partitionNum
}

failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
it.concurrency = oldConcurrency
}
})
}
if it.smallTaskConcurrency > 20 {
it.smallTaskConcurrency = 20
}
Expand Down

0 comments on commit cdd7546

Please sign in to comment.