Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Provide a util to "pause" session in uint test #35529

Merged
merged 32 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
322468e
txn: fix bug that some times lock ts and read ts are not the same
lcwangchao Jun 17, 2022
8675cdd
Merge branch 'master' into fix_35459
lcwangchao Jun 17, 2022
240140b
update
lcwangchao Jun 17, 2022
92c87e4
Add session stop
lcwangchao Jun 17, 2022
fe924f8
Merge branch 'master' into sessionstop
lcwangchao Jun 17, 2022
9ecee16
update
lcwangchao Jun 20, 2022
7f042b0
update
lcwangchao Jun 20, 2022
bac48a1
update
lcwangchao Jun 20, 2022
7544019
update
lcwangchao Jun 20, 2022
1bbef30
update
lcwangchao Jun 20, 2022
0258494
update
lcwangchao Jun 20, 2022
a293a69
Merge branch 'master' into sessionstop
lcwangchao Jun 20, 2022
3a0bc58
comments
lcwangchao Jun 20, 2022
3cebd5f
Merge branch 'sessionstop' of github.com:lcwangchao/tidb into session…
lcwangchao Jun 20, 2022
a3c3742
Merge branch 'master' into sessionstop
lcwangchao Jun 20, 2022
72ad391
add SteppedCommandTask
lcwangchao Jun 21, 2022
f1414e6
Merge branch 'master' into sessionstop
lcwangchao Jun 21, 2022
d4b9ed7
refactor
lcwangchao Jun 22, 2022
9146d0c
update
lcwangchao Jun 22, 2022
1814743
update
lcwangchao Jun 22, 2022
5f9942e
rename
lcwangchao Jun 22, 2022
25c1db3
rename
lcwangchao Jun 22, 2022
c6af734
lint
lcwangchao Jun 22, 2022
4304e97
update
lcwangchao Jun 23, 2022
1eef15c
Merge branch 'master' into sessionstop
lcwangchao Jun 23, 2022
4491124
update bazel
lcwangchao Jun 23, 2022
d6b4150
update SetBreakPoints
lcwangchao Jun 23, 2022
4dd804e
consts
lcwangchao Jun 23, 2022
09d4bbd
Merge branch 'master' into sessionstop
lcwangchao Jun 24, 2022
3a245ce
Merge branch 'master' into sessionstop
ti-chi-bot Jun 24, 2022
a36c20a
Merge branch 'master' into sessionstop
ti-chi-bot Jun 24, 2022
2f11a05
Merge branch 'master' into sessionstop
ti-chi-bot Jun 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ go_library(
"//util",
"//util/admin",
"//util/bitmap",
"//util/breakpoint",
"//util/chunk",
"//util/codec",
"//util/collate",
Expand Down
11 changes: 3 additions & 8 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/breakpoint"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hint"
Expand Down Expand Up @@ -415,10 +416,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.observeStmtBeginForTopSQL(ctx)

failpoint.Inject("hookBeforeFirstRunExecutor", func() {
sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookBeforeFirstRunExecutorKey)
})

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun)
if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
return nil, err
Expand Down Expand Up @@ -795,10 +793,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
if err != nil {
return nil, err
}

failpoint.Inject("hookAfterOnStmtRetryWithLockError", func() {
sessiontxn.ExecTestHook(a.Ctx, sessiontxn.HookAfterOnStmtRetryWithLockErrorKey)
})
breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)

e, err := a.buildExecutor()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions sessiontxn/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ var AssertTxnInfoSchemaKey stringutil.StringerStr = "assertTxnInfoSchemaKey"
// Only for test
var AssertTxnInfoSchemaAfterRetryKey stringutil.StringerStr = "assertTxnInfoSchemaAfterRetryKey"

// HookBeforeFirstRunExecutorKey is the hook key for the executor first run
// BreakPointBeforeExecutorFirstRun is the key for the stop point where session stops before executor's first run
// Only for test
var HookBeforeFirstRunExecutorKey stringutil.StringerStr = "testHookKeyBeforeFirstRunExecutor"
var BreakPointBeforeExecutorFirstRun = "beforeExecutorFirstRun"

// HookAfterOnStmtRetryWithLockErrorKey is the hook key for after OnStmtRetry with lock error
// BreakPointOnStmtRetryAfterLockError s the key for the stop point where session stops after OnStmtRetry when lock error happens
// Only for test
var HookAfterOnStmtRetryWithLockErrorKey stringutil.StringerStr = "testHookKeyAfterOnStmtRetryWithLockError"
var BreakPointOnStmtRetryAfterLockError = "lockErrorAndThenOnStmtRetryCalled"

// RecordAssert is used only for test
func RecordAssert(sctx sessionctx.Context, name string, value interface{}) {
Expand Down
147 changes: 53 additions & 94 deletions sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return"))
Expand Down Expand Up @@ -85,8 +83,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec"))
Expand Down Expand Up @@ -729,102 +725,65 @@ func TestStillWriteConflictAfterRetry(t *testing.T) {
"update t1 set v=v+1 where id in (1, 2, 3) and v>0",
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
for _, isolation := range []string{ast.RepeatableRead, ast.ReadCommitted} {
for _, query := range queries {
for _, autocommit := range []bool{true, false} {
t.Run(fmt.Sprintf("%s,%s,autocommit=%v", isolation, query, autocommit), func(t *testing.T) {
testStillWriteConflictAfterRetry(t, store, isolation, query, autocommit)
tk.MustExec("truncate table t1")
tk.MustExec("insert into t1 values(1, 10)")
tk2 := testkit.NewSteppedTestKit(t, store)
defer tk2.MustExec("rollback")

tk2.MustExec("use test")
tk2.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk2.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation))
if autocommit {
tk2.MustExec("set autocommit=1")
tk2.MustExec("begin")
} else {
tk2.MustExec("set autocommit=0")
}

tk2.SetBreakPoints(
sessiontxn.BreakPointBeforeExecutorFirstRun,
sessiontxn.BreakPointOnStmtRetryAfterLockError,
)

var isSelect, isUpdate bool
switch {
case strings.HasPrefix(query, "select"):
isSelect = true
tk2.SteppedMustQuery(query)
case strings.HasPrefix(query, "update"):
isUpdate = true
tk2.SteppedMustExec(query)
default:
require.FailNowf(t, "invalid query: ", query)
}

// Pause the session before the executor first run and then update the record in another session
tk2.ExpectStopOnBreakPoint(sessiontxn.BreakPointBeforeExecutorFirstRun)
tk.MustExec("update t1 set v=v+1")

// Session continues, it should get a lock error and retry, we pause the session before the executor's next run
// and then update the record in another session again.
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
tk.MustExec("update t1 set v=v+1")

// Because the record is updated by another session again, when this session continues, it will get a lock error again.
tk2.Continue().ExpectStopOnBreakPoint(sessiontxn.BreakPointOnStmtRetryAfterLockError)
tk2.Continue().ExpectIdle()
switch {
case isSelect:
tk2.GetQueryResult().Check(testkit.Rows("1 12"))
case isUpdate:
tk2.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
}
})
}
}
}
}

func testStillWriteConflictAfterRetry(t *testing.T, store kv.Storage, isolation string, query string, autocommit bool) {
tk := testkit.NewTestKit(t, store)
defer tk.MustExec("rollback")

tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("set tx_isolation = '%s'", isolation))
tk.MustExec("set autocommit=1")
tk.MustExec("set @@tidb_txn_mode = 'pessimistic'")
tk.MustExec("truncate table t1")
tk.MustExec("insert into t1 values(1, 10)")

se := tk.Session()
chanBeforeRunStmt := make(chan func(), 1)
chanAfterOnStmtRetry := make(chan func(), 1)
c2 := make(chan string, 1)
c3 := make(chan string, 1)
wait := func(ch chan string, expect string) {
select {
case got := <-ch:
if got != expect {
panic(fmt.Sprintf("expect '%s', got '%s'", expect, got))
}
case <-time.After(time.Second * 10):
panic("wait2 timeout")
}
}

if autocommit {
tk.MustExec("begin")
} else {
tk.MustExec("set @@autocommit=0")
}

se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, chanBeforeRunStmt)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, chanAfterOnStmtRetry)
defer func() {
se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil)
}()

chanBeforeRunStmt <- func() {
c2 <- "now before session1 runStmt"
wait(c3, "session2 updated v=v+1 done")
}

chanAfterOnStmtRetry <- func() {
c2 <- "now after OnStmtRetry before rebuild executor"
wait(c3, "session2 updated v=v+1 again done")
}

go func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

// first conflict
wait(c2, "now before session1 runStmt")
tk2.MustExec("update t1 set v=v+1 where id=1")
c3 <- "session2 updated v=v+1 done"

// second conflict
wait(c2, "now after OnStmtRetry before rebuild executor")
tk2.MustExec("update t1 set v=v+1 where id=1")
c3 <- "session2 updated v=v+1 again done"
chanAfterOnStmtRetry <- func() {}
c3 <- "done"
}()

isSelect := false
if strings.HasPrefix(query, "update ") {
tk.MustExec(query)
} else if strings.HasPrefix(query, "select ") {
isSelect = true
tk.MustQuery(query).Check(testkit.Rows("1 12"))
} else {
require.FailNowf(t, "invalid query: %s", query)
}

wait(c3, "done")

se.SetValue(sessiontxn.HookBeforeFirstRunExecutorKey, nil)
se.SetValue(sessiontxn.HookAfterOnStmtRetryWithLockErrorKey, nil)
if isSelect {
tk.MustExec("update t1 set v=v+1")
}
tk.MustExec("commit")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 13"))
tk.MustExec("rollback")
}
3 changes: 3 additions & 0 deletions testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"mocksessionmanager.go",
"mockstore.go",
"result.go",
"stepped.go",
"testkit.go",
],
importpath = "github.com/pingcap/tidb/testkit",
Expand All @@ -23,8 +24,10 @@ go_library(
"//store/mockstore",
"//types",
"//util",
"//util/breakpoint",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
Loading