Skip to content

Commit

Permalink
planner: plan cache always check scheme valid in RC isolation level (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 committed May 13, 2022
1 parent cd297b9 commit 0703a64
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 6 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ type pessimisticTxn interface {
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
// buildExecutor build an executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
Expand Down
4 changes: 3 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4488,7 +4488,9 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
changed bool
err error
)
if ds.isForUpdateRead {
check := ds.ctx.GetSessionVars().IsIsolation(ast.ReadCommitted) || ds.isForUpdateRead
check = check && ds.ctx.GetSessionVars().ConnectionID > 0
if check {
latestIndexes, changed, err = getLatestIndexInfo(ds.ctx, ds.table.Meta().ID, 0)
if err != nil {
ds.fdSet = fds
Expand Down
4 changes: 3 additions & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i

optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes

check = check || ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error
Expand Down Expand Up @@ -1594,7 +1595,8 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))

check := b.isForUpdateRead && b.ctx.GetSessionVars().ConnectionID > 0
check := b.isForUpdateRead || b.ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && b.ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

Expand Down
1 change: 1 addition & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
return nil
}

check = check || ctx.GetSessionVars().IsIsolation(ast.ReadCommitted)
check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error
Expand Down
47 changes: 47 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2805,3 +2805,50 @@ func TestCachedTable(t *testing.T) {
require.True(t, lastReadFromCache(tk))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func TestPlanCacheWithRCWhenInfoSchemaChange(t *testing.T) {
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

ctx := context.Background()
store, clean := testkit.CreateMockStore(t)
defer clean()

tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("drop table if exists t1")
tk1.MustExec("create table t1(id int primary key, c int, index ic (c))")
// prepare text protocol
tk1.MustExec("prepare s from 'select /*+use_index(t1, ic)*/ * from t1 where 1'")
// prepare binary protocol
stmtID, _, _, err := tk2.Session().PrepareStmt("select /*+use_index(t1, ic)*/ * from t1 where 1")
require.Nil(t, err)
tk1.MustExec("set tx_isolation='READ-COMMITTED'")
tk1.MustExec("begin pessimistic")
tk2.MustExec("set tx_isolation='READ-COMMITTED'")
tk2.MustExec("begin pessimistic")
tk1.MustQuery("execute s").Check(testkit.Rows())
rs, err := tk2.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
require.Nil(t, err)
tk2.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows())

tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")
tk3.MustExec("alter table t1 drop index ic")
tk3.MustExec("insert into t1 values(1, 0)")

// The execution after schema changed should not hit plan cache.
// execute text protocol
tk1.MustQuery("execute s").Check(testkit.Rows("1 0"))
tk1.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// execute binary protocol
rs, err = tk2.Session().ExecutePreparedStmt(ctx, stmtID, []types.Datum{})
require.Nil(t, err)
tk2.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 0"))
tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}
4 changes: 3 additions & 1 deletion planner/core/rule_build_key_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,10 @@ func (ds *DataSource) BuildKeyInfo(selfSchema *expression.Schema, childSchema []
var latestIndexes map[int64]*model.IndexInfo
var changed bool
var err error
check := ds.ctx.GetSessionVars().IsIsolation(ast.ReadCommitted) || ds.isForUpdateRead
check = check && ds.ctx.GetSessionVars().ConnectionID > 0
// we should check index valid while forUpdateRead, see detail in https://github.com/pingcap/tidb/pull/22152
if ds.isForUpdateRead {
if check {
latestIndexes, changed, err = getLatestIndexInfo(ds.ctx, ds.table.Meta().ID, 0)
if err != nil {
return
Expand Down
7 changes: 6 additions & 1 deletion planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschem
execID = vars.PreparedStmtNameToID[execStmt.Name]
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
checkSchema := vars.IsIsolation(ast.ReadCommitted)
if !checkSchema {
preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt)
checkSchema = ok && preparedObj.ForUpdateRead
}
if checkSchema {
is := domain.GetDomain(sctx).InfoSchema()
return temptable.AttachLocalTemporaryTableInfoSchema(sctx, is)
}
Expand Down
3 changes: 2 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2373,8 +2373,9 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
if err != nil {
return nil, err
}
} else if preparedStmt.ForUpdateRead {
} else if s.sessionVars.IsIsolation(ast.ReadCommitted) || preparedStmt.ForUpdateRead {
is = domain.GetDomain(s).InfoSchema()
is = temptable.AttachLocalTemporaryTableInfoSchema(s, is)
} else {
is = s.GetInfoSchema().(infoschema.InfoSchema)
}
Expand Down

0 comments on commit 0703a64

Please sign in to comment.