Skip to content

Commit

Permalink
executor/join : use shallow copy for semi join. (#7433)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Aug 29, 2018
1 parent c625c27 commit 360567b
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 22 deletions.
47 changes: 25 additions & 22 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func newJoiner(ctx sessionctx.Context, joinType plan.JoinType,
colTypes := make([]*types.FieldType, 0, len(lhsColTypes)+len(rhsColTypes))
colTypes = append(colTypes, lhsColTypes...)
colTypes = append(colTypes, rhsColTypes...)
base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize)
base.selected = make([]bool, 0, chunk.InitialCapacity)
if joinType == plan.LeftOuterJoin || joinType == plan.RightOuterJoin {
innerColTypes := lhsColTypes
Expand All @@ -102,18 +101,25 @@ func newJoiner(ctx sessionctx.Context, joinType plan.JoinType,
}
switch joinType {
case plan.SemiJoin:
base.shallowRow = chunk.MutRowFromTypes(colTypes)
return &semiJoiner{base}
case plan.AntiSemiJoin:
base.shallowRow = chunk.MutRowFromTypes(colTypes)
return &antiSemiJoiner{base}
case plan.LeftOuterSemiJoin:
base.shallowRow = chunk.MutRowFromTypes(colTypes)
return &leftOuterSemiJoiner{base}
case plan.AntiLeftOuterSemiJoin:
base.shallowRow = chunk.MutRowFromTypes(colTypes)
return &antiLeftOuterSemiJoiner{base}
case plan.LeftOuterJoin:
base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize)
return &leftOuterJoiner{base}
case plan.RightOuterJoin:
base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize)
return &rightOuterJoiner{base}
case plan.InnerJoin:
base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize)
return &innerJoiner{base}
}
panic("unsupported join type in func newJoiner()")
Expand All @@ -125,6 +131,7 @@ type baseJoiner struct {
defaultInner chunk.Row
outerIsRight bool
chk *chunk.Chunk
shallowRow chunk.MutRow
selected []bool
maxChunkSize int
}
Expand All @@ -142,6 +149,15 @@ func (j *baseJoiner) makeJoinRowToChunk(chk *chunk.Chunk, lhs, rhs chunk.Row) {
chk.AppendPartialRow(lhs.Len(), rhs)
}

// makeShallowJoinRow shallow copies `inner` and `outer` into `shallowRow`.
func (j *baseJoiner) makeShallowJoinRow(isRightJoin bool, inner, outer chunk.Row) {
if !isRightJoin {
inner, outer = outer, inner
}
j.shallowRow.ShallowCopyPartialRow(0, inner)
j.shallowRow.ShallowCopyPartialRow(inner.Len(), outer)
}

func (j *baseJoiner) filter(input, output *chunk.Chunk) (matched bool, err error) {
j.selected, err = expression.VectorizedFilter(j.ctx, j.conditions, chunk.NewIterator4Chunk(input), j.selected)
if err != nil {
Expand Down Expand Up @@ -173,14 +189,9 @@ func (j *semiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chu
}

for inner := inners.Current(); inner != inners.End(); inner = inners.Next() {
j.chk.Reset()
if j.outerIsRight {
j.makeJoinRowToChunk(j.chk, inner, outer)
} else {
j.makeJoinRowToChunk(j.chk, outer, inner)
}
j.makeShallowJoinRow(j.outerIsRight, inner, outer)

matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0))
matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow())
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -212,14 +223,9 @@ func (j *antiSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk
}

for inner := inners.Current(); inner != inners.End(); inner = inners.Next() {
j.chk.Reset()
if j.outerIsRight {
j.makeJoinRowToChunk(j.chk, inner, outer)
} else {
j.makeJoinRowToChunk(j.chk, outer, inner)
}
j.makeShallowJoinRow(j.outerIsRight, inner, outer)

matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0))
matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow())
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -252,10 +258,9 @@ func (j *leftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator,
}

for inner := inners.Current(); inner != inners.End(); inner = inners.Next() {
j.chk.Reset()
j.makeJoinRowToChunk(j.chk, outer, inner)
j.makeShallowJoinRow(false, inner, outer)

matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0))
matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow())
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -295,10 +300,9 @@ func (j *antiLeftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Itera
}

for inner := inners.Current(); inner != inners.End(); inner = inners.Next() {
j.chk.Reset()
j.makeJoinRowToChunk(j.chk, outer, inner)
matched, err := expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0))
j.makeShallowJoinRow(false, inner, outer)

matched, err := expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow())
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -330,7 +334,6 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk
if inners.Len() == 0 {
return false, nil
}

j.chk.Reset()
chkForJoin := j.chk
if len(j.conditions) == 0 {
Expand Down
12 changes: 12 additions & 0 deletions util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ func newChunk(elemLen ...int) *Chunk {
return chk
}

func newChunkWithInitCap(cap int, elemLen ...int) *Chunk {
chk := &Chunk{}
for _, l := range elemLen {
if l > 0 {
chk.columns = append(chk.columns, newFixedLenColumn(l, cap))
} else {
chk.columns = append(chk.columns, newVarLenColumn(cap, nil))
}
}
return chk
}

var allTypes = []*types.FieldType{
types.NewFieldType(mysql.TypeTiny),
types.NewFieldType(mysql.TypeShort),
Expand Down
23 changes: 23 additions & 0 deletions util/chunk/mutrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,26 @@ func setMutRowJSON(col *column, j json.BinaryJSON) {
copy(col.data[1:], j.Value)
col.offsets[1] = int32(dataLen)
}

// ShallowCopyPartialRow shallow copies the data of `row` to MutRow.
func (mr MutRow) ShallowCopyPartialRow(colIdx int, row Row) {
for i, srcCol := range row.c.columns {
dstCol := mr.c.columns[colIdx+i]
if !srcCol.isNull(row.idx) {
// MutRow only contains one row, so we can directly set the whole byte.
dstCol.nullBitmap[0] = 1
} else {
dstCol.nullBitmap[0] = 0
}

if srcCol.isFixed() {
elemLen := len(srcCol.elemBuf)
offset := row.idx * elemLen
dstCol.data = srcCol.data[offset : offset+elemLen]
} else {
start, end := srcCol.offsets[row.idx], srcCol.offsets[row.idx+1]
dstCol.data = srcCol.data[start:end]
dstCol.offsets[1] = int32(len(dstCol.data))
}
}
}
62 changes: 62 additions & 0 deletions util/chunk/mutrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package chunk

import (
"testing"
"time"

"github.com/pingcap/check"
"github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -134,3 +135,64 @@ func BenchmarkMutRowFromValues(b *testing.B) {
MutRowFromValues(values)
}
}

func (s *testChunkSuite) TestMutRowShallowCopyPartialRow(c *check.C) {
colTypes := make([]*types.FieldType, 0, 3)
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeTimestamp})

mutRow := MutRowFromTypes(colTypes)
row := MutRowFromValues("abc", 123, types.ZeroTimestamp).ToRow()
mutRow.ShallowCopyPartialRow(0, row)
c.Assert(row.GetString(0), check.Equals, mutRow.ToRow().GetString(0))
c.Assert(row.GetInt64(1), check.Equals, mutRow.ToRow().GetInt64(1))
c.Assert(row.GetTime(2), check.DeepEquals, mutRow.ToRow().GetTime(2))

row.c.Reset()
d := types.NewStringDatum("dfg")
row.c.AppendDatum(0, &d)
d = types.NewIntDatum(567)
row.c.AppendDatum(1, &d)
d = types.NewTimeDatum(types.Time{Time: types.FromGoTime(time.Now()), Fsp: 6, Type: mysql.TypeTimestamp})
row.c.AppendDatum(2, &d)

c.Assert(d.GetMysqlTime(), check.DeepEquals, mutRow.ToRow().GetTime(2))
c.Assert(row.GetString(0), check.Equals, mutRow.ToRow().GetString(0))
c.Assert(row.GetInt64(1), check.Equals, mutRow.ToRow().GetInt64(1))
c.Assert(row.GetTime(2), check.DeepEquals, mutRow.ToRow().GetTime(2))
}

var rowsNum = 1024

func BenchmarkMutRowShallowCopyPartialRow(b *testing.B) {
b.ReportAllocs()
colTypes := make([]*types.FieldType, 0, 8)
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong})
colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeDatetime})

mutRow := MutRowFromTypes(colTypes)
row := MutRowFromValues("abc", "abcdefg", 123, 456, types.ZeroDatetime).ToRow()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < rowsNum; j++ {
mutRow.ShallowCopyPartialRow(0, row)
}
}
}

func BenchmarkChunkAppendPartialRow(b *testing.B) {
b.ReportAllocs()
chk := newChunkWithInitCap(rowsNum, 0, 0, 8, 8, 16)
row := MutRowFromValues("abc", "abcdefg", 123, 456, types.ZeroDatetime).ToRow()
b.ResetTimer()
for i := 0; i < b.N; i++ {
chk.Reset()
for j := 0; j < rowsNum; j++ {
chk.AppendPartialRow(0, row)
}
}
}

0 comments on commit 360567b

Please sign in to comment.