Skip to content

Commit

Permalink
*: support AS OF TIMESTAMP read-only begin statement (#24740)
Browse files Browse the repository at this point in the history
  • Loading branch information
JmPotato committed May 20, 2021
1 parent 40a8ae7 commit 6bfc5f8
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 74 deletions.
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
StalenessTxnOption: v.StalenessTxnOption,
}
return e
}
Expand Down
32 changes: 26 additions & 6 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type SimpleExec struct {
IsFromRemote bool
done bool
is infoschema.InfoSchema

// StalenessTxnOption is used to execute the staleness txn during a read-only begin statement.
StalenessTxnOption *sessionctx.StalenessTxnOption
}

func (e *baseExecutor) getSysSession() (sessionctx.Context, error) {
Expand Down Expand Up @@ -566,13 +569,16 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error {
}

func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should
// If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should
// always create a new Txn instead of reusing it.
if s.ReadOnly {
enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs
if !enableNoopFuncs && s.Bound == nil {
if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s)
}
if s.Bound != nil {
return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s)
}
Expand Down Expand Up @@ -614,6 +620,22 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error {
if e.StalenessTxnOption == nil {
return errors.New("Failed to get timestamp during start transaction read only as of timestamp")
}
if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement.
func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error {
opt := sessionctx.StalenessTxnOption{}
opt.Mode = s.Bound.Mode
Expand All @@ -632,8 +654,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
Expand Down Expand Up @@ -668,8 +689,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
if err != nil {
Expand Down
118 changes: 89 additions & 29 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,30 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
zone: "sz",
},
{
name: "begin",
name: "begin after TimestampBoundReadTimestamp",
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: kv.GlobalTxnScope,
zone: "",
},
{
name: "AsOfTimestamp",
preSQL: "begin",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
IsStaleness: true,
expectPhysicalTS: 1599321600000,
txnScope: "local",
zone: "sh",
},
{
name: "begin after AsOfTimestamp",
preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: oracle.GlobalTxnScope,
zone: "",
},
}
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand All @@ -106,8 +123,8 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
}
c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness)
tk.MustExec("commit")
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
}

func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
Expand Down Expand Up @@ -147,13 +164,17 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`)
// Using NOW() will cause the loss of fsp precision, so we use NOW(3) to be accurate to the millisecond.
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag")
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag")
}

func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) {
Expand All @@ -169,6 +190,17 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) {
tk.MustExec(updateSafePoint)
// set @@tidb_snapshot before staleness txn
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
// 1599321600000 == 2020-09-06 00:00:00
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot during staleness txn
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot before staleness txn
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
Expand All @@ -190,60 +222,76 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
name string
sql string
injectSafeTS uint64
useSafeTS bool
// compareWithSafeTS will be 0 if StartTS==SafeTS, -1 if StartTS < SafeTS, and +1 if StartTS > SafeTS.
compareWithSafeTS int
}{
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "20 seconds ago to now, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW())`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 10 SECOND, NOW())`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "20 seconds ago to 10 seconds ago, safeTS 5 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW() - INTERVAL 10 SECOND)`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)),
compareWithSafeTS: -1,
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
tk.MustExec(testcase.sql)
if testcase.useSafeTS {
if testcase.compareWithSafeTS == 1 {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS)
} else if testcase.compareWithSafeTS == 0 {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS)
} else {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS)
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Less, testcase.injectSafeTS)
}
tk.MustExec("commit")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS")
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS")
}

func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
Expand All @@ -263,4 +311,16 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// got an old infoSchema
c.Assert(schemaVer3, Equals, schemaVer1)

schemaVer4 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
time.Sleep(time.Second)
tk.MustExec("create table t (id int primary key);")
schemaVer5 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// confirm schema changed
c.Assert(schemaVer4, Less, schemaVer5)

tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 1 SECOND`)
schemaVer6 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// got an old infoSchema
c.Assert(schemaVer6, Equals, schemaVer4)
}
4 changes: 2 additions & 2 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2928,7 +2928,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) {

// Test whether it's deterministic.
safeTime1 := t2.Add(-1 * time.Second)
safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0)
safeTS1 := oracle.GoTimeToTS(safeTime1)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS1)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
Expand All @@ -2941,7 +2941,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) {
c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat))
// SafeTS updated.
safeTime2 := t2.Add(1 * time.Second)
safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0)
safeTS2 := oracle.GoTimeToTS(safeTime2)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS2)), IsNil)
f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
Expand Down
3 changes: 3 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ type Simple struct {
// and executing in co-processor.
// Used for `global kill`. See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md.
IsFromRemote bool

// StalenessTxnOption is the transaction option that will be built when planner builder calls buildSimple.
StalenessTxnOption *sessionctx.StalenessTxnOption
}

// PhysicalSimpleWrapper is a wrapper of `Simple` to implement physical plan interface.
Expand Down
36 changes: 34 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -643,7 +644,7 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) {
*ast.GrantStmt, *ast.DropUserStmt, *ast.AlterUserStmt, *ast.RevokeStmt, *ast.KillStmt, *ast.DropStatsStmt,
*ast.GrantRoleStmt, *ast.RevokeRoleStmt, *ast.SetRoleStmt, *ast.SetDefaultRoleStmt, *ast.ShutdownStmt,
*ast.RenameUserStmt:
return b.buildSimple(node.(ast.StmtNode))
return b.buildSimple(ctx, node.(ast.StmtNode))
case ast.DDLNode:
return b.buildDDL(ctx, x)
case *ast.CreateBindingStmt:
Expand Down Expand Up @@ -2259,7 +2260,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan,
return np, nil
}

func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) {
func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, error) {
p := &Simple{Statement: node}

switch raw := node.(type) {
Expand Down Expand Up @@ -2325,10 +2326,41 @@ func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) {
}
case *ast.ShutdownStmt:
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil)
case *ast.BeginStmt:
if raw.AsOf != nil {
startTS, err := b.calculateTsExpr(raw.AsOf)
if err != nil {
return nil, err
}
p.StalenessTxnOption = &sessionctx.StalenessTxnOption{
Mode: ast.TimestampBoundReadTimestamp,
StartTS: startTS,
}
}
}
return p, nil
}

// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func (b *PlanBuilder) calculateTsExpr(asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := evalAstExpr(b.ctx, asOfClause.TsExpr)
if err != nil {
return 0, err
}
toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp)
// We need at least the millionsecond here, so set fsp to 3.
toTypeTimestamp.Decimal = 3
tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp)
if err != nil {
return 0, err
}
tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone)
if err != nil {
return 0, err
}
return oracle.GoTimeToTS(tsTime), nil
}

func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) []visitInfo {
// To use REVOKE, you must have the GRANT OPTION privilege,
// and you must have the privileges that you are granting.
Expand Down
Loading

0 comments on commit 6bfc5f8

Please sign in to comment.