Skip to content

Commit

Permalink
executor, expression: support Chunk in ProjectionExec (#5178)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Nov 22, 2017
1 parent 1b70a6d commit 55b8f9f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 5 deletions.
4 changes: 3 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,12 @@ func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
}

func (b *executorBuilder) buildProjection(v *plan.Projection) Executor {
return &ProjectionExec{
e := &ProjectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
exprs: v.Exprs,
}
e.baseExecutor.supportChk = true
return e
}

func (b *executorBuilder) buildTableDual(v *plan.TableDual) Executor {
Expand Down
29 changes: 25 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ const (
type Row = types.DatumRow

type baseExecutor struct {
children []Executor
ctx context.Context
schema *expression.Schema
supportChk bool
ctx context.Context
schema *expression.Schema
supportChk bool
children []Executor
childrenResults []*chunk.Chunk
}

// Open implements the Executor Open interface.
Expand All @@ -107,6 +108,10 @@ func (e *baseExecutor) Open() error {
return errors.Trace(err)
}
}
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
return nil
}

Expand All @@ -118,6 +123,7 @@ func (e *baseExecutor) Close() error {
return errors.Trace(err)
}
}
e.childrenResults = nil
return nil
}

Expand All @@ -129,6 +135,10 @@ func (e *baseExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *baseExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunk(e.Schema().GetTypes())
}

func (e *baseExecutor) supportChunk() bool {
if !e.supportChk {
return false
Expand Down Expand Up @@ -160,6 +170,7 @@ type Executor interface {
Open() error
Schema() *expression.Schema
supportChunk() bool
newChunk() *chunk.Chunk
NextChunk(chk *chunk.Chunk) error
}

Expand Down Expand Up @@ -433,6 +444,16 @@ func (e *ProjectionExec) Next() (retRow Row, err error) {
return row, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *ProjectionExec) NextChunk(chk *chunk.Chunk) error {
chk.Reset()
if err := e.children[0].NextChunk(e.childrenResults[0]); err != nil {
return errors.Trace(err)
}
err := expression.EvalExprsToChunk(e.ctx, e.exprs, e.childrenResults[0], chk)
return errors.Trace(err)
}

// TableDualExec represents a dual table executor.
type TableDualExec struct {
baseExecutor
Expand Down
98 changes: 98 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package expression
import (
goJSON "encoding/json"
"fmt"
"strconv"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
)

// EvalAstExpr evaluates ast expression directly.
Expand Down Expand Up @@ -85,6 +87,102 @@ type Expression interface {
ExplainInfo() string
}

// EvalExprsToChunk evaluates a list of expressions and append their results to "output" Chunk.
// TODO: Determine whether some of the expressions have internal depencies and implement a vectorized execution method if not.
func EvalExprsToChunk(ctx context.Context, exprs []Expression, input, output *chunk.Chunk) error {
sc := ctx.GetSessionVars().StmtCtx
for rowID, length := 0, input.NumRows(); rowID < length; rowID++ {
for colID, expr := range exprs {
err := evalOneCell(sc, expr, input, output, rowID, colID)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}

func evalOneCell(sc *stmtctx.StatementContext, expr Expression, input, output *chunk.Chunk, rowID, colID int) error {
switch fieldType, evalType := expr.GetType(), expr.GetType().EvalType(); evalType {
case types.ETInt:
res, isNull, err := expr.EvalInt(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeBit {
output.AppendBytes(colID, strconv.AppendUint(make([]byte, 0, 8), uint64(res), 10))
} else if mysql.HasUnsignedFlag(fieldType.Flag) {
output.AppendUint64(colID, uint64(res))
} else {
output.AppendInt64(colID, res)
}
case types.ETReal:
res, isNull, err := expr.EvalReal(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeFloat {
output.AppendFloat32(colID, float32(res))
} else {
output.AppendFloat64(colID, res)
}
case types.ETDecimal:
res, isNull, err := expr.EvalDecimal(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendMyDecimal(colID, res)
}
case types.ETDatetime, types.ETTimestamp:
res, isNull, err := expr.EvalTime(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendTime(colID, res)
}
case types.ETDuration:
res, isNull, err := expr.EvalDuration(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendDuration(colID, res)
}
case types.ETJson:
res, isNull, err := expr.EvalJSON(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else {
output.AppendJSON(colID, res)
}
case types.ETString:
res, isNull, err := expr.EvalString(input.GetRow(rowID), sc)
if err != nil {
return errors.Trace(err)
} else if isNull {
output.AppendNull(colID)
} else if fieldType.Tp == mysql.TypeEnum {
val := types.Enum{Value: uint64(0), Name: res}
output.AppendEnum(colID, val)
} else if fieldType.Tp == mysql.TypeSet {
val := types.Set{Value: uint64(0), Name: res}
output.AppendSet(colID, val)
} else {
output.AppendString(colID, res)
}
}
return nil
}

// CNFExprs stands for a CNF expression.
type CNFExprs []Expression

Expand Down

0 comments on commit 55b8f9f

Please sign in to comment.