Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement set transaction read only as of transaction #24766

Merged
19 changes: 17 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
// start transaction read only as of failed due to we set tx_read_ts before
if e.ctx.GetSessionVars().TxnReadTS > 0 {
return errors.New("start transaction read only as of is forbidden after set transaction read only as of")
}
if err := e.ctx.NewTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil {
return err
}
Expand All @@ -584,18 +588,29 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}
}
// When TxnReadTS is not 0, it indicates the transaction is staleness transaction
if e.ctx.GetSessionVars().TxnReadTS > 0 {
startTS := e.ctx.GetSessionVars().TxnReadTS
// clear TxnReadTS after we used it.
e.ctx.GetSessionVars().TxnReadTS = 0
if err := e.ctx.NewTxnWithStartTS(ctx, startTS); err != nil {
return err
}
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
// always create a new transaction.
txnCtx := e.ctx.GetSessionVars().TxnCtx
if txnCtx.History != nil || txnCtx.IsStaleness {
err := e.ctx.NewTxn(ctx)
if err != nil {
return err
}
}
} // always create a new transaction.
Yisaer marked this conversation as resolved.
Show resolved Hide resolved

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
Expand Down
48 changes: 48 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -242,3 +243,50 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)
tk.MustExec("commit")
}

func (s *testStaleTxnSerialSuite) TestSetTransactionReadOnlyAsOf(c *C) {
t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04")
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, s.store)
testcases := []struct {
sql string
expectedTS uint64
injectSafeTS uint64
}{
{
sql: `SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`,
expectedTS: 424394603102208000,
injectSafeTS: 0,
},
{
sql: `SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness('2015-09-21 00:07:01', '2021-04-27 11:26:13')`,
expectedTS: oracle.GoTimeToTS(t1),
injectSafeTS: oracle.GoTimeToTS(t1),
},
}
for _, testcase := range testcases {
if testcase.injectSafeTS > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
}
tk.MustExec(testcase.sql)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, testcase.expectedTS)
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.expectedTS)
tk.MustExec("commit")
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Not(Equals), testcase.expectedTS)

failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
}

err = tk.ExecToErr(`SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness(invalid1, invalid2')`)
c.Assert(err, NotNil)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))

tk.MustExec(`SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`)
err = tk.ExecToErr(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00'`)
c.Assert(err, NotNil)
c.Assert(err, Equals, "start transaction read only as of is forbidden after set transaction read only as of")
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ type SessionVars struct {
// SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request.
SnapshotTS uint64

// TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS.
TxnReadTS uint64

// SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema
// version, we load an old version schema for query.
SnapshotInfoschema interface{}
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ var defaultSysVars = []*SysVar{
}, GetSession: func(s *SessionVars) (string, error) {
return s.TxnScope.GetVarValue(), nil
}},
{Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", SetSession: func(s *SessionVars, val string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sysvar exposed to users?

Copy link
Contributor Author

@Yisaer Yisaer May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to exposed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, users may also use this variable to set read ts.

return setTxnReadTS(s, val)
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error {
s.allowMPPExecution = val
return nil
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ const (

// TiDBTxnScope indicates whether using global transactions or local transactions.
TiDBTxnScope = "txn_scope"

// TiDBTxnReadTS indicates the next transaction should be staleness transaction and provide the startTS
TiDBTxnReadTS = "tx_read_ts"
)

// TiDB system variable names that both in session and global scope.
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,23 @@ func setSnapshotTS(s *SessionVars, sVal string) error {
return err
}

func setTxnReadTS(s *SessionVars, sVal string) error {
if sVal == "" {
s.TxnReadTS = 0
return nil
}
t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return err
}
t1, err := t.GoTime(s.TimeZone)
if err != nil {
return err
}
s.TxnReadTS = oracle.GoTimeToTS(t1)
return err
}

// serverGlobalVariable is used to handle variables that acts in server and global scope.
type serverGlobalVariable struct {
sync.Mutex
Expand Down