Skip to content

Commit

Permalink
Merge 07ac518 into 169c92d
Browse files Browse the repository at this point in the history
  • Loading branch information
Darych authored Aug 12, 2024
2 parents 169c92d + 07ac518 commit bf069a7
Show file tree
Hide file tree
Showing 27 changed files with 1,666 additions and 101 deletions.
6 changes: 4 additions & 2 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
TMaybeNode<TExprBase> output;
auto aggregate = node.Cast<TCoAggregateBase>();
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
if (hopSetting) {
auto input = aggregate.Input().Maybe<TDqConnection>();
if (!input) {
Expand All @@ -118,7 +118,9 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
true, // defaultWatermarksMode
true); // syncActor
} else {
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
output = DqRewriteAggregate(node, ctx, TypesCtx, false,
KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(),
KqpCtx.Config->HasOptUseFinalizeByKey(), false /* allowSpilling */);
}
if (output) {
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
AddHandler(0, &TCoFinalizeByKeyWithSpilling::Match, HNDL(BuildFinalizeByKeyWithSpillingStage<false>));
AddHandler(0, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<false>));
AddHandler(0, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<false>));
AddHandler(0, &TCoTop::Match, HNDL(BuildTopStage<false>));
Expand Down Expand Up @@ -101,6 +102,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(1, &TCoCombineByKey::Match, HNDL(PushCombineToStage<true>));
AddHandler(1, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<true>));
AddHandler(1, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<true>));
AddHandler(1, &TCoFinalizeByKeyWithSpilling::Match, HNDL(BuildFinalizeByKeyWithSpillingStage<true>));
AddHandler(1, &TCoShuffleByKeys::Match, HNDL(BuildShuffleStage<true>));
AddHandler(1, &TCoPartitionByKey::Match, HNDL(BuildPartitionStage<true>));
AddHandler(1, &TCoTop::Match, HNDL(BuildTopStage<true>));
Expand Down Expand Up @@ -321,6 +323,13 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildFinalizeByKeyWithSpillingStage(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
TExprBase output = DqBuildFinalizeByKeyWithSpillingStage(node, ctx, *getParents(), IsGlobal);
DumpAppliedRule("BuildFinalizeByKeyWithSpillingStage", node.Ptr(), output.Ptr(), ctx);
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildPartitionsStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ui64 ParseEnableSpillingNodes(const TString &v) {
if (s.empty()) {
throw yexception() << "Empty value item";
}
auto value = FromString<NYql::TDqSettings::EEnabledSpillingNodes>(s);
auto value = FromString<NYql::NDq::EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
Expand Down
57 changes: 55 additions & 2 deletions ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoCombineByKeyWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "CombineByKeyWithSpilling"},
"Children": [
{"Index": 1, "Name": "PreMapLambda", "Type": "TCoLambda"},
{"Index": 2, "Name": "KeySelectorLambda", "Type": "TCoLambda"},
{"Index": 3, "Name": "InitHandlerLambda", "Type": "TCoLambda"},
{"Index": 4, "Name": "UpdateHandlerLambda", "Type": "TCoLambda"},
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"},
{"Index": 6, "Name": "LoadHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoFinalizeByKey",
"Base": "TCoInputBase",
Expand All @@ -450,6 +463,19 @@
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoFinalizeByKeyWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "FinalizeByKeyWithSpilling"},
"Children": [
{"Index": 1, "Name": "PreMapLambda", "Type": "TCoLambda"},
{"Index": 2, "Name": "KeySelectorLambda", "Type": "TCoLambda"},
{"Index": 3, "Name": "InitHandlerLambda", "Type": "TCoLambda"},
{"Index": 4, "Name": "UpdateHandlerLambda", "Type": "TCoLambda"},
{"Index": 5, "Name": "FinishHandlerLambda", "Type": "TCoLambda"},
{"Index": 6, "Name": "SaveHandlerLambda", "Type": "TCoLambda"}
]
},
{
"Name": "TCoAggrCountInit",
"Base": "TCallable",
Expand Down Expand Up @@ -1625,7 +1651,7 @@
{"Index": 5, "Name": "LeftRenames", "Type": "TCoAtomList"},
{"Index": 6, "Name": "RightRenames", "Type": "TCoAtomList"},
{"Index": 7, "Name": "LeftKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 8, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 8, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList"}
]
},
Expand All @@ -1641,7 +1667,7 @@
{"Index": 4, "Name": "LeftRenames", "Type": "TCoAtomList"},
{"Index": 5, "Name": "RightRenames", "Type": "TCoAtomList"},
{"Index": 6, "Name": "LeftKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 7, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 7, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"},
{"Index": 8, "Name": "Flags", "Type": "TCoAtomList"}
]
},
Expand Down Expand Up @@ -1977,6 +2003,19 @@
{"Index": 5, "Name": "MemLimit", "Type": "TCoAtom", "Optional": true}
]
},
{
"Name": "TCoCombineCoreWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "CombineCoreWithSpilling"},
"Children": [
{"Index": 1, "Name": "KeyExtractor", "Type": "TCoLambda"},
{"Index": 2, "Name": "InitHandler", "Type": "TCoLambda"},
{"Index": 3, "Name": "UpdateHandler", "Type": "TCoLambda"},
{"Index": 4, "Name": "FinishHandler", "Type": "TCoLambda"},
{"Index": 5, "Name": "LoadHandler", "Type": "TCoLambda"},
{"Index": 6, "Name": "MemLimit", "Type": "TCoAtom", "Optional": true}
]
},
{
"Name": "TCoWideCombiner",
"Base": "TCoInputBase",
Expand All @@ -1989,6 +2028,20 @@
{"Index": 5, "Name": "FinishHandler", "Type": "TCoLambda"}
]
},
{
"Name": "TCoWideCombinerWithSpilling",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "WideCombinerWithSpilling"},
"Children": [
{"Index": 1, "Name": "MemLimit", "Type": "TCoAtom"},
{"Index": 2, "Name": "KeyExtractor", "Type": "TCoLambda"},
{"Index": 3, "Name": "InitHandler", "Type": "TCoLambda"},
{"Index": 4, "Name": "UpdateHandler", "Type": "TCoLambda"},
{"Index": 5, "Name": "FinishHandler", "Type": "TCoLambda"},
{"Index": 6, "Name": "SerializeHandler", "Type": "TCoLambda"},
{"Index": 7, "Name": "DeserializeHandler", "Type": "TCoLambda"}
]
},
{
"Name": "TCoGroupingCore",
"Base": "TCoInputBase",
Expand Down
Loading

0 comments on commit bf069a7

Please sign in to comment.