Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use int instead of fmt.Stringer as executor id #19207

Merged
merged 21 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"
"unsafe"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -92,7 +91,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []int, rootPlanID int) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb)
if err == nil {
if selectResult, ok := sr.(*selectResult); ok {
Expand Down
19 changes: 6 additions & 13 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -32,17 +31,16 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []string) (*selectResult, []*types.FieldType) {
func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []int) (*selectResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), -1)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()
c.Assert(err, IsNil)

Expand All @@ -66,12 +64,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
if planIDs == nil {
response, err = Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
} else {
var planIDFuncs []fmt.Stringer
for i := range planIDs {
idx := i
planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx]))
}
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0"))
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDs, 1)
}

c.Assert(err, IsNil)
Expand Down Expand Up @@ -134,13 +127,13 @@ func (s *testSuite) TestSelectNormalChunkSize(c *C) {
}

func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
planIDs := []string{"1", "2", "3"}
planIDs := []int{1, 2, 3}
response, colTypes := s.createSelectNormal(1, 2, c, planIDs)
if len(response.copPlanIDs) != len(planIDs) {
c.Fatal("invalid copPlanIDs")
}
for i := range planIDs {
if response.copPlanIDs[i].String() != planIDs[i] {
if response.copPlanIDs[i] != planIDs[i] {
c.Fatal("invalid copPlanIDs")
}
}
Expand Down Expand Up @@ -440,7 +433,7 @@ func createSelectNormal(batch, totalRows int, ctx sessionctx.Context) (*selectRe
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), -1)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()

/// 4 int64 types.
Expand Down
5 changes: 2 additions & 3 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -57,8 +56,8 @@ type testSuite struct {
func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), -1),
DiskTracker: disk.NewTracker(stringutil.StringerStr("testSuite"), -1),
MemTracker: memory.NewTracker(-1, -1),
DiskTracker: disk.NewTracker(-1, -1),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
Expand Down
16 changes: 6 additions & 10 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type selectResult struct {

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []fmt.Stringer
rootPlanID fmt.Stringer
copPlanIDs []int
rootPlanID int

fetchDuration time.Duration
durationReported bool
Expand Down Expand Up @@ -248,7 +248,7 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) {
callee := copStats.CalleeAddress
if r.rootPlanID == nil || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
Expand All @@ -260,7 +260,7 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID.String()
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
Expand All @@ -274,12 +274,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := ""
if detail.GetExecutorId() != "" {
winoros marked this conversation as resolved.
Show resolved Hide resolved
planID = detail.GetExecutorId()
} else {
planID = r.copPlanIDs[i].String()
}
// Fixme: Use detail.GetExecutorId() if exist.
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, callee, detail)
}
Expand Down
15 changes: 4 additions & 11 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package distsql

import (
"context"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand All @@ -30,7 +29,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext)
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = copPlan{}
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
Expand All @@ -42,17 +41,11 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse)

sr.copPlanIDs = []fmt.Stringer{copPlan{}}
sr.copPlanIDs = []int{sr.rootPlanID}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1")
}

type copPlan struct{}

func (p copPlan) String() string {
return "callee"
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1")
}
2 changes: 1 addition & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (e *HashAggExec) Close() error {
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
3 changes: 1 addition & 2 deletions executor/apply_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)

// applyCache is used in the apply executor. When we get the same value of the outer row.
Expand Down Expand Up @@ -47,7 +46,7 @@ func newApplyCache(ctx sessionctx.Context) (*applyCache, error) {
c := applyCache{
cache: cache,
memCapacity: ctx.GetSessionVars().NestedLoopJoinCacheCapacity,
memTracker: memory.NewTracker(stringutil.StringerStr("applyCache"), -1),
memTracker: memory.NewTracker(memory.LabelForApplyCache, -1),
}
return &c, nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
Expand Down
30 changes: 15 additions & 15 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
}

func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, nil)
baseExec := newBaseExecutor(opt.ctx, opt.schema, 0)
m := &mockDataSource{baseExec, opt, nil, nil, 0}
rTypes := retTypes(m)
colData := make([][]interface{}, len(rTypes))
Expand Down Expand Up @@ -742,8 +742,8 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
ctx.GetSessionVars().SetIndexLookupJoinConcurrency(4)
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}, rawData: wideString}
tc.cols = cols
Expand Down Expand Up @@ -785,7 +785,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
probeKeys = append(probeKeys, cols1[keyIdx])
}
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, stringutil.StringerStr("HashJoin"), innerExec, outerExec),
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
concurrency: uint(testCase.concurrency),
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
Expand All @@ -809,9 +809,9 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
if testCase.disk {
memLimit = 1
}
t := memory.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), memLimit)
t := memory.NewTracker(-1, memLimit)
t.SetActionOnExceed(nil)
t2 := disk.NewTracker(stringutil.StringerStr("root of prepare4HashJoin"), -1)
t2 := disk.NewTracker(-1, -1)
e.ctx.GetSessionVars().StmtCtx.MemTracker = t
e.ctx.GetSessionVars().StmtCtx.DiskTracker = t2
return e
Expand Down Expand Up @@ -1131,8 +1131,8 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
tc := &indexJoinTestCase{
outerRows: 100000,
innerRows: variable.DefMaxChunkSize * 100,
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
keyOff2IdxOff[i] = i
}
e := &IndexLookUpJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexInnerHashJoin"), outerDS),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 1, outerDS),
outerCtx: outerCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}
e := &IndexLookUpMergeJoin{
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("IndexMergeJoin"), outerDS),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 2, outerDS),
outerMergeCtx: outerMergeCtx{
rowTypes: leftTypes,
keyCols: tc.outerJoinKeyIdx,
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, leftExec, rightExec *mockDataSourc
// only benchmark inner join
e := &MergeJoinExec{
stmtCtx: tc.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("MergeJoin"), leftExec, rightExec),
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, 3, leftExec, rightExec),
compareFuncs: compareFuncs,
isOuterJoin: false,
}
Expand Down Expand Up @@ -1446,8 +1446,8 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().SnapshotTS = 1
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)

numInnerRows := numOuterRows*numInnerDup + numInnerRedundant
itc := &indexJoinTestCase{
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func defaultSortTestCase() *sortCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
tc := &sortCase{rows: 300000, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
return tc
}
Expand All @@ -1621,7 +1621,7 @@ func benchmarkSortExec(b *testing.B, cas *sortCase) {
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, 4, dataSource),
ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
Expand Down
6 changes: 1 addition & 5 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor

import (
"context"
"fmt"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -44,7 +43,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stringutil"
)

// brieTaskProgress tracks a task's current progress.
Expand Down Expand Up @@ -169,11 +167,9 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
return variable.GoTimeToTS(t1), nil
}

var brieStmtLabel fmt.Stringer = stringutil.StringerStr("BRIEStmt")

func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Executor {
e := &BRIEExec{
baseExecutor: newBaseExecutor(b.ctx, schema, brieStmtLabel),
baseExecutor: newBaseExecutor(b.ctx, schema, 0),
info: &brieTaskInfo{
kind: s.Kind,
},
Expand Down
Loading