From 51b4882beca6afb36ffe02574519c6b9d95ed451 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Mon, 19 Nov 2018 17:12:26 +0800 Subject: [PATCH] *: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen (#8081) (#8310) --- ddl/column.go | 3 ++- ddl/delete_range.go | 8 ++------ ddl/index.go | 22 ++++++++++++++++++---- ddl/reorg.go | 2 +- store/tikv/scan.go | 6 ++++++ structure/hash.go | 2 +- table/tables/index.go | 8 +++++--- table/tables/tables.go | 6 +++--- util/admin/admin.go | 6 ++++-- util/prefix_helper.go | 4 ++-- 10 files changed, 44 insertions(+), 23 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index a4b6023598dd4..81a8f0bc400b4 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -14,6 +14,7 @@ package ddl import ( + "math" "time" "github.com/juju/errors" @@ -304,7 +305,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI for { startTime := time.Now() handles = handles[:0] - err = iterateSnapshotRows(d.store, t, version, seekHandle, + err = iterateSnapshotRows(d.store, t, version, seekHandle, math.MaxInt64, true, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { handles = append(handles, h) if len(handles) == defaultBatchCnt { diff --git a/ddl/delete_range.go b/ddl/delete_range.go index bd76b868fdb99..d716d4265a7d9 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -14,7 +14,6 @@ package ddl import ( - "bytes" "encoding/hex" "fmt" "math" @@ -155,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { finish := true dr.keys = dr.keys[:0] err := kv.RunInNewTxn(dr.d.store, false, func(txn kv.Transaction) error { - iter, err := txn.Iter(oldStartKey, nil) + iter, err := txn.Iter(oldStartKey, r.EndKey) if err != nil { return errors.Trace(err) } @@ -165,10 +164,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { if !iter.Valid() { break } - finish = bytes.Compare(iter.Key(), r.EndKey) >= 0 - if finish { - break - } + finish = false dr.keys = append(dr.keys, iter.Key().Clone()) newStartKey = iter.Key().Next() diff --git a/ddl/index.go b/ddl/index.go index 934a384d278c0..b54c40342af70 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -550,7 +550,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false oprStartTime := time.Now() - err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, + err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0) @@ -1000,7 +1000,7 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 { // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error { +func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error { ver := kv.Version{Ver: version} snap, err := store.GetSnapshot(ver) @@ -1008,8 +1008,22 @@ func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHa if err != nil { return errors.Trace(err) } - firstKey := t.RecordKey(seekHandle) - it, err := snap.Iter(firstKey, nil) + firstKey := t.RecordKey(startHandle) + + // Calculate the exclusive upper bound + var upperBound kv.Key + if endIncluded { + if endHandle == math.MaxInt64 { + upperBound = t.RecordKey(endHandle).PrefixNext() + } else { + // PrefixNext is time costing. Try to avoid it if possible. + upperBound = t.RecordKey(endHandle + 1) + } + } else { + upperBound = t.RecordKey(endHandle) + } + + it, err := snap.Iter(firstKey, upperBound) if err != nil { return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index 53721a847e3f0..2d39c51ad4eed 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -196,7 +196,7 @@ func (d *ddl) getReorgInfo(t *meta.Meta, job *model.Job, tbl table.Table) (*reor } // Get the first handle of this table. - err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64, + err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64, math.MaxInt64, true, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { info.Handle = h return false, nil diff --git a/store/tikv/scan.go b/store/tikv/scan.go index f2ed062b583fa..65af86247caa1 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -96,6 +96,12 @@ func (s *Scanner) Next() error { continue } } + current := s.cache[s.idx] + if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 { + s.eof = true + s.Close() + return nil + } if err := s.resolveCurrentLock(bo); err != nil { s.Close() return errors.Trace(err) diff --git a/structure/hash.go b/structure/hash.go index d99809f845980..9b4e67edc1a97 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error { func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error { dataPrefix := t.hashDataKeyPrefix(key) - it, err := t.reader.Iter(dataPrefix, nil) + it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext()) if err != nil { return errors.Trace(err) } diff --git a/table/tables/index.go b/table/tables/index.go index f371ca80bdff2..eee1a8fc67671 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -232,7 +232,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues // Drop removes the KV index from store. func (c *index) Drop(rm kv.RetrieverMutator) error { - it, err := rm.Iter(c.prefix, nil) + it, err := rm.Iter(c.prefix, c.prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -261,7 +261,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues if err != nil { return nil, false, errors.Trace(err) } - it, err := r.Iter(key, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(key, upperBound) if err != nil { return nil, false, errors.Trace(err) } @@ -275,7 +276,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues // SeekFirst returns an iterator which points to the first entry of the KV index. func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) { - it, err := r.Iter(c.prefix, nil) + upperBound := c.prefix.PrefixNext() + it, err := r.Iter(c.prefix, upperBound) if err != nil { return nil, errors.Trace(err) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 7002e67468899..b9a17b97aa3fc 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -701,7 +701,8 @@ func (t *Table) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, // IterRecords implements table.Table IterRecords interface. func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { - it, err := ctx.Txn().Iter(startKey, nil) + prefix := t.RecordPrefix() + it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -717,7 +718,6 @@ func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*tab for _, col := range cols { colMap[col.ID] = &col.FieldType } - prefix := t.RecordPrefix() defaultVals := make([]types.Datum, len(cols)) for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. @@ -831,7 +831,7 @@ func (t *Table) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetStep bo // Seek implements table.Table Seek interface. func (t *Table) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) { seekKey := tablecodec.EncodeRowKeyWithHandle(t.ID, h) - iter, err := ctx.Txn().Iter(seekKey, nil) + iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext()) if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) { // No more records in the table, skip to the end. return 0, false, nil diff --git a/util/admin/admin.go b/util/admin/admin.go index 4434a2698aea4..e9660c23866f5 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -541,7 +541,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { - it, err := retriever.Iter(startKey, nil) + prefix := t.RecordPrefix() + keyUpperBound := prefix.PrefixNext() + + it, err := retriever.Iter(startKey, keyUpperBound) if err != nil { return errors.Trace(err) } @@ -557,7 +560,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab for _, col := range cols { colMap[col.ID] = &col.FieldType } - prefix := t.RecordPrefix() for it.Valid() && it.Key().HasPrefix(prefix) { // first kv pair is row lock information. // TODO: check valid lock diff --git a/util/prefix_helper.go b/util/prefix_helper.go index 015f60aa60760..63a17468c9c12 100644 --- a/util/prefix_helper.go +++ b/util/prefix_helper.go @@ -26,7 +26,7 @@ import ( // ScanMetaWithPrefix scans metadata with the prefix. func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error { - iter, err := retriever.Iter(prefix, nil) + iter, err := retriever.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke // DelKeyWithPrefix deletes keys with prefix. func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error { var keys []kv.Key - iter, err := rm.Iter(prefix, nil) + iter, err := rm.Iter(prefix, prefix.PrefixNext()) if err != nil { return errors.Trace(err) }