diff --git a/distsql/distsql.go b/distsql/distsql.go index 89a9de36ac4f1..7b6a733dc2359 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -160,11 +160,13 @@ func (r *selectResult) getSelectResp() error { if err := r.selectResp.Error; err != nil { return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg) } + sc := r.ctx.GetSessionVars().StmtCtx for _, warning := range r.selectResp.Warnings { - r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) + sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg)) } r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ + sc.MergeExecDetails(re.result.GetExecDetails()) if len(r.selectResp.Chunks) == 0 { continue } diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 27c1fbbb411ac..e9747923cdddb 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -25,7 +25,8 @@ import ( "github.com/pingcap/tidb/util/charset" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - tipb "github.com/pingcap/tipb/go-tipb" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tipb/go-tipb" "golang.org/x/net/context" ) @@ -203,8 +204,13 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) { // Used only for test. type mockResultSubset struct{ data []byte } -// GetData implements kv.Response interface. +// GetData implements kv.ResultSubset interface. func (r *mockResultSubset) GetData() []byte { return r.data } -// GetStartKey implements kv.Response interface. +// GetStartKey implements kv.ResultSubset interface. func (r *mockResultSubset) GetStartKey() kv.Key { return nil } + +// GetExecDetails implements kv.ResultSubset interface. +func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { + return &execdetails.ExecDetails{} +} diff --git a/executor/adapter.go b/executor/adapter.go index 20f8566a724b2..1e728668f43a2 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -358,12 +358,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) { user := a.Ctx.GetSessionVars().User if costTime < threshold { logutil.SlowQueryLogger.Debugf( - "[QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", - costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) + "[QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", + costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) } else { logutil.SlowQueryLogger.Warnf( - "[SLOW_QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", - costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) + "[SLOW_QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v", + costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql) } } diff --git a/kv/kv.go b/kv/kv.go index 3dbdbc47c7765..6d4c4b36e1975 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/util/execdetails" "golang.org/x/net/context" ) @@ -207,6 +208,8 @@ type ResultSubset interface { GetData() []byte // GetStartKey gets the start key. GetStartKey() Key + // GetExecDetails gets the detail information. + GetExecDetails() *execdetails.ExecDetails } // Response represents the response returned from KV layer. diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 58f1868291e55..5347e686f8c5d 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/tidb/mysql" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" ) @@ -60,6 +61,7 @@ type StatementContext struct { foundRows uint64 warnings []SQLWarn histogramsNotLoad bool + execDetails execdetails.ExecDetails } // Copied from SessionVars.TimeZone. @@ -199,3 +201,25 @@ func (sc *StatementContext) ResetForRetry() { sc.mu.warnings = nil sc.mu.Unlock() } + +// MergeExecDetails merges a single region execution details into self, used to print +// the information in slow query log. +func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) { + sc.mu.Lock() + sc.mu.execDetails.ProcessTime += details.ProcessTime + sc.mu.execDetails.WaitTime += details.WaitTime + sc.mu.execDetails.BackoffTime += details.BackoffTime + sc.mu.execDetails.RequestCount++ + sc.mu.execDetails.TotalKeys += details.TotalKeys + sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys + sc.mu.Unlock() +} + +// GetExecDetails gets the execution details for the statement. +func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails { + var details execdetails.ExecDetails + sc.mu.Lock() + details = sc.mu.execDetails + sc.mu.Unlock() + return details +} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index fa70fa4989301..312c30b6f48d4 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/goroutine_pool" tipb "github.com/pingcap/tipb/go-tipb" log "github.com/sirupsen/logrus" @@ -127,7 +128,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request) kv.Response { it.concurrency = 1 } if !it.req.KeepOrder { - it.respChan = make(chan copResponse, it.concurrency) + it.respChan = make(chan *copResponse, it.concurrency) } it.open(ctx) return it @@ -138,7 +139,7 @@ type copTask struct { region RegionVerID ranges *copRanges - respChan chan copResponse + respChan chan *copResponse storeAddr string cmdType tikvrpc.CmdType } @@ -273,7 +274,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo tasks = append(tasks, &copTask{ region: region, ranges: ranges, - respChan: make(chan copResponse, 1), + respChan: make(chan *copResponse, 1), cmdType: cmdType, }) } @@ -379,7 +380,7 @@ type copIterator struct { curr int // Otherwise, results are stored in respChan. - respChan chan copResponse + respChan chan *copResponse wg sync.WaitGroup } @@ -389,7 +390,7 @@ type copIteratorWorker struct { wg *sync.WaitGroup store *tikvStore req *kv.Request - respChan chan<- copResponse + respChan chan<- *copResponse finishCh <-chan struct{} } @@ -399,15 +400,30 @@ type copIteratorTaskSender struct { wg *sync.WaitGroup tasks []*copTask finishCh <-chan struct{} - respChan chan<- copResponse + respChan chan<- *copResponse } type copResponse struct { - *coprocessor.Response + pbResp *coprocessor.Response + execdetails.ExecDetails startKey kv.Key err error } +// GetData implements the kv.ResultSubset GetData interface. +func (rs *copResponse) GetData() []byte { + return rs.pbResp.Data +} + +// GetStartKey implements the kv.ResultSubset GetStartKey interface. +func (rs *copResponse) GetStartKey() kv.Key { + return rs.startKey +} + +func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { + return &rs.ExecDetails +} + const minLogCopTaskTime = 300 * time.Millisecond // run is a worker function that get a copTask from channel, handle it and @@ -482,7 +498,7 @@ func (sender *copIteratorTaskSender) run() { } } -func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan copResponse) (resp copResponse, ok bool, exit bool) { +func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copResponse) (resp *copResponse, ok bool, exit bool) { select { case resp, ok = <-respCh: case <-it.finishCh: @@ -506,7 +522,7 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) { return } -func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- copResponse) (exit bool) { +func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) { select { case respCh <- resp: case <-worker.finishCh: @@ -515,29 +531,13 @@ func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- co return } -// copResultSubset implements the kv.ResultSubset interface. -type copResultSubset struct { - data []byte - startKey kv.Key -} - -// GetData implements the kv.ResultSubset GetData interface. -func (rs *copResultSubset) GetData() []byte { - return rs.data -} - -// GetStartKey implements the kv.ResultSubset GetStartKey interface. -func (rs *copResultSubset) GetStartKey() kv.Key { - return rs.startKey -} - // Next returns next coprocessor result. // NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next(). func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { metrics.TiKVCoprocessorCounter.WithLabelValues("next").Inc() var ( - resp copResponse + resp *copResponse ok bool closed bool ) @@ -578,20 +578,16 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { if err != nil { return nil, errors.Trace(err) } - - if resp.Data == nil { - return &copResultSubset{}, nil - } - return &copResultSubset{data: resp.Data, startKey: resp.startKey}, nil + return resp, nil } // handleTask handles single copTask, sends the result to channel, retry automatically on error. -func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- copResponse) { +func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) { remainTasks := []*copTask{task} for len(remainTasks) > 0 { tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh) if err != nil { - resp := copResponse{err: errors.Trace(err)} + resp := &copResponse{err: errors.Trace(err)} worker.sendToRespCh(resp, respCh) return } @@ -605,7 +601,7 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh // handleTaskOnce handles single copTask, successful results are send to channel. // If error happened, returns error. If region split or meet lock, returns the remain tasks. -func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { // gofail: var handleTaskOnceError bool // if handleTaskOnceError { @@ -646,7 +642,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch } // Handles the response for non-streaming copTask. - return worker.handleCopResponse(bo, resp.Cop, task, ch, nil) + return worker.handleCopResponse(bo, &copResponse{pbResp: resp.Cop}, task, ch, nil) } const ( @@ -697,7 +693,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan return logStr } -func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) { +func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) { defer stream.Close() var resp *coprocessor.Response var lastRange *coprocessor.KeyRange @@ -707,7 +703,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti return nil, nil } for { - remainedTasks, err := worker.handleCopResponse(bo, resp, task, ch, lastRange) + remainedTasks, err := worker.handleCopResponse(bo, &copResponse{pbResp: resp}, task, ch, lastRange) if err != nil || len(remainedTasks) != 0 { return remainedTasks, errors.Trace(err) } @@ -733,8 +729,8 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti // returns more tasks when that happens, or handles the response if no error. // if we're handling streaming coprocessor response, lastRange is the range of last // successful response, otherwise it's nil. -func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan<- copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) { - if regionErr := resp.GetRegionError(); regionErr != nil { +func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) { + if regionErr := resp.pbResp.GetRegionError(); regionErr != nil { if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil { return nil, errors.Trace(err) } @@ -742,7 +738,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coproces metrics.TiKVCoprocessorCounter.WithLabelValues("rebuild_task").Inc() return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming) } - if lockErr := resp.GetLocked(); lockErr != nil { + if lockErr := resp.pbResp.GetLocked(); lockErr != nil { log.Debugf("coprocessor encounters lock: %v", lockErr) ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)}) if err1 != nil { @@ -755,19 +751,31 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coproces } return worker.buildCopTasksFromRemain(bo, lastRange, task) } - if otherErr := resp.GetOtherError(); otherErr != "" { + if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) log.Warnf("coprocessor err: %v", err) return nil, errors.Trace(err) } - var startKey kv.Key // When the request is using streaming API, the `Range` is not nil. - if resp.Range != nil { - startKey = resp.Range.Start + if resp.pbResp.Range != nil { + resp.startKey = resp.pbResp.Range.Start } else { - startKey = task.ranges.at(0).StartKey + resp.startKey = task.ranges.at(0).StartKey + } + resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { + if handleTime := pbDetails.HandleTime; handleTime != nil { + resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond + resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond + } + if scanDetail := pbDetails.ScanDetail; scanDetail != nil { + if scanDetail.Write != nil { + resp.TotalKeys += scanDetail.Write.Total + resp.ProcessedKeys += scanDetail.Write.Processed + } + } } - worker.sendToRespCh(copResponse{resp, startKey, nil}, ch) + worker.sendToRespCh(resp, ch) return nil, nil } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go new file mode 100644 index 0000000000000..a73c5b2cd60b5 --- /dev/null +++ b/util/execdetails/execdetails.go @@ -0,0 +1,54 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package execdetails + +import ( + "fmt" + "strings" + "time" +) + +// ExecDetails contains execution detail information. +type ExecDetails struct { + ProcessTime time.Duration + WaitTime time.Duration + BackoffTime time.Duration + RequestCount int + TotalKeys int64 + ProcessedKeys int64 +} + +// String implements the fmt.Stringer interface. +func (d ExecDetails) String() string { + parts := make([]string, 0, 6) + if d.ProcessTime > 0 { + parts = append(parts, fmt.Sprintf("process_time:%v", d.ProcessTime)) + } + if d.WaitTime > 0 { + parts = append(parts, fmt.Sprintf("wait_time:%v", d.WaitTime)) + } + if d.BackoffTime > 0 { + parts = append(parts, fmt.Sprintf("backoff_time:%v", d.BackoffTime)) + } + if d.RequestCount > 0 { + parts = append(parts, fmt.Sprintf("request_count:%d", d.RequestCount)) + } + if d.TotalKeys > 0 { + parts = append(parts, fmt.Sprintf("total_keys:%d", d.TotalKeys)) + } + if d.ProcessedKeys > 0 { + parts = append(parts, fmt.Sprintf("processed_keys:%d", d.ProcessedKeys)) + } + return strings.Join(parts, " ") +} diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go new file mode 100644 index 0000000000000..b69f2229c7668 --- /dev/null +++ b/util/execdetails/execdetails_test.go @@ -0,0 +1,38 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package execdetails + +import ( + "testing" + "time" +) + +func TestString(t *testing.T) { + detail := &ExecDetails{ + ProcessTime: time.Second, + WaitTime: time.Second, + BackoffTime: time.Second, + RequestCount: 1, + TotalKeys: 100, + ProcessedKeys: 10, + } + expected := "process_time:1s wait_time:1s backoff_time:1s request_count:1 total_keys:100 processed_keys:10" + if str := detail.String(); str != expected { + t.Errorf("got:\n%s\nexpected:\n%s", str, expected) + } + detail = &ExecDetails{} + if str := detail.String(); str != "" { + t.Errorf("got:\n%s\nexpected:\n", str) + } +}