Skip to content

Commit

Permalink
executor: implement set transaction read only as of transaction (#24766)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed May 25, 2021
1 parent ab5cf85 commit 6e9d275
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 0 deletions.
15 changes: 15 additions & 0 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
// start transaction read only as of failed due to we set tx_read_ts before
if e.ctx.GetSessionVars().TxnReadTS > 0 {
return errors.New("start transaction read only as of is forbidden after set transaction read only as of")
}
if err := e.ctx.NewTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil {
return err
}
Expand All @@ -584,6 +588,17 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}
}
// When TxnReadTS is not 0, it indicates the transaction is staleness transaction
if e.ctx.GetSessionVars().TxnReadTS > 0 {
startTS := e.ctx.GetSessionVars().TxnReadTS
// clear TxnReadTS after we used it.
e.ctx.GetSessionVars().TxnReadTS = 0
if err := e.ctx.NewTxnWithStartTS(ctx, startTS); err != nil {
return err
}
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
Expand Down
48 changes: 48 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -242,3 +243,50 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)
tk.MustExec("commit")
}

func (s *testStaleTxnSerialSuite) TestSetTransactionReadOnlyAsOf(c *C) {
t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04")
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, s.store)
testcases := []struct {
sql string
expectedTS uint64
injectSafeTS uint64
}{
{
sql: `SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`,
expectedTS: 424394603102208000,
injectSafeTS: 0,
},
{
sql: `SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness('2015-09-21 00:07:01', '2021-04-27 11:26:13')`,
expectedTS: oracle.GoTimeToTS(t1),
injectSafeTS: oracle.GoTimeToTS(t1),
},
}
for _, testcase := range testcases {
if testcase.injectSafeTS > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
}
tk.MustExec(testcase.sql)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, testcase.expectedTS)
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.expectedTS)
tk.MustExec("commit")
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Not(Equals), testcase.expectedTS)

failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
}

err = tk.ExecToErr(`SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness(invalid1, invalid2')`)
c.Assert(err, NotNil)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))

tk.MustExec(`SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`)
err = tk.ExecToErr(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00'`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "start transaction read only as of is forbidden after set transaction read only as of")
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ type SessionVars struct {
// SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request.
SnapshotTS uint64

// TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS.
TxnReadTS uint64

// SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema
// version, we load an old version schema for query.
SnapshotInfoschema interface{}
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ var defaultSysVars = []*SysVar{
}, GetSession: func(s *SessionVars) (string, error) {
return s.TxnScope.GetVarValue(), nil
}},
{Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", SetSession: func(s *SessionVars, val string) error {
return setTxnReadTS(s, val)
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error {
s.allowMPPExecution = val
return nil
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ const (

// TiDBTxnScope indicates whether using global transactions or local transactions.
TiDBTxnScope = "txn_scope"

// TiDBTxnReadTS indicates the next transaction should be staleness transaction and provide the startTS
TiDBTxnReadTS = "tx_read_ts"
)

// TiDB system variable names that both in session and global scope.
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,23 @@ func setSnapshotTS(s *SessionVars, sVal string) error {
return err
}

func setTxnReadTS(s *SessionVars, sVal string) error {
if sVal == "" {
s.TxnReadTS = 0
return nil
}
t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return err
}
t1, err := t.GoTime(s.TimeZone)
if err != nil {
return err
}
s.TxnReadTS = oracle.GoTimeToTS(t1)
return err
}

// serverGlobalVariable is used to handle variables that acts in server and global scope.
type serverGlobalVariable struct {
sync.Mutex
Expand Down

0 comments on commit 6e9d275

Please sign in to comment.