diff --git a/ast/misc.go b/ast/misc.go index 98ae76ca53ac0..620a9abf34c12 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) { type ExplainStmt struct { stmtNode - Stmt StmtNode - Format string + Stmt StmtNode + Format string + Analyze bool } // Accept implements Node Accept interface. diff --git a/executor/adapter.go b/executor/adapter.go index e877dd6e126e7..4849b8012997f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -365,12 +365,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { if sessVars.InRestrictedSQL { internal = "[INTERNAL] " } + execDetail := sessVars.StmtCtx.GetExecDetails() if costTime < threshold { logutil.SlowQueryLogger.Debugf( "[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", - internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) + internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) } else { - execDetail := sessVars.StmtCtx.GetExecDetails() logutil.SlowQueryLogger.Warnf( "[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) diff --git a/executor/aggregate.go b/executor/aggregate.go index d28d5a4c404f7..6c28ef688b90b 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -15,6 +15,7 @@ package executor import ( "sync" + "time" "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" @@ -501,6 +502,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro // Next implements the Executor Next interface. func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if e.isUnparallelExec { return errors.Trace(e.unparallelExec(ctx, chk)) @@ -756,8 +761,11 @@ func (e *StreamAggExec) Close() error { // Next implements the Executor Next interface. func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() - for !e.executed && chk.NumRows() < e.maxChunkSize { err := e.consumeOneGroup(ctx, chk) if err != nil { diff --git a/executor/builder.go b/executor/builder.go index e7f63af8b8371..1c62d4cbf5702 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" @@ -448,13 +449,12 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor { func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) base.initCap = chunk.ZeroCapacity - e := &PrepareExec{ + return &PrepareExec{ baseExecutor: base, is: b.is, name: v.Name, sqlText: v.SQLText, } - return e } func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor { @@ -659,14 +659,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor { // buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`. func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor { - e := &ExplainExec{ + explainExec := &ExplainExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + explain: v, } - e.rows = make([][]string, 0, len(v.Rows)) - for _, row := range v.Rows { - e.rows = append(e.rows, row) + if v.Analyze { + b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() + explainExec.analyzeExec = b.build(v.ExecPlan) } - return e + return explainExec } func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor { diff --git a/executor/distsql.go b/executor/distsql.go index d262d8b57bbb0..8f7beae3cc28e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -19,6 +19,7 @@ import ( "sort" "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/tidb/distsql" @@ -243,6 +244,10 @@ func (e *IndexReaderExecutor) Close() error { // Next implements the Executor Next interface. func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } err := e.result.Next(ctx, chk) if err != nil { e.feedback.Invalidate() @@ -474,7 +479,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha } func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) { - tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, &TableReaderExecutor{ + tableReaderExec := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"), table: e.table, physicalTableID: e.physicalTableID, @@ -483,7 +488,11 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in feedback: statistics.NewQueryFeedback(0, nil, 0, false), corColInFilter: e.corColInTblSide, plans: e.tblPlans, - }, handles) + } + // We assign `nil` to `runtimeStats` to forbidden `TableWorker` driven `IndexLookupExecutor`'s runtime stats collecting, + // because TableWorker information isn't showing in explain result now. + tableReaderExec.runtimeStats = nil + tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles) if err != nil { log.Error(err) return nil, errors.Trace(err) @@ -512,6 +521,10 @@ func (e *IndexLookUpExecutor) Close() error { // Next implements Exec Next interface. func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() for { resultTask, err := e.getResultTask() diff --git a/executor/executor.go b/executor/executor.go index b6e2580a0d659..fe543c4d7eebf 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -18,6 +18,7 @@ import ( "runtime" "sync" "sync/atomic" + "time" "github.com/cznic/mathutil" "github.com/pingcap/tidb/ast" @@ -35,12 +36,14 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) var ( + _ Executor = &baseExecutor{} _ Executor = &CheckTableExec{} _ Executor = &HashAggExec{} _ Executor = &LimitExec{} @@ -71,6 +74,7 @@ type baseExecutor struct { maxChunkSize int children []Executor retFieldTypes []*types.FieldType + runtimeStats *execdetails.RuntimeStats } // Open initializes children recursively and "childrenResults" according to children's schemas. @@ -127,6 +131,9 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin initCap: ctx.GetSessionVars().MaxChunkSize, maxChunkSize: ctx.GetSessionVars().MaxChunkSize, } + if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { + e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id) + } if schema != nil { cols := schema.Columns e.retFieldTypes = make([]*types.FieldType, len(cols)) @@ -168,6 +175,10 @@ type CancelDDLJobsExec struct { // Next implements the Executor Next interface. func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.jobIDs) { return nil @@ -610,6 +621,10 @@ type LimitExec struct { // Next implements the Executor Next interface. func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if e.cursor >= e.end { return nil @@ -729,6 +744,10 @@ func (e *TableDualExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if e.numReturned >= e.numDualRows { return nil @@ -780,6 +799,10 @@ func (e *SelectionExec) Close() error { // Next implements the Executor Next interface. func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(e.maxChunkSize) if !e.batched { @@ -855,6 +878,10 @@ type TableScanExec struct { // Next implements the Executor Next interface. func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(e.maxChunkSize) if e.isVirtualTable { return errors.Trace(e.nextChunk4InfoSchema(ctx, chk)) @@ -955,6 +982,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if e.evaluated { return nil @@ -1097,6 +1128,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) { // Next implements the Executor Next interface. func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(e.maxChunkSize) if !e.initialized { e.initialize(ctx) diff --git a/executor/explain.go b/executor/explain.go index afc3f871e0883..019e07a655744 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -15,6 +15,7 @@ package executor import ( "github.com/cznic/mathutil" + "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util/chunk" "golang.org/x/net/context" ) @@ -23,18 +24,39 @@ import ( type ExplainExec struct { baseExecutor - rows [][]string - cursor int + explain *core.Explain + analyzeExec Executor + rows [][]string + cursor int +} + +// Open implements the Executor Open interface. +func (e *ExplainExec) Open(ctx context.Context) error { + if e.analyzeExec != nil { + return e.analyzeExec.Open(ctx) + } + return nil } // Close implements the Executor Close interface. func (e *ExplainExec) Close() error { + if e.analyzeExec != nil { + e.analyzeExec.Close() + } e.rows = nil return nil } // Next implements the Executor Next interface. func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.rows == nil { + var err error + e.rows, err = e.generateExplainInfo(ctx) + if err != nil { + return err + } + } + chk.GrowAndReset(e.maxChunkSize) if e.cursor >= len(e.rows) { return nil @@ -49,3 +71,23 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error { e.cursor += numCurRows return nil } + +func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { + if e.analyzeExec != nil { + chk := e.analyzeExec.newFirstChunk() + for { + err := e.analyzeExec.Next(ctx, chk) + if err != nil { + return nil, err + } + if chk.NumRows() == 0 { + break + } + } + } + e.explain.RenderResult() + if e.analyzeExec != nil { + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = nil + } + return e.explain.Rows, nil +} diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index a0bc75994bc92..bc05f56d971fd 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -18,6 +18,7 @@ import ( "runtime" "sort" "sync" + "time" "unsafe" "github.com/pingcap/tidb/expression" @@ -189,6 +190,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork // Next implements the Executor interface. func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() e.joinResult.Reset() for { diff --git a/executor/join.go b/executor/join.go index a979790fe2af7..205127e64ee55 100644 --- a/executor/join.go +++ b/executor/join.go @@ -16,6 +16,7 @@ package executor import ( "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/tidb/expression" @@ -483,6 +484,10 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu // step 1. fetch data from inner child and build a hash table; // step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers. func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } if !e.prepared { e.innerFinished = make(chan error, 1) go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.finishFetchInnerAndBuildHashTable) @@ -696,6 +701,10 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error { // Next implements the Executor interface. func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() for { if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { diff --git a/executor/merge_join.go b/executor/merge_join.go index 56f2102e763e2..4bbfeaac26e0d 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -14,6 +14,8 @@ package executor import ( + "time" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/util/chunk" @@ -261,6 +263,10 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error { // Next implements the Executor Next interface. func (e *MergeJoinExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if !e.prepared { if err := e.prepare(ctx, chk); err != nil { diff --git a/executor/projection.go b/executor/projection.go index 168ce32f39914..dce2709f1271b 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -14,6 +14,8 @@ package executor import ( + "time" + "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" @@ -139,6 +141,10 @@ func (e *ProjectionExec) Open(ctx context.Context) error { // +------------------------------+ +----------------------+ // func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(e.maxChunkSize) if e.isUnparallelExec() { return errors.Trace(e.unParallelExecute(ctx, chk)) diff --git a/executor/sort.go b/executor/sort.go index 95b9ecac29d38..83fa22618798e 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -16,6 +16,7 @@ package executor import ( "container/heap" "sort" + "time" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" @@ -73,6 +74,10 @@ func (e *SortExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if !e.fetched { err := e.fetchRowChunks(ctx) @@ -296,6 +301,10 @@ func (e *TopNExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.Reset() if !e.fetched { e.totalLimit = int(e.limit.Offset + e.limit.Count) diff --git a/executor/table_reader.go b/executor/table_reader.go index ea9de3ed2b6f9..a941e3706bee8 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -14,6 +14,8 @@ package executor import ( + "time" + "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/model" plannercore "github.com/pingcap/tidb/planner/core" @@ -98,6 +100,10 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { // Next fills data into the chunk passed by its caller. // The task was actually done by tableReaderHandler. func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error { + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } if err := e.resultHandler.nextChunk(ctx, chk); err != nil { e.feedback.Invalidate() return err diff --git a/executor/union_scan.go b/executor/union_scan.go index 3c1682936d71d..1ac6689c1b58d 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -15,6 +15,7 @@ package executor import ( "sort" + "time" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/model" @@ -125,6 +126,10 @@ func (us *UnionScanExec) Open(ctx context.Context) error { // Next implements the Executor Next interface. func (us *UnionScanExec) Next(ctx context.Context, chk *chunk.Chunk) error { + if us.runtimeStats != nil { + start := time.Now() + defer func() { us.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() + } chk.GrowAndReset(us.maxChunkSize) mutableRow := chunk.MutRowFromTypes(us.retTypes()) for i, batchSize := 0, chk.Capacity(); i < batchSize; i++ { diff --git a/parser/parser.y b/parser/parser.y index 786c4be6ae505..a7eb8136dee5c 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -2312,6 +2312,14 @@ ExplainStmt: Format: $4, } } +| ExplainSym "ANALYZE" ExplainableStmt + { + $$ = &ast.ExplainStmt { + Stmt: $3, + Format: "row", + Analyze: true, + } + } LengthNum: NUM diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index b1ec657000890..9382b5aa500fa 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -15,6 +15,7 @@ package core_test import ( "fmt" + "strings" "testing" . "github.com/pingcap/check" @@ -36,6 +37,35 @@ var _ = Suite(&testAnalyzeSuite{}) type testAnalyzeSuite struct { } +func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + defer func() { + dom.Close() + store.Close() + }() + tk.MustExec("use test") + tk.MustExec("create table t1(a int, b int, c int, key idx(a, b))") + tk.MustExec("create table t2(a int, b int)") + tk.MustExec("insert into t1 values (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)") + tk.MustExec("insert into t2 values (2, 22), (3, 33), (5, 55)") + tk.MustExec("analyze table t1, t2") + rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1") + c.Assert(len(rs.Rows()), Equals, 10) + for _, row := range rs.Rows() { + c.Assert(len(row), Equals, 5) + taskType := row[2].(string) + if taskType != "cop" { + execInfo := row[4].(string) + c.Assert(strings.Contains(execInfo, "time"), Equals, true) + c.Assert(strings.Contains(execInfo, "loops"), Equals, true) + c.Assert(strings.Contains(execInfo, "rows"), Equals, true) + } + } +} + // TestCBOWithoutAnalyze tests the plan with stats that only have count info. func (s *testAnalyzeSuite) TestCBOWithoutAnalyze(c *C) { defer testleak.AfterTest(c)() diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index f06459ffbbf7b..d629d611cc70c 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -17,12 +17,14 @@ import ( "bytes" "fmt" "strconv" + "strings" "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/auth" @@ -421,11 +423,55 @@ type Explain struct { StmtPlan Plan Rows [][]string explainedPlans map[int]bool + Format string + Analyze bool + ExecStmt ast.StmtNode + ExecPlan Plan +} + +// prepareSchema prepares explain's result schema. +func (e *Explain) prepareSchema() error { + switch strings.ToLower(e.Format) { + case ast.ExplainFormatROW: + retFields := []string{"id", "count", "task", "operator info"} + if e.Analyze { + retFields = append(retFields, "execution info") + } + schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) + for _, fieldName := range retFields { + schema.Append(buildColumn("", fieldName, mysql.TypeString, mysql.MaxBlobWidth)) + } + e.SetSchema(schema) + case ast.ExplainFormatDOT: + retFields := []string{"dot contents"} + schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) + for _, fieldName := range retFields { + schema.Append(buildColumn("", fieldName, mysql.TypeString, mysql.MaxBlobWidth)) + } + e.SetSchema(schema) + default: + return errors.Errorf("explain format '%s' is not supported now", e.Format) + } + return nil +} + +// RenderResult renders the explain result as specified format. +func (e *Explain) RenderResult() error { + switch strings.ToLower(e.Format) { + case ast.ExplainFormatROW: + e.explainedPlans = map[int]bool{} + e.explainPlanInRowFormat(e.StmtPlan.(PhysicalPlan), "root", "", true) + case ast.ExplainFormatDOT: + e.prepareDotInfo(e.StmtPlan.(PhysicalPlan)) + default: + return errors.Errorf("explain format '%s' is not supported now", e.Format) + } + return nil } // explainPlanInRowFormat generates explain information for root-tasks. -func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, TaskType, indent string, isLastChild bool) { - e.prepareOperatorInfo(p, TaskType, indent, isLastChild) +func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, taskType, indent string, isLastChild bool) { + e.prepareOperatorInfo(p, taskType, indent, isLastChild) e.explainedPlans[p.ID()] = true // For every child we create a new sub-tree rooted by it. @@ -434,7 +480,7 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, TaskType, indent string if e.explainedPlans[child.ID()] { continue } - e.explainPlanInRowFormat(child.(PhysicalPlan), TaskType, childIndent, i == len(p.Children())-1) + e.explainPlanInRowFormat(child.(PhysicalPlan), taskType, childIndent, i == len(p.Children())-1) } switch copPlan := p.(type) { @@ -450,10 +496,18 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, TaskType, indent string // prepareOperatorInfo generates the following information for every plan: // operator id, task type, operator info, and the estemated row count. -func (e *Explain) prepareOperatorInfo(p PhysicalPlan, TaskType string, indent string, isLastChild bool) { +func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent string, isLastChild bool) { operatorInfo := p.ExplainInfo() count := string(strconv.AppendFloat([]byte{}, p.statsInfo().RowCount, 'f', 2, 64)) - row := []string{e.prettyIdentifier(p.ExplainID(), indent, isLastChild), count, TaskType, operatorInfo} + row := []string{e.prettyIdentifier(p.ExplainID(), indent, isLastChild), count, taskType, operatorInfo} + if e.Analyze { + runtimeStatsColl := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl + if taskType == "cop" { + row = append(row, "") //TODO: wait collect resp from tikv + } else { + row = append(row, runtimeStatsColl.Get(p.ExplainID()).String()) + } + } e.Rows = append(e.Rows, row) } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 33c48c11abe8f..f218ecf944a7d 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1445,27 +1445,11 @@ func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) { return nil, ErrUnsupportedType.GenWithStackByArgs(targetPlan) } } - p := &Explain{StmtPlan: pp} - switch strings.ToLower(explain.Format) { - case ast.ExplainFormatROW: - retFields := []string{"id", "count", "task", "operator info"} - schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) - for _, fieldName := range retFields { - schema.Append(buildColumn("", fieldName, mysql.TypeString, mysql.MaxBlobWidth)) - } - p.SetSchema(schema) - p.explainedPlans = map[int]bool{} - p.explainPlanInRowFormat(p.StmtPlan.(PhysicalPlan), "root", "", true) - case ast.ExplainFormatDOT: - retFields := []string{"dot contents"} - schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) - for _, fieldName := range retFields { - schema.Append(buildColumn("", fieldName, mysql.TypeString, mysql.MaxBlobWidth)) - } - p.SetSchema(schema) - p.prepareDotInfo(p.StmtPlan.(PhysicalPlan)) - default: - return nil, errors.Errorf("explain format '%s' is not supported now", explain.Format) + p := &Explain{StmtPlan: pp, Analyze: explain.Analyze, Format: explain.Format, ExecStmt: explain.Stmt, ExecPlan: targetPlan} + p.ctx = b.ctx + err = p.prepareSchema() + if err != nil { + return nil, err } return p, nil } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 2fc9adfd49712..e59d2da7194cb 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -73,12 +73,13 @@ type StatementContext struct { } // Copied from SessionVars.TimeZone. - TimeZone *time.Location - Priority mysql.PriorityEnum - NotFillCache bool - MemTracker *memory.Tracker - TableIDs []int64 - IndexIDs []int64 + TimeZone *time.Location + Priority mysql.PriorityEnum + NotFillCache bool + MemTracker *memory.Tracker + RuntimeStatsColl *execdetails.RuntimeStatsColl + TableIDs []int64 + IndexIDs []int64 } // AddAffectedRows adds affected rows. diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index a73c5b2cd60b5..ad384c51e0a7b 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -16,6 +16,8 @@ package execdetails import ( "fmt" "strings" + "sync" + "sync/atomic" "time" ) @@ -52,3 +54,50 @@ func (d ExecDetails) String() string { } return strings.Join(parts, " ") } + +// RuntimeStatsColl collects executors's execution info. +type RuntimeStatsColl struct { + mu sync.Mutex + stats map[string]*RuntimeStats +} + +// RuntimeStats collects one executor's execution info. +type RuntimeStats struct { + // executor's Next() called times. + loop int32 + // executor consume time. + consume int64 + // executor return row count. + rows int64 +} + +// NewRuntimeStatsColl creates new executor collector. +func NewRuntimeStatsColl() *RuntimeStatsColl { + return &RuntimeStatsColl{stats: make(map[string]*RuntimeStats)} +} + +// Get gets execStat for a executor. +func (e *RuntimeStatsColl) Get(planID string) *RuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + runtimeStats, exists := e.stats[planID] + if !exists { + runtimeStats = &RuntimeStats{} + e.stats[planID] = runtimeStats + } + return runtimeStats +} + +// Record records executor's execution. +func (e *RuntimeStats) Record(d time.Duration, rowNum int) { + atomic.AddInt32(&e.loop, 1) + atomic.AddInt64(&e.consume, int64(d)) + atomic.AddInt64(&e.rows, int64(rowNum)) +} + +func (e *RuntimeStats) String() string { + if e == nil { + return "" + } + return fmt.Sprintf("time:%v, loops:%d, rows:%d", time.Duration(e.consume), e.loop, e.rows) +}