Skip to content

Commit

Permalink
executor: improve trace format='row' (#9029)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and zz-jason committed Jan 13, 2019
1 parent 1d00f75 commit 8ac79f3
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 94 deletions.
174 changes: 80 additions & 94 deletions executor/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@ package executor
import (
"context"
"encoding/json"
"sort"
"time"

"github.com/opentracing/basictracer-go"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/tracing"
"sourcegraph.com/sourcegraph/appdash"
traceImpl "sourcegraph.com/sourcegraph/appdash/opentracing"
)
Expand Down Expand Up @@ -55,103 +53,63 @@ func (e *TraceExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.exhausted {
return nil
}

if e.format == "json" {
if se, ok := e.ctx.(sqlexec.SQLExecutor); ok {
store := appdash.NewMemoryStore()
tracer := traceImpl.NewTracer(store)
span := tracer.StartSpan("trace")
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
recordSets, err := se.Execute(ctx, e.stmtNode.Text())
if err != nil {
return errors.Trace(err)
}

for _, rs := range recordSets {
_, err = drainRecordSet(ctx, e.ctx, rs)
if err != nil {
return errors.Trace(err)
}
if err = rs.Close(); err != nil {
return errors.Trace(err)
}
}

traces, err := store.Traces(appdash.TracesOpts{})
if err != nil {
return errors.Trace(err)
}
data, err := json.Marshal(traces)
if err != nil {
return errors.Trace(err)
}

// Split json data into rows to avoid the max packet size limitation.
const maxRowLen = 4096
for len(data) > maxRowLen {
chk.AppendString(0, string(data[:maxRowLen]))
data = data[maxRowLen:]
}
chk.AppendString(0, string(data))
}
se, ok := e.ctx.(sqlexec.SQLExecutor)
if !ok {
e.exhausted = true
return nil
}

// TODO: If the following code is never used, remove it later.
// record how much time was spent for optimizeing plan
optimizeSp := e.rootTrace.Tracer().StartSpan("plan_optimize", opentracing.FollowsFrom(e.rootTrace.Context()))
stmtPlan, err := planner.Optimize(e.builder.ctx, e.stmtNode, e.builder.is)
store := appdash.NewMemoryStore()
tracer := traceImpl.NewTracer(store)
span := tracer.StartSpan("trace")
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
recordSets, err := se.Execute(ctx, e.stmtNode.Text())
if err != nil {
return err
return errors.Trace(err)
}
optimizeSp.Finish()

pp, ok := stmtPlan.(plannercore.PhysicalPlan)
if !ok {
return errors.New("cannot cast logical plan to physical plan")
for _, rs := range recordSets {
_, err = drainRecordSet(ctx, e.ctx, rs)
if err != nil {
return errors.Trace(err)
}
if err = rs.Close(); err != nil {
return errors.Trace(err)
}
}

// append select executor to trace executor
stmtExec := e.builder.build(pp)

e.rootTrace = tracing.NewRecordedTrace("trace_exec", func(sp basictracer.RawSpan) {
e.CollectedSpans = append(e.CollectedSpans, sp)
})
err = stmtExec.Open(ctx)
traces, err := store.Traces(appdash.TracesOpts{})
if err != nil {
return errors.Trace(err)
}
stmtExecChk := stmtExec.newFirstChunk()

// store span into context
ctx = opentracing.ContextWithSpan(ctx, e.rootTrace)

for {
if err := stmtExec.Next(ctx, stmtExecChk); err != nil {
return errors.Trace(err)
}
if stmtExecChk.NumRows() == 0 {
break
// Row format.
if e.format != "json" {
if len(traces) < 1 {
e.exhausted = true
return nil
}
trace := traces[0]
sortTraceByStartTime(trace)
dfsTree(trace, "", false, chk)
e.exhausted = true
return nil
}

e.rootTrace.LogKV("event", "tracing completed")
e.rootTrace.Finish()
var rootSpan basictracer.RawSpan

treeSpans := make(map[uint64][]basictracer.RawSpan)
for _, sp := range e.CollectedSpans {
treeSpans[sp.ParentSpanID] = append(treeSpans[sp.ParentSpanID], sp)
// if a span's parentSpanID is 0, then it is root span
// this is by design
if sp.ParentSpanID == 0 {
rootSpan = sp
}
// Json format.
data, err := json.Marshal(traces)
if err != nil {
return errors.Trace(err)
}

dfsTree(rootSpan, treeSpans, "", false, chk)
// Split json data into rows to avoid the max packet size limitation.
const maxRowLen = 4096
for len(data) > maxRowLen {
chk.AppendString(0, string(data[:maxRowLen]))
data = data[maxRowLen:]
}
chk.AppendString(0, string(data))
e.exhausted = true
return nil
}
Expand All @@ -173,14 +131,34 @@ func drainRecordSet(ctx context.Context, sctx sessionctx.Context, rs sqlexec.Rec
}
}

func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, prefix string, isLast bool, chk *chunk.Chunk) {
suffix := ""
spans := tree[span.Context.SpanID]
var newPrefix string
if span.ParentSpanID == 0 {
newPrefix = prefix
type sortByStartTime []*appdash.Trace

func (t sortByStartTime) Len() int { return len(t) }
func (t sortByStartTime) Less(i, j int) bool {
return getStartTime(t[j]).After(getStartTime(t[i]))
}
func (t sortByStartTime) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func getStartTime(trace *appdash.Trace) (t time.Time) {
if e, err := trace.TimespanEvent(); err == nil {
t = e.Start()
}
return
}

func sortTraceByStartTime(trace *appdash.Trace) {
sort.Sort(sortByStartTime(trace.Sub))
for _, t := range trace.Sub {
sortTraceByStartTime(t)
}
}

func dfsTree(t *appdash.Trace, prefix string, isLast bool, chk *chunk.Chunk) {
var newPrefix, suffix string
if len(prefix) == 0 {
newPrefix = prefix + " "
} else {
if len(tree[span.ParentSpanID]) > 0 && !isLast {
if !isLast {
suffix = "├─"
newPrefix = prefix + "│ "
} else {
Expand All @@ -189,11 +167,19 @@ func dfsTree(span basictracer.RawSpan, tree map[uint64][]basictracer.RawSpan, pr
}
}

chk.AppendString(0, prefix+suffix+span.Operation)
chk.AppendString(1, span.Start.Format(time.StampNano))
chk.AppendString(2, span.Duration.String())
var start time.Time
var duration time.Duration
if e, err := t.TimespanEvent(); err == nil {
start = e.Start()
end := e.End()
duration = end.Sub(start)
}

chk.AppendString(0, prefix+suffix+t.Span.Name())
chk.AppendString(1, start.Format("15:04:05.000000"))
chk.AppendString(2, duration.String())

for i, sp := range spans {
dfsTree(sp, tree, newPrefix, i == (len(spans))-1 /*last element of array*/, chk)
for i, sp := range t.Sub {
dfsTree(sp, newPrefix, i == (len(t.Sub))-1 /*last element of array*/, chk)
}
}
15 changes: 15 additions & 0 deletions executor/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,19 @@ func (s *testSuite1) TestTraceExec(c *C) {
tk.MustExec("trace insert into trace (c1, c2, c3) values (1, 2, 3)")
rows := tk.MustQuery("trace select * from trace where id = 0;").Rows()
c.Assert(rows, HasLen, 1)

// +---------------------------+-----------------+------------+
// | operation | startTS | duration |
// +---------------------------+-----------------+------------+
// | session.getTxnFuture | 22:08:38.247834 | 78.909µs |
// | ├─session.Execute | 22:08:38.247829 | 1.478487ms |
// | ├─session.ParseSQL | 22:08:38.248457 | 71.159µs |
// | ├─executor.Compile | 22:08:38.248578 | 45.329µs |
// | ├─session.runStmt | 22:08:38.248661 | 75.13µs |
// | ├─session.CommitTxn | 22:08:38.248699 | 13.213µs |
// | └─recordSet.Next | 22:08:38.249340 | 155.317µs |
// +---------------------------+-----------------+------------+
rows = tk.MustQuery("trace format='row' select * from trace where id = 0;").Rows()

c.Assert(len(rows) > 1, IsTrue)
}

0 comments on commit 8ac79f3

Please sign in to comment.