Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen (#8081) #8257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error {
iter, err := txn.Iter(oldStartKey, nil)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -164,10 +163,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1183,16 +1183,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(priority)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Iter(firstKey, nil)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior
startHandle = math.MinInt64
endHandle = math.MaxInt64
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64,
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandle = h
return false, nil
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (s *Scanner) Next() error {
}

current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
// Try to resolve the lock
if current.GetError() != nil {
// 'current' would be modified if the lock being resolved
Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error {

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.Iter(dataPrefix, nil)
it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues

// Drop removes the KV index from store.
func (c *index) Drop(rm kv.RetrieverMutator) error {
it, err := rm.Iter(c.prefix, nil)
it, err := rm.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -270,7 +270,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
return nil, false, errors.Trace(err)
}

it, err := r.Iter(key, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -284,7 +285,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
it, err := r.Iter(c.prefix, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMu
// IterRecords implements table.Table IterRecords interface.
func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := ctx.Txn().Iter(startKey, nil)
prefix := t.RecordPrefix()
it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -782,7 +783,6 @@ func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
defaultVals := make([]types.Datum, len(cols))
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand Down Expand Up @@ -896,7 +896,7 @@ func (t *tableCommon) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetS
// Seek implements table.Table Seek interface.
func (t *tableCommon) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) {
seekKey := tablecodec.EncodeRowKeyWithHandle(t.physicalTableID, h)
iter, err := ctx.Txn().Iter(seekKey, nil)
iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext())
if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) {
// No more records in the table, skip to the end.
return 0, false, nil
Expand Down
6 changes: 4 additions & 2 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h
// genExprs use to calculate generated column value.
func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc, genExprs map[string]expression.Expression) error {
it, err := retriever.Iter(startKey, nil)
prefix := t.RecordPrefix()
keyUpperBound := prefix.PrefixNext()

it, err := retriever.Iter(startKey, keyUpperBound)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -663,7 +666,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
}
}

prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
Expand Down
4 changes: 2 additions & 2 deletions util/prefix_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error {
iter, err := retriever.Iter(prefix, nil)
iter, err := retriever.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke
// DelKeyWithPrefix deletes keys with prefix.
func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error {
var keys []kv.Key
iter, err := rm.Iter(prefix, nil)
iter, err := rm.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down