Skip to content

Commit

Permalink
Merge efa3995 into b49b9cc
Browse files Browse the repository at this point in the history
  • Loading branch information
Darych authored Aug 30, 2024
2 parents b49b9cc + efa3995 commit d726858
Show file tree
Hide file tree
Showing 26 changed files with 1,991 additions and 70 deletions.
17 changes: 14 additions & 3 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ class TxPlanSerializer {
operatorId = Visit(maybeCondense.Cast(), planNode);
} else if (auto maybeCombiner = TMaybeNode<TCoCombineCore>(node)) {
operatorId = Visit(maybeCombiner.Cast(), planNode);
} else if (auto maybeCombinerWithSpilling = TMaybeNode<TCoCombineCoreWithSpilling>(node)) {
operatorId = Visit(maybeCombinerWithSpilling.Cast(), planNode);
} else if (auto maybeSort = TMaybeNode<TCoSort>(node)) {
operatorId = Visit(maybeSort.Cast(), planNode);
} else if (auto maybeTop = TMaybeNode<TCoTop>(node)) {
Expand Down Expand Up @@ -1163,6 +1165,15 @@ class TxPlanSerializer {
return AddOperator(planNode, "Aggregate", std::move(op));
}

std::variant<ui32, TArgContext> Visit(const TCoCombineCoreWithSpilling& combiner, TQueryPlanNode& planNode) {
TOperator op;
op.Properties["Name"] = "Aggregate";
op.Properties["GroupBy"] = NPlanUtils::PrettyExprStr(combiner.KeyExtractor());
op.Properties["Aggregation"] = NPlanUtils::PrettyExprStr(combiner.UpdateHandler());

return AddOperator(planNode, "Aggregate", std::move(op));
}

std::variant<ui32, TArgContext> Visit(const TCoSort& sort, TQueryPlanNode& planNode) {
TOperator op;
op.Properties["Name"] = "Sort";
Expand Down Expand Up @@ -2033,9 +2044,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,

newPlans.AppendValue(ReconstructQueryPlanRec(
plan.GetMapSafe().at("Plans").GetArraySafe()[0],
0,
planIndex,
precomputes,
0,
planIndex,
precomputes,
nodeCounter));

newPlans.AppendValue(lookupPlan);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
TMaybeNode<TExprBase> output;
auto aggregate = node.Cast<TCoAggregateBase>();
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (hopSetting) {
auto input = aggregate.Input().Maybe<TDqConnection>();
if (!input) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class TKqpPeepholeFinalTransformer : public TOptimizeTransformerBase {
{
#define HNDL(name) "KqpPeepholeFinal-"#name, Hndl(&TKqpPeepholeFinalTransformer::name)
AddHandler(0, &TCoWideCombiner::Match, HNDL(SetCombinerMemoryLimit));
AddHandler(0, &TCoWideCombinerWithSpilling::Match, HNDL(SetCombinerMemoryLimit));
#undef HNDL
}
private:
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoCombineByKeyWithSpilling::Match, HNDL(PushCombineWithSpillingToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
AddHandler(0, &TCoFinalizeByKeyWithSpilling::Match, HNDL(BuildFinalizeByKeyWithSpillingStage<false>));
AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>));
AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>));
AddHandler(0, &TCoTop::Match, HNDL(BuildTopStage<false>));
Expand Down Expand Up @@ -99,8 +101,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>));
AddHandler(1, &TCoCombineByKeyWithSpilling::Match, HNDL(PushCombineWithSpillingToStage<true>));
AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>));
AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>));
AddHandler(1, &TCoFinalizeByKeyWithSpilling::Match, HNDL(BuildFinalizeByKeyWithSpillingStage<true>));
AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>));
AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>));
AddHandler(1, &TCoTop::Match, HNDL(BuildTopStage<true>));
Expand Down Expand Up @@ -306,6 +310,15 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> PushCombineWithSpillingToStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
TExprBase output = DqPushCombineWithSpillingToStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("PushCombineWithSpillingToStage", node.Ptr(), output.Ptr(), ctx);
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildShuffleStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand All @@ -322,6 +335,13 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildFinalizeByKeyWithSpillingStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
TExprBase output = DqBuildFinalizeByKeyWithSpillingStage(node, ctx, *getParents(), IsGlobal);
DumpAppliedRule("BuildFinalizeByKeyWithSpillingStage", node.Ptr(), output.Ptr(), ctx);
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/query_data/kqp_predictor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ void TStagePredictor::Scan(const NYql::TExprNode::TPtr& stageNode) {
} else {
HasStateCombinerFlag = true;
}
} else if (node.Maybe<NYql::NNodes::TCoWideCombinerWithSpilling>()) {
auto wCombiner = node.Cast<NYql::NNodes::TCoWideCombinerWithSpilling>();
GroupByKeysCount = wCombiner.KeyExtractor().Ptr()->ChildrenSize() - 1;
if (wCombiner.MemLimit() != "") {
HasFinalCombinerFlag = true;
} else {
HasStateCombinerFlag = true;
}
} else if (node.Maybe<NYql::NNodes::TCoMapJoinCore>()) {
HasMapJoinFlag = true;
} else if (node.Maybe<NYql::NNodes::TCoUdf>()) {
Expand Down
27 changes: 25 additions & 2 deletions ydb/library/yql/core/common_opt/yql_co_flow1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,25 @@ TExprNode::TPtr FuseFlatMapOverByKey(const TExprNode& node, TExprContext& ctx) {
return ctx.ChangeChild(node.Head(), node.Head().ChildrenSize() - 1U, std::move(lambda));
}

TExprNode::TPtr FuseFlatMapOverByKeyWithSpilling(const TExprNode& node, TExprContext& ctx) {
YQL_CLOG(DEBUG, Core) << "Fuse " << node.Content() << " over " << node.Head().Content();
auto lambda =
ctx.Builder(node.Pos())
.Lambda()
.Param("key")
.Param("state")
.Callable(node.Content())
.Apply(0, *node.Head().Child(5U))
.With(0, "key")
.With(1, "state")
.Seal()
.Add(1, node.TailPtr())
.Seal()
.Seal().Build();

return ctx.ChangeChild(node.Head(), 5U, std::move(lambda));
}

TExprNode::TPtr ExtractOneItemStructFromFold(const TExprNode& node, TExprContext& ctx) {
YQL_CLOG(DEBUG, Core) << "Extract single item struct from " << node.Content();
const auto structType = node.Child(1)->GetTypeAnn()->Cast<TStructExprType>();
Expand Down Expand Up @@ -1324,6 +1343,10 @@ TExprNode::TPtr OptimizeFlatMap(const TExprNode::TPtr& node, TExprContext& ctx,
return FuseFlatMapOverByKey<false>(*node, ctx);
}

if (node->Head().IsCallable("CombineByKeyWithSpilling")) {
return FuseFlatMapOverByKeyWithSpilling(*node, ctx);
}

if (node->Head().IsCallable({"PartitionByKey", "PartitionsByKeys", "ShuffleByKeys"})) {
return FuseFlatMapOverByKey<true>(*node, ctx);
}
Expand Down Expand Up @@ -1392,7 +1415,7 @@ TExprNode::TPtr OptimizeFlatMap(const TExprNode::TPtr& node, TExprContext& ctx,
{
auto canPush = [&](const auto& child) {
// we push FlatMap over Extend only if it can later be fused with child
return child->IsCallable({Ordered ? "OrderedFlatMap" : "FlatMap", "GroupByKey", "CombineByKey", "PartitionByKey", "PartitionsByKeys", "ShuffleByKeys",
return child->IsCallable({Ordered ? "OrderedFlatMap" : "FlatMap", "GroupByKey", "CombineByKey", "CombineByKeyWithSpilling", "PartitionByKey", "PartitionsByKeys", "ShuffleByKeys",
"ListIf", "FlatListIf", "AsList", "ToList"}) && optCtx.IsSingleUsage(*child);
};
if (AllOf(node->Head().ChildrenList(), canPush)) {
Expand Down Expand Up @@ -1702,7 +1725,7 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) {
return node;
};

map["FinalizeByKey"] = map["CombineByKey"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
map["FinalizeByKey"] = map["CombineByKey"] = map["FinalizeByKeyWithSpilling"] = map["CombineByKeyWithSpilling"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
if (!optCtx.IsSingleUsage(node->Head())) {
return node;
}
Expand Down
69 changes: 69 additions & 0 deletions ydb/library/yql/core/common_opt/yql_co_flow2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,75 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
.Ptr();
};

map["CombineByKeyWithSpilling"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
TCoCombineByKeyWithSpilling self(node);
if (!AllowSubsetFieldsForNode(self.Input().Ref(), optCtx)) {
return node;
}

auto itemArg = self.PreMapLambda().Args().Arg(0);
auto itemType = itemArg.Ref().GetTypeAnn();
if (itemType->GetKind() != ETypeAnnotationKind::Struct) {
return node;
}

auto itemStructType = itemType->Cast<TStructExprType>();
if (itemStructType->GetSize() == 0) {
return node;
}

TSet<TStringBuf> usedFields;
if (!HaveFieldsSubset(self.PreMapLambda().Body().Ptr(), itemArg.Ref(), usedFields, *optCtx.ParentsMap)) {
return node;
}

TExprNode::TPtr newInput;
if (self.Input().Ref().IsCallable("Take") || self.Input().Ref().IsCallable("Skip") || self.Input().Maybe<TCoExtendBase>()) {
TExprNode::TListType filteredInputs;
filteredInputs.reserve(self.Input().Ref().ChildrenSize());
for (ui32 index = 0; index < self.Input().Ref().ChildrenSize(); ++index) {
auto x = self.Input().Ref().ChildPtr(index);
if (!self.Input().Maybe<TCoExtendBase>() && index > 0) {
filteredInputs.push_back(x);
continue;
}

filteredInputs.push_back(FilterByFields(node->Pos(), x, usedFields, ctx, false));
}

YQL_CLOG(DEBUG, Core) << "FieldsSubset in " << node->Content() << " over " << self.Input().Ref().Content();
newInput = ctx.ChangeChildren(self.Input().Ref(), std::move(filteredInputs));
}
else {
TExprNode::TListType fieldNodes;
for (auto& item : itemStructType->GetItems()) {
if (usedFields.contains(item->GetName())) {
fieldNodes.push_back(ctx.NewAtom(self.Pos(), item->GetName()));
}
}

YQL_CLOG(DEBUG, Core) << node->Content() << "SubsetFields";
newInput = Build<TCoExtractMembers>(ctx, self.Input().Pos())
.Input(self.Input())
.Members()
.Add(fieldNodes)
.Build()
.Done()
.Ptr();
}

return Build<TCoCombineByKeyWithSpilling>(ctx, self.Pos())
.Input(newInput)
.PreMapLambda(ctx.DeepCopyLambda(self.PreMapLambda().Ref()))
.KeySelectorLambda(ctx.DeepCopyLambda(self.KeySelectorLambda().Ref()))
.InitHandlerLambda(ctx.DeepCopyLambda(self.InitHandlerLambda().Ref()))
.UpdateHandlerLambda(ctx.DeepCopyLambda(self.UpdateHandlerLambda().Ref()))
.FinishHandlerLambda(ctx.DeepCopyLambda(self.FinishHandlerLambda().Ref()))
.LoadHandlerLambda(ctx.DeepCopyLambda(self.LoadHandlerLambda().Ref()))
.Done()
.Ptr();
};

map["EquiJoin"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
ui32 inputsCount = node->ChildrenSize() - 2;
for (ui32 i = 0; i < inputsCount; ++i) {
Expand Down
51 changes: 51 additions & 0 deletions ydb/library/yql/core/common_opt/yql_co_simple1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4415,6 +4415,56 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return node;
};

map["CombineCoreWithSpilling"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
if (node->Head().IsCallable("FromFlow")) {
YQL_CLOG(DEBUG, Core) << "Swap " << node->Content() << " with " << node->Head().Content();
return ctx.SwapWithHead(*node);
}

if (const auto selector = node->Child(1); selector != selector->Tail().GetDependencyScope()->second) {
YQL_CLOG(DEBUG, Core) << node->Content() << " by constant key.";
return ctx.Builder(node->Pos())
.Callable("FlatMap")
.Callable(0, "Condense1")
.Add(0, node->HeadPtr())
.Lambda(1)
.Param("item")
.Apply(*node->Child(2))
.With(0, selector->TailPtr())
.With(1, "item")
.Seal()
.Seal()
.Lambda(2)
.Param("item")
.Param("state")
.Callable("Bool")
.Atom(0, "false", TNodeFlags::Default)
.Seal()
.Seal()
.Lambda(3)
.Param("item")
.Param("state")
.Apply(*node->Child(3))
.With(0, selector->TailPtr())
.With(1, "item")
.With(2, "state")
.Seal()
.Seal()
.Seal()
.Lambda(1)
.Param("state")
.Apply(*node->Child(4))
.With(0, selector->TailPtr())
.With(1, "state")
.Seal()
.Seal()
.Seal()
.Build();
}

return node;
};

map["Length"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
const auto& nodeToCheck = SkipCallables(node->Head(), SkippableCallables);
if (nodeToCheck.IsCallable("AsList")) {
Expand Down Expand Up @@ -4906,6 +4956,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {

map["GroupByKey"] = std::bind(&DropReorder<false>, _1, _2);
map["CombineByKey"] = std::bind(&DropReorder<true>, _1, _2);
map["CombineByKeyWithSpilling"] = std::bind(&DropReorder<true>, _1, _2);

map["ToList"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
if (node->Head().IsCallable("Nothing")) {
Expand Down
48 changes: 48 additions & 0 deletions ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoCombineByKeyWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "CombineByKeyWithSpilling"},
"Children": [
{"Index": 1, "Name": "PreMapLambda", "Type": "TCoLambda"},
{"Index": 2, "Name": "KeySelectorLambda", "Type": "TCoLambda"},
{"Index": 3, "Name": "InitHandlerLambda", "Type": "TCoLambda"},
{"Index": 4, "Name": "UpdateHandlerLambda", "Type": "TCoLambda"},
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"},
{"Index": 6, "Name": "LoadHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoFinalizeByKey",
"Base": "TCoInputBase",
Expand All @@ -450,6 +463,19 @@
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoFinalizeByKeyWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "FinalizeByKeyWithSpilling"},
"Children": [
{"Index": 1, "Name": "PreMapLambda", "Type": "TCoLambda"},
{"Index": 2, "Name": "KeySelectorLambda", "Type": "TCoLambda"},
{"Index": 3, "Name": "InitHandlerLambda", "Type": "TCoLambda"},
{"Index": 4, "Name": "UpdateHandlerLambda", "Type": "TCoLambda"},
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"},
{"Index": 6, "Name": "SaveHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoAggrCountInit",
"Base": "TCallable",
Expand Down Expand Up @@ -1985,6 +2011,19 @@
{"Index": 5, "Name": "MemLimit", "Type": "TCoAtom", "Optional": true}
]
},
{
"Name": "TCoCombineCoreWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "CombineCoreWithSpilling"},
"Children": [
{"Index": 1, "Name": "KeyExtractor", "Type": "TCoLambda"},
{"Index": 2, "Name": "InitHandler", "Type": "TCoLambda"},
{"Index": 3, "Name": "UpdateHandler", "Type": "TCoLambda"},
{"Index": 4, "Name": "FinishHandler", "Type": "TCoLambda"},
{"Index": 5, "Name": "LoadHandler", "Type": "TCoLambda"},
{"Index": 6, "Name": "MemLimit", "Type": "TCoAtom", "Optional": true}
]
},
{
"Name": "TCoWideCombiner",
"Base": "TCoInputBase",
Expand All @@ -1997,6 +2036,15 @@
{"Index": 5, "Name": "FinishHandler", "Type": "TCoLambda"}
]
},
{
"Name": "TCoWideCombinerWithSpilling",
"Base": "TCoWideCombiner",
"Match": {"Type": "Callable", "Name": "WideCombinerWithSpilling"},
"Children": [
{"Index": 6, "Name": "SerializeHandler", "Type": "TCoLambda"},
{"Index": 7, "Name": "DeserializeHandler", "Type": "TCoLambda"}
]
},
{
"Name": "TCoGroupingCore",
"Base": "TCoInputBase",
Expand Down
Loading

0 comments on commit d726858

Please sign in to comment.