From 6bfc5f8b44005f2c988d383c9cdf65cf98e7742d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 20 May 2021 20:54:22 +0800 Subject: [PATCH] *: support AS OF TIMESTAMP read-only begin statement (#24740) --- executor/builder.go | 9 +-- executor/simple.go | 32 +++++++-- executor/stale_txn_test.go | 118 ++++++++++++++++++++++++-------- expression/builtin_time_test.go | 4 +- planner/core/common_plans.go | 3 + planner/core/planbuilder.go | 36 +++++++++- session/session.go | 54 +++++++-------- session/session_test.go | 4 +- sessionctx/context.go | 2 +- 9 files changed, 188 insertions(+), 74 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index b04dfe45f4bab..c8a128e505562 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -706,10 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor { base := newBaseExecutor(b.ctx, v.Schema(), v.ID()) base.initCap = chunk.ZeroCapacity e := &SimpleExec{ - baseExecutor: base, - Statement: v.Statement, - IsFromRemote: v.IsFromRemote, - is: b.is, + baseExecutor: base, + Statement: v.Statement, + IsFromRemote: v.IsFromRemote, + is: b.is, + StalenessTxnOption: v.StalenessTxnOption, } return e } diff --git a/executor/simple.go b/executor/simple.go index 3c45ee05ada4e..193949eda3fdd 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -74,6 +74,9 @@ type SimpleExec struct { IsFromRemote bool done bool is infoschema.InfoSchema + + // StalenessTxnOption is used to execute the staleness txn during a read-only begin statement. + StalenessTxnOption *sessionctx.StalenessTxnOption } func (e *baseExecutor) getSysSession() (sessionctx.Context, error) { @@ -566,13 +569,16 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error { } func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { - // If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should + // If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should // always create a new Txn instead of reusing it. if s.ReadOnly { enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs - if !enableNoopFuncs && s.Bound == nil { + if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil { return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY") } + if s.AsOf != nil { + return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s) + } if s.Bound != nil { return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s) } @@ -614,6 +620,22 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { return nil } +func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error { + if e.StalenessTxnOption == nil { + return errors.New("Failed to get timestamp during start transaction read only as of timestamp") + } + if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil { + return err + } + + // With START TRANSACTION, autocommit remains disabled until you end + // the transaction with COMMIT or ROLLBACK. The autocommit mode then + // reverts to its previous state. + e.ctx.GetSessionVars().SetInTxn(true) + return nil +} + +// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement. func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error { opt := sessionctx.StalenessTxnOption{} opt.Mode = s.Bound.Mode @@ -632,8 +654,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte if err != nil { return err } - startTS := oracle.GoTimeToTS(gt) - opt.StartTS = startTS + opt.StartTS = oracle.GoTimeToTS(gt) case ast.TimestampBoundExactStaleness: // TODO: support funcCallExpr in future v, ok := s.Bound.Timestamp.(*driver.ValueExpr) @@ -668,8 +689,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte if err != nil { return err } - startTS := oracle.GoTimeToTS(gt) - opt.StartTS = startTS + opt.StartTS = oracle.GoTimeToTS(gt) } err := e.ctx.NewTxnWithStalenessOption(ctx, opt) if err != nil { diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index db2b55a9a1637..dc5ddd0785208 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -73,13 +73,30 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { zone: "sz", }, { - name: "begin", + name: "begin after TimestampBoundReadTimestamp", preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`, sql: "begin", IsStaleness: false, txnScope: kv.GlobalTxnScope, zone: "", }, + { + name: "AsOfTimestamp", + preSQL: "begin", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`, + IsStaleness: true, + expectPhysicalTS: 1599321600000, + txnScope: "local", + zone: "sh", + }, + { + name: "begin after AsOfTimestamp", + preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`, + sql: "begin", + IsStaleness: false, + txnScope: oracle.GlobalTxnScope, + zone: "", + }, } tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -106,8 +123,8 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { } c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness) tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") } + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") } func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { @@ -147,13 +164,17 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope)) failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`) + // Using NOW() will cause the loss of fsp precision, so we use NOW(3) to be accurate to the millisecond. + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) + tk.MustQuery(testcase.sql) + tk.MustExec(`commit`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) tk.MustQuery(testcase.sql) tk.MustExec(`commit`) - failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") } + failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag") } func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { @@ -169,6 +190,17 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) { tk.MustExec(updateSafePoint) // set @@tidb_snapshot before staleness txn tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) + // 1599321600000 == 2020-09-06 00:00:00 + c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) + tk.MustExec("commit") + // set @@tidb_snapshot during staleness txn + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`) + tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) + c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) + tk.MustExec("commit") + // set @@tidb_snapshot before staleness txn + tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`) c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000)) tk.MustExec("commit") @@ -190,23 +222,20 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { name string sql string injectSafeTS uint64 - useSafeTS bool + // compareWithSafeTS will be 0 if StartTS==SafeTS, -1 if StartTS < SafeTS, and +1 if StartTS > SafeTS. + compareWithSafeTS int }{ { - name: "max 20 seconds ago, safeTS 10 secs ago", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, - injectSafeTS: func() uint64 { - return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)) - }(), - useSafeTS: true, + name: "max 20 seconds ago, safeTS 10 secs ago", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, }, { - name: "max 10 seconds ago, safeTS 20 secs ago", - sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, - injectSafeTS: func() uint64 { - return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)) - }(), - useSafeTS: false, + name: "max 10 seconds ago, safeTS 20 secs ago", + sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, }, { name: "max 20 seconds ago, safeTS 10 secs ago", @@ -214,10 +243,8 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05")) }(), - injectSafeTS: func() uint64 { - return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)) - }(), - useSafeTS: true, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, }, { name: "max 10 seconds ago, safeTS 20 secs ago", @@ -225,25 +252,46 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05")) }(), - injectSafeTS: func() uint64 { - return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)) - }(), - useSafeTS: false, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, + }, + { + name: "20 seconds ago to now, safeTS 10 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW())`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)), + compareWithSafeTS: 0, + }, + { + name: "10 seconds ago to now, safeTS 20 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 10 SECOND, NOW())`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)), + compareWithSafeTS: 1, + }, + { + name: "20 seconds ago to 10 seconds ago, safeTS 5 secs ago", + sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW() - INTERVAL 10 SECOND)`, + injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)), + compareWithSafeTS: -1, }, } for _, testcase := range testcases { c.Log(testcase.name) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) tk.MustExec(testcase.sql) - if testcase.useSafeTS { + if testcase.compareWithSafeTS == 1 { + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS) + } else if testcase.compareWithSafeTS == 0 { c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS) } else { - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS) + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Less, testcase.injectSafeTS) } tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } + failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { @@ -263,4 +311,16 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) { schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() // got an old infoSchema c.Assert(schemaVer3, Equals, schemaVer1) + + schemaVer4 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + time.Sleep(time.Second) + tk.MustExec("create table t (id int primary key);") + schemaVer5 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // confirm schema changed + c.Assert(schemaVer4, Less, schemaVer5) + + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 1 SECOND`) + schemaVer6 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion() + // got an old infoSchema + c.Assert(schemaVer6, Equals, schemaVer4) } diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index e247e8756ae9a..4015794377486 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -2928,7 +2928,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { // Test whether it's deterministic. safeTime1 := t2.Add(-1 * time.Second) - safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0) + safeTS1 := oracle.GoTimeToTS(safeTime1) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS1)), IsNil) f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) @@ -2941,7 +2941,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) // SafeTS updated. safeTime2 := t2.Add(1 * time.Second) - safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0) + safeTS2 := oracle.GoTimeToTS(safeTime2) c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", fmt.Sprintf("return(%v)", safeTS2)), IsNil) f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 3818486955646..b42a84d926eeb 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -723,6 +723,9 @@ type Simple struct { // and executing in co-processor. // Used for `global kill`. See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md. IsFromRemote bool + + // StalenessTxnOption is the transaction option that will be built when planner builder calls buildSimple. + StalenessTxnOption *sessionctx.StalenessTxnOption } // PhysicalSimpleWrapper is a wrapper of `Simple` to implement physical plan interface. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index ccab0a28cc863..2ae66a61602a5 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -643,7 +644,7 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) { *ast.GrantStmt, *ast.DropUserStmt, *ast.AlterUserStmt, *ast.RevokeStmt, *ast.KillStmt, *ast.DropStatsStmt, *ast.GrantRoleStmt, *ast.RevokeRoleStmt, *ast.SetRoleStmt, *ast.SetDefaultRoleStmt, *ast.ShutdownStmt, *ast.RenameUserStmt: - return b.buildSimple(node.(ast.StmtNode)) + return b.buildSimple(ctx, node.(ast.StmtNode)) case ast.DDLNode: return b.buildDDL(ctx, x) case *ast.CreateBindingStmt: @@ -2259,7 +2260,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan, return np, nil } -func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) { +func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, error) { p := &Simple{Statement: node} switch raw := node.(type) { @@ -2325,10 +2326,41 @@ func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) { } case *ast.ShutdownStmt: b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil) + case *ast.BeginStmt: + if raw.AsOf != nil { + startTS, err := b.calculateTsExpr(raw.AsOf) + if err != nil { + return nil, err + } + p.StalenessTxnOption = &sessionctx.StalenessTxnOption{ + Mode: ast.TimestampBoundReadTimestamp, + StartTS: startTS, + } + } } return p, nil } +// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS. +func (b *PlanBuilder) calculateTsExpr(asOfClause *ast.AsOfClause) (uint64, error) { + tsVal, err := evalAstExpr(b.ctx, asOfClause.TsExpr) + if err != nil { + return 0, err + } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) + // We need at least the millionsecond here, so set fsp to 3. + toTypeTimestamp.Decimal = 3 + tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp) + if err != nil { + return 0, err + } + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + if err != nil { + return 0, err + } + return oracle.GoTimeToTS(tsTime), nil +} + func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) []visitInfo { // To use REVOKE, you must have the GRANT OPTION privilege, // and you must have the privileges that you are granting. diff --git a/session/session.go b/session/session.go index f116daf96dd04..2874330a02fd3 100644 --- a/session/session.go +++ b/session/session.go @@ -1960,20 +1960,9 @@ func (s *session) isTxnRetryable() bool { } func (s *session) NewTxn(ctx context.Context) error { - if s.txn.Valid() { - txnStartTS := s.txn.StartTS() - txnScope := s.GetSessionVars().TxnCtx.TxnScope - err := s.CommitTxn(ctx) - if err != nil { - return err - } - vars := s.GetSessionVars() - logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", - zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnStartTS), - zap.String("txnScope", txnScope)) + if err := s.checkBeforeNewTxn(ctx); err != nil { + return err } - txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err @@ -1995,6 +1984,23 @@ func (s *session) NewTxn(ctx context.Context) error { return nil } +func (s *session) checkBeforeNewTxn(ctx context.Context) error { + if s.txn.Valid() { + txnStartTS := s.txn.StartTS() + txnScope := s.GetSessionVars().TxnCtx.TxnScope + err := s.CommitTxn(ctx) + if err != nil { + return err + } + vars := s.GetSessionVars() + logutil.Logger(ctx).Info("Try to create a new txn inside a transaction auto commit", + zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), + zap.Uint64("txnStartTS", txnStartTS), + zap.String("txnScope", txnScope)) + } + return nil +} + func (s *session) SetValue(key fmt.Stringer, value interface{}) { s.mu.Lock() s.mu.values[key] = value @@ -2782,22 +2788,14 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { // NewTxnWithStalenessOption create a transaction with Staleness option func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionctx.StalenessTxnOption) error { - if s.txn.Valid() { - txnID := s.txn.StartTS() - txnScope := s.txn.GetOption(kv.TxnScope).(string) - err := s.CommitTxn(ctx) - if err != nil { - return err - } - vars := s.GetSessionVars() - logutil.Logger(ctx).Info("InitTxnWithExactStaleness() inside a transaction auto commit", - zap.Int64("schemaVersion", vars.GetInfoSchema().SchemaMetaVersion()), - zap.Uint64("txnStartTS", txnID), - zap.String("txnScope", txnScope)) + err := s.checkBeforeNewTxn(ctx) + if err != nil { + return err } - var txn kv.Transaction - var err error - txnScope := s.GetSessionVars().CheckAndGetTxnScope() + var ( + txn kv.Transaction + txnScope = s.GetSessionVars().CheckAndGetTxnScope() + ) switch option.Mode { case ast.TimestampBoundReadTimestamp: txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) diff --git a/session/session_test.go b/session/session_test.go index a6c7908237bca..9d2d63cb02804 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4109,7 +4109,7 @@ func (s *testSessionSerialSuite) TestValidateReadOnlyInStalenessTransaction(c *C tk.MustExec(`set @@tidb_enable_noop_functions=1;`) for _, testcase := range testcases { c.Log(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) if testcase.isValidate { _, err := tk.Exec(testcase.sql) c.Assert(err, IsNil) @@ -4169,7 +4169,7 @@ func (s *testSessionSerialSuite) TestSpecialSQLInStalenessTxn(c *C) { tk.MustExec("CREATE USER 'newuser' IDENTIFIED BY 'mypassword';") for _, testcase := range testcases { comment := Commentf(testcase.name) - tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`) + tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, true, comment) tk.MustExec(testcase.sql) c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.sameSession, comment) diff --git a/sessionctx/context.go b/sessionctx/context.go index 2aeda663a038d..59a917f86a9bc 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -73,7 +73,7 @@ type Context interface { // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error - // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption + // NewTxnWithStalenessOption initializes a transaction with StalenessTxnOption. NewTxnWithStalenessOption(ctx context.Context, option StalenessTxnOption) error // GetStore returns the store of session.