diff --git a/ddl/delete_range.go b/ddl/delete_range.go index d0e242662fd48..2eeb9b7321ae3 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -14,7 +14,6 @@ package ddl import ( - "bytes" "encoding/hex" "fmt" "math" @@ -154,7 +153,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { finish := true dr.keys = dr.keys[:0] err := kv.RunInNewTxn(dr.store, false, func(txn kv.Transaction) error { - iter, err := txn.Iter(oldStartKey, nil) + iter, err := txn.Iter(oldStartKey, r.EndKey) if err != nil { return errors.Trace(err) } @@ -164,10 +163,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { if !iter.Valid() { break } - finish = bytes.Compare(iter.Key(), r.EndKey) >= 0 - if finish { - break - } + finish = false dr.keys = append(dr.keys, iter.Key().Clone()) newStartKey = iter.Key().Next() diff --git a/ddl/index.go b/ddl/index.go index bcd690383490c..a8fa89e39e36e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -622,7 +622,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, + err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0) @@ -1207,7 +1207,7 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 { // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error { +func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error { ver := kv.Version{Ver: version} snap, err := store.GetSnapshot(ver) @@ -1215,8 +1215,22 @@ func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version if err != nil { return errors.Trace(err) } - firstKey := t.RecordKey(seekHandle) - it, err := snap.Iter(firstKey, nil) + firstKey := t.RecordKey(startHandle) + + // Calculate the exclusive upper bound + var upperBound kv.Key + if endIncluded { + if endHandle == math.MaxInt64 { + upperBound = t.RecordKey(endHandle).PrefixNext() + } else { + // PrefixNext is time costing. Try to avoid it if possible. + upperBound = t.RecordKey(endHandle + 1) + } + } else { + upperBound = t.RecordKey(endHandle) + } + + it, err := snap.Iter(firstKey, upperBound) if err != nil { return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index a793e0c402c71..d69bb257277b4 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -304,7 +304,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior startHandle = math.MinInt64 endHandle = math.MaxInt64 // Get the start handle of this partition. - err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, + err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64, math.MaxInt64, true, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { startHandle = h return false, nil diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 52ce4ca23df2a..6bf1813bbcf41 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -98,6 +98,11 @@ func (s *Scanner) Next() error { } current := s.cache[s.idx] + if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + s.eof = true + s.Close() + return nil + } // Try to resolve the lock if current.GetError() != nil { // 'current' would be modified if the lock being resolved diff --git a/structure/hash.go b/structure/hash.go index 8921bbe0db0af..609c4df398b68 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error { func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error { dataPrefix := t.hashDataKeyPrefix(key) - it, err := t.reader.Iter(dataPrefix, nil) + it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext()) if err != nil { return errors.Trace(err) } diff --git a/table/tables/index.go b/table/tables/index.go index ec87c3727089a..d86c45c6c1d72 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -240,7 +240,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues // Drop removes the KV index from store. func (c *index) Drop(rm kv.RetrieverMutator) error { - it, err := rm.Iter(c.prefix, nil) + it, err := rm.Iter(c.prefix, c.prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -270,7 +270,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues return nil, false, errors.Trace(err) } - it, err := r.Iter(key, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(key, upperBound) if err != nil { return nil, false, errors.Trace(err) } @@ -284,7 +285,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues // SeekFirst returns an iterator which points to the first entry of the KV index. func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) { - it, err := r.Iter(c.prefix, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(c.prefix, upperBound) if err != nil { return nil, errors.Trace(err) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 9d8c01022fd9b..d1126be735582 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -782,7 +782,8 @@ func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMu // IterRecords implements table.Table IterRecords interface. func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { - it, err := ctx.Txn().Iter(startKey, nil) + prefix := t.RecordPrefix() + it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -798,7 +799,6 @@ func (t *tableCommon) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols for _, col := range cols { colMap[col.ID] = &col.FieldType } - prefix := t.RecordPrefix() defaultVals := make([]types.Datum, len(cols)) for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. @@ -912,7 +912,7 @@ func (t *tableCommon) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetS // Seek implements table.Table Seek interface. func (t *tableCommon) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) { seekKey := tablecodec.EncodeRowKeyWithHandle(t.physicalTableID, h) - iter, err := ctx.Txn().Iter(seekKey, nil) + iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext()) if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) { // No more records in the table, skip to the end. return 0, false, nil diff --git a/util/admin/admin.go b/util/admin/admin.go index 9aa38113ba13c..ab324eb51f3d6 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -639,7 +639,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h // genExprs use to calculate generated column value. func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc, genExprs map[model.TableColumnID]expression.Expression) error { - it, err := retriever.Iter(startKey, nil) + prefix := t.RecordPrefix() + keyUpperBound := prefix.PrefixNext() + + it, err := retriever.Iter(startKey, keyUpperBound) if err != nil { return errors.Trace(err) } @@ -651,7 +654,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab log.Debugf("startKey:%q, key:%q, value:%q", startKey, it.Key(), it.Value()) rowDecoder := makeRowDecoder(t, cols, genExprs) - prefix := t.RecordPrefix() for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. // TODO: check valid lock diff --git a/util/prefix_helper.go b/util/prefix_helper.go index 214414a9cef64..cb01e3b57b102 100644 --- a/util/prefix_helper.go +++ b/util/prefix_helper.go @@ -26,7 +26,7 @@ import ( // ScanMetaWithPrefix scans metadata with the prefix. func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error { - iter, err := retriever.Iter(prefix, nil) + iter, err := retriever.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke // DelKeyWithPrefix deletes keys with prefix. func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error { var keys []kv.Key - iter, err := rm.Iter(prefix, nil) + iter, err := rm.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) }