diff --git a/executor/adapter.go b/executor/adapter.go index 64b0ac1f1e5b8..df6e62751f90a 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -26,6 +26,7 @@ import ( "github.com/cznic/mathutil" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -43,6 +44,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -177,6 +179,9 @@ func (a *recordSet) OnFetchReturned() { type ExecStmt struct { // GoCtx stores parent go context.Context for a stmt. GoCtx context.Context + // SnapshotTS stores the timestamp for stale read. + // It is not equivalent to session variables's snapshot ts, it only use to build the executor. + SnapshotTS uint64 // InfoSchema stores a reference to the schema information. InfoSchema infoschema.InfoSchema // Plan stores a reference to the final physical plan. @@ -268,18 +273,19 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool { // RebuildPlan rebuilds current execute statement plan. // It returns the current information schema version that 'a' is using. func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { - is := a.Ctx.GetInfoSchema().(infoschema.InfoSchema) - a.InfoSchema = is - if err := plannercore.Preprocess(a.Ctx, a.StmtNode, is, plannercore.InTxnRetry); err != nil { + ret := &plannercore.PreprocessorReturn{} + if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil { return 0, err } - p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, is) + a.InfoSchema = ret.InfoSchema + a.SnapshotTS = ret.SnapshotTS + p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema) if err != nil { return 0, err } a.OutputNames = names a.Plan = p - return is.SchemaMetaVersion(), nil + return a.InfoSchema.SchemaMetaVersion(), nil } // Exec builds an Executor from a plan. If the Executor doesn't return result, @@ -305,6 +311,25 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.GetTextToLog()), zap.Stack("stack")) }() + failpoint.Inject("assertStaleTSO", func(val failpoint.Value) { + if n, ok := val.(int); ok { + startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000 + if n != int(startTS) { + panic("different tso") + } + failpoint.Return() + } + }) + failpoint.Inject("assertStaleTSOWithTolerance", func(val failpoint.Value) { + if n, ok := val.(int); ok { + // Convert to seconds + startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000 + if int(startTS) <= n-1 || n+1 <= int(startTS) { + panic("tso violate tolerance") + } + failpoint.Return() + } + }) sctx := a.Ctx ctx = util.SetSessionID(ctx, sctx.GetSessionVars().ConnectionID) if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL { @@ -747,6 +772,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { } b := newExecutorBuilder(ctx, a.InfoSchema) + b.snapshotTS = a.SnapshotTS e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/builder.go b/executor/builder.go index 2f644a6eed2c4..67a0c33cb1fcb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -640,7 +640,6 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { base.initCap = chunk.ZeroCapacity return &PrepareExec{ baseExecutor: base, - is: b.is, name: v.Name, sqlText: v.SQLText, } diff --git a/executor/compiler.go b/executor/compiler.go index 5a658878ca747..673f17f24f40e 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" @@ -53,13 +52,13 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm ctx = opentracing.ContextWithSpan(ctx, span1) } - infoSchema := c.Ctx.GetInfoSchema().(infoschema.InfoSchema) - if err := plannercore.Preprocess(c.Ctx, stmtNode, infoSchema); err != nil { + ret := &plannercore.PreprocessorReturn{} + if err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret)); err != nil { return nil, err } stmtNode = plannercore.TryAddExtraLimit(c.Ctx, stmtNode) - finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, infoSchema) + finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema) if err != nil { return nil, err } @@ -71,7 +70,8 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } return &ExecStmt{ GoCtx: ctx, - InfoSchema: infoSchema, + SnapshotTS: ret.SnapshotTS, + InfoSchema: ret.InfoSchema, Plan: finalPlan, LowerPriority: lowerPriority, Text: stmtNode.Text(), diff --git a/executor/executor_test.go b/executor/executor_test.go index 47f070641e8d6..0e7eab38ac9e8 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2349,14 +2349,14 @@ func (s *testSuiteP2) TestIsPointGet(c *C) { "select * from help_topic where help_topic_id=1": true, "select * from help_topic where help_category_id=1": false, } - infoSchema := ctx.GetInfoSchema().(infoschema.InfoSchema) for sqlStr, result := range tests { stmtNode, err := s.ParseOneStmt(sqlStr, "", "") c.Check(err, IsNil) - err = plannercore.Preprocess(ctx, stmtNode, infoSchema) + preprocessorReturn := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(ctx, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn)) c.Check(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, infoSchema) + p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, preprocessorReturn.InfoSchema) c.Check(err, IsNil) ret, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, p) c.Assert(err, IsNil) @@ -2381,13 +2381,13 @@ func (s *testSuiteP2) TestClusteredIndexIsPointGet(c *C) { "select * from t where a='x' and c='x'": true, "select * from t where a='x' and c='x' and b=1": false, } - infoSchema := ctx.GetInfoSchema().(infoschema.InfoSchema) for sqlStr, result := range tests { stmtNode, err := s.ParseOneStmt(sqlStr, "", "") c.Check(err, IsNil) - err = plannercore.Preprocess(ctx, stmtNode, infoSchema) + preprocessorReturn := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(ctx, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn)) c.Check(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, infoSchema) + p, _, err := planner.Optimize(context.TODO(), ctx, stmtNode, preprocessorReturn.InfoSchema) c.Check(err, IsNil) ret, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, p) c.Assert(err, IsNil) diff --git a/executor/metrics_reader_test.go b/executor/metrics_reader_test.go index 51bdc560ed6d0..8d75ac41fd96e 100644 --- a/executor/metrics_reader_test.go +++ b/executor/metrics_reader_test.go @@ -20,10 +20,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser" "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/testkit" ) @@ -62,10 +60,11 @@ func (s *testSuite7) TestStmtLabel(c *C) { for _, tt := range tests { stmtNode, err := parser.New().ParseOneStmt(tt.sql, "", "") c.Check(err, IsNil) - is := tk.Se.GetInfoSchema().(infoschema.InfoSchema) - err = plannercore.Preprocess(tk.Se.(sessionctx.Context), stmtNode, is) + preprocessorReturn := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(tk.Se, stmtNode, plannercore.WithPreprocessorReturn(preprocessorReturn)) + c.Check(err, IsNil) c.Assert(err, IsNil) - _, _, err = planner.Optimize(context.TODO(), tk.Se, stmtNode, is) + _, _, err = planner.Optimize(context.TODO(), tk.Se, stmtNode, preprocessorReturn.InfoSchema) c.Assert(err, IsNil) c.Assert(executor.GetStmtLabel(stmtNode), Equals, tt.label) } diff --git a/executor/prepared.go b/executor/prepared.go index a3d950ec0afc9..f494ebac3dc9d 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -79,7 +79,6 @@ func (e *paramMarkerExtractor) Leave(in ast.Node) (ast.Node, bool) { type PrepareExec struct { baseExecutor - is infoschema.InfoSchema name string sqlText string @@ -89,12 +88,11 @@ type PrepareExec struct { } // NewPrepareExec creates a new PrepareExec. -func NewPrepareExec(ctx sessionctx.Context, is infoschema.InfoSchema, sqlTxt string) *PrepareExec { +func NewPrepareExec(ctx sessionctx.Context, sqlTxt string) *PrepareExec { base := newBaseExecutor(ctx, nil, 0) base.initCap = chunk.ZeroCapacity return &PrepareExec{ baseExecutor: base, - is: is, sqlText: sqlTxt, } } @@ -159,7 +157,8 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { return ErrPsManyParam } - err = plannercore.Preprocess(e.ctx, stmt, e.is, plannercore.InPrepare) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(e.ctx, stmt, plannercore.InPrepare, plannercore.WithPreprocessorReturn(ret)) if err != nil { return err } @@ -177,14 +176,14 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { Stmt: stmt, StmtType: GetStmtLabel(stmt), Params: sorter.markers, - SchemaVersion: e.is.SchemaMetaVersion(), + SchemaVersion: ret.InfoSchema.SchemaMetaVersion(), } if !plannercore.PreparedPlanCacheEnabled() { prepared.UseCache = false } else { if !e.ctx.GetSessionVars().UseDynamicPartitionPrune() { - prepared.UseCache = plannercore.Cacheable(stmt, e.is) + prepared.UseCache = plannercore.Cacheable(stmt, ret.InfoSchema) } else { prepared.UseCache = plannercore.Cacheable(stmt, nil) } @@ -199,7 +198,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { var p plannercore.Plan e.ctx.GetSessionVars().PlanID = 0 e.ctx.GetSessionVars().PlanColumnID = 0 - destBuilder, _ := plannercore.NewPlanBuilder(e.ctx, e.is, &hint.BlockHintProcessor{}) + destBuilder, _ := plannercore.NewPlanBuilder(e.ctx, ret.InfoSchema, &hint.BlockHintProcessor{}) p, err = destBuilder.Build(ctx, stmt) if err != nil { return err diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index a36dd6073654e..f39cec6f9c220 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -18,6 +18,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/tikv/oracle" @@ -93,6 +94,118 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) { failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope") } +func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`drop table if exists b`) + tk.MustExec("create table t (id int primary key);") + tk.MustExec("create table b (pid int primary key);") + defer func() { + tk.MustExec(`drop table if exists b`) + tk.MustExec(`drop table if exists t`) + }() + time.Sleep(2 * time.Second) + now := time.Now() + time.Sleep(2 * time.Second) + + testcases := []struct { + name string + sql string + expectPhysicalTS int64 + preSec int64 + // IsStaleness is auto cleanup in select stmt. + errorStr string + }{ + { + name: "TimestampExactRead1", + sql: fmt.Sprintf("select * from t as of timestamp '%s';", now.Format("2006-1-2 15:04:05")), + expectPhysicalTS: now.Unix(), + }, + { + name: "NomalRead", + sql: `select * from b;`, + preSec: 0, + }, + { + name: "TimestampExactRead2", + sql: fmt.Sprintf("select * from t as of timestamp TIMESTAMP('%s');", now.Format("2006-1-2 15:04:05")), + expectPhysicalTS: now.Unix(), + }, + { + name: "TimestampExactRead3", + sql: `select * from t as of timestamp NOW() - INTERVAL 2 SECOND;`, + preSec: 2, + }, + { + name: "TimestampExactRead4", + sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND);`, + preSec: 2, + }, + { + name: "TimestampExactRead5", + sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND);`, + preSec: 1, + }, + { + name: "TimestampExactRead6", + sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP('2020-09-06 00:00:00');`, + errorStr: ".*can not set different time in the as of.*", + }, + { + name: "TimestampExactRead7", + sql: `select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b;`, + errorStr: ".*can not set different time in the as of.*", + }, + { + name: "TimestampExactRead8", + sql: `select * from t, b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND);`, + errorStr: ".*can not set different time in the as of.*", + }, + { + name: "NomalRead", + sql: `select * from t, b;`, + preSec: 0, + }, + { + name: "TimestampExactRead9", + sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 1 SECOND)) as c, b;`, + errorStr: ".*can not set different time in the as of.*", + }, + { + name: "TimestampExactRead10", + sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 2 SECOND)) as c;`, + preSec: 2, + }, + // Cannot be supported the SubSelect + { + name: "TimestampExactRead11", + sql: `select * from (select * from t as of timestamp TIMESTAMP(NOW() - INTERVAL 20 SECOND), b as of timestamp TIMESTAMP(NOW() - INTERVAL 20 SECOND)) as c as of timestamp Now();`, + errorStr: ".*You have an error in your SQL syntax.*", + }, + } + + for _, testcase := range testcases { + c.Log(testcase.name) + if testcase.expectPhysicalTS > 0 { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSO", fmt.Sprintf(`return(%d)`, testcase.expectPhysicalTS)), IsNil) + } else if testcase.preSec > 0 { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance", fmt.Sprintf(`return(%d)`, time.Now().Unix()-testcase.preSec)), IsNil) + } + _, err := tk.Exec(testcase.sql) + if len(testcase.errorStr) != 0 { + c.Assert(err, ErrorMatches, testcase.errorStr) + continue + } + c.Assert(err, IsNil, Commentf("sql:%s, error stack %v", testcase.sql, errors.ErrorStack(err))) + if testcase.expectPhysicalTS > 0 { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO"), IsNil) + } else if testcase.preSec > 0 { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance"), IsNil) + } + } +} + func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/expression/integration_test.go b/expression/integration_test.go index a4706629660a0..1d883964d996a 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -4850,10 +4850,10 @@ func (s *testIntegrationSuite) TestFilterExtractFromDNF(c *C) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) conds := make([]expression.Expression, len(selection.Conditions)) diff --git a/expression/typeinfer_test.go b/expression/typeinfer_test.go index bce929c933852..853873db7d749 100644 --- a/expression/typeinfer_test.go +++ b/expression/typeinfer_test.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/domain" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" @@ -139,10 +138,10 @@ func (s *testInferTypeSuite) TestInferType(c *C) { err = se.NewTxn(context.Background()) c.Assert(err, IsNil) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmt, is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmt, plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, comment) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmt, is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmt, ret.InfoSchema) c.Assert(err, IsNil, comment) tp := p.Schema().Columns[0].RetType diff --git a/go.mod b/go.mod index 0879d5f157bf4..01cc11c222859 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b go.uber.org/atomic v1.7.0 go.uber.org/automaxprocs v1.2.0 - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect golang.org/x/mod v0.4.2 // indirect @@ -79,7 +79,6 @@ require ( gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect honnef.co/go/tools v0.1.4 // indirect modernc.org/mathutil v1.2.2 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 diff --git a/go.sum b/go.sum index 664a52bdca20d..0ae5bdaf51b9e 100644 --- a/go.sum +++ b/go.sum @@ -538,8 +538,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14/go.mod h1:gxQT6pBGRuIGunNf/+tSOB5OHvguWi8Tbt82WOkf35E= github.com/swaggo/gin-swagger v1.2.0/go.mod h1:qlH2+W7zXGZkczuL+r2nEBR2JTT+/lX05Nn6vPhc7OI= github.com/swaggo/http-swagger v0.0.0-20200308142732-58ac5e232fba/go.mod h1:O1lAbCgAAX/KZ80LM/OXwtWFI/5TvZlwxSg8Cq08PV0= @@ -623,8 +624,8 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/ go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.8.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -919,8 +920,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index a46a48c7191ff..ffa0b7ccde75e 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -295,10 +295,10 @@ func (s *testAnalyzeSuite) TestIndexRead(c *C) { c.Assert(err, IsNil) c.Assert(stmts, HasLen, 1) stmt := stmts[0] - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + p, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) planString := core.ToString(p) s.testData.OnRecord(func() { @@ -330,10 +330,10 @@ func (s *testAnalyzeSuite) TestEmptyTable(c *C) { c.Assert(err, IsNil) c.Assert(stmts, HasLen, 1) stmt := stmts[0] - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + p, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) planString := core.ToString(p) s.testData.OnRecord(func() { @@ -402,10 +402,10 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { stmt := stmts[0] err = executor.ResetContextOfStmt(ctx, stmt) c.Assert(err, IsNil) - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + p, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) planString := core.ToString(p) s.testData.OnRecord(func() { @@ -491,10 +491,10 @@ func (s *testAnalyzeSuite) TestPreparedNullParam(c *C) { c.Assert(err, IsNil) stmt := stmts[0] - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is, core.InPrepare) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.InPrepare, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + p, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) c.Assert(core.ToString(p), Equals, best, Commentf("for %s", sql)) @@ -726,14 +726,14 @@ func BenchmarkOptimize(b *testing.B) { c.Assert(err, IsNil) c.Assert(stmts, HasLen, 1) stmt := stmts[0] - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) b.Run(tt.sql, func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + _, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) } b.ReportAllocs() diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index ad68fd111d617..a20e2bf12a359 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -265,7 +265,9 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont preparedObj.Executor = nil // If the schema version has changed we need to preprocess it again, // if this time it failed, the real reason for the error is schema changed. - err := Preprocess(sctx, prepared.Stmt, is, InPrepare) + // FIXME: compatible with prepare https://github.com/pingcap/tidb/issues/24932 + ret := &PreprocessorReturn{InfoSchema: is} + err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret)) if err != nil { return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error()) } diff --git a/planner/core/errors.go b/planner/core/errors.go index 860a3444b5538..c1bb613cda005 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -97,5 +97,6 @@ var ( ErrAccessDenied = dbterror.ClassOptimizer.NewStdErr(mysql.ErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDeniedNoPassword]) ErrBadNull = dbterror.ClassOptimizer.NewStd(mysql.ErrBadNull) ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem) + ErrDifferentAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrUnknown) ErrOptOnTemporaryTable = dbterror.ClassOptimizer.NewStd(mysql.ErrOptOnTemporaryTable) ) diff --git a/planner/core/indexmerge_test.go b/planner/core/indexmerge_test.go index a78b91b54f889..f1ebf4ee8144b 100644 --- a/planner/core/indexmerge_test.go +++ b/planner/core/indexmerge_test.go @@ -90,7 +90,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGeneration(c *C) { comment := Commentf("case:%v sql:%s", i, tc) stmt, err := s.ParseOneStmt(tc, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 56652983ff8f8..379a4c486c5dc 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -457,7 +457,7 @@ func (s *testPlanSuite) TestSubquery(c *C) { stmt, err := s.ParseOneStmt(ca, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) p, _, err := BuildLogicalPlan(ctx, s.ctx, stmt, s.is) c.Assert(err, IsNil) @@ -483,7 +483,7 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) { c.Assert(err, IsNil, comment) s.ctx.GetSessionVars().SetHashJoinConcurrency(1) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) p, _, err := BuildLogicalPlan(ctx, s.ctx, stmt, s.is) c.Assert(err, IsNil) @@ -848,7 +848,7 @@ func (s *testPlanSuite) TestValidate(c *C) { comment := Commentf("for %s", sql) stmt, err := s.ParseOneStmt(sql, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) _, _, err = BuildLogicalPlan(ctx, s.ctx, stmt, s.is) if tt.err == nil { @@ -1309,7 +1309,7 @@ func (s *testPlanSuite) TestVisitInfo(c *C) { stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) // to fix, Table 'test.ttt' doesn't exist - _ = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) builder.ctx.GetSessionVars().SetHashJoinConcurrency(1) _, err = builder.Build(context.TODO(), stmt) @@ -1389,7 +1389,7 @@ func (s *testPlanSuite) TestUnion(c *C) { comment := Commentf("case:%v sql:%s", i, tt) stmt, err := s.ParseOneStmt(tt, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) plan, err := builder.Build(ctx, stmt) @@ -1422,7 +1422,7 @@ func (s *testPlanSuite) TestTopNPushDown(c *C) { comment := Commentf("case:%v sql:%s", i, tt) stmt, err := s.ParseOneStmt(tt, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1496,7 +1496,7 @@ func (s *testPlanSuite) TestOuterJoinEliminator(c *C) { comment := Commentf("case:%v sql:%s", i, tt) stmt, err := s.ParseOneStmt(tt, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1533,7 +1533,7 @@ func (s *testPlanSuite) TestSelectView(c *C) { comment := Commentf("case:%v sql:%s", i, tt.sql) stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1605,7 +1605,7 @@ func (s *testPlanSuite) optimize(ctx context.Context, sql string) (PhysicalPlan, if err != nil { return nil, nil, err } - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) if err != nil { return nil, nil, err } @@ -1697,7 +1697,7 @@ func (s *testPlanSuite) TestSkylinePruning(c *C) { comment := Commentf("case:%v sql:%s", i, tt.sql) stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1768,7 +1768,7 @@ func (s *testPlanSuite) TestFastPlanContextTables(c *C) { for _, tt := range tests { stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) s.ctx.GetSessionVars().StmtCtx.Tables = nil p := TryFastPlan(s.ctx, stmt) @@ -1800,7 +1800,7 @@ func (s *testPlanSuite) TestUpdateEQCond(c *C) { comment := Commentf("case:%v sql:%s", i, tt.sql) stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1817,7 +1817,7 @@ func (s *testPlanSuite) TestConflictedJoinTypeHints(c *C) { ctx := context.TODO() stmt, err := s.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1838,7 +1838,7 @@ func (s *testPlanSuite) TestSimplyOuterJoinWithOnlyOuterExpr(c *C) { ctx := context.TODO() stmt, err := s.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) builder, _ := NewPlanBuilder(MockContext(), s.is, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) @@ -1890,7 +1890,7 @@ func (s *testPlanSuite) TestResolvingCorrelatedAggregate(c *C) { comment := Commentf("case:%v sql:%s", i, tt.sql) stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) - err = Preprocess(s.ctx, stmt, s.is) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil, comment) p, _, err := BuildLogicalPlan(ctx, s.ctx, stmt, s.is) c.Assert(err, IsNil, comment) @@ -1932,7 +1932,8 @@ func (s *testPlanSuite) TestFastPathInvalidBatchPointGet(c *C) { comment := Commentf("case:%v sql:%s", i, tc.sql) stmt, err := s.ParseOneStmt(tc.sql, "", "") c.Assert(err, IsNil, comment) - c.Assert(Preprocess(s.ctx, stmt, s.is), IsNil, comment) + err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is})) + c.Assert(err, IsNil, comment) plan := TryFastPlan(s.ctx, stmt) if tc.fastPlan { c.Assert(plan, NotNil) diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 6a8993286e36a..6bfda74d6ab1d 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -249,7 +249,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderBasePhysicalPlan(c *C) { stmt, err := s.ParseOneStmt(tt, "", "") c.Assert(err, IsNil, comment) - err = core.Preprocess(se, stmt, s.is) + err = core.Preprocess(se, stmt, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) p, _, err := planner.Optimize(context.TODO(), se, stmt, s.is) c.Assert(err, IsNil) @@ -1427,7 +1427,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSplitAvg(c *C) { stmt, err := s.ParseOneStmt(tt.sql, "", "") c.Assert(err, IsNil, comment) - err = core.Preprocess(se, stmt, s.is) + err = core.Preprocess(se, stmt, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: s.is})) c.Assert(err, IsNil) p, _, err := planner.Optimize(context.TODO(), se, stmt, s.is) c.Assert(err, IsNil, comment) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 931221e576ddc..178779cabdf25 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -2344,7 +2344,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil) case *ast.BeginStmt: if raw.AsOf != nil { - startTS, err := b.calculateTsExpr(raw.AsOf) + startTS, err := calculateTsExpr(b.ctx, raw.AsOf) if err != nil { return nil, err } @@ -2355,19 +2355,19 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, } // calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS. -func (b *PlanBuilder) calculateTsExpr(asOfClause *ast.AsOfClause) (uint64, error) { - tsVal, err := evalAstExpr(b.ctx, asOfClause.TsExpr) +func calculateTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) { + tsVal, err := evalAstExpr(sctx, asOfClause.TsExpr) if err != nil { return 0, err } toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) // We need at least the millionsecond here, so set fsp to 3. toTypeTimestamp.Decimal = 3 - tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp) + tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp) if err != nil { return 0, err } - tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone) + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().TimeZone) if err != nil { return 0, err } diff --git a/planner/core/point_get_plan_test.go b/planner/core/point_get_plan_test.go index dfece394a5390..1306da4bed42c 100644 --- a/planner/core/point_get_plan_test.go +++ b/planner/core/point_get_plan_test.go @@ -316,10 +316,10 @@ func (s *testPointGetSuite) TestPointGetId(c *C) { c.Assert(err, IsNil) c.Assert(stmts, HasLen, 1) stmt := stmts[0] - is := domain.GetDomain(ctx).InfoSchema() - err = core.Preprocess(ctx, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - p, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) + p, _, err := planner.Optimize(context.TODO(), ctx, stmt, ret.InfoSchema) c.Assert(err, IsNil) // Test explain format = 'brief' result is useless, plan id will be reset when running `explain`. c.Assert(p.ID(), Equals, 1) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index ca436189c0d41..bfc596d20f975 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/meta/autoid" @@ -51,6 +52,13 @@ func InTxnRetry(p *preprocessor) { p.flag |= inTxnRetry } +// WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocesorReturn. +func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt { + return func(p *preprocessor) { + p.PreprocessorReturn = ret + } +} + // TryAddExtraLimit trys to add an extra limit for SELECT or UNION statement when sql_select_limit is set. func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { if ctx.GetSessionVars().SelectLimit == math.MaxUint64 || ctx.GetSessionVars().InRestrictedSQL { @@ -82,12 +90,21 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { } // Preprocess resolves table names of the node, and checks some statements validation. -func Preprocess(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema, preprocessOpt ...PreprocessOpt) error { - v := preprocessor{is: is, ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0)} +// prepreocssReturn used to extract the infoschema for the tableName and the timestamp from the asof clause. +func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error { + v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0)} for _, optFn := range preprocessOpt { optFn(&v) } + // PreprocessorReturn must be non-nil before preprocessing + if v.PreprocessorReturn == nil { + v.PreprocessorReturn = &PreprocessorReturn{} + } node.Accept(&v) + // InfoSchema must be non-nil after preprocessing + if v.InfoSchema == nil { + v.ensureInfoSchema() + } return errors.Trace(v.err) } @@ -109,18 +126,26 @@ const ( inSequenceFunction ) +// PreprocessorReturn is used to retain information obtained in the preprocessor. +type PreprocessorReturn struct { + SnapshotTS uint64 + InfoSchema infoschema.InfoSchema +} + // preprocessor is an ast.Visitor that preprocess // ast Nodes parsed from parser. type preprocessor struct { - is infoschema.InfoSchema ctx sessionctx.Context - err error flag preprocessorFlag stmtTp byte // tableAliasInJoin is a stack that keeps the table alias names for joins. // len(tableAliasInJoin) may bigger than 1 because the left/right child of join may be subquery that contains `JOIN` tableAliasInJoin []map[string]interface{} + + // values that may be returned + *PreprocessorReturn + err error } func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { @@ -571,7 +596,7 @@ func (p *preprocessor) checkAdminCheckTableGrammar(stmt *ast.AdminStmt) { } sName := model.NewCIStr(currentDB) tName := table.Name - tableInfo, err := p.is.TableByName(sName, tName) + tableInfo, err := p.ensureInfoSchema().TableByName(sName, tName) if err != nil { p.err = err return @@ -590,7 +615,8 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { if stmt.ReferTable.Schema.String() != "" { schema = stmt.ReferTable.Schema } - tableInfo, err := p.is.TableByName(schema, stmt.ReferTable.Name) + // get the infoschema from the context. + tableInfo, err := p.ensureInfoSchema().TableByName(schema, stmt.ReferTable.Name) if err != nil { p.err = err return @@ -1161,6 +1187,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { } tn.Schema = model.NewCIStr(currentDB) } + if p.flag&inCreateOrDropTable > 0 { // The table may not exist in create table or drop table statement. if p.flag&inRepairTable > 0 { @@ -1180,7 +1207,12 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } - table, err := p.is.TableByName(tn.Schema, tn.Name) + p.handleAsOf(tn.AsOf) + if p.err != nil { + return + } + + table, err := p.ensureInfoSchema().TableByName(tn.Schema, tn.Name) if err != nil { // We should never leak that the table doesn't exist (i.e. attach ErrTableNotExists) // unless we know that the user has permissions to it, should it exist. @@ -1202,7 +1234,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } tableInfo := table.Meta() - dbInfo, _ := p.is.SchemaByName(tn.Schema) + dbInfo, _ := p.ensureInfoSchema().SchemaByName(tn.Schema) // tableName should be checked as sequence object. if p.flag&inSequenceFunction > 0 { if !tableInfo.IsSequence() { @@ -1327,3 +1359,40 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) { } } } + +// handleAsOf tries to validate the timestamp. +// If it is not nil, timestamp is used to get the history infoschema from the infocache. +func (p *preprocessor) handleAsOf(node *ast.AsOfClause) { + dom := domain.GetDomain(p.ctx) + ts := uint64(0) + if node != nil { + ts, p.err = calculateTsExpr(p.ctx, node) + if p.err != nil { + return + } + } + if ts != 0 && p.InfoSchema == nil { + is, err := dom.GetSnapshotInfoSchema(ts) + if err != nil { + p.err = err + return + } + p.SnapshotTS = ts + p.InfoSchema = is + } + if p.SnapshotTS != ts { + p.err = ErrDifferentAsOf.GenWithStack("can not set different time in the as of") + } +} + +// ensureInfoSchema get the infoschema from the preprecessor. +// there some situations: +// - the stmt specifies the schema version. +// - session variable +// - transcation context +func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema { + if p.InfoSchema == nil { + p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema) + } + return p.InfoSchema +} diff --git a/planner/core/preprocess_test.go b/planner/core/preprocess_test.go index 14b006c836ca9..d9f053f509e92 100644 --- a/planner/core/preprocess_test.go +++ b/planner/core/preprocess_test.go @@ -67,7 +67,7 @@ func (s *testValidatorSuite) runSQL(c *C, sql string, inPrepare bool, terr error if inPrepare { opts = append(opts, core.InPrepare) } - err := core.Preprocess(s.ctx, stmt, s.is, opts...) + err := core.Preprocess(s.ctx, stmt, append(opts, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: s.is}))...) c.Assert(terror.ErrorEqual(err, terr), IsTrue, Commentf("sql: %s, err:%v", sql, err)) } diff --git a/planner/core/stats_test.go b/planner/core/stats_test.go index d74f1ba1df1d8..6767a9b910b7d 100644 --- a/planner/core/stats_test.go +++ b/planner/core/stats_test.go @@ -69,15 +69,15 @@ func (s *testStatsSuite) TestGroupNDVs(c *C) { AggInput string JoinInput string } - is := dom.InfoSchema() s.testData.GetTestCases(c, &input, &output) for i, tt := range input { comment := Commentf("case:%v sql: %s", i, tt) stmt, err := s.ParseOneStmt(tt, "", "") c.Assert(err, IsNil, comment) - err = core.Preprocess(tk.Se, stmt, is) + ret := &core.PreprocessorReturn{} + err = core.Preprocess(tk.Se, stmt, core.WithPreprocessorReturn(ret)) c.Assert(err, IsNil) - builder, _ := core.NewPlanBuilder(tk.Se, is, &hint.BlockHintProcessor{}) + builder, _ := core.NewPlanBuilder(tk.Se, ret.InfoSchema, &hint.BlockHintProcessor{}) p, err := builder.Build(ctx, stmt) c.Assert(err, IsNil, comment) p, err = core.LogicalOptimize(ctx, builder.GetOptFlag(), p.(core.LogicalPlan)) diff --git a/server/conn.go b/server/conn.go index 78cdd1a46c12d..46d0a7c023a27 100644 --- a/server/conn.go +++ b/server/conn.go @@ -63,7 +63,6 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -1621,11 +1620,11 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm pointPlans := make([]plannercore.Plan, len(stmts)) var idxKeys []kv.Key var rowKeys []kv.Key - is := domain.GetDomain(cc.ctx).InfoSchema() sc := vars.StmtCtx for i, stmt := range stmts { // TODO: the preprocess is run twice, we should find some way to avoid do it again. - if err = plannercore.Preprocess(cc.ctx, stmt, is); err != nil { + // TODO: handle the PreprocessorReturn. + if err = plannercore.Preprocess(cc.ctx, stmt); err != nil { return nil, err } p := plannercore.TryFastPlan(cc.ctx.Session, stmt) diff --git a/session/session.go b/session/session.go index 03860c6320b2e..46d3a09033837 100644 --- a/session/session.go +++ b/session/session.go @@ -1708,7 +1708,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields // So we have to call PrepareTxnCtx here. s.PrepareTxnCtx(ctx) s.PrepareTSFuture(ctx) - prepareExec := executor.NewPrepareExec(s, s.GetInfoSchema().(infoschema.InfoSchema), sql) + prepareExec := executor.NewPrepareExec(s, sql) err = prepareExec.Next(ctx, nil) if err != nil { return diff --git a/statistics/selectivity_test.go b/statistics/selectivity_test.go index 546ca0b20a4ea..359e1d2db9585 100644 --- a/statistics/selectivity_test.go +++ b/statistics/selectivity_test.go @@ -237,7 +237,6 @@ func (s *testStatsSuite) TestSelectivity(c *C) { defer cleanEnv(c, s.store, s.do) testKit := testkit.NewTestKit(c, s.store) statsTbl := s.prepareSelectivity(testKit, c) - is := s.do.InfoSchema() longExpr := "0 < a and a = 1 " for i := 1; i < 64; i++ { @@ -294,9 +293,10 @@ func (s *testStatsSuite) TestSelectivity(c *C) { c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprs)) c.Assert(stmts, HasLen, 1) - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, comment) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for building plan, expr %s", err, tt.exprs)) sel := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) @@ -542,7 +542,6 @@ func BenchmarkSelectivity(b *testing.B) { testKit := testkit.NewTestKit(c, s.store) statsTbl := s.prepareSelectivity(testKit, c) - is := s.do.InfoSchema() exprs := "a > 1 and b < 2 and c > 3 and d < 4 and e > 5" sql := "select * from t where " + exprs comment := Commentf("for %s", exprs) @@ -550,9 +549,10 @@ func BenchmarkSelectivity(b *testing.B) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, exprs)) c.Assert(stmts, HasLen, 1) - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, comment) - p, _, err := plannercore.BuildLogicalPlan(context.Background(), sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(context.Background(), sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for building plan, expr %s", err, exprs)) file, err := os.Create("cpu.profile") @@ -792,9 +792,8 @@ func (s *testStatsSuite) TestDNFCondSelectivity(c *C) { testKit.MustExec(`analyze table t`) ctx := context.Background() - is := s.do.InfoSchema() h := s.do.StatsHandle() - tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tb, err := s.do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tblInfo := tb.Meta() statsTbl := h.GetTableStats(tblInfo) @@ -813,9 +812,10 @@ func (s *testStatsSuite) TestDNFCondSelectivity(c *C) { c.Assert(err, IsNil, Commentf("error %v, for sql %s", err, tt)) c.Assert(stmts, HasLen, 1) - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for sql %s", err, tt)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for building plan, sql %s", err, tt)) sel := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) diff --git a/util/ranger/ranger_test.go b/util/ranger/ranger_test.go index b0b66e8a469c1..3f4e59848b044 100644 --- a/util/ranger/ranger_test.go +++ b/util/ranger/ranger_test.go @@ -304,10 +304,10 @@ func (s *testRangerSuite) TestTableRange(c *C) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) conds := make([]expression.Expression, len(selection.Conditions)) @@ -648,10 +648,10 @@ create table t( stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) tbl := selection.Children()[0].(*plannercore.DataSource).TableInfo() @@ -839,10 +839,10 @@ create table t( stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) tbl := selection.Children()[0].(*plannercore.DataSource).TableInfo() @@ -1203,10 +1203,10 @@ func (s *testRangerSuite) TestColumnRange(c *C) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) sel := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) ds, ok := sel.Children()[0].(*plannercore.DataSource) @@ -1627,10 +1627,10 @@ func (s *testRangerSuite) TestIndexRangeForYear(c *C) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) tbl := selection.Children()[0].(*plannercore.DataSource).TableInfo() @@ -1698,10 +1698,10 @@ func (s *testRangerSuite) TestPrefixIndexRangeScan(c *C) { stmts, err := session.Parse(sctx, sql) c.Assert(err, IsNil, Commentf("error %v, for expr %s", err, tt.exprStr)) c.Assert(stmts, HasLen, 1) - is := domain.GetDomain(sctx).InfoSchema() - err = plannercore.Preprocess(sctx, stmts[0], is) + ret := &plannercore.PreprocessorReturn{} + err = plannercore.Preprocess(sctx, stmts[0], plannercore.WithPreprocessorReturn(ret)) c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr)) - p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], is) + p, _, err := plannercore.BuildLogicalPlan(ctx, sctx, stmts[0], ret.InfoSchema) c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr)) selection := p.(plannercore.LogicalPlan).Children()[0].(*plannercore.LogicalSelection) tbl := selection.Children()[0].(*plannercore.DataSource).TableInfo()