From 9ea7bb79cfeb17b2594185524b532e14808f16af Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 8 Sep 2021 20:21:03 +0800 Subject: [PATCH] cherry pick #27735 to release-5.1 Signed-off-by: ti-srebot --- distsql/distsql.go | 19 +++++++++-- executor/simple.go | 2 +- kv/kv.go | 3 +- store/copr/coprocessor.go | 27 +++++++++++----- store/copr/coprocessor_test.go | 40 +++++++++++------------ util/mock/client.go | 3 +- util/trxevents/trx_events.go | 58 ++++++++++++++++++++++++++++++++++ 7 files changed, 118 insertions(+), 34 deletions(-) create mode 100644 util/trxevents/trx_events.go diff --git a/distsql/distsql.go b/distsql/distsql.go index 83236ce4dc5f4..7aaad10701ff5 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -24,8 +24,11 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) // DispatchMPPTasks dispathes all tasks and returns an iterator. @@ -74,7 +77,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie kvReq.Streaming = false } enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction) + originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL + eventCb := func(event trxevents.TransactionEvent) { + // Note: Do not assume this callback will be invoked within the same goroutine. + if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil { + logutil.Logger(ctx).Debug("coprocessor encounters lock", + zap.Uint64("startTS", kvReq.StartTs), + zap.Stringer("lock", copMeetLock.LockInfo), + zap.String("stmt", originalSQL)) + } + } + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb) if resp == nil { err := errors.New("client returns nil response") return nil, err @@ -136,7 +149,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq // Analyze do a analyze request. func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables, isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) { - resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false) + resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } @@ -159,7 +172,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv. func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) { // FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit // Checksum function signature. The two-way dependence should be removed in future. - resp := client.Send(ctx, kvReq, vars, nil, false) + resp := client.Send(ctx, kvReq, vars, nil, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } diff --git a/executor/simple.go b/executor/simple.go index ccc5f5b2ce242..dedd0107fa906 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -1295,7 +1295,7 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G return err } - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false) + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, nil) if resp == nil { err := errors.New("client returns nil response") return err diff --git a/kv/kv.go b/kv/kv.go index bcebff808ca1f..32556dab577a8 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" ) // Transaction options @@ -329,7 +330,7 @@ type ReturnedValue struct { // Client is used to send request to KV layer. type Client interface { // Send sends request to KV layer, returns a Response. - Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response + Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response // IsRequestTypeSupported checks if reqType and subType is supported. IsRequestTypeSupported(reqType, subType int64) bool diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index f30235cc1699e..ea38a4eb33004 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -62,14 +63,14 @@ type CopClient struct { } // Send builds the request and gets the coprocessor iterator response. -func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response { +func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response { if req.StoreType == kv.TiFlash && req.BatchCop { logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req) + tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req, eventCb) if err != nil { return copErrorResponse{err} } @@ -128,6 +129,8 @@ type copTask struct { storeAddr string cmdType tikvrpc.CmdType storeType kv.StoreType + + eventCb trxevents.EventCallback } func (r *copTask) String() string { @@ -138,7 +141,7 @@ func (r *copTask) String() string { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) { +func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { start := time.Now() cmdType := tikvrpc.CmdCop if req.Streaming { @@ -165,6 +168,7 @@ func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.Key respChan: make(chan *copResponse, 2), cmdType: cmdType, storeType: req.StoreType, + eventCb: eventCb, }) i = nextI } @@ -861,11 +865,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req) + return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.eventCb) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { - logutil.BgLogger().Debug("coprocessor encounters", - zap.Stringer("lock", lockErr)) + // Be care that we didn't redact the SQL statement because the log is DEBUG level. + if task.eventCb != nil { + task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{ + LockInfo: lockErr, + })) + } else { + logutil.Logger(bo.GetCtx()).Debug("coprocessor encounters lock", + zap.Stringer("lock", lockErr)) + } msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) @@ -879,7 +890,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) - logutil.BgLogger().Warn("other error", + logutil.Logger(bo.GetCtx()).Warn("other error", zap.Uint64("txnStartTS", worker.req.StartTs), zap.Uint64("regionID", task.region.GetID()), zap.String("storeAddr", task.storeAddr), @@ -1009,7 +1020,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *tikv.Backoffer, las if worker.req.Streaming && lastRange != nil { remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) } - return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req) + return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb) } // calculateRemain splits the input ranges into two, and take one of them according to desc flag. diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 4a3a020191230..953997ea4e7dd 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -41,49 +41,49 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req) + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -91,7 +91,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -99,45 +99,45 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") s.taskEqual(c, tasks[1], regionIDs[2], "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") @@ -208,7 +208,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { bo := tikv.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "m") @@ -222,7 +222,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 3) s.taskEqual(c, tasks[2], regionIDs[0], "a", "m") diff --git a/util/mock/client.go b/util/mock/client.go index 56ec53336d2e4..e88ce57ff77e2 100644 --- a/util/mock/client.go +++ b/util/mock/client.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" ) // Client implement kv.Client interface, mocked from "CopClient" defined in @@ -28,6 +29,6 @@ type Client struct { } // Send implement kv.Client interface. -func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response { +func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, eventCb trxevents.EventCallback) kv.Response { return c.MockResponse } diff --git a/util/trxevents/trx_events.go b/util/trxevents/trx_events.go new file mode 100644 index 0000000000000..3b9b0e2ee6a97 --- /dev/null +++ b/util/trxevents/trx_events.go @@ -0,0 +1,58 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trxevents + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +// EventType represents the type of a transaction event. +type EventType = int + +const ( + // EventTypeCopMeetLock stands for the CopMeetLock event type. + EventTypeCopMeetLock = iota +) + +// CopMeetLock represents an event that coprocessor reading encounters lock. +type CopMeetLock struct { + LockInfo *kvrpcpb.LockInfo +} + +// TransactionEvent represents a transaction event that may belong to any of the possible types. +type TransactionEvent struct { + eventType EventType + inner interface{} +} + +// GetCopMeetLock tries to extract the inner CopMeetLock event from a TransactionEvent. Returns nil if it's not a +// CopMeetLock event. +func (e TransactionEvent) GetCopMeetLock() *CopMeetLock { + if e.eventType == EventTypeCopMeetLock { + return e.inner.(*CopMeetLock) + } + return nil +} + +// WrapCopMeetLock wraps a CopMeetLock event into a TransactionEvent object. +func WrapCopMeetLock(copMeetLock *CopMeetLock) TransactionEvent { + return TransactionEvent{ + eventType: EventTypeCopMeetLock, + inner: copMeetLock, + } +} + +// EventCallback is the callback type that handles `TransactionEvent`s. +type EventCallback = func(event TransactionEvent)