Skip to content

Commit

Permalink
*: tidb tracing prototype (#7016)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany authored Aug 30, 2018
1 parent 87c54b2 commit 341dc10
Show file tree
Hide file tree
Showing 16 changed files with 480 additions and 88 deletions.
12 changes: 12 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildDelete(v)
case *plan.Execute:
return b.buildExecute(v)
case *plan.Trace:
return b.buildTrace(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
Expand Down Expand Up @@ -619,6 +621,16 @@ func (b *executorBuilder) buildDDL(v *plan.DDL) Executor {
return e
}

// buildTrace builds a TraceExec for future executing. This method will be called
// at build().
func (b *executorBuilder) buildTrace(v *plan.Trace) Executor {
return &TraceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
stmtNode: v.StmtNode,
builder: b,
}
}

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plan.Explain) Executor {
e := &ExplainExec{
Expand Down
19 changes: 0 additions & 19 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"unsafe"

"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -210,18 +209,6 @@ func splitRanges(ranges []*ranger.Range, keepOrder bool) ([]*ranger.Range, []*ra
return signedRanges, unsignedRanges
}

// startSpanFollowContext is similar to opentracing.StartSpanFromContext, but the span reference use FollowsFrom option.
func startSpanFollowsContext(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan(operationName, opentracing.FollowsFrom(span.Context()))
} else {
span = opentracing.StartSpan(operationName)
}

return span, opentracing.ContextWithSpan(ctx, span)
}

// rebuildIndexRanges will be called if there's correlated column in access conditions. We will rebuild the range
// by substitute correlated column with the constant.
func rebuildIndexRanges(ctx sessionctx.Context, is *plan.PhysicalIndexScan, idxCols []*expression.Column, colLens []int) (ranges []*ranger.Range, err error) {
Expand Down Expand Up @@ -298,9 +285,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
}

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
span, ctx := startSpanFollowsContext(ctx, "executor.IndexReader.Open")
defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -403,9 +387,6 @@ func (e *IndexLookUpExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

span, ctx := startSpanFollowsContext(ctx, "executor.IndexLookUp.Open")
defer span.Finish()

e.finished = make(chan struct{})
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))

Expand Down
11 changes: 4 additions & 7 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
span, ctx := startSpanFollowsContext(ctx, "executor.TableReader.Open")
defer span.Finish()

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -101,11 +98,11 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.resultHandler.nextChunk(ctx, chk)
if err != nil {
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
return err
}
return errors.Trace(err)
return errors.Trace(nil)
}

// Close implements the Executor Close interface.
Expand All @@ -115,7 +112,7 @@ func (e *TableReaderExecutor) Close() error {
return errors.Trace(err)
}

// buildResp first build request and send it to tikv using distsql.Select. It uses SelectResut returned by the callee
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
Expand Down
131 changes: 131 additions & 0 deletions executor/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"time"

"github.com/juju/errors"
"github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/tracing"
"golang.org/x/net/context"
)

// TraceExec represents a root executor of trace query.
type TraceExec struct {
baseExecutor
// CollectedSpans collects all span during execution. Span is appended via
// callback method which passes into tracer implementation.
CollectedSpans []basictracer.RawSpan
// exhausted being true means there is no more result.
exhausted bool
// stmtNode is the real query ast tree and it is used for building real query's plan.
stmtNode ast.StmtNode
// rootTrace represents root span which is father of all other span.
rootTrace opentracing.Span

builder *executorBuilder
}

// Next executes real query and collects span later.
func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.exhausted {
return nil
}

// record how much time was spent for optimizeing plan
optimizeSp := e.rootTrace.Tracer().StartSpan("plan_optimize", opentracing.FollowsFrom(e.rootTrace.Context()))
stmtPlan, err := plan.Optimize(e.builder.ctx, e.stmtNode, e.builder.is)
if err != nil {
return err
}
optimizeSp.Finish()

pp, ok := stmtPlan.(plan.PhysicalPlan)
if !ok {
return errors.New("cannot cast logical plan to physical plan")
}

// append select executor to trace executor
stmtExec := e.builder.build(pp)

e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) {
e.CollectedSpans = append(e.CollectedSpans, sp)
})
err = stmtExec.Open(ctx)
if err != nil {
return errors.Trace(err)
}
stmtExecChk := stmtExec.newChunk()

// store span into context
ctx = opentracing.ContextWithSpan(ctx, e.rootTrace)

for {
if err := stmtExec.Next(ctx, stmtExecChk); err != nil {
return errors.Trace(err)
}
if stmtExecChk.NumRows() == 0 {
break
}
}

e.rootTrace.LogKV("event", "tracing completed")
e.rootTrace.Finish()
var rootSpan basictracer.RawSpan

treeSpans := make(map[uint64][]basictracer.RawSpan)
for _, sp := range e.CollectedSpans {
treeSpans[sp.ParentSpanID] = append(treeSpans[sp.ParentSpanID], sp)
// if a span's parentSpanID is 0, then it is root span
// this is by design
if sp.ParentSpanID == 0 {
rootSpan = sp
}
}

dfsTree(rootSpan, treeSpans, "", false, chk)
e.exhausted = true
return nil
}

func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) {
suffix := ""
spans := tree[span.Context.SpanID]
var newPrefix string
if span.ParentSpanID == 0 {
newPrefix = prefix
} else {
if len(tree[span.ParentSpanID]) > 0 && !isLast {
suffix = "├─"
newPrefix = prefix + "│ "
} else {
suffix = "└─"
newPrefix = prefix + " "
}
}

chk.AppendString(0, prefix+suffix+span.Operation)
chk.AppendString(1, span.Start.Format(time.StampNano))
chk.AppendString(2, span.Duration.String())

for i, sp := range spans {
dfsTree(sp, tree, newPrefix, i == (len(spans))-1 /*last element of array*/, chk)
}
}
33 changes: 33 additions & 0 deletions executor/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
)

type testTraceExec struct{}

func (s *testTraceExec) SetupSuite(c *C) {
}

func (s *testSuite) TestTraceExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
testSQL := `create table trace (id int PRIMARY KEY AUTO_INCREMENT, c1 int, c2 int, c3 int default 1);`
tk.MustExec(testSQL)
// TODO: check result later in another PR.
tk.MustExec("trace select * from trace where id = 0;")
}
22 changes: 22 additions & 0 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (b *planBuilder) build(node ast.Node) (Plan, error) {
return b.buildExecute(x)
case *ast.ExplainStmt:
return b.buildExplain(x)
case *ast.TraceStmt:
return b.buildTrace(x)
case *ast.InsertStmt:
return b.buildInsert(x)
case *ast.LoadDataStmt:
Expand Down Expand Up @@ -1359,6 +1361,26 @@ func (b *planBuilder) buildDDL(node ast.DDLNode) Plan {
return p
}

// buildTrace builds a trace plan. Inside this method, it first optimize the
// underlying query and then constructs a schema, which will be used to constructs
// rows result.
func (b *planBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) {
if _, ok := trace.Stmt.(*ast.SelectStmt); !ok {
return nil, errors.New("trace only supports select query")
}

p := &Trace{StmtNode: trace.Stmt}

retFields := []string{"operation", "duration", "spanID"}
schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...)
schema.Append(buildColumn("", "operation", mysql.TypeString, mysql.MaxBlobWidth))

schema.Append(buildColumn("", "startTS", mysql.TypeString, mysql.MaxBlobWidth))
schema.Append(buildColumn("", "duration", mysql.TypeString, mysql.MaxBlobWidth))
p.SetSchema(schema)
return p, nil
}

func (b *planBuilder) buildExplain(explain *ast.ExplainStmt) (Plan, error) {
if show, ok := explain.Stmt.(*ast.ShowStmt); ok {
return b.buildShow(show)
Expand Down
12 changes: 12 additions & 0 deletions plan/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package plan

import (
"github.com/pingcap/tidb/ast"
)

// Trace represents a trace plan.
type Trace struct {
baseSchemaProducer

StmtNode ast.StmtNode
}
26 changes: 0 additions & 26 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/juju/errors"
"github.com/ngaut/pools"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -375,11 +374,6 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
}

func (s *session) CommitTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.CommitTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

err := s.doCommitWithRetry(ctx)
label := metrics.LblOK
if err != nil {
Expand All @@ -390,11 +384,6 @@ func (s *session) CommitTxn(ctx context.Context) error {
}

func (s *session) RollbackTxn(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("session.RollbackTxn", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

var err error
if s.txn.Valid() {
terror.Log(s.txn.Rollback())
Expand Down Expand Up @@ -451,9 +440,6 @@ func (s *session) isRetryableError(err error) bool {
}

func (s *session) retry(ctx context.Context, maxCnt uint) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "retry")
defer span.Finish()

connID := s.sessionVars.ConnectionID
if s.sessionVars.TxnCtx.ForUpdate {
return errForUpdateCantRetry.GenByArgs(connID)
Expand Down Expand Up @@ -545,10 +531,7 @@ func (s *session) sysSessionPool() *pools.ResourcePool {
// Unlike normal Exec, it doesn't reset statement status, doesn't commit or rollback the current transaction
// and doesn't write binlog.
func (s *session) ExecRestrictedSQL(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) {
var span opentracing.Span
ctx := context.TODO()
span, ctx = opentracing.StartSpanFromContext(ctx, "session.ExecRestrictedSQL")
defer span.Finish()

// Use special session to execute the sql.
tmp, err := s.sysSessionPool().Get()
Expand Down Expand Up @@ -712,10 +695,6 @@ func (s *session) SetGlobalSysVar(name, value string) error {
}

func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) ([]ast.StmtNode, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span1 := opentracing.StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
s.parser.SetSQLMode(s.sessionVars.SQLMode)
return s.parser.Parse(sql, charset, collation)
}
Expand Down Expand Up @@ -770,11 +749,6 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.Rec
}

func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.RecordSet, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span, ctx = opentracing.StartSpanFromContext(ctx, "session.Execute")
defer span.Finish()
}

s.PrepareTxnCtx(ctx)
connID := s.sessionVars.ConnectionID
err = s.loadCommonGlobalVariablesIfNeeded()
Expand Down
Loading

0 comments on commit 341dc10

Please sign in to comment.