Skip to content

Commit

Permalink
cherry pick pingcap#27735 to release-5.1
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 23, 2021
1 parent 53251a9 commit 9ea7bb7
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 34 deletions.
19 changes: 16 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// DispatchMPPTasks dispathes all tasks and returns an iterator.
Expand Down Expand Up @@ -74,7 +77,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
kvReq.Streaming = false
}
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction)
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
// Note: Do not assume this callback will be invoked within the same goroutine.
if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil {
logutil.Logger(ctx).Debug("coprocessor encounters lock",
zap.Uint64("startTS", kvReq.StartTs),
zap.Stringer("lock", copMeetLock.LockInfo),
zap.String("stmt", originalSQL))
}
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down Expand Up @@ -136,7 +149,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq
// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables,
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false)
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand All @@ -159,7 +172,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.
func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) {
// FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit
// Checksum function signature. The two-way dependence should be removed in future.
resp := client.Send(ctx, kvReq, vars, nil, false)
resp := client.Send(ctx, kvReq, vars, nil, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
return err
}

resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false)
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, nil)
if resp == nil {
err := errors.New("client returns nil response")
return err
Expand Down
3 changes: 2 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
)

// Transaction options
Expand Down Expand Up @@ -329,7 +330,7 @@ type ReturnedValue struct {
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response

// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
Expand Down
27 changes: 19 additions & 8 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand All @@ -62,14 +63,14 @@ type CopClient struct {
}

// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
if req.StoreType == kv.TiFlash && req.BatchCop {
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req, eventCb)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -128,6 +129,8 @@ type copTask struct {
storeAddr string
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
}

func (r *copTask) String() string {
Expand All @@ -138,7 +141,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -165,6 +168,7 @@ func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
})
i = nextI
}
Expand Down Expand Up @@ -861,11 +865,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
// Be care that we didn't redact the SQL statement because the log is DEBUG level.
if task.eventCb != nil {
task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{
LockInfo: lockErr,
}))
} else {
logutil.Logger(bo.GetCtx()).Debug("coprocessor encounters lock",
zap.Stringer("lock", lockErr))
}
msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
Expand All @@ -879,7 +890,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
logutil.Logger(bo.GetCtx()).Warn("other error",
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.GetID()),
zap.String("storeAddr", task.storeAddr),
Expand Down Expand Up @@ -1009,7 +1020,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *tikv.Backoffer, las
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
40 changes: 20 additions & 20 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,103 +41,103 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
req := &kv.Request{}
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
bo := tikv.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -222,7 +222,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cache.InvalidateCachedRegion(tasks[1].region)

req.Desc = true
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
3 changes: 2 additions & 1 deletion util/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
)

// Client implement kv.Client interface, mocked from "CopClient" defined in
Expand All @@ -28,6 +29,6 @@ type Client struct {
}

// Send implement kv.Client interface.
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response {
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, eventCb trxevents.EventCallback) kv.Response {
return c.MockResponse
}
58 changes: 58 additions & 0 deletions util/trxevents/trx_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trxevents

import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// EventType represents the type of a transaction event.
type EventType = int

const (
// EventTypeCopMeetLock stands for the CopMeetLock event type.
EventTypeCopMeetLock = iota
)

// CopMeetLock represents an event that coprocessor reading encounters lock.
type CopMeetLock struct {
LockInfo *kvrpcpb.LockInfo
}

// TransactionEvent represents a transaction event that may belong to any of the possible types.
type TransactionEvent struct {
eventType EventType
inner interface{}
}

// GetCopMeetLock tries to extract the inner CopMeetLock event from a TransactionEvent. Returns nil if it's not a
// CopMeetLock event.
func (e TransactionEvent) GetCopMeetLock() *CopMeetLock {
if e.eventType == EventTypeCopMeetLock {
return e.inner.(*CopMeetLock)
}
return nil
}

// WrapCopMeetLock wraps a CopMeetLock event into a TransactionEvent object.
func WrapCopMeetLock(copMeetLock *CopMeetLock) TransactionEvent {
return TransactionEvent{
eventType: EventTypeCopMeetLock,
inner: copMeetLock,
}
}

// EventCallback is the callback type that handles `TransactionEvent`s.
type EventCallback = func(event TransactionEvent)

0 comments on commit 9ea7bb7

Please sign in to comment.