Skip to content

Commit

Permalink
executor: support checking schemaVer before staleness transaction beg…
Browse files Browse the repository at this point in the history
…ins (#22679)
  • Loading branch information
Yisaer authored Feb 4, 2021
1 parent feb9da8 commit 93d3d04
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
49 changes: 49 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7487,6 +7487,8 @@ func (s *testSuite) TestIssue15563(c *C) {
}

func (s *testSuite) TestStalenessTransaction(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
testcases := []struct {
name string
preSQL string
Expand Down Expand Up @@ -7562,6 +7564,8 @@ func (s *testSuite) TestStalenessTransaction(c *C) {
}

func (s *testSuite) TestStalenessAndHistoryRead(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
Expand Down Expand Up @@ -7607,3 +7611,48 @@ func (s *testSuite) TestIssue22201(c *C) {
tk.MustQuery("SELECT HEX(WEIGHT_STRING('ab' AS char(1000000000000000000)));").Check(testkit.Rows("<nil>"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1301 Result of weight_string() was larger than max_allowed_packet (67108864) - truncated"))
}

func (s *testSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
testcases := []struct {
name string
sql string
expectErr error
}{
{
name: "ddl change before stale txn",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`,
expectErr: errors.New("schema version changed after the staleness startTS"),
},
{
name: "ddl change before stale txn",
sql: fmt.Sprintf("START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '%v'",
time.Now().Truncate(3*time.Second).Format("2006-01-02 15:04:05")),
expectErr: errors.New(".*schema version changed after the staleness startTS.*"),
},
{
name: "ddl change before stale txn",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:03'`,
expectErr: nil,
},
}
tk := testkit.NewTestKitWithInit(c, s.store)
for _, testcase := range testcases {
check := func() {
if testcase.expectErr != nil {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(true)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
} else {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
}
_, err := tk.Exec(testcase.sql)
if testcase.expectErr != nil {
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, testcase.expectErr.Error())
} else {
c.Assert(err, IsNil)
}
}
check()
}
}
22 changes: 22 additions & 0 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -629,6 +630,27 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
dom := domain.GetDomain(e.ctx)
m, err := dom.GetSnapshotMeta(e.ctx.GetSessionVars().TxnCtx.StartTS)
if err != nil {
return err
}
staleVer, err := m.GetSchemaVersion()
if err != nil {
return err
}
failpoint.Inject("mockStalenessTxnSchemaVer", func(val failpoint.Value) {
if val.(bool) {
staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion - 1
} else {
staleVer = e.ctx.GetSessionVars().TxnCtx.SchemaVersion
}
})
// TODO: currently we directly check the schema version. In future, we can cache the stale infoschema instead.
if e.ctx.GetSessionVars().TxnCtx.SchemaVersion > staleVer {
return errors.New("schema version changed after the staleness startTS")
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
Expand Down
4 changes: 4 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3871,6 +3871,8 @@ func (s *testSessionSerialSuite) TestIssue21943(c *C) {
}

func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
testcases := []struct {
name string
sql string
Expand Down Expand Up @@ -4015,6 +4017,8 @@ func (s *testSessionSuite) TestValidateReadOnlyInStalenessTransaction(c *C) {
}

func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer", "return(false)"), IsNil)
defer failpoint.Disable("github.com/pingcap/tidb/executor/mockStalenessTxnSchemaVer")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
testcases := []struct {
Expand Down

0 comments on commit 93d3d04

Please sign in to comment.