Skip to content

Commit

Permalink
Merge branch 'release-5.0' into release-5.0-8fca6a21a675
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Apr 15, 2021
2 parents a62e2e3 + 4ea7e4e commit b7dc176
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 25 deletions.
3 changes: 3 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (s *testSuite1) TestAnalyzeTooLongColumns(c *C) {
}

func (s *testSuite1) TestAnalyzeIndexExtractTopN(c *C) {
c.Skip("unstable")
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer func() {
Expand Down Expand Up @@ -509,6 +510,7 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) {
}

func (s *testSerialSuite2) TestFastAnalyze4GlobalStats(c *C) {
c.Skip("unstable")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
Expand Down Expand Up @@ -743,6 +745,7 @@ func (s *testSuite10) TestFailedAnalyzeRequest(c *C) {
}

func (s *testSuite1) TestExtractTopN(c *C) {
c.Skip("unstable")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down
2 changes: 2 additions & 0 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ func (s *testFlushSuite) TestFlushPrivilegesPanic(c *C) {
}

func (s *testSuite3) TestDropPartitionStats(c *C) {
c.Skip("unstable")
// Use the testSerialSuite to fix the unstable test
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table t (
Expand Down
51 changes: 45 additions & 6 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func getLockWaitTime(ctx sessionctx.Context, lockInfo *ast.SelectLockInfo) (lock
func newBatchPointGetPlan(
ctx sessionctx.Context, patternInExpr *ast.PatternInExpr,
handleCol *model.ColumnInfo, tbl *model.TableInfo, schema *expression.Schema,
names []*types.FieldName, whereColNames []string,
names []*types.FieldName, whereColNames []string, indexHints []*ast.IndexHint,
) *BatchPointGetPlan {
statsInfo := &property.StatsInfo{RowCount: float64(len(patternInExpr.List))}
var partitionColName *ast.ColumnName
Expand Down Expand Up @@ -563,7 +563,8 @@ func newBatchPointGetPlan(
}
}
for _, idxInfo := range tbl.Indices {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible ||
!indexIsAvailableByHints(idxInfo, indexHints) {
continue
}
if len(idxInfo.Columns) != len(whereColNames) || idxInfo.HasPrefixIndex() {
Expand Down Expand Up @@ -742,7 +743,7 @@ func tryWhereIn2BatchPointGet(ctx sessionctx.Context, selStmt *ast.SelectStmt) *
return nil
}

p := newBatchPointGetPlan(ctx, in, handleCol, tbl, schema, names, whereColNames)
p := newBatchPointGetPlan(ctx, in, handleCol, tbl, schema, names, whereColNames, tblName.IndexHints)
if p == nil {
return nil
}
Expand Down Expand Up @@ -824,7 +825,7 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
}

handlePair, fieldType := findPKHandle(tbl, pairs)
if handlePair.value.Kind() != types.KindNull && len(pairs) == 1 {
if handlePair.value.Kind() != types.KindNull && len(pairs) == 1 && indexIsAvailableByHints(nil, tblName.IndexHints) {
if isTableDual {
p := newPointGetPlan(ctx, tblName.Schema.O, schema, tbl, names)
p.IsTableDual = true
Expand All @@ -846,7 +847,8 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
var err error

for _, idxInfo := range tbl.Indices {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible ||
!indexIsAvailableByHints(idxInfo, tblName.IndexHints) {
continue
}
if isTableDual {
Expand All @@ -866,7 +868,6 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
p.IsTableDual = true
return p
}

idxValues, idxValueParams := getIndexValues(idxInfo, pairs)
if idxValues == nil {
continue
Expand Down Expand Up @@ -896,6 +897,44 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool
return nil
}

// indexIsAvailableByHints checks whether this index is filtered by these specified index hints.
// idxInfo is PK if it's nil
func indexIsAvailableByHints(idxInfo *model.IndexInfo, idxHints []*ast.IndexHint) bool {
if len(idxHints) == 0 {
return true
}
match := func(name model.CIStr) bool {
if idxInfo == nil {
return name.L == "primary"
}
return idxInfo.Name.L == name.L
}
// NOTICE: it's supposed that ignore hints and use/force hints will not be applied together since the effect of
// the former will be eliminated by the latter.
isIgnore := false
for _, hint := range idxHints {
if hint.HintScope != ast.HintForScan {
continue
}
if hint.HintType == ast.HintIgnore && hint.IndexNames != nil {
isIgnore = true
for _, name := range hint.IndexNames {
if match(name) {
return false
}
}
}
if (hint.HintType == ast.HintForce || hint.HintType == ast.HintUse) && hint.IndexNames != nil {
for _, name := range hint.IndexNames {
if match(name) {
return true
}
}
}
}
return isIgnore
}

func partitionNameInSet(name model.CIStr, pnames []model.CIStr) bool {
for _, pname := range pnames {
// Case insensitive, create table partition p0, query using P0 is OK.
Expand Down
46 changes: 46 additions & 0 deletions planner/core/point_get_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,3 +599,49 @@ func (s *testPointGetSuite) TestCBOShouldNotUsePointGet(c *C) {
res.Check(testkit.Rows(output[i].Res...))
}
}

func (s *testPointGetSuite) TestPointGetWithIndexHints(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
// point get
tk.MustExec("create table t(a int, b int, unique index ab(a, b), unique index ba(b, a))")
tk.MustQuery("explain format='brief' select a, b from t where a=1 and b=1").Check(testkit.Rows("Point_Get 1.00 root table:t, index:ab(a, b) "))
tk.MustQuery("explain format='brief' select a, b from t use index(ba) where a=1 and b=1").Check(testkit.Rows("Point_Get 1.00 root table:t, index:ba(b, a) "))
tk.MustQuery("explain format='brief' select a, b from t ignore index(ab, ba) where a=1 and b=1").Check(testkit.Rows(
"TableReader 1.00 root data:Selection",
"└─Selection 1.00 cop[tikv] eq(test.t.a, 1), eq(test.t.b, 1)",
" └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"))

// batch get
tk.MustQuery("explain format='brief' select a, b from t where (a=1 and b=1) or (a=2 and b=2)").Check(testkit.Rows("Batch_Point_Get 2.00 root table:t, index:ab(a, b) keep order:false, desc:false"))
tk.MustQuery("explain format='brief' select a, b from t use index(ba) where (a=1 and b=1) or (a=2 and b=2)").Check(testkit.Rows("Batch_Point_Get 2.00 root table:t, index:ba(b, a) keep order:false, desc:false"))
tk.MustQuery("explain format='brief' select a, b from t ignore index(ab, ba) where (a=1 and b=1) or (a=2 and b=2)").Check(testkit.Rows(
"TableReader 2.00 root data:Selection",
"└─Selection 2.00 cop[tikv] or(and(eq(test.t.a, 1), eq(test.t.b, 1)), and(eq(test.t.a, 2), eq(test.t.b, 2)))",
" └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"))
tk.MustQuery("explain format='brief' select a, b from t where (a, b) in ((1, 1), (2, 2))").Check(testkit.Rows("Batch_Point_Get 2.00 root table:t, index:ab(a, b) keep order:false, desc:false"))
tk.MustQuery("explain format='brief' select a, b from t use index(ba) where (a, b) in ((1, 1), (2, 2))").Check(testkit.Rows("Batch_Point_Get 2.00 root table:t, index:ba(b, a) keep order:false, desc:false"))
tk.MustQuery("explain format='brief' select a, b from t ignore index(ab, ba) where (a, b) in ((1, 1), (2, 2))").Check(testkit.Rows(
"TableReader 2.00 root data:Selection",
"└─Selection 2.00 cop[tikv] or(and(eq(test.t.a, 1), eq(test.t.b, 1)), and(eq(test.t.a, 2), eq(test.t.b, 2)))",
" └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"))

// primary key
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int primary key, b int, unique index ab(a, b))")
tk.MustQuery("explain format='brief' select a from t1 where a=1").Check(testkit.Rows("Point_Get 1.00 root table:t1 handle:1"))
tk.MustQuery("explain format='brief' select a from t1 use index(ab) where a=1").Check(testkit.Rows(
"IndexReader 10.00 root index:IndexRangeScan",
"└─IndexRangeScan 10.00 cop[tikv] table:t1, index:ab(a, b) range:[1,1], keep order:false, stats:pseudo"))

// other cases
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (a int, b int, unique index aa(a), unique index bb(b))")
tk.MustQuery("explain format='brief' select a from t2 ignore index(bb) where a=1").Check(testkit.Rows("Point_Get 1.00 root table:t2, index:aa(a) "))
tk.MustQuery("explain format='brief' select a from t2 use index(bb) where a=1").Check(testkit.Rows(
"IndexLookUp 1.00 root ",
"├─IndexFullScan(Build) 10000.00 cop[tikv] table:t2, index:bb(b) keep order:false, stats:pseudo",
"└─Selection(Probe) 1.00 cop[tikv] eq(test.t2.a, 1)",
" └─TableRowIDScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo"))
}
1 change: 1 addition & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,7 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) {
}

func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
c.Skip("unstable")
// TODO: implement commit_ts calculation in unistore
if !*withTiKV {
return
Expand Down
4 changes: 3 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,9 @@ func CreateSessionWithOpt(store kv.Storage, opt *Opt) (Session, error) {
// which periodically updates stats using the collected data.
if do.StatsHandle() != nil && do.StatsUpdating() {
s.statsCollector = do.StatsHandle().NewSessionStatsCollector()
s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector()
}
}

return s, nil
Expand Down
7 changes: 6 additions & 1 deletion session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {

ddlLease := time.Duration(atomic.LoadInt64(&schemaLease))
statisticLease := time.Duration(atomic.LoadInt64(&statsLease))
idxUsageSyncLease := time.Duration(atomic.LoadInt64(&indexUsageSyncLease))
idxUsageSyncLease := GetIndexUsageSyncLease()
err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) {
logutil.BgLogger().Info("new domain",
zap.String("store", store.UUID()),
Expand Down Expand Up @@ -161,6 +161,11 @@ func SetIndexUsageSyncLease(lease time.Duration) {
atomic.StoreInt64(&indexUsageSyncLease, int64(lease))
}

// GetIndexUsageSyncLease returns the index usage sync lease time.
func GetIndexUsageSyncLease() time.Duration {
return time.Duration(atomic.LoadInt64(&indexUsageSyncLease))
}

// DisableStats4Test disables the stats for tests.
func DisableStats4Test() {
SetStatsLease(-1)
Expand Down
25 changes: 25 additions & 0 deletions session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,28 @@ func (s *testMainSuite) TestKeysNeedLock(c *C) {
c.Assert(flag.HasPresumeKeyNotExists(), IsTrue)
c.Assert(keyNeedToLock(indexKey, deleteVal, flag), IsTrue)
}

func (s *testMainSuite) TestIndexUsageSyncLease(c *C) {
store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
do, err := BootstrapSession(store)
c.Assert(err, IsNil)
do.SetStatsUpdating(true)
st, err := CreateSessionWithOpt(store, nil)
c.Assert(err, IsNil)
se, ok := st.(*session)
c.Assert(ok, IsTrue)
c.Assert(se.idxUsageCollector, IsNil)

SetIndexUsageSyncLease(1)
defer SetIndexUsageSyncLease(0)
st, err = CreateSessionWithOpt(store, nil)
c.Assert(err, IsNil)
se, ok = st.(*session)
c.Assert(ok, IsTrue)
c.Assert(se.idxUsageCollector, NotNil)

do.Close()
err = store.Close()
c.Assert(err, IsNil)
}
4 changes: 4 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,8 @@ type statsSerialSuite struct {

func (s *statsSerialSuite) TestIndexUsageInformation(c *C) {
defer cleanEnv(c, s.store, s.do)
session.SetIndexUsageSyncLease(1)
defer session.SetIndexUsageSyncLease(0)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t_idx(a int, b int)")
Expand Down Expand Up @@ -1962,6 +1964,8 @@ func (s *statsSerialSuite) TestIndexUsageInformation(c *C) {

func (s *statsSerialSuite) TestGCIndexUsageInformation(c *C) {
defer cleanEnv(c, s.store, s.do)
session.SetIndexUsageSyncLease(1)
defer session.SetIndexUsageSyncLease(0)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t_idx(a int, b int)")
Expand Down
20 changes: 9 additions & 11 deletions store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,25 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
// checked there.

if c.sessionID > 0 {
failpoint.Inject("beforePrewrite", func() {})

failpoint.Inject("prewritePrimaryFail", func() {
if batch.isPrimary {
if batch.isPrimary {
failpoint.Inject("prewritePrimaryFail", func() {
// Delay to avoid cancelling other normally ongoing prewrite requests.
time.Sleep(time.Millisecond * 50)
logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting primary batch",
zap.Uint64("txnStartTS", c.startTS))
failpoint.Return(errors.New("injected error on prewriting primary batch"))
}
})

failpoint.Inject("prewriteSecondaryFail", func() {
if !batch.isPrimary {
})
failpoint.Inject("prewritePrimary", nil) // for other failures like sleep or pause
} else {
failpoint.Inject("prewriteSecondaryFail", func() {
// Delay to avoid cancelling other normally ongoing prewrite requests.
time.Sleep(time.Millisecond * 50)
logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting secondary batch",
zap.Uint64("txnStartTS", c.startTS))
failpoint.Return(errors.New("injected error on prewriting secondary batch"))
}
})
})
failpoint.Inject("prewriteSecondary", nil) // for other failures like sleep or pause
}
}

txnSize := uint64(c.regionTxnSize[batch.region.id])
Expand Down
19 changes: 13 additions & 6 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tables
import (
"context"
"io"
"sync"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -81,11 +82,15 @@ func (c *indexIter) Next() (indexData []types.Datum, h kv.Handle, err error) {

// index is the data structure for index data in the KV store.
type index struct {
idxInfo *model.IndexInfo
tblInfo *model.TableInfo
prefix kv.Key
needRestoredData bool
phyTblID int64
idxInfo *model.IndexInfo
tblInfo *model.TableInfo
prefix kv.Key
phyTblID int64
// initNeedRestoreData is used to initialize `needRestoredData` in `index.Create()`.
// This routine cannot be done in `NewIndex()` because `needRestoreData` relies on `NewCollationEnabled()` and
// the collation global variable is initialized *after* `NewIndex()`.
initNeedRestoreData sync.Once
needRestoredData bool
}

// NeedRestoredData checks whether the index columns needs restored data.
Expand Down Expand Up @@ -120,7 +125,6 @@ func NewIndex(physicalID int64, tblInfo *model.TableInfo, indexInfo *model.Index
prefix: prefix,
phyTblID: physicalID,
}
index.needRestoredData = NeedRestoredData(indexInfo.Columns, tblInfo.Columns)
return index
}

Expand Down Expand Up @@ -175,6 +179,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue

// save the key buffer to reuse.
writeBufs.IndexKeyBuf = key
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, indexedValues, h, c.phyTblID, handleRestoreData)
if err != nil {
return nil, err
Expand Down

0 comments on commit b7dc176

Please sign in to comment.