Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, expression: support Chunk in ProjectionExec #5178

Merged
merged 4 commits into from
Nov 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,12 @@ func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
}

func (b *executorBuilder) buildProjection(v *plan.Projection) Executor {
return &ProjectionExec{
e := &ProjectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
exprs: v.Exprs,
}
e.baseExecutor.supportChk = true
return e
}

func (b *executorBuilder) buildTableDual(v *plan.TableDual) Executor {
Expand Down
29 changes: 25 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ const (
type Row = types.DatumRow

type baseExecutor struct {
children []Executor
ctx context.Context
schema *expression.Schema
supportChk bool
ctx context.Context
schema *expression.Schema
supportChk bool
children []Executor
childrenResults []*chunk.Chunk
}

// Open implements the Executor Open interface.
Expand All @@ -107,6 +108,10 @@ func (e *baseExecutor) Open() error {
return errors.Trace(err)
}
}
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
return nil
}

Expand All @@ -118,6 +123,7 @@ func (e *baseExecutor) Close() error {
return errors.Trace(err)
}
}
e.childrenResults = nil
return nil
}

Expand All @@ -129,6 +135,10 @@ func (e *baseExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *baseExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunk(e.Schema().GetTypes())
}

func (e *baseExecutor) supportChunk() bool {
if !e.supportChk {
return false
Expand Down Expand Up @@ -160,6 +170,7 @@ type Executor interface {
Open() error
Schema() *expression.Schema
supportChunk() bool
newChunk() *chunk.Chunk
NextChunk(chk *chunk.Chunk) error
}

Expand Down Expand Up @@ -433,6 +444,16 @@ func (e *ProjectionExec) Next() (retRow Row, err error) {
return row, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *ProjectionExec) NextChunk(chk *chunk.Chunk) error {
chk.Reset()
if err := e.children[0].NextChunk(e.childrenResults[0]); err != nil {
return errors.Trace(err)
}
err := expression.EvalExprsToChunk(e.ctx, e.exprs, e.childrenResults[0], chk)
return errors.Trace(err)
}

// TableDualExec represents a dual table executor.
type TableDualExec struct {
baseExecutor
Expand Down
98 changes: 98 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package expression
import (
goJSON "encoding/json"
"fmt"
"strconv"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
)

// EvalAstExpr evaluates ast expression directly.
Expand Down Expand Up @@ -85,6 +87,102 @@ type Expression interface {
ExplainInfo() string
}

// EvalExprsToChunk evaluates a list of expressions and append their results to "output" Chunk.
// TODO: Determine whether some of the expressions have internal depencies and implement a vectorized execution method if not.
func EvalExprsToChunk(ctx context.Context, exprs []Expression, input, output *chunk.Chunk) error {
sc := ctx.GetSessionVars().StmtCtx
for rowID, length := 0, input.NumRows(); rowID < length; rowID++ {
for colID, expr := range exprs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more cache friendly If we move exprs range to outer for loop.

err := evalOneCell(sc, expr, input, output, rowID, colID)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

func evalOneCell(sc *stmtctx.StatementContext, expr Expression, input, output *chunk.Chunk, rowID, colID int) error {
switch fieldType, evalType := expr.GetType(), expr.GetType().EvalType(); evalType {
case types.ETInt:
res, isNull, err := expr.EvalInt(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save this else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll do it in the next pr

output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeBit {
output.AppendBytes(colID, strconv.AppendUint(make([]byte, 0, 8), uint64(res), 10))
} else if mysql.HasUnsignedFlag(fieldType.Flag) {
output.AppendUint64(colID, uint64(res))
} else {
output.AppendInt64(colID, res)
}
case types.ETReal:
res, isNull, err := expr.EvalReal(input.GetRow(rowID), sc)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeFloat {
output.AppendFloat32(colID, float32(res))
} else {
output.AppendFloat64(colID, res)
}
case types.ETDecimal:
res, isNull, err := expr.EvalDecimal(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendMyDecimal(colID, res)
}
case types.ETDatetime, types.ETTimestamp:
res, isNull, err := expr.EvalTime(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendTime(colID, res)
}
case types.ETDuration:
res, isNull, err := expr.EvalDuration(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendDuration(colID, res)
}
case types.ETJson:
res, isNull, err := expr.EvalJSON(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendJSON(colID, res)
}
case types.ETString:
res, isNull, err := expr.EvalString(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeEnum {
val := types.Enum{Value: uint64(0), Name: res}
output.AppendEnum(colID, val)
} else if fieldType.Tp == mysql.TypeSet {
val := types.Set{Value: uint64(0), Name: res}
output.AppendSet(colID, val)
} else {
output.AppendString(colID, res)
}
}
return nil
}

// CNFExprs stands for a CNF expression.
type CNFExprs []Expression

Expand Down