Skip to content

Commit

Permalink
planner: plan cache always check scheme valid in RC isolation level (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Sep 16, 2022
1 parent b9ee9f8 commit 7ef14d0
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 4 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ type pessimisticTxn interface {
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
// buildExecutor build an executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
Expand Down
4 changes: 3 additions & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i

optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes

check = check || ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error
Expand Down Expand Up @@ -1576,7 +1577,8 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))

check := b.isForUpdateRead && b.ctx.GetSessionVars().ConnectionID > 0
check := b.isForUpdateRead || b.ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && b.ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

Expand Down
1 change: 1 addition & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
return nil
}

check = check || ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error
Expand Down
50 changes: 50 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/kvcache"
Expand Down Expand Up @@ -2694,3 +2695,52 @@ func (s *testPlanSerialSuite) TestPartitionWithVariedDatasources(c *C) {
}
}
}

func (s *testPlanSerialSuite) TestPlanCacheWithRCWhenInfoSchemaChange(c *C) {
store, dom, err := newStoreWithBootstrap()
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

ctx := context.Background()

tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("drop table if exists t1")
tk1.MustExec("create table t1(id int primary key, c int, index ic (c))")
// prepare text protocol
tk1.MustExec("prepare s from 'select /*+use_index(t1, ic)*/ * from t1 where 1'")
// prepare binary protocol
stmtID, _, _, err := tk2.Se.PrepareStmt("select /*+use_index(t1, ic)*/ * from t1 where 1")
c.Assert(err, IsNil)
tk1.MustExec("set tx_isolation='READ-COMMITTED'")
tk1.MustExec("begin pessimistic")
tk2.MustExec("set tx_isolation='READ-COMMITTED'")
tk2.MustExec("begin pessimistic")
tk1.MustQuery("execute s").Check(testkit.Rows())
rs, err := tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
c.Assert(err, IsNil)
tk2.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows())

tk3 := testkit.NewTestKit(c, store)
tk3.MustExec("use test")
tk3.MustExec("alter table t1 drop index ic")
tk3.MustExec("insert into t1 values(1, 0)")

// The execution after schema changed should not hit plan cache.
// execute text protocol
tk1.MustQuery("execute s").Check(testkit.Rows("1 0"))
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// execute binary protocol
rs, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
c.Assert(err, IsNil)
tk2.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 0"))
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}
4 changes: 3 additions & 1 deletion planner/core/rule_build_key_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ func (ds *DataSource) BuildKeyInfo(selfSchema *expression.Schema, childSchema []
var latestIndexes map[int64]*model.IndexInfo
var changed bool
var err error
check := ds.ctx.GetSessionVars().IsIsolation(ast.ReadCommitted) || ds.isForUpdateRead
check = check && ds.ctx.GetSessionVars().ConnectionID > 0
// we should check index valid while forUpdateRead, see detail in https://github.com/pingcap/tidb/pull/22152
if ds.isForUpdateRead {
if check {
latestIndexes, changed, err = getLatestIndexInfo(ds.ctx, ds.table.Meta().ID, 0)
if err != nil {
return
Expand Down
7 changes: 6 additions & 1 deletion planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschem
execID = vars.PreparedStmtNameToID[execStmt.Name]
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
checkSchema := vars.IsIsolation(ast.ReadCommitted)
if !checkSchema {
preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt)
checkSchema = ok && preparedObj.ForUpdateRead
}
if checkSchema {
is := domain.GetDomain(sctx).InfoSchema()
return temptable.AttachLocalTemporaryTableInfoSchema(sctx, is)
}
Expand Down
3 changes: 3 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,9 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
if err != nil {
return nil, errors.Trace(err)
}
} else if s.sessionVars.IsIsolation(ast.ReadCommitted) || preparedStmt.ForUpdateRead {
is = domain.GetDomain(s).InfoSchema()
is = temptable.AttachLocalTemporaryTableInfoSchema(s, is)
} else {
is = s.GetInfoSchema().(infoschema.InfoSchema)
}
Expand Down

0 comments on commit 7ef14d0

Please sign in to comment.