Skip to content

Commit

Permalink
session: forbid plan cache in stale read (#33273) (#33301)
Browse files Browse the repository at this point in the history
close #31550
  • Loading branch information
ti-srebot authored Mar 25, 2022
1 parent ca0e9a0 commit eda2f72
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 25 deletions.
30 changes: 30 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor_test

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -1282,3 +1283,32 @@ func TestStaleReadNoExtraTSORequest(t *testing.T) {
tk.MustQuery("select * from t")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTSONotRequest"))
}

func TestPlanCacheWithStaleReadByBinaryProto(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
se := tk.Session()
tk.MustExec("set @a=now(6)")
time.Sleep(time.Millisecond * 5)
tk.MustExec("update t1 set v=100 where id=1")

stmtID1, _, _, err := se.PrepareStmt("select * from t1 as of timestamp @a where id=1")
require.NoError(t, err)

rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))

rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))

rs, err = se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
}
16 changes: 5 additions & 11 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,23 +2246,17 @@ func (s *session) cachedPlanExec(ctx context.Context,
// IsPointGetWithPKOrUniqueKeyByAutoCommit
func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore.CachedPrepareStmt) (bool, error) {
prepared := preparedStmt.PreparedAst
if prepared.CachedPlan == nil {
if prepared.CachedPlan == nil || preparedStmt.SnapshotTSEvaluator != nil {
return false, nil
}
// check auto commit
if !plannercore.IsAutoCommitTxn(s) {
return false, nil
}
// SnapshotTSEvaluator != nil, it is stale read
// stale read expect a stale infoschema
// so skip infoschema check
if preparedStmt.SnapshotTSEvaluator == nil {
// check schema version
is := s.GetInfoSchema().(infoschema.InfoSchema)
if prepared.SchemaVersion != is.SchemaMetaVersion() {
prepared.CachedPlan = nil
return false, nil
}
is := s.GetInfoSchema().(infoschema.InfoSchema)
if prepared.SchemaVersion != is.SchemaMetaVersion() {
prepared.CachedPlan = nil
return false, nil
}
// maybe we'd better check cached plan type here, current
// only point select/update will be cached, see "getPhysicalPlan" func
Expand Down
14 changes: 0 additions & 14 deletions sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,6 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) {
tk.MustExec("execute s2")
})

// plan cache for stmtID2
doWithCheckPath(t, se, []string{"assertTxnManagerInCachedPlanExec", "assertTxnManagerInShortPointGetPlan"}, func() {
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID2, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
})

// tx_read_ts in prepare
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1)
doWithCheckPath(t, se, path, func() {
Expand All @@ -656,13 +649,6 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) {
doWithCheckPath(t, se, normalPathRecords, func() {
tk.MustExec("execute s3")
})

// plan cache for stmtID3
doWithCheckPath(t, se, []string{"assertTxnManagerInCachedPlanExec", "assertTxnManagerInShortPointGetPlan"}, func() {
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID3, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
})
}

func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) {
Expand Down

0 comments on commit eda2f72

Please sign in to comment.