Skip to content

Commit

Permalink
Merge pull request #4 from spongedu/complete_stream_agg
Browse files Browse the repository at this point in the history
Complete stream agg
  • Loading branch information
qiuyesuifeng authored Dec 1, 2018
2 parents edabda9 + 3fd0062 commit d0deb5a
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 21 deletions.
8 changes: 8 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -51,6 +52,13 @@ func onCreateStream(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}

// Fill in TableInfo.StreamWinCol
for _, c := range tbInfo.Columns {
if c.Tp == mysql.TypeTimestamp {
tbInfo.StreamWinCol = c.Name.L
}
}

switch tbInfo.State {
case model.StateNone:
// none -> public
Expand Down
119 changes: 105 additions & 14 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,20 +940,59 @@ type StreamWindowHashAggExec struct {
defaultVal *chunk.Chunk

childResult *chunk.Chunk

lastIter *chunk.Iterator4Chunk

winCol string
winColIdx int
winSize uint64

windowStart types.Time
windowEnd types.Time
needSetWindow bool
init bool
}

// Open implements the Executor Open interface.
func (e *StreamWindowHashAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
found := false
for i, c := range e.children[0].Schema().Columns {
if c.ColName.L == e.winCol {
e.winColIdx = i
found = true
break
}
}
if !found {
return errors.New("Fail to find window col")
}
e.reset()
e.childResult = e.children[0].newFirstChunk()
e.init = false
e.needSetWindow = true
e.lastIter = nil
return nil
}

func (e *StreamWindowHashAggExec) reset() {
e.prepared = false
e.cursor4GroupKey = 0
e.groupSet = set.NewStringSet()
e.partialResultMap = make(aggPartialResultMapper, 0)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newFirstChunk()
return nil
e.groupKeys = nil
}

func (e *StreamWindowHashAggExec) shouldReset() bool {
return e.needSetWindow
}

func (e *StreamWindowHashAggExec) shouldStop() bool {
return e.childResult.NumRows() == 0
}

// Next implements the Executor Next interface.
Expand All @@ -963,6 +1002,14 @@ func (e *StreamWindowHashAggExec) Next(ctx context.Context, chk *chunk.Chunk) er
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
//TODO: Deal with error
if e.shouldStop() && (e.init) {
return nil
}
if e.shouldReset() {
e.reset()
}
e.init = true
return errors.Trace(e.next(ctx, chk))
}

Expand All @@ -974,11 +1021,12 @@ func (e *StreamWindowHashAggExec) Close() error {
return nil
}

// unparallelExec executes hash aggregation algorithm in single thread.
func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) error {
// In this stage we consider all data from src as a single group.
//fmt.Println("XXXXXXXXXXXXXXXXX")
if !e.prepared {
err := e.execute(ctx)
//fmt.Println("55555")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -993,18 +1041,23 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er
e.prepared = true
}
chk.Reset()
//fmt.Println("666666")

// Since we return e.maxChunkSize rows every time, so we should not traverse
// `groupSet` because of its randomness.
var i = 0
for ; e.cursor4GroupKey < len(e.groupKeys); e.cursor4GroupKey++ {
i++
partialResults := e.getPartialResults(e.groupKeys[e.cursor4GroupKey])
if len(e.PartialAggFuncs) == 0 {
chk.SetNumVirtualRows(chk.NumRows() + 1)
}
for i, af := range e.PartialAggFuncs {
af.AppendFinalResult2Chunk(e.ctx, partialResults[i], chk)
}
if chk.NumRows() == e.maxChunkSize {
chk.AppendTime(len(e.schema.Columns) - 2, e.windowStart)
chk.AppendTime(len(e.schema.Columns) - 1, e.windowEnd)
if chk.NumRows() == e.maxChunkSize { //|| e.s {
e.cursor4GroupKey++
return nil
}
Expand All @@ -1014,33 +1067,71 @@ func (e *StreamWindowHashAggExec) next(ctx context.Context, chk *chunk.Chunk) er

// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *StreamWindowHashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
//fmt.Println("=== StreamWindowHashAggExec.execute === ")
var inputIter *chunk.Iterator4Chunk
var row chunk.Row
for {
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childResult.NumRows() == 0 {
return nil
}
for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() {
//fmt.Println("e.lastIter == nil = %+v", e.lastIter == nil)
if e.lastIter == nil {
inputIter = chunk.NewIterator4Chunk(e.childResult)
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
if e.childResult.NumRows() == 0 {
return nil
}
row = inputIter.Begin()
} else {
inputIter = e.lastIter
row = inputIter.Current()
}
for ;row != inputIter.End(); row = inputIter.Next() {
//fmt.Println("999999999")
tm := row.GetTime(e.winColIdx)
if e.needSetWindow {
e.windowStart = tm
e.windowEnd, err = e.windowStart.Add(e.ctx.GetSessionVars().StmtCtx, types.Duration{Duration: time.Duration(int(e.winSize)) * time.Second})
e.needSetWindow = false
}
//fmt.Printf("win_start=%s, win_end=%s\n",e.windowStart.String(), e.windowEnd.String())
//fmt.Printf("tm=%s\n",tm)
//fmt.Printf("FFFFFF\n")
if tm.Compare(e.windowEnd) == 1 {
e.needSetWindow = true
}
//fmt.Printf("e.needSetWindow=%v\n", e.needSetWindow)
//fmt.Printf("e.shouldStop() =%v\n", e.shouldStop())
//fmt.Printf("row != inputIter.End() =%v\n", row == inputIter.End())
if e.needSetWindow || e.shouldStop(){
if row == inputIter.End() {
e.lastIter = nil
} else {
e.lastIter = inputIter
}
return nil
}
groupKey, err := e.getGroupKey(row)
//fmt.Println("111")
if err != nil {
return errors.Trace(err)
}
//fmt.Println("222")
if !e.groupSet.Exist(groupKey) {
e.groupSet.Insert(groupKey)
e.groupKeys = append(e.groupKeys, groupKey)
}
//fmt.Println("333")
partialResults := e.getPartialResults(groupKey)
//fmt.Println("444")
for i, af := range e.PartialAggFuncs {
err = af.UpdatePartialResult(e.ctx, []chunk.Row{row}, partialResults[i])
if err != nil {
return errors.Trace(err)
}
}
}
e.lastIter = nil
}
}

Expand Down
7 changes: 7 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,11 @@ func (b *executorBuilder) buildProjBelowAgg(aggFuncs []*aggregation.AggFuncDesc,
groupByItems[i] = newArg
cursor++
}
for i, r := range src.Schema().Columns {
projSchemaCols = append(projSchemaCols, r.Clone().(*expression.Column))
projExprs = append(projExprs, expression.Column2Exprs([]*expression.Column{r})...)
projExprs[len(projExprs)-1].(*expression.Column).Index = i
}

return &ProjectionExec{
baseExecutor: newBaseExecutor(b.ctx, expression.NewSchema(projSchemaCols...), projFromID, src),
Expand Down Expand Up @@ -1120,6 +1125,8 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
defaultVal: e.defaultVal,

childResult: e.childResult,
winCol: v.StreamWindow.WinColName,
winSize: v.StreamWindow.Size,
}
}

Expand Down
80 changes: 79 additions & 1 deletion executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (e *ShowExec) fetchAll() error {
return e.fetchShowColumns()
case ast.ShowCreateTable:
return e.fetchShowCreateTable()
case ast.ShowCreateStream:
return e.fetchShowCreateStream()
case ast.ShowCreateDatabase:
return e.fetchShowCreateDatabase()
case ast.ShowDatabases:
Expand Down Expand Up @@ -536,7 +538,9 @@ func (e *ShowExec) fetchShowCreateTable() error {
if err != nil {
return errors.Trace(err)
}

if tb.Meta().IsStream {
return errors.Trace(errors.New(fmt.Sprintf("table %s is a stream table. use 'show create stream %s' instead", tb.Meta().Name.L, tb.Meta().Name.L)))
}
sqlMode := e.ctx.GetSessionVars().SQLMode

// TODO: let the result more like MySQL.
Expand Down Expand Up @@ -722,6 +726,80 @@ func (e *ShowExec) fetchShowCreateTable() error {
return nil
}

func (e *ShowExec) fetchShowCreateStream() error {
tb, err := e.getTable()
if err != nil {
return errors.Trace(err)
}
if !tb.Meta().IsStream {
return errors.Trace(errors.New(fmt.Sprintf("table %s is not a stream table. use 'show create talbe %s' instead", tb.Meta().Name.L, tb.Meta().Name.L)))
}

sqlMode := e.ctx.GetSessionVars().SQLMode

// TODO: let the result more like MySQL.
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("CREATE STREAM %s (\n", escape(tb.Meta().Name, sqlMode)))
for i, col := range tb.Cols() {
buf.WriteString(fmt.Sprintf(" %s %s", escape(col.Name, sqlMode), col.GetTypeDesc()))
if col.IsGenerated() {
// It's a generated column.
buf.WriteString(fmt.Sprintf(" GENERATED ALWAYS AS (%s)", col.GeneratedExprString))
if col.GeneratedStored {
buf.WriteString(" STORED")
} else {
buf.WriteString(" VIRTUAL")
}
}
if mysql.HasAutoIncrementFlag(col.Flag) {
buf.WriteString(" NOT NULL AUTO_INCREMENT")
} else {
if mysql.HasNotNullFlag(col.Flag) {
buf.WriteString(" NOT NULL")
}
if !mysql.HasNoDefaultValueFlag(col.Flag) {
defaultValue := col.GetDefaultValue()
switch defaultValue {
case nil:
if !mysql.HasNotNullFlag(col.Flag) {
if col.Tp == mysql.TypeTimestamp {
buf.WriteString(" NULL")
}
buf.WriteString(" DEFAULT NULL")
}
case "CURRENT_TIMESTAMP":
buf.WriteString(" DEFAULT CURRENT_TIMESTAMP")
default:
defaultValStr := fmt.Sprintf("%v", defaultValue)
if col.Tp == mysql.TypeBit {
defaultValBinaryLiteral := types.BinaryLiteral(defaultValStr)
buf.WriteString(fmt.Sprintf(" DEFAULT %s", defaultValBinaryLiteral.ToBitLiteralString(true)))
} else {
buf.WriteString(fmt.Sprintf(" DEFAULT '%s'", format.OutputFormat(defaultValStr)))
}
}
}
if mysql.HasOnUpdateNowFlag(col.Flag) {
buf.WriteString(" ON UPDATE CURRENT_TIMESTAMP")
}
}
if len(col.Comment) > 0 {
buf.WriteString(fmt.Sprintf(" COMMENT '%s'", format.OutputFormat(col.Comment)))
}
if i != len(tb.Cols())-1 {
buf.WriteString(",\n")
}
}
buf.WriteString(") WITH (\n")
for k, v := range tb.Meta().StreamProperties {
buf.WriteString(fmt.Sprintf("\t\t'%s'='%s'\n", k, v))
}
buf.WriteString("\t\t);")

e.appendRow([]interface{}{tb.Meta().Name.O, buf.String()})
return nil
}

// fetchShowCreateDatabase composes show create database result.
func (e *ShowExec) fetchShowCreateDatabase() error {
db, ok := e.is.SchemaByName(e.DBName)
Expand Down
1 change: 1 addition & 0 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

// TODO: Complete here
type AggWindowDesc struct {
WinColName string
Size uint64
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

replace github.com/pingcap/parser => github.com/spongedu/parser v0.0.0-20181102150703-36c270493d6d
replace github.com/pingcap/parser => github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ github.com/spongedu/parser v0.0.0-20181102150703-0294367261e2 h1:FfNM0blvy5go/ey
github.com/spongedu/parser v0.0.0-20181102150703-0294367261e2/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-05d67ec39b67 h1:Yb88YfrMzIZWbXiuCvjCNc7lcV2HcWd+1KuNPspaOpQ=
github.com/spongedu/parser v0.0.0-20181102150703-05d67ec39b67/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-263a8d3a093d h1:ympimjDjKqSoc4Y1XUYmc1ZbyYfC99jNdOJIeySLLHI=
github.com/spongedu/parser v0.0.0-20181102150703-263a8d3a093d/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827 h1:0PQpmdl+tqYB+TlfTSgMjXHaLlQ6PrLEifBuPykWJys=
github.com/spongedu/parser v0.0.0-20181102150703-28da3dcd8827/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-2ae733bc2c81 h1:rs+sXF/OicOiX//QfVIVDfBvsB5MzcE0coyHDWTHvmw=
github.com/spongedu/parser v0.0.0-20181102150703-2ae733bc2c81/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-36c270493d6d h1:qEeHZ/BVNH8zriuiZKsbq9GzaXdx6Vyw7EWIgLKTcuk=
Expand All @@ -289,6 +293,8 @@ github.com/spongedu/parser v0.0.0-20181102150703-4acd198f5092 h1:k7LQeI7t82s95X2
github.com/spongedu/parser v0.0.0-20181102150703-4acd198f5092/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-58bff2a4ea0b h1:UXFKQHwVjopn2VNpUTzMRgq2Fm3QtLLnHZ3HliGW2b0=
github.com/spongedu/parser v0.0.0-20181102150703-58bff2a4ea0b/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-69c772388fc2 h1:FTtmJ9E6kPBsGRFElKiW9mN80/9uHWOnE1YcP9bJyXo=
github.com/spongedu/parser v0.0.0-20181102150703-69c772388fc2/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-6b6427a2156a h1:nUp9zaAAEq8Q+Jqw/Y3cVaxfwOz6zTipjBZprYOBuNk=
github.com/spongedu/parser v0.0.0-20181102150703-6b6427a2156a/go.mod h1:pt5ToBPRXvy7eNveA9VEwpNj5AQiVGE4GaPxBeAmeUo=
github.com/spongedu/parser v0.0.0-20181102150703-7bb1bc7c942b h1:w7FL9Lo4EvZinolU2rKdzdWIcY4H8WrnaNBfogafzoQ=
Expand Down Expand Up @@ -425,9 +431,11 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20170420181420-c06e80d9300e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 h1:JG/0uqcGdTNgq7FdU+61l5Pdmb8putNZlXb65bJBROs=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181026183834-f60e5f99f081/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181201035826-d0ca3933b724 h1:eV9myT/I6o1p8salzgZ0f1pz54PEgUf2NkCxEf6t+xs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0 h1:S0iUepdCWODXRvtE+gcRDd15L+k+k1AiHlMiMjefH24=
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
Loading

0 comments on commit d0deb5a

Please sign in to comment.