Skip to content

Commit

Permalink
planner: support 'admin flush plan cache' (#30370)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reminiscent authored Dec 27, 2021
1 parent 2b0400a commit abb6582
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 2 deletions.
19 changes: 19 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Domain struct {
cancel context.CancelFunc
indexUsageSyncLease time.Duration
planReplayer *planReplayer
expiredTimeStamp4PC types.Time

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -335,6 +337,22 @@ func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) {
return meta.NewSnapshotMeta(snapshot), nil
}

// ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain.
func (do *Domain) ExpiredTimeStamp4PC() types.Time {
do.m.Lock()
defer do.m.Unlock()

return do.expiredTimeStamp4PC
}

// SetExpiredTimeStamp4PC sets the expiredTimeStamp4PC from domain.
func (do *Domain) SetExpiredTimeStamp4PC(time types.Time) {
do.m.Lock()
defer do.m.Unlock()

do.expiredTimeStamp4PC = time
}

// DDL gets DDL from domain.
func (do *Domain) DDL() ddl.DDL {
return do.ddl
Expand Down Expand Up @@ -712,6 +730,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
onClose: onClose,
renewLeaseCh: make(chan func(), 10),
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
Expand Down
35 changes: 34 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
case *ast.ShutdownStmt:
err = e.executeShutdown(x)
case *ast.AdminStmt:
err = e.executeAdminReloadStatistics(x)
err = e.executeAdmin(x)
}
e.done = true
return err
Expand Down Expand Up @@ -1659,6 +1660,16 @@ func asyncDelayShutdown(p *os.Process, delay time.Duration) {
}
}

func (e *SimpleExec) executeAdmin(s *ast.AdminStmt) error {
switch s.Tp {
case ast.AdminReloadStatistics:
return e.executeAdminReloadStatistics(s)
case ast.AdminFlushPlanCache:
return e.executeAdminFlushPlanCache(s)
}
return nil
}

func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error {
if s.Tp != ast.AdminReloadStatistics {
return errors.New("This AdminStmt is not ADMIN RELOAD STATS_EXTENDED")
Expand All @@ -1668,3 +1679,25 @@ func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error {
}
return domain.GetDomain(e.ctx).StatsHandle().ReloadExtendedStatistics()
}

func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
if s.Tp != ast.AdminFlushPlanCache {
return errors.New("This AdminStmt is not ADMIN FLUSH PLAN_CACHE")
}
if s.StatementScope == ast.StatementScopeGlobal {
return errors.New("Do not support the 'admin flush global scope.'")
}
if !plannercore.PreparedPlanCacheEnabled() {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The plan cache is disable. So there no need to flush the plan cache"))
return nil
}
now := types.NewTime(types.FromGoTime(time.Now().In(e.ctx.GetSessionVars().StmtCtx.TimeZone)), mysql.TypeTimestamp, 3)
e.ctx.GetSessionVars().LastUpdateTime4PC = now
e.ctx.PreparedPlanCache().DeleteAll()
if s.StatementScope == ast.StatementScopeInstance {
// Record the timestamp. When other sessions want to use the plan cache,
// it will check the timestamp first to decide whether the plan cache should be flushed.
domain.GetDomain(e.ctx).SetExpiredTimeStamp4PC(now)
}
return nil
}
10 changes: 10 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
prepared.SchemaVersion = is.SchemaMetaVersion()
}
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
// And update lastUpdateTime to the newest one.
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if prepared.UseCache && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.PreparedPlanCache().DeleteAll()
prepared.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
err = e.getPhysicalPlan(ctx, sctx, is, preparedObj)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,8 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan,
return &AdminResetTelemetryID{}, nil
case ast.AdminReloadStatistics:
return &Simple{Statement: as}, nil
case ast.AdminFlushPlanCache:
return &Simple{Statement: as}, nil
default:
return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
}
Expand Down
202 changes: 202 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,208 @@ type testPrepareSuite struct {
type testPrepareSerialSuite struct {
}

func (s *testPrepareSerialSuite) TestFlushPlanCache(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int, a int, b int, key(a))")
tk.MustExec("create table t2(id int, a int, b int, key(a))")
tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';")
tk.MustExec("execute stmt1;")
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("prepare stmt2 from 'SELECT * from t1';")
tk.MustExec("execute stmt2;")
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';")
tk.MustExec("execute stmt3;")
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk2.MustExec("use test")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("drop table if exists t2")
tk2.MustExec("create table t1(id int, a int, b int, key(a))")
tk2.MustExec("create table t2(id int, a int, b int, key(a))")
tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';")
tk2.MustExec("execute stmt1;")
tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk2.MustExec("prepare stmt2 from 'SELECT * from t1';")
tk2.MustExec("execute stmt2;")
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';")
tk2.MustExec("execute stmt3;")
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("admin flush session plan_cache;")
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk2.MustExec("admin flush instance plan_cache;")
tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

err = tk.ExecToErr("admin flush global plan_cache;")
c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'")
}

func (s *testPrepareSerialSuite) TestFlushPlanCacheWithoutPCEnable(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(false)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int, a int, b int, key(a))")
tk.MustExec("create table t2(id int, a int, b int, key(a))")
tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';")
tk.MustExec("execute stmt1;")
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("prepare stmt2 from 'SELECT * from t1';")
tk.MustExec("execute stmt2;")
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';")
tk.MustExec("execute stmt3;")
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("use test")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("drop table if exists t2")
tk2.MustExec("create table t1(id int, a int, b int, key(a))")
tk2.MustExec("create table t2(id int, a int, b int, key(a))")
tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';")
tk2.MustExec("execute stmt1;")
tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("prepare stmt2 from 'SELECT * from t1';")
tk2.MustExec("execute stmt2;")
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';")
tk2.MustExec("execute stmt3;")
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("admin flush session plan_cache;")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache"))
tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk2.MustExec("admin flush instance plan_cache;")
tk2.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache"))
tk2.MustExec("execute stmt1;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt2;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk2.MustExec("execute stmt3;")
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("execute stmt1;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt2;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustExec("execute stmt3;")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

err = tk.ExecToErr("admin flush global plan_cache;")
c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'")
}

func (s *testPrepareSerialSuite) TestPrepareCache(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ type SessionVars struct {
// preparedStmtID is id of prepared statement.
preparedStmtID uint32
// PreparedParams params for prepared statements
PreparedParams PreparedParams
PreparedParams PreparedParams
LastUpdateTime4PC types.Time

// ActiveRoles stores active roles for current user
ActiveRoles []*auth.RoleIdentity
Expand Down

0 comments on commit abb6582

Please sign in to comment.