Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: improve UnionScanRead performance #32668

Merged
merged 11 commits into from
Apr 1, 2022
31 changes: 19 additions & 12 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type memTableReader struct {
buffer allocBuf
pkColIDs []int64
cacheTable kv.MemBuffer
offsets []int
}

type allocBuf struct {
Expand Down Expand Up @@ -240,18 +241,25 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
opentracing.ContextWithSpan(ctx, span1)
}
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
resultRows := make([]types.Datum, len(m.columns))
m.offsets = make([]int, len(m.columns))
for i, col := range m.columns {
m.offsets[i] = m.colIDs[col.ID]
}
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
row, err := m.decodeRecordKeyValue(key, value)
var err error
resultRows, err = m.decodeRecordKeyValue(key, value, &resultRows)
if err != nil {
return err
}

mutableRow.SetDatums(row...)
mutableRow.SetDatums(resultRows...)
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
if err != nil || !matched {
return err
}
m.addedRows = append(m.addedRows, row)
m.addedRows = append(m.addedRows, resultRows)
resultRows = make([]types.Datum, len(m.columns))
return nil
})
if err != nil {
Expand All @@ -265,30 +273,29 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
return m.addedRows, nil
}

func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum, error) {
func (m *memTableReader) decodeRecordKeyValue(key, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return nil, errors.Trace(err)
}
return m.decodeRowData(handle, value)
return m.decodeRowData(handle, value, resultRows)
}

// decodeRowData uses to decode row data value.
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte) ([]types.Datum, error) {
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
values, err := m.getRowData(handle, value)
if err != nil {
return nil, err
}
ds := make([]types.Datum, 0, len(m.columns))
for _, col := range m.columns {
offset := m.colIDs[col.ID]
d, err := tablecodec.DecodeColumnValue(values[offset], &col.FieldType, m.ctx.GetSessionVars().Location())
for i, col := range m.columns {
var datum types.Datum
err := tablecodec.DecodeColumnValueWithDatum(values[m.offsets[i]], &col.FieldType, m.ctx.GetSessionVars().Location(), &datum)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
ds = append(ds, d)
(*resultRows)[i] = datum
}
return ds, nil
return *resultRows, nil
}

// getRowData decodes raw byte slice to row data.
Expand Down
16 changes: 15 additions & 1 deletion tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ func DecodeColumnValue(data []byte, ft *types.FieldType, loc *time.Location) (ty
return colDatum, nil
}

// DecodeColumnValueWithDatum decodes data to an existing Datum according to the column info.
func DecodeColumnValueWithDatum(data []byte, ft *types.FieldType, loc *time.Location, result *types.Datum) error {
var err error
_, *result, err = codec.DecodeOne(data)
if err != nil {
return errors.Trace(err)
}
*result, err = Unflatten(*result, ft, loc)
if err != nil {
return errors.Trace(err)
}
return nil
}

// DecodeRowWithMapNew decode a row to datum map.
func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType,
loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
Expand All @@ -401,7 +415,7 @@ func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType,
return rd.DecodeToDatumMap(b, row)
}

// DecodeRowWithMap decodes a byte slice into datums with a existing row map.
// DecodeRowWithMap decodes a byte slice into datums with an existing row map.
// Row layout: colID1, value1, colID2, value2, .....
func DecodeRowWithMap(b []byte, cols map[int64]*types.FieldType, loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) {
if row == nil {
Expand Down
5 changes: 3 additions & 2 deletions util/rowcodec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rowcodec

import (
"encoding/binary"
"fmt"
"time"

Expand Down Expand Up @@ -452,7 +453,7 @@ func (decoder *BytesDecoder) tryDecodeHandle(values [][]byte, offset int, col *C
return false
}

// DecodeToBytesNoHandle decodes raw byte slice to row dat without handle.
// DecodeToBytesNoHandle decodes raw byte slice to row data without handle.
func (decoder *BytesDecoder) DecodeToBytesNoHandle(outputOffset map[int64]int, value []byte) ([][]byte, error) {
return decoder.decodeToBytesInternal(outputOffset, nil, value, nil)
}
Expand All @@ -463,7 +464,7 @@ func (decoder *BytesDecoder) DecodeToBytes(outputOffset map[int64]int, handle kv
}

func (decoder *BytesDecoder) encodeOldDatum(tp byte, val []byte) []byte {
var buf []byte
buf := make([]byte, 0, 1+binary.MaxVarintLen64+len(val))
switch tp {
case BytesFlag:
buf = append(buf, CompactBytesFlag)
Expand Down