Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement set transaction read only as of transaction #24766

Merged
Merged
23 changes: 22 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,13 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error {
}

func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// When TxnReadTS is not 0, it indicates the transaction is staleness transaction
if e.ctx.GetSessionVars().TxnReadTS > 0 {
startTS := e.ctx.GetSessionVars().TxnReadTS
// clear TxnReadTS after we used it.
e.ctx.GetSessionVars().TxnReadTS = 0
return e.executeBeginWithReadTS(ctx, startTS)
}
// If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should
// always create a new Txn instead of reusing it.
if s.ReadOnly {
Expand Down Expand Up @@ -620,14 +627,28 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeBeginWithReadTS(ctx context.Context, startTS uint64) error {
opt := sessionctx.StalenessTxnOption{}
opt.Mode = ast.TimestampBoundReadTimestamp
opt.StartTS = startTS
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
if err != nil {
return err
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error {
if e.StalenessTxnOption == nil {
return errors.New("Failed to get timestamp during start transaction read only as of timestamp")
}
if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
Expand Down
43 changes: 43 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -324,3 +325,45 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
// got an old infoSchema
c.Assert(schemaVer6, Equals, schemaVer4)
}

func (s *testStaleTxnSerialSuite) TestSetTransactionReadOnlyAsOf(c *C) {
t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04")
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, s.store)
testcases := []struct {
sql string
expectedTS uint64
injectSafeTS uint64
}{
{
sql: `SET TRANSACTION READ ONLY as of timestamp '2021-04-21 00:42:12'`,
expectedTS: 424394603102208000,
injectSafeTS: 0,
},
{
sql: `SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness('2015-09-21 00:07:01', '2021-04-27 11:26:13')`,
expectedTS: oracle.GoTimeToTS(t1),
injectSafeTS: oracle.GoTimeToTS(t1),
},
}
for _, testcase := range testcases {
if testcase.injectSafeTS > 0 {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
}
tk.MustExec(testcase.sql)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, testcase.expectedTS)
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.expectedTS)
tk.MustExec("commit")
tk.MustExec("begin")
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Not(Equals), testcase.expectedTS)

failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
}

err = tk.ExecToErr(`SET TRANSACTION READ ONLY as of timestamp tidb_bounded_staleness(invalid1, invalid2')`)
c.Assert(err, NotNil)
c.Assert(tk.Se.GetSessionVars().TxnReadTS, Equals, uint64(0))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6
github.com/pingcap/parser v0.0.0-20210519103238-c4cd8114d53f
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 h1:wsH3psMH5ksDowsN9VUE9ZqSrX6oF4AYQQfOunkvSfU=
github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
github.com/pingcap/parser v0.0.0-20210519103238-c4cd8114d53f h1:N1g4fuvLA7rQar3Q+T9967kPCwuQnXtBMNr+qSnCb/M=
github.com/pingcap/parser v0.0.0-20210519103238-c4cd8114d53f/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,9 @@ type SessionVars struct {
// SnapshotTS is used for reading history data. For simplicity, SnapshotTS only supports distsql request.
SnapshotTS uint64

// TxnReadTS is used for staleness transaction, it provides next staleness transaction startTS.
TxnReadTS uint64

// SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema
// version, we load an old version schema for query.
SnapshotInfoschema interface{}
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,13 @@ var defaultSysVars = []*SysVar{
}
return nil
}},
{Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", SetSession: func(s *SessionVars, val string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sysvar exposed to users?

Copy link
Contributor Author

@Yisaer Yisaer May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to exposed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, users may also use this variable to set read ts.

err := setTxnReadTS(s, val)
if err != nil {
return err
}
return nil
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error {
s.allowMPPExecution = val
return nil
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ const (

// TiDBTxnScope indicates whether using global transactions or local transactions.
TiDBTxnScope = "txn_scope"

// TiDBTxnReadTS indicates the next transaction should be staleness transaction and provide the startTS
TiDBTxnReadTS = "tx_read_ts"
)

// TiDB system variable names that both in session and global scope.
Expand Down
17 changes: 17 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,23 @@ func setSnapshotTS(s *SessionVars, sVal string) error {
return err
}

func setTxnReadTS(s *SessionVars, sVal string) error {
if sVal == "" {
s.TxnReadTS = 0
return nil
}
t, err := types.ParseTime(s.StmtCtx, sVal, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return err
}
t1, err := t.GoTime(s.TimeZone)
if err != nil {
return err
}
s.TxnReadTS = GoTimeToTS(t1)
return err
}

// GoTimeToTS converts a Go time to uint64 timestamp.
func GoTimeToTS(t time.Time) uint64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is duplicated with oracle.GoTimeToTS, maybe we should remove it and use the latter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

ts := (t.UnixNano() / int64(time.Millisecond)) << epochShiftBits
Expand Down