Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: collect execution details and output them in slow query log #7364

Merged
merged 1 commit into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ func (r *selectResult) getSelectResp() error {
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
sc := r.ctx.GetSessionVars().StmtCtx
for _, warning := range r.selectResp.Warnings {
r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails())
if len(r.selectResp.Chunks) == 0 {
continue
}
Expand Down
12 changes: 9 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-tipb"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -203,8 +204,13 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
// Used only for test.
type mockResultSubset struct{ data []byte }

// GetData implements kv.Response interface.
// GetData implements kv.ResultSubset interface.
func (r *mockResultSubset) GetData() []byte { return r.data }

// GetStartKey implements kv.Response interface.
// GetStartKey implements kv.ResultSubset interface.
func (r *mockResultSubset) GetStartKey() kv.Key { return nil }

// GetExecDetails implements kv.ResultSubset interface.
func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
user := a.Ctx.GetSessionVars().User
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[SLOW_QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
}
}

Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -207,6 +208,8 @@ type ResultSubset interface {
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
}

// Response represents the response returned from KV layer.
Expand Down
24 changes: 24 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ type StatementContext struct {
foundRows uint64
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
}

// Copied from SessionVars.TimeZone.
Expand Down Expand Up @@ -199,3 +201,25 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.warnings = nil
sc.mu.Unlock()
}

// MergeExecDetails merges a single region execution details into self, used to print
// the information in slow query log.
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) {
sc.mu.Lock()
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
sc.mu.Unlock()
}

// GetExecDetails gets the execution details for the statement.
func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
sc.mu.Lock()
details = sc.mu.execDetails
sc.mu.Unlock()
return details
}
100 changes: 54 additions & 46 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/goroutine_pool"
tipb "github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request) kv.Response {
it.concurrency = 1
}
if !it.req.KeepOrder {
it.respChan = make(chan copResponse, it.concurrency)
it.respChan = make(chan *copResponse, it.concurrency)
}
it.open(ctx)
return it
Expand All @@ -138,7 +139,7 @@ type copTask struct {
region RegionVerID
ranges *copRanges

respChan chan copResponse
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
}
Expand Down Expand Up @@ -273,7 +274,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
tasks = append(tasks, &copTask{
region: region,
ranges: ranges,
respChan: make(chan copResponse, 1),
respChan: make(chan *copResponse, 1),
cmdType: cmdType,
})
}
Expand Down Expand Up @@ -379,7 +380,7 @@ type copIterator struct {
curr int

// Otherwise, results are stored in respChan.
respChan chan copResponse
respChan chan *copResponse
wg sync.WaitGroup
}

Expand All @@ -389,7 +390,7 @@ type copIteratorWorker struct {
wg *sync.WaitGroup
store *tikvStore
req *kv.Request
respChan chan<- copResponse
respChan chan<- *copResponse
finishCh <-chan struct{}
}

Expand All @@ -399,15 +400,30 @@ type copIteratorTaskSender struct {
wg *sync.WaitGroup
tasks []*copTask
finishCh <-chan struct{}
respChan chan<- copResponse
respChan chan<- *copResponse
}

type copResponse struct {
*coprocessor.Response
pbResp *coprocessor.Response
execdetails.ExecDetails
startKey kv.Key
err error
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResponse) GetData() []byte {
return rs.pbResp.Data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResponse) GetStartKey() kv.Key {
return rs.startKey
}

func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails {
return &rs.ExecDetails
}

const minLogCopTaskTime = 300 * time.Millisecond

// run is a worker function that get a copTask from channel, handle it and
Expand Down Expand Up @@ -482,7 +498,7 @@ func (sender *copIteratorTaskSender) run() {
}
}

func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan copResponse) (resp copResponse, ok bool, exit bool) {
func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copResponse) (resp *copResponse, ok bool, exit bool) {
select {
case resp, ok = <-respCh:
case <-it.finishCh:
Expand All @@ -506,7 +522,7 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
return
}

func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- copResponse) (exit bool) {
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
select {
case respCh <- resp:
case <-worker.finishCh:
Expand All @@ -515,29 +531,13 @@ func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- co
return
}

// copResultSubset implements the kv.ResultSubset interface.
type copResultSubset struct {
data []byte
startKey kv.Key
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResultSubset) GetData() []byte {
return rs.data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResultSubset) GetStartKey() kv.Key {
return rs.startKey
}

// Next returns next coprocessor result.
// NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next().
func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
metrics.TiKVCoprocessorCounter.WithLabelValues("next").Inc()

var (
resp copResponse
resp *copResponse
ok bool
closed bool
)
Expand Down Expand Up @@ -578,20 +578,16 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
if err != nil {
return nil, errors.Trace(err)
}

if resp.Data == nil {
return &copResultSubset{}, nil
}
return &copResultSubset{data: resp.Data, startKey: resp.startKey}, nil
return resp, nil
}

// handleTask handles single copTask, sends the result to channel, retry automatically on error.
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- copResponse) {
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
remainTasks := []*copTask{task}
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
if err != nil {
resp := copResponse{err: errors.Trace(err)}
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
return
}
Expand All @@ -605,7 +601,7 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh

// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {

// gofail: var handleTaskOnceError bool
// if handleTaskOnceError {
Expand Down Expand Up @@ -646,7 +642,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, resp.Cop, task, ch, nil)
return worker.handleCopResponse(bo, &copResponse{pbResp: resp.Cop}, task, ch, nil)
}

const (
Expand Down Expand Up @@ -697,7 +693,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -707,7 +703,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, resp, task, ch, lastRange)
remainedTasks, err := worker.handleCopResponse(bo, &copResponse{pbResp: resp}, task, ch, lastRange)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand All @@ -733,16 +729,16 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan<- copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.GetRegionError(); regionErr != nil {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
metrics.TiKVCoprocessorCounter.WithLabelValues("rebuild_task").Inc()
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming)
}
if lockErr := resp.GetLocked(); lockErr != nil {
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
log.Debugf("coprocessor encounters lock: %v", lockErr)
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
Expand All @@ -755,19 +751,31 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coproces
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
}
if otherErr := resp.GetOtherError(); otherErr != "" {
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
log.Warnf("coprocessor err: %v", err)
return nil, errors.Trace(err)
}
var startKey kv.Key
// When the request is using streaming API, the `Range` is not nil.
if resp.Range != nil {
startKey = resp.Range.Start
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else {
startKey = task.ranges.at(0).StartKey
resp.startKey = task.ranges.at(0).StartKey
}
resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
}
if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
if scanDetail.Write != nil {
resp.TotalKeys += scanDetail.Write.Total
resp.ProcessedKeys += scanDetail.Write.Processed
}
}
}
worker.sendToRespCh(copResponse{resp, startKey, nil}, ch)
worker.sendToRespCh(resp, ch)
return nil, nil
}

Expand Down
Loading