diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 295b9e07c1f3d..e9b7c23298645 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -152,6 +152,7 @@ go_library( "//util", "//util/admin", "//util/bitmap", + "//util/breakpoint", "//util/chunk", "//util/codec", "//util/collate", diff --git a/executor/adapter.go b/executor/adapter.go index adb6c34865aa6..faf894816c702 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/breakpoint" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hint" @@ -415,10 +416,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. ctx = a.observeStmtBeginForTopSQL(ctx) - failpoint.Inject("hookBeforeFirstRunExecutor", func() { - sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookBeforeFirstRunExecutorKey) - }) - + breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) if err = e.Open(ctx); err != nil { terror.Call(e.Close) return nil, err @@ -795,10 +793,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error if err != nil { return nil, err } - - failpoint.Inject("hookAfterOnStmtRetryWithLockError", func() { - sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookAfterOnStmtRetryWithLockErrorKey) - }) + breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) e, err := a.buildExecutor() if err != nil { diff --git a/sessiontxn/failpoint.go b/sessiontxn/failpoint.go index 76ff4ea60e52a..d33984649b371 100644 --- a/sessiontxn/failpoint.go +++ b/sessiontxn/failpoint.go @@ -35,13 +35,13 @@ var AssertTxnInfoSchemaKey stringutil.StringerStr = "assertTxnInfoSchemaKey" // Only for test var AssertTxnInfoSchemaAfterRetryKey stringutil.StringerStr = "assertTxnInfoSchemaAfterRetryKey" -// HookBeforeFirstRunExecutorKey is the hook key for the executor first run +// BreakPointBeforeExecutorFirstRun is the key for the stop point where session stops before executor's first run // Only for test -var HookBeforeFirstRunExecutorKey stringutil.StringerStr = "testHookKeyBeforeFirstRunExecutor" +var BreakPointBeforeExecutorFirstRun = "beforeExecutorFirstRun" -// HookAfterOnStmtRetryWithLockErrorKey is the hook key for after OnStmtRetry with lock error +// BreakPointOnStmtRetryAfterLockError s the key for the stop point where session stops after OnStmtRetry when lock error happens // Only for test -var HookAfterOnStmtRetryWithLockErrorKey stringutil.StringerStr = "testHookKeyAfterOnStmtRetryWithLockError" +var BreakPointOnStmtRetryAfterLockError = "lockErrorAndThenOnStmtRetryCalled" // RecordAssert is used only for test func RecordAssert(sctx sessionctx.Context, name string, value interface{}) { diff --git a/sessiontxn/txn_context_test.go b/sessiontxn/txn_context_test.go index 8ac6f0aaa7322..d3e1f32124575 100644 --- a/sessiontxn/txn_context_test.go +++ b/sessiontxn/txn_context_test.go @@ -52,8 +52,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor", "return")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return")) @@ -85,8 +83,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec")) @@ -729,102 +725,65 @@ func TestStillWriteConflictAfterRetry(t *testing.T) { "update t1 set v=v+1 where id in (1, 2, 3) and v>0", } + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") for _, isolation := range []string{ast.RepeatableRead, ast.ReadCommitted} { for _, query := range queries { for _, autocommit := range []bool{true, false} { t.Run(fmt.Sprintf("%s,%s,autocommit=%v", isolation, query, autocommit), func(t *testing.T) { - testStillWriteConflictAfterRetry(t, store, isolation, query, autocommit) + tk.MustExec("truncate table t1") + tk.MustExec("insert into t1 values(1, 10)") + tk2 := testkit.NewSteppedTestKit(t, store) + defer tk2.MustExec("rollback") + + tk2.MustExec("use test") + tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'") + tk2.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation)) + if autocommit { + tk2.MustExec("set autocommit=1") + tk2.MustExec("begin") + } else { + tk2.MustExec("set autocommit=0") + } + + tk2.SetBreakPoints( + sessiontxn.BreakPointBeforeExecutorFirstRun, + sessiontxn.BreakPointOnStmtRetryAfterLockError, + ) + + var isSelect, isUpdate bool + switch { + case strings.HasPrefix(query, "select"): + isSelect = true + tk2.SteppedMustQuery(query) + case strings.HasPrefix(query, "update"): + isUpdate = true + tk2.SteppedMustExec(query) + default: + require.FailNowf(t, "invalid query: ", query) + } + + // Pause the session before the executor first run and then update the record in another session + tk2.ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun) + tk.MustExec("update t1 set v=v+1") + + // Session continues, it should get a lock error and retry, we pause the session before the executor's next run + // and then update the record in another session again. + tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError) + tk.MustExec("update t1 set v=v+1") + + // Because the record is updated by another session again, when this session continues, it will get a lock error again. + tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError) + tk2.Continue().ExpectIdle() + switch { + case isSelect: + tk2.GetQueryResult().Check(testkit.Rows("1 12")) + case isUpdate: + tk2.MustExec("commit") + tk2.MustQuery("select * from t1").Check(testkit.Rows("1 13")) + } }) } } } } - -func testStillWriteConflictAfterRetry(t *testing.T, store kv.Storage, isolation string, query string, autocommit bool) { - tk := testkit.NewTestKit(t, store) - defer tk.MustExec("rollback") - - tk.MustExec("use test") - tk.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation)) - tk.MustExec("set autocommit=1") - tk.MustExec("set @@tidb_txn_mode = 'pessimistic'") - tk.MustExec("truncate table t1") - tk.MustExec("insert into t1 values(1, 10)") - - se := tk.Session() - chanBeforeRunStmt := make(chan func(), 1) - chanAfterOnStmtRetry := make(chan func(), 1) - c2 := make(chan string, 1) - c3 := make(chan string, 1) - wait := func(ch chan string, expect string) { - select { - case got := <-ch: - if got != expect { - panic(fmt.Sprintf("expect '%s', got '%s'", expect, got)) - } - case <-time.After(time.Second * 10): - panic("wait2 timeout") - } - } - - if autocommit { - tk.MustExec("begin") - } else { - tk.MustExec("set @@autocommit=0") - } - - se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, chanBeforeRunStmt) - se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, chanAfterOnStmtRetry) - defer func() { - se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil) - se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil) - }() - - chanBeforeRunStmt <- func() { - c2 <- "now before session1 runStmt" - wait(c3, "session2 updated v=v+1 done") - } - - chanAfterOnStmtRetry <- func() { - c2 <- "now after OnStmtRetry before rebuild executor" - wait(c3, "session2 updated v=v+1 again done") - } - - go func() { - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - - // first conflict - wait(c2, "now before session1 runStmt") - tk2.MustExec("update t1 set v=v+1 where id=1") - c3 <- "session2 updated v=v+1 done" - - // second conflict - wait(c2, "now after OnStmtRetry before rebuild executor") - tk2.MustExec("update t1 set v=v+1 where id=1") - c3 <- "session2 updated v=v+1 again done" - chanAfterOnStmtRetry <- func() {} - c3 <- "done" - }() - - isSelect := false - if strings.HasPrefix(query, "update ") { - tk.MustExec(query) - } else if strings.HasPrefix(query, "select ") { - isSelect = true - tk.MustQuery(query).Check(testkit.Rows("1 12")) - } else { - require.FailNowf(t, "invalid query: %s", query) - } - - wait(c3, "done") - - se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil) - se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil) - if isSelect { - tk.MustExec("update t1 set v=v+1") - } - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 13")) - tk.MustExec("rollback") -} diff --git a/testkit/BUILD.bazel b/testkit/BUILD.bazel index c4c43cf90cfdc..17034f2e778f5 100644 --- a/testkit/BUILD.bazel +++ b/testkit/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "mocksessionmanager.go", "mockstore.go", "result.go", + "stepped.go", "testkit.go", ], importpath = "github.com/pingcap/tidb/testkit", @@ -23,8 +24,10 @@ go_library( "//store/mockstore", "//types", "//util", + "//util/breakpoint", "//util/sqlexec", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", diff --git a/testkit/stepped.go b/testkit/stepped.go new file mode 100644 index 0000000000000..d63a2a1efc7b8 --- /dev/null +++ b/testkit/stepped.go @@ -0,0 +1,256 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testkit + +import ( + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/breakpoint" + "github.com/stretchr/testify/require" +) + +var errCommandRunFailed = errors.New("command run failed") + +var defaultChanTimeout = time.Second * 10 + +type steppedTestkitMsgType int + +const ( + msgTpCmdContinue steppedTestkitMsgType = iota + msgTpCmdStopOnBreakPoint + msgTpCmdDone +) + +type steppedTestKitMsg struct { + tp steppedTestkitMsgType + val any +} + +type steppedTestKitMsgChan chan *steppedTestKitMsg + +func (ch steppedTestKitMsgChan) sendMsg(tp steppedTestkitMsgType, val any) error { + select { + case ch <- &steppedTestKitMsg{tp: tp, val: val}: + return nil + case <-time.After(defaultChanTimeout): + return errors.New("send msg timeout") + } +} + +func (ch steppedTestKitMsgChan) sendMsgWithType(tp steppedTestkitMsgType) error { + return ch.sendMsg(tp, nil) +} + +func (ch steppedTestKitMsgChan) recvMsg() (*steppedTestKitMsg, error) { + select { + case msg := <-ch: + return msg, nil + case <-time.After(defaultChanTimeout): + return nil, errors.New("send msg timeout") + } +} + +func (ch steppedTestKitMsgChan) recvMsgWithCheck(tp steppedTestkitMsgType) (*steppedTestKitMsg, error) { + msg, err := ch.recvMsg() + if err != nil { + return nil, err + } + + if msg.tp != tp { + return nil, errors.Errorf("unexpected msg type: %v, expect: %v", msg.tp, tp) + } + + return msg, nil +} + +type steppedTestKitCommandContext struct { + t *testing.T + tk *TestKit + notifyBreakPointAndWait func(string) +} + +type steppedTestKitCommand func(ctx *steppedTestKitCommandContext) any + +// SteppedTestKit is the testkit that can run stepped command +type SteppedTestKit struct { + t *testing.T + tk *TestKit + + // ch1 is used to send msg from foreground to background + ch1 steppedTestKitMsgChan + // ch2 is used to send msg from background to foreground + ch2 steppedTestKitMsgChan + // breakPoints is the break points we want to stop at + breakPoints []string + // cmdStopAt is the current break point it stopped at + cmdStopAt string + // the result of the current command + cmdResult any +} + +// NewSteppedTestKit creates a new SteppedTestKit +func NewSteppedTestKit(t *testing.T, store kv.Storage) *SteppedTestKit { + tk := &SteppedTestKit{ + t: t, + tk: NewTestKit(t, store), + ch1: make(steppedTestKitMsgChan), + ch2: make(steppedTestKitMsgChan), + } + return tk +} + +// ExpectIdle checks no command is running +func (tk *SteppedTestKit) ExpectIdle() { + require.Equal(tk.t, "", tk.cmdStopAt) +} + +// ExpectStopOnBreakPoint checks stopped on the specified break point +func (tk *SteppedTestKit) ExpectStopOnBreakPoint(breakPoint string) { + require.Equal(tk.t, breakPoint, tk.cmdStopAt) +} + +// ExpectStopOnAnyBreakPoint checks stopped on any break point +func (tk *SteppedTestKit) ExpectStopOnAnyBreakPoint() { + require.NotEqual(tk.t, "", tk.cmdStopAt) +} + +// SetBreakPoints sets the break points we want to stop at +func (tk *SteppedTestKit) SetBreakPoints(breakPoints ...string) { + tk.breakPoints = breakPoints +} + +func (tk *SteppedTestKit) handleCommandMsg() { + msg, err := tk.ch2.recvMsg() + require.NoError(tk.t, err) + switch msg.tp { + case msgTpCmdDone: + tk.cmdStopAt = "" + if msg.val == errCommandRunFailed { + require.FailNow(tk.t, "internal command failed") + } else { + tk.cmdResult = msg.val + } + case msgTpCmdStopOnBreakPoint: + require.IsType(tk.t, "", msg.val) + require.NotEqual(tk.t, "", msg.val) + tk.cmdStopAt = msg.val.(string) + default: + require.FailNow(tk.t, "invalid msg type", "tp %v", msg.tp) + } +} + +func (tk *SteppedTestKit) beforeCommand() { + tk.ExpectIdle() + tk.cmdResult = nil +} + +func (tk *SteppedTestKit) steppedCommand(cmd steppedTestKitCommand) *SteppedTestKit { + tk.beforeCommand() + go func() { + var success bool + var result any + var breakPointPaths []string + defer func() { + if !success { + result = errCommandRunFailed + } + + tk.tk.Session().SetValue(breakpoint.NotifyBreakPointFuncKey, nil) + for _, path := range breakPointPaths { + require.NoError(tk.t, failpoint.Disable(path)) + } + + require.NoError(tk.t, tk.ch2.sendMsg(msgTpCmdDone, result)) + }() + + ctx := &steppedTestKitCommandContext{ + t: tk.t, + tk: tk.tk, + notifyBreakPointAndWait: func(breakPoint string) { + require.NoError(tk.t, tk.ch2.sendMsg(msgTpCmdStopOnBreakPoint, breakPoint)) + _, err := tk.ch1.recvMsgWithCheck(msgTpCmdContinue) + require.NoError(tk.t, err) + }, + } + + tk.tk.Session().SetValue(breakpoint.NotifyBreakPointFuncKey, ctx.notifyBreakPointAndWait) + for _, breakPoint := range tk.breakPoints { + path := "github.com/pingcap/tidb/util/breakpoint/" + breakPoint + require.NoError(tk.t, failpoint.Enable(path, "return")) + breakPointPaths = append(breakPointPaths, path) + } + + result = cmd(ctx) + success = true + }() + + tk.handleCommandMsg() + return tk +} + +// Continue continues current command +func (tk *SteppedTestKit) Continue() *SteppedTestKit { + tk.ExpectStopOnAnyBreakPoint() + require.NoError(tk.t, tk.ch1.sendMsgWithType(msgTpCmdContinue)) + tk.handleCommandMsg() + return tk +} + +// SteppedMustExec creates a new stepped task for MustExec +func (tk *SteppedTestKit) SteppedMustExec(sql string, args ...interface{}) *SteppedTestKit { + return tk.steppedCommand(func(_ *steppedTestKitCommandContext) any { + tk.MustExec(sql, args...) + return nil + }) +} + +// SteppedMustQuery creates a new stepped task for MustQuery +func (tk *SteppedTestKit) SteppedMustQuery(sql string, args ...interface{}) *SteppedTestKit { + return tk.steppedCommand(func(_ *steppedTestKitCommandContext) any { + return tk.MustQuery(sql, args...) + }) +} + +// MustExec executes a sql statement and asserts nil error. +func (tk *SteppedTestKit) MustExec(sql string, args ...interface{}) { + tk.beforeCommand() + tk.tk.MustExec(sql, args...) +} + +// MustQuery query the statements and returns result rows. +// If expected result is set it asserts the query result equals expected result. +func (tk *SteppedTestKit) MustQuery(sql string, args ...interface{}) *Result { + tk.beforeCommand() + result := tk.tk.MustQuery(sql, args...) + tk.cmdResult = result + return result +} + +// GetResult returns the result of the latest command +func (tk *SteppedTestKit) GetResult() any { + tk.ExpectIdle() + return tk.cmdResult +} + +// GetQueryResult returns the query result of the latest command +func (tk *SteppedTestKit) GetQueryResult() *Result { + tk.ExpectIdle() + require.IsType(tk.t, &Result{}, tk.cmdResult) + return tk.cmdResult.(*Result) +} diff --git a/util/breakpoint/BUILD.bazel b/util/breakpoint/BUILD.bazel new file mode 100644 index 0000000000000..ecc1f6e7f6e03 --- /dev/null +++ b/util/breakpoint/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "breakpoint", + srcs = ["breakpoint.go"], + importpath = "github.com/pingcap/tidb/util/breakpoint", + visibility = ["//visibility:public"], + deps = [ + "//sessionctx", + "//util/stringutil", + "@com_github_pingcap_failpoint//:failpoint", + ], +) diff --git a/util/breakpoint/breakpoint.go b/util/breakpoint/breakpoint.go new file mode 100644 index 0000000000000..b94a90be02954 --- /dev/null +++ b/util/breakpoint/breakpoint.go @@ -0,0 +1,34 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package breakpoint + +import ( + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/stringutil" +) + +// NotifyBreakPointFuncKey is the key where break point notify function located +const NotifyBreakPointFuncKey = stringutil.StringerStr("breakPointNotifyFunc") + +// Inject injects a break point to a session +func Inject(sctx sessionctx.Context, name string) { + failpoint.Inject(name, func(_ failpoint.Value) { + val := sctx.Value(NotifyBreakPointFuncKey) + if breakPointNotifyAndWaitContinue, ok := val.(func(string)); ok { + breakPointNotifyAndWaitContinue(name) + } + }) +}