diff --git a/executor/simple.go b/executor/simple.go index 2e9f8c89ea4be..0eadfd5f85dd2 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -574,6 +574,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY") } if s.AsOf != nil { + // start transaction read only as of failed due to we set tx_read_ts before + if e.ctx.GetSessionVars().TxnReadTS > 0 { + return errors.New("start transaction read only as of is forbidden after set transaction read only as of") + } if err := e.ctx.NewTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil { return err } @@ -584,6 +588,17 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return nil } } + // When TxnReadTS is not 0, it indicates the transaction is staleness transaction + if e.ctx.GetSessionVars().TxnReadTS > 0 { + startTS := e.ctx.GetSessionVars().TxnReadTS + // clear TxnReadTS after we used it. + e.ctx.GetSessionVars().TxnReadTS = 0 + if err := e.ctx.NewTxnWithStartTS(ctx, startTS); err != nil { + return err + } + e.ctx.GetSessionVars().SetInTxn(true) + return nil + } // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the // need to call NewTxn, which commits the existing transaction and begins a new one. diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 1f4f24a7f9f37..db4705f428d14 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testkit" ) @@ -242,3 +243,50 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1) tk.MustExec("commit") } + +func (s *testStaleTxnSerialSuite) TestSetTransactionReadOnlyAsOf(c *C) { + t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04") + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, s.store) + testcases := []struct { + sql string + expectedTS uint64 + injectSafeTS uint64 + }{ + { + sql: `SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`, + expectedTS: 424394603102208000, + injectSafeTS: 0, + }, + { + sql: `SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness('2015-09-21 00:07:01', '2021-04-27 11:26:13')`, + expectedTS: oracle.GoTimeToTS(t1), + injectSafeTS: oracle.GoTimeToTS(t1), + }, + } + for _, testcase := range testcases { + if testcase.injectSafeTS > 0 { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) + } + tk.MustExec(testcase.sql) + c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, testcase.expectedTS) + tk.MustExec("begin") + c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0)) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.expectedTS) + tk.MustExec("commit") + tk.MustExec("begin") + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Not(Equals), testcase.expectedTS) + + failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") + } + + err = tk.ExecToErr(`SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness(invalid1, invalid2')`) + c.Assert(err, NotNil) + c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0)) + + tk.MustExec(`SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`) + err = tk.ExecToErr(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00'`) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "start transaction read only as of is forbidden after set transaction read only as of") +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f2536e4edcb16..8bd2d6940306e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -462,6 +462,9 @@ type SessionVars struct { // SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request. SnapshotTS uint64 + // TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS. + TxnReadTS uint64 + // SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema // version, we load an old version schema for query. SnapshotInfoschema interface{} diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index ee633e0d1afdf..31ee28de2b5a8 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -825,6 +825,9 @@ var defaultSysVars = []*SysVar{ }, GetSession: func(s *SessionVars) (string, error) { return s.TxnScope.GetVarValue(), nil }}, + {Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", SetSession: func(s *SessionVars, val string) error { + return setTxnReadTS(s, val) + }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { s.allowMPPExecution = val return nil diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3fb99204ee2ea..5d7897ff9273a 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -203,6 +203,9 @@ const ( // TiDBTxnScope indicates whether using global transactions or local transactions. TiDBTxnScope = "txn_scope" + + // TiDBTxnReadTS indicates the next transaction should be staleness transaction and provide the startTS + TiDBTxnReadTS = "tx_read_ts" ) // TiDB system variable names that both in session and global scope. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index d42128e73c612..653d6e4bdd8ca 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -378,6 +378,23 @@ func setSnapshotTS(s *SessionVars, sVal string) error { return err } +func setTxnReadTS(s *SessionVars, sVal string) error { + if sVal == "" { + s.TxnReadTS = 0 + return nil + } + t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp) + if err != nil { + return err + } + t1, err := t.GoTime(s.TimeZone) + if err != nil { + return err + } + s.TxnReadTS = oracle.GoTimeToTS(t1) + return err +} + // serverGlobalVariable is used to handle variables that acts in server and global scope. type serverGlobalVariable struct { sync.Mutex