Skip to content

Commit

Permalink
store/copr: partly revert pr/35975, do it correctly this time pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Aug 6, 2024
1 parent c1456c5 commit 6cb8d0d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
49 changes: 38 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type RequestBuilder struct {
kv.Request
is infoschema.InfoSchema
err error
// When SetDAGRequest is called, builder will also this field.
dag *tipb.DAGRequest
}

// Build builds a "kv.Request".
Expand Down Expand Up @@ -73,6 +75,29 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.Request.KeyRanges == nil {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(nil)
}

if dag := builder.dag; dag != nil {
if execCnt := len(dag.Executors); execCnt == 1 {
oldConcurrency := builder.Request.Concurrency
// select * from t order by id
if builder.Request.KeepOrder {
// When the DAG is just simple scan and keep order, set concurrency to 2.
// If a lot data are returned to client, mysql protocol is the bottleneck so concurrency 2 is enough.
// If very few data are returned to client, the speed is not optimal but good enough.
switch dag.Executors[0].Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypeIndexScan, tipb.ExecType_TypePartitionTableScan:
builder.Request.Concurrency = 2
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
builder.Request.Concurrency = oldConcurrency
}
})
}
}
}
}

return &builder.Request, builder.err
}

Expand Down Expand Up @@ -152,17 +177,19 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.Cacheable = true
builder.Request.Data, builder.err = dag.Marshal()
}
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1

builder.dag = dag
if execCnt := len(dag.Executors); execCnt != 0 && dag.Executors[execCnt-1].GetLimit() != nil {
limit := dag.Executors[execCnt-1].GetLimit()
builder.Request.LimitSize = limit.GetLimit()
// When the DAG is just simple scan and small limit, set concurrency to 1 would be sufficient.
if execCnt == 2 {
if limit.Limit < estimatedRegionRowCount {
if kr := builder.Request.KeyRanges; kr != nil {
builder.Request.Concurrency = kr.PartitionNum()
} else {
builder.Request.Concurrency = 1
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions session/sessiontest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,10 @@ func TestCoprocessorOOMAction(t *testing.T) {
sql: "select id from t5",
},
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/distsql/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/distsql/testRateLimitActionMockConsumeAndAssert"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/testRateLimitActionMockConsumeAndAssert"))
Expand Down
11 changes: 0 additions & 11 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,6 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}

if it.req.KeepOrder {
oldConcurrency := it.concurrency
partitionNum := req.KeyRanges.PartitionNum()
if partitionNum > 0 && partitionNum < it.concurrency {
it.concurrency = partitionNum
}
failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) {
if val.(bool) {
// When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition
it.concurrency = oldConcurrency
}
})
if it.smallTaskConcurrency > 20 {
it.smallTaskConcurrency = 20
}
Expand Down

0 comments on commit 6cb8d0d

Please sign in to comment.