Skip to content

Commit

Permalink
*: Provide a util to "pause" session in uint test (#35529)
Browse files Browse the repository at this point in the history
close #35526
  • Loading branch information
lcwangchao committed Jun 24, 2022
1 parent 2c4d1df commit db02b4a
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 106 deletions.
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ go_library(
"//util",
"//util/admin",
"//util/bitmap",
"//util/breakpoint",
"//util/chunk",
"//util/codec",
"//util/collate",
Expand Down
11 changes: 3 additions & 8 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/breakpoint"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hint"
Expand Down Expand Up @@ -415,10 +416,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.observeStmtBeginForTopSQL(ctx)

failpoint.Inject("hookBeforeFirstRunExecutor", func() {
sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookBeforeFirstRunExecutorKey)
})

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun)
if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
return nil, err
Expand Down Expand Up @@ -795,10 +793,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
if err != nil {
return nil, err
}

failpoint.Inject("hookAfterOnStmtRetryWithLockError", func() {
sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookAfterOnStmtRetryWithLockErrorKey)
})
breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)

e, err := a.buildExecutor()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions sessiontxn/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ var AssertTxnInfoSchemaKey stringutil.StringerStr = "assertTxnInfoSchemaKey"
// Only for test
var AssertTxnInfoSchemaAfterRetryKey stringutil.StringerStr = "assertTxnInfoSchemaAfterRetryKey"

// HookBeforeFirstRunExecutorKey is the hook key for the executor first run
// BreakPointBeforeExecutorFirstRun is the key for the stop point where session stops before executor's first run
// Only for test
var HookBeforeFirstRunExecutorKey stringutil.StringerStr = "testHookKeyBeforeFirstRunExecutor"
var BreakPointBeforeExecutorFirstRun = "beforeExecutorFirstRun"

// HookAfterOnStmtRetryWithLockErrorKey is the hook key for after OnStmtRetry with lock error
// BreakPointOnStmtRetryAfterLockError s the key for the stop point where session stops after OnStmtRetry when lock error happens
// Only for test
var HookAfterOnStmtRetryWithLockErrorKey stringutil.StringerStr = "testHookKeyAfterOnStmtRetryWithLockError"
var BreakPointOnStmtRetryAfterLockError = "lockErrorAndThenOnStmtRetryCalled"

// RecordAssert is used only for test
func RecordAssert(sctx sessionctx.Context, name string, value interface{}) {
Expand Down
147 changes: 53 additions & 94 deletions sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return"))
Expand Down Expand Up @@ -85,8 +83,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec"))
Expand Down Expand Up @@ -729,102 +725,65 @@ func TestStillWriteConflictAfterRetry(t *testing.T) {
"update t1 set v=v+1 where id in (1, 2, 3) and v>0",
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
for _, isolation := range []string{ast.RepeatableRead, ast.ReadCommitted} {
for _, query := range queries {
for _, autocommit := range []bool{true, false} {
t.Run(fmt.Sprintf("%s,%s,autocommit=%v", isolation, query, autocommit), func(t *testing.T) {
testStillWriteConflictAfterRetry(t, store, isolation, query, autocommit)
tk.MustExec("truncate table t1")
tk.MustExec("insert into t1 values(1, 10)")
tk2 := testkit.NewSteppedTestKit(t, store)
defer tk2.MustExec("rollback")

tk2.MustExec("use test")
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk2.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation))
if autocommit {
tk2.MustExec("set autocommit=1")
tk2.MustExec("begin")
} else {
tk2.MustExec("set autocommit=0")
}

tk2.SetBreakPoints(
sessiontxn.BreakPointBeforeExecutorFirstRun,
sessiontxn.BreakPointOnStmtRetryAfterLockError,
)

var isSelect, isUpdate bool
switch {
case strings.HasPrefix(query, "select"):
isSelect = true
tk2.SteppedMustQuery(query)
case strings.HasPrefix(query, "update"):
isUpdate = true
tk2.SteppedMustExec(query)
default:
require.FailNowf(t, "invalid query: ", query)
}

// Pause the session before the executor first run and then update the record in another session
tk2.ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
tk.MustExec("update t1 set v=v+1")

// Session continues, it should get a lock error and retry, we pause the session before the executor's next run
// and then update the record in another session again.
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
tk.MustExec("update t1 set v=v+1")

// Because the record is updated by another session again, when this session continues, it will get a lock error again.
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
tk2.Continue().ExpectIdle()
switch {
case isSelect:
tk2.GetQueryResult().Check(testkit.Rows("1 12"))
case isUpdate:
tk2.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
}
})
}
}
}
}

func testStillWriteConflictAfterRetry(t *testing.T, store kv.Storage, isolation string, query string, autocommit bool) {
tk := testkit.NewTestKit(t, store)
defer tk.MustExec("rollback")

tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation))
tk.MustExec("set autocommit=1")
tk.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk.MustExec("truncate table t1")
tk.MustExec("insert into t1 values(1, 10)")

se := tk.Session()
chanBeforeRunStmt := make(chan func(), 1)
chanAfterOnStmtRetry := make(chan func(), 1)
c2 := make(chan string, 1)
c3 := make(chan string, 1)
wait := func(ch chan string, expect string) {
select {
case got := <-ch:
if got != expect {
panic(fmt.Sprintf("expect '%s', got '%s'", expect, got))
}
case <-time.After(time.Second * 10):
panic("wait2 timeout")
}
}

if autocommit {
tk.MustExec("begin")
} else {
tk.MustExec("set @@autocommit=0")
}

se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, chanBeforeRunStmt)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, chanAfterOnStmtRetry)
defer func() {
se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil)
}()

chanBeforeRunStmt <- func() {
c2 <- "now before session1 runStmt"
wait(c3, "session2 updated v=v+1 done")
}

chanAfterOnStmtRetry <- func() {
c2 <- "now after OnStmtRetry before rebuild executor"
wait(c3, "session2 updated v=v+1 again done")
}

go func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

// first conflict
wait(c2, "now before session1 runStmt")
tk2.MustExec("update t1 set v=v+1 where id=1")
c3 <- "session2 updated v=v+1 done"

// second conflict
wait(c2, "now after OnStmtRetry before rebuild executor")
tk2.MustExec("update t1 set v=v+1 where id=1")
c3 <- "session2 updated v=v+1 again done"
chanAfterOnStmtRetry <- func() {}
c3 <- "done"
}()

isSelect := false
if strings.HasPrefix(query, "update ") {
tk.MustExec(query)
} else if strings.HasPrefix(query, "select ") {
isSelect = true
tk.MustQuery(query).Check(testkit.Rows("1 12"))
} else {
require.FailNowf(t, "invalid query: %s", query)
}

wait(c3, "done")

se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil)
if isSelect {
tk.MustExec("update t1 set v=v+1")
}
tk.MustExec("commit")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
tk.MustExec("rollback")
}
3 changes: 3 additions & 0 deletions testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"mocksessionmanager.go",
"mockstore.go",
"result.go",
"stepped.go",
"testkit.go",
],
importpath = "github.com/pingcap/tidb/testkit",
Expand All @@ -23,8 +24,10 @@ go_library(
"//store/mockstore",
"//types",
"//util",
"//util/breakpoint",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
Loading

0 comments on commit db02b4a

Please sign in to comment.