Skip to content

Commit

Permalink
Merge branch 'master' into resource-tag
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu committed May 25, 2021
2 parents cf4aaad + 98f0d76 commit 0b35aba
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 508 deletions.
10 changes: 5 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
StalenessTxnOption: v.StalenessTxnOption,
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
staleTxnStartTS: v.StaleTxnStartTS,
}
return e
}
Expand Down
104 changes: 11 additions & 93 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -75,8 +72,8 @@ type SimpleExec struct {
done bool
is infoschema.InfoSchema

// StalenessTxnOption is used to execute the staleness txn during a read-only begin statement.
StalenessTxnOption *sessionctx.StalenessTxnOption
// staleTxnStartTS is the StartTS that is used to execute the staleness txn during a read-only begin statement.
staleTxnStartTS uint64
}

func (e *baseExecutor) getSysSession() (sessionctx.Context, error) {
Expand Down Expand Up @@ -573,14 +570,18 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// always create a new Txn instead of reusing it.
if s.ReadOnly {
enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs
if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil {
if !enableNoopFuncs && s.AsOf == nil {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s)
}
if s.Bound != nil {
return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s)
if err := e.ctx.NewTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil {
return err
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}
}

Expand Down Expand Up @@ -620,89 +621,6 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error {
if e.StalenessTxnOption == nil {
return errors.New("Failed to get timestamp during start transaction read only as of timestamp")
}
if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement.
func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error {
opt := sessionctx.StalenessTxnOption{}
opt.Mode = s.Bound.Mode
switch s.Bound.Mode {
case ast.TimestampBoundReadTimestamp:
// TODO: support funcCallExpr in future
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
if !ok {
return errors.New("Invalid value for Bound Timestamp")
}
t, err := types.ParseTime(e.ctx.GetSessionVars().StmtCtx, v.GetString(), v.GetType().Tp, types.GetFsp(v.GetString()))
if err != nil {
return err
}
gt, err := t.GoTime(e.ctx.GetSessionVars().TimeZone)
if err != nil {
return err
}
opt.StartTS = oracle.GoTimeToTS(gt)
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
if !ok {
return errors.New("Invalid value for Bound Timestamp")
}
d, err := types.ParseDuration(e.ctx.GetSessionVars().StmtCtx, v.GetString(), types.GetFsp(v.GetString()))
if err != nil {
return err
}
opt.PrevSec = uint64(d.Seconds())
case ast.TimestampBoundMaxStaleness:
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
if !ok {
return errors.New("Invalid value for Bound Timestamp")
}
d, err := types.ParseDuration(e.ctx.GetSessionVars().StmtCtx, v.GetString(), types.GetFsp(v.GetString()))
if err != nil {
return err
}
opt.PrevSec = uint64(d.Seconds())
case ast.TimestampBoundMinReadTimestamp:
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
if !ok {
return errors.New("Invalid value for Bound Timestamp")
}
t, err := types.ParseTime(e.ctx.GetSessionVars().StmtCtx, v.GetString(), v.GetType().Tp, types.GetFsp(v.GetString()))
if err != nil {
return err
}
gt, err := t.GoTime(e.ctx.GetSessionVars().TimeZone)
if err != nil {
return err
}
opt.StartTS = oracle.GoTimeToTS(gt)
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
if err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

func (e *SimpleExec) executeRevokeRole(s *ast.RevokeRoleStmt) error {
for _, role := range s.Roles {
exists, err := userExists(e.ctx, role.Username, role.Hostname)
Expand Down
136 changes: 30 additions & 106 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
)
Expand All @@ -32,66 +31,38 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
sql string
IsStaleness bool
expectPhysicalTS int64
preSec int64
txnScope string
zone string
}{
{
name: "TimestampBoundExactStaleness",
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`,
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
IsStaleness: true,
expectPhysicalTS: 1599321600000,
txnScope: "local",
zone: "sh",
},
{
name: "TimestampBoundReadTimestamp",
name: "AsOfTimestamp",
preSQL: "begin",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
IsStaleness: true,
expectPhysicalTS: 1599321600000,
txnScope: "local",
zone: "bj",
},
{
name: "TimestampBoundExactStaleness",
preSQL: "begin",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`,
IsStaleness: true,
preSec: 20,
txnScope: "local",
zone: "sh",
},
{
name: "TimestampBoundExactStaleness",
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:20';`,
IsStaleness: true,
preSec: 20,
txnScope: "local",
zone: "sz",
zone: "sh",
},
{
name: "begin after TimestampBoundReadTimestamp",
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
name: "begin after AsOfTimestamp",
preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: kv.GlobalTxnScope,
txnScope: oracle.GlobalTxnScope,
zone: "",
},
{
name: "AsOfTimestamp",
name: "AsOfTimestamp with tidb_bounded_staleness",
preSQL: "begin",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness('2015-09-21 00:07:01', NOW());`,
IsStaleness: true,
expectPhysicalTS: 1599321600000,
expectPhysicalTS: 1442765221000,
txnScope: "local",
zone: "sh",
zone: "bj",
},
{
name: "begin after AsOfTimestamp",
preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
name: "begin after AsOfTimestamp with tidb_bounded_staleness",
preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness('2015-09-21 00:07:01', NOW());`,
sql: "begin",
IsStaleness: false,
txnScope: oracle.GlobalTxnScope,
Expand All @@ -107,21 +78,15 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
tk.MustExec(fmt.Sprintf("set @@txn_scope=%v", testcase.txnScope))
tk.MustExec(testcase.preSQL)
tk.MustExec(testcase.sql)
c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness)
if testcase.expectPhysicalTS > 0 {
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, testcase.expectPhysicalTS)
} else if testcase.preSec > 0 {
curSec := time.Now().Unix()
startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS)
// exact stale txn tolerate 2 seconds deviation for startTS
c.Assert(startTS, Greater, (curSec-testcase.preSec-2)*1000)
c.Assert(startTS, Less, (curSec-testcase.preSec+2)*1000)
} else if !testcase.IsStaleness {
curSec := time.Now().Unix()
curTS := oracle.ExtractPhysical(oracle.GoTimeToTS(time.Now()))
startTS := oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS)
c.Assert(curSec*1000-startTS, Less, time.Second/time.Millisecond)
c.Assert(startTS-curSec*1000, Less, time.Second/time.Millisecond)
c.Assert(curTS-startTS, Less, time.Second.Milliseconds())
c.Assert(startTS-curTS, Less, time.Second.Milliseconds())
}
c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness)
tk.MustExec("commit")
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
Expand Down Expand Up @@ -168,9 +133,6 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
Expand Down Expand Up @@ -199,16 +161,6 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) {
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot before staleness txn
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot during staleness txn
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`)
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
}

func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
Expand All @@ -217,44 +169,13 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")
defer tk.MustExec(`drop table if exists t`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`)
testcases := []struct {
name string
sql string
injectSafeTS uint64
// compareWithSafeTS will be 0 if StartTS==SafeTS, -1 if StartTS < SafeTS, and +1 if StartTS > SafeTS.
compareWithSafeTS int
}{
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "20 seconds ago to now, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW())`,
Expand All @@ -273,6 +194,18 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)),
compareWithSafeTS: -1,
},
{
name: "exact timestamp 5 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 5 SECOND`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "exact timestamp 10 seconds ago, safeTS 5 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 10 SECOND`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)),
compareWithSafeTS: -1,
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
Expand Down Expand Up @@ -300,21 +233,12 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")

// test exact
// test as of
schemaVer1 := tk.Se.GetInfoSchema().SchemaMetaVersion()
time1 := time.Now()
tk.MustExec("drop table if exists t")
c.Assert(schemaVer1, Less, tk.Se.GetInfoSchema().SchemaMetaVersion())
tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")))
tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer1)
tk.MustExec("commit")

// test as of
schemaVer2 := tk.Se.GetInfoSchema().SchemaMetaVersion()
time2 := time.Now()
tk.MustExec("create table t (id int primary key);")
c.Assert(schemaVer2, Less, tk.Se.GetInfoSchema().SchemaMetaVersion())
tk.MustExec(fmt.Sprintf(`START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer2)
tk.MustExec("commit")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pingcap/badger v1.5.1-0.20200908111422-2e78ee155d19
github.com/pingcap/br v5.0.0-nightly.0.20210419090151-03762465b589+incompatible
github.com/pingcap/br v5.1.0-alpha.0.20210524083733-58e9e24fcb8e+incompatible
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
Expand Down
Loading

0 comments on commit 0b35aba

Please sign in to comment.