Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix mock mpp store will output none data for count(empty-set) #50856

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/store/mockstore/unistore/cophandler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,25 @@ go_test(
],
embed = [":cophandler"],
flaky = True,
shard_count = 4,
shard_count = 5,
deps = [
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/kv",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/store/mockstore/unistore/lockstore",
"//pkg/store/mockstore/unistore/tikv/dbreader",
"//pkg/store/mockstore/unistore/tikv/mvcc",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/mock",
"//pkg/util/rowcodec",
"//pkg/util/timeutil",
"@com_github_pingcap_badger//:badger",
Expand Down
65 changes: 65 additions & 0 deletions pkg/store/mockstore/unistore/cophandler/cop_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/lockstore"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/tikv/dbreader"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore/tikv/mvcc"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -261,6 +266,17 @@ func (dagBuilder *dagBuilder) addTableScan(colInfos []*tipb.ColumnInfo, tableID
return dagBuilder
}

func (dagBuilder *dagBuilder) addAggregate(gbyExprs, aggExprs []*tipb.Expr) *dagBuilder {
dagBuilder.executors = append(dagBuilder.executors, &tipb.Executor{
Tp: tipb.ExecType_TypeAggregation,
Aggregation: &tipb.Aggregation{
GroupBy: gbyExprs,
AggFunc: aggExprs,
},
})
return dagBuilder
}

func (dagBuilder *dagBuilder) addSelection(expr *tipb.Expr) *dagBuilder {
dagBuilder.executors = append(dagBuilder.executors, &tipb.Executor{
Tp: tipb.ExecType_TypeSelection,
Expand Down Expand Up @@ -388,6 +404,49 @@ func TestClosureExecutor(t *testing.T) {
require.Equal(t, 0, rowCount)
}

func TestMppAggExec(t *testing.T) {
data, err := prepareTestTableData(keyNumber, tableID)
require.NoError(t, err)

store, clean, err := newTestStore("cop_handler_test_db", "cop_handler_test_log")
require.NoError(t, err)
defer func() {
err := clean()
require.NoError(t, err)
}()

sctx := mock.NewContext()
client := new(mock.Client)

// we don't need to insert any data.(empty set is targeted case)
dagRequest := newDagBuilder().
setStartTs(dagRequestStartTs).
addTableScan(data.colInfos, tableID). // empty-set source
addAggregate(nil, buildCountAgg(sctx, client, []expression.Expression{expression.NewOne()})).
build()

dagCtx := newDagContext(t, store, []kv.KeyRange{getTestPointRange(tableID, 1)},
dagRequest, dagRequestStartTs)
_, chunks, _, _, _, err := buildAndRunMPPExecutor(dagCtx, dagRequest, 0)
require.NoError(t, err)
require.Equal(t, len(chunks), 1)

// read tipb.chunk
fieldTypes := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
chk := chunk.NewChunkWithCapacity(fieldTypes, 32)
rowsData := chunks[0].RowsData
decoder := codec.NewDecoder(chk, sctx.GetSessionVars().Location())
for !chk.IsFull() && len(rowsData) > 0 {
for i := 0; i < len(fieldTypes); i++ {
rowsData, err = decoder.DecodeOne(rowsData, i, fieldTypes[i])
require.NoError(t, err)
}
}
require.Equal(t, chk.NumRows(), 1)
d := chk.GetRow(0).GetDatum(0, fieldTypes[0])
require.Equal(t, d.GetInt64(), int64(0)) // count(empty-set) = 0
}

func TestMppExecutor(t *testing.T) {
data, err := prepareTestTableData(keyNumber, tableID)
require.NoError(t, err)
Expand Down Expand Up @@ -437,6 +496,12 @@ func buildNEIntExpr(colIdx, val int64) *tipb.Expr {
}
}

func buildCountAgg(sctx sessionctx.Context, cli kv.Client, args []expression.Expression) []*tipb.Expr {
aggFunc, _ := aggregation.NewAggFuncDesc(sctx, ast.AggFuncCount, args, false)
aggExpr, _ := aggregation.AggFuncToPBExpr(sctx, cli, aggFunc, kv.TiFlash)
return []*tipb.Expr{aggExpr}
}

func buildEQIntExpr(colIdx, val int64) *tipb.Expr {
return &tipb.Expr{
Tp: tipb.ExprType_ScalarFunc,
Expand Down
30 changes: 25 additions & 5 deletions pkg/store/mockstore/unistore/cophandler/mpp_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func (e *aggExec) open() error {

func (e *aggExec) getGroupKey(row chunk.Row) (*chunk.MutRow, []byte, error) {
length := len(e.groupByExprs)
if length == 0 {
if length == 0 { // if no group items, nothing returned.
return nil, nil, nil
}
key := make([]byte, 0, DefaultBatchSize)
Expand Down Expand Up @@ -1075,24 +1075,44 @@ func (e *aggExec) processAllRows() (*chunk.Chunk, error) {

chk := chunk.NewChunkWithCapacity(e.fieldTypes, 0)

for i, gk := range e.groupKeys {
fetchNewRow := func(idx int, groupKey []byte) error {
// empty set, no data in agg OP.
// fill a mock Contexts here, to let agg get default aggregated result.
// like count(empty-set) = 0, sum(empty-set) = null
newRow := chunk.MutRowFromTypes(e.fieldTypes)
aggCtxs := e.getContexts(gk)
// empty context key
aggCtxs := e.getContexts(groupKey)
for i, agg := range e.aggExprs {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
result := agg.GetResult(aggCtxs[i])
if e.fieldTypes[i].GetType() == mysql.TypeLonglong && result.Kind() == types.KindMysqlDecimal {
var err error
result, err = result.ConvertTo(e.sctx.GetSessionVars().StmtCtx.TypeCtx(), e.fieldTypes[i])
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}
newRow.SetDatum(i, result)
}
if len(e.groupByRows) > 0 {
newRow.ShallowCopyPartialRow(len(e.aggExprs), e.groupByRows[i])
newRow.ShallowCopyPartialRow(len(e.aggExprs), e.groupByRows[idx])
}
chk.AppendRow(newRow.ToRow())
return nil
}

// where len(e.groupKeys) equals to 0, that means there is no data in the below child source.
// even without group by items, the whole data(not empty) will be seen as one group, and classified
// into that group with key built as "".
if len(e.groupKeys) == 0 {
if err := fetchNewRow(0, []byte{}); err != nil {
return nil, err
}
} else {
for i, gk := range e.groupKeys {
if err := fetchNewRow(i, gk); err != nil {
return nil, err
}
}
}
e.execSummary.updateOnlyRows(chk.NumRows())
return chk, nil
Expand Down