From 3e257cfc8a9d3b4e14a0363f2cd5e7af904b0a84 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Tue, 17 Sep 2024 10:41:01 +0000 Subject: [PATCH 01/11] add simple optimization 'RewriteAggregate' --- .../yql/core/common_opt/yql_co_simple1.cpp | 12 + ydb/library/yql/core/ya.make | 1 + ydb/library/yql/core/yql_opt_hopping.cpp | 744 ++++++++++++++++++ ydb/library/yql/core/yql_opt_hopping.h | 17 + 4 files changed, 774 insertions(+) create mode 100644 ydb/library/yql/core/yql_opt_hopping.cpp create mode 100644 ydb/library/yql/core/yql_opt_hopping.h diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index c63dc75d344b..c0165961c3ae 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -5080,6 +5081,17 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } + if (auto hopping = NHopping::RewriteAsHoppingWindow( + node, + ctx, + false, // analyticsHopping + TDuration::MilliSeconds(5'000), // TDqSettings::TDefault::WatermarksLateArrivalDelayMs + true, // defaultWatermarksMode + true); // syncActor + hopping) { + return hopping; + } + return DropReorder(node, ctx); }; diff --git a/ydb/library/yql/core/ya.make b/ydb/library/yql/core/ya.make index ab1abc133ab5..f68a92cab417 100644 --- a/ydb/library/yql/core/ya.make +++ b/ydb/library/yql/core/ya.make @@ -28,6 +28,7 @@ SRCS( yql_join.cpp yql_join.h yql_library_compiler.cpp + yql_opt_hopping.cpp yql_opt_match_recognize.cpp yql_opt_match_recognize.h yql_opt_proposed_by_data.cpp diff --git a/ydb/library/yql/core/yql_opt_hopping.cpp b/ydb/library/yql/core/yql_opt_hopping.cpp new file mode 100644 index 000000000000..fa003c39b324 --- /dev/null +++ b/ydb/library/yql/core/yql_opt_hopping.cpp @@ -0,0 +1,744 @@ +#include "yql_opt_hopping.h" + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +using namespace NYql; +using namespace NYql::NNodes; + +namespace { + +struct THoppingTraits { + TString Column; + TCoHoppingTraits Traits; + ui64 Hop; + ui64 Interval; + ui64 Delay; +}; + +struct TKeysDescription { + TVector PickleKeys; + TVector MemberKeys; + TVector FakeKeys; + + TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { + for (const auto& key : keys) { + if (key.StringValue() == hoppingColumn) { + FakeKeys.emplace_back(key.StringValue()); + continue; + } + + const auto index = rowType.FindItem(key.StringValue()); + Y_ENSURE(index); + + auto itemType = rowType.GetItems()[*index]->GetItemType(); + if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { + MemberKeys.emplace_back(key.StringValue()); + continue; + } + + PickleKeys.emplace_back(key.StringValue()); + } + } + + TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "StablePickle", { member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); + } + + TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto index = rowType.FindItem(key); + Y_ENSURE(index); + + auto itemType = rowType.GetItems().at(*index)->GetItemType(); + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); + } + + TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { + TVector res; + res.reserve(PickleKeys.size() + MemberKeys.size()); + + for (const auto& pickleKey : PickleKeys) { + res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); + } + for (const auto& memberKey : MemberKeys) { + res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); + } + return res; + } + + TVector GetActualGroupKeys() { + TVector result; + result.reserve(PickleKeys.size() + MemberKeys.size()); + result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); + result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); + return result; + } + + bool NeedPickle() const { + return !PickleKeys.empty(); + } + + TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { + auto builder = Build(ctx, pos); + for (auto key : GetKeysList(ctx, pos)) { + builder.Add(std::move(key)); + } + return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); + } +}; + +TString BuildColumnName(const TExprBase& column) { + if (const auto columnName = column.Maybe()) { + return columnName.Cast().StringValue(); + } + + if (const auto columnNames = column.Maybe()) { + TStringBuilder columnNameBuilder; + for (const auto columnName : columnNames.Cast()) { + columnNameBuilder.append(columnName.StringValue()); + columnNameBuilder.append("_"); + } + return columnNameBuilder; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << column.Ptr()->Dump()); +} + +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { + return !hoppingSetting->Child(1)->IsList(); +} + +void EnsureNotDistinct(const TCoAggregate& aggregate) { + const auto& aggregateHandlers = aggregate.Handlers(); + + YQL_ENSURE( + AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }), + "Distinct is not supported for aggregation with hop"); +} + +TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { + const auto pos = aggregate.Pos(); + + const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); + if (!hopSetting) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); + return Nothing(); + } + + const auto hoppingColumn = IsLegacyHopping(hopSetting) + ? "_yql_time" + : TString(hopSetting->Child(1)->Child(0)->Content()); + + const auto traitsNode = IsLegacyHopping(hopSetting) + ? hopSetting->Child(1) + : hopSetting->Child(1)->Child(1); + + const auto maybeTraits = TMaybeNode(traitsNode); + if (!maybeTraits) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); + return Nothing(); + } + + const auto traits = maybeTraits.Cast(); + + const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { + if (param.Maybe()) { + param = param.Cast().Input(); + } + if (!param.Maybe()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); + return 0; + } + auto value = FromString(param.Cast().Literal().Value()); + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); + return 0; + } + return (ui64)value; + }; + + const auto hop = checkIntervalParam(traits.Hop()); + if (!hop) { + return Nothing(); + } + const auto interval = checkIntervalParam(traits.Interval()); + if (!interval) { + return Nothing(); + } + const auto delay = checkIntervalParam(traits.Delay()); + if (!delay) { + return Nothing(); + } + + if (interval < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); + return Nothing(); + } + if (delay < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); + return Nothing(); + } + + const auto newTraits = Build(ctx, aggregate.Pos()) + .InitFrom(traits) + .DataWatermarks(analyticsMode + ? ctx.NewAtom(aggregate.Pos(), "false") + : traits.DataWatermarks().Ptr()) + .Done(); + + return THoppingTraits { + hoppingColumn, + newTraits, + hop, + interval, + delay + }; +} + +TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { + const auto pos = hoppingTraits.Pos(); + + if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { + // The case when no fields are used in lambda. F.e. when it has only DependsOn. + return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); + } + + return Build(ctx, pos) + .Args({"item"}) + .Body() + .Apply(hoppingTraits.TimeExtractor()) + .With(0) + .Type(hoppingTraits.ItemType()) + .Value("item") + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto initItemArg = Build(ctx, pos).Name("item").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + ui32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + + TMaybeNode applier; + if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .With(1) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(BuildColumnName(tuple.ColumnName())) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({initItemArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto updateItemArg = Build(ctx, pos).Name("item").Done(); + const auto updateStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + i32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(updateStateArg) + .Name().Build(columnName) + .Done(); + + TMaybeNode applier; + if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .With(2) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({updateItemArg, updateStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); + const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member1 = Build(ctx, pos) + .Struct(mergeState1Arg) + .Name().Build(columnName) + .Done(); + const auto member2 = Build(ctx, pos) + .Struct(mergeState2Arg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().MergeHandler()) + .With(0, member1) + .With(1, member2) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({mergeState1Arg, mergeState2Arg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildFinishHopLambda( + const TCoAggregate& aggregate, + const TVector& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx) +{ + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); + const auto finishStateArg = Build(ctx, pos).Name("state").Done(); + const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); + + TVector structItems; + structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); + + if (actualGroupKeys.size() == 1) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[0]) + .Value(finishKeyArg) + .Done()); + } else { + for (size_t i = 0; i < actualGroupKeys.size(); ++i) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[i]) + .Value() + .Tuple(finishKeyArg) + .Index() + .Value(ToString(i)) + .Build() + .Build() + .Done()); + } + } + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(finishStateArg) + .Name().Build(compoundColumnName) + .Done(); + + if (tuple.ColumnName().Maybe()) { + structItems.push_back(Build(ctx, pos) + .Name().Build(compoundColumnName) + .Value() + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Build() + .Done()); + + continue; + } + + if (const auto namesList = tuple.ColumnName().Maybe()) { + const auto expApplier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Done(); + + int index = 0; + for (const auto columnName : namesList.Cast()) { + const auto extracter = Build(ctx, pos) + .Tuple(expApplier) + .Index().Build(index++) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name(columnName) + .Value(extracter) + .Done()); + } + + continue; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << tuple.ColumnName().Ptr()->Dump()); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(hoppingColumn) + .Value(finishTimeArg) + .Done()); + + return Build(ctx, pos) + .Args({finishKeyArg, finishStateArg, finishTimeArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto saveStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(saveStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().SaveHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({saveStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(loadStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().LoadHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({loadStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TMaybe BuildWatermarkMode( + const TCoAggregate& aggregate, + const TCoHoppingTraits& hoppingTraits, + TExprContext& ctx, + bool analyticsMode, + bool defaultWatermarksMode, + bool syncActor) +{ + const bool enableWatermarks = !analyticsMode && + defaultWatermarksMode && + hoppingTraits.Version().Cast().StringValue() == "v2"; + if (enableWatermarks && syncActor) { + ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); + return Nothing(); + } + + if (hoppingTraits.Version().Cast().StringValue() == "v2" && !enableWatermarks) { + ctx.AddError(TIssue( + ctx.GetPosition(aggregate.Pos()), + "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead.")); + return Nothing(); + } + + return enableWatermarks; +} + +TExprNode::TPtr RewriteAsHoppingWindowFullOutput( + const TCoAggregate& aggregate, + TExprContext& ctx, + bool analyticsMode, + TDuration lateArrivalDelay, + bool defaultWatermarksMode, + bool syncActor +) { + const auto pos = aggregate.Pos(); + + EnsureNotDistinct(aggregate); + + const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx, analyticsMode); + if (!maybeHopTraits) { + return nullptr; + } + const auto hopTraits = *maybeHopTraits; + + const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); + TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); + const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx); + const auto initLambda = BuildInitHopLambda(aggregate, ctx); + const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); + const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); + const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); + const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); + const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx, analyticsMode, defaultWatermarksMode, syncActor); + if (!enableWatermarks) { + return nullptr; + } + + const auto streamArg = Build(ctx, pos).Name("stream").Done(); + auto multiHoppingCoreBuilder = Build(ctx, pos) + .KeyExtractor(keyLambda) + .TimeExtractor(timeExtractorLambda) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .InitHandler(initLambda) + .UpdateHandler(updateLambda) + .MergeHandler(mergeLambda) + .FinishHandler(finishLambda) + .SaveHandler(saveLambda) + .LoadHandler(loadLambda) + .template WatermarkMode().Build(ToString(*enableWatermarks)); + + if (*enableWatermarks) { + const auto hop = TDuration::MicroSeconds(hopTraits.Hop); + multiHoppingCoreBuilder.template Delay() + .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds())) + .Build(); + } else { + multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay()); + } + + if (true) { + return Build(ctx, pos) + .Input(aggregate.Input()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") + .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Build() + .Build() + .Done() + .Ptr(); + } else { + return Build(ctx, pos) + .Input(multiHoppingCoreBuilder + .Input() + .Input(aggregate.Input()) + .Build() + .Done()) + .Done() + .Ptr(); + } +} + +} // namespace + +namespace NYql::NHopping { + +TExprNode::TPtr RewriteAsHoppingWindow( + TExprNode::TPtr node, + TExprContext& ctx, + bool analyticsMode, + TDuration lateArrivalDelay, + bool defaultWatermarksMode, + bool syncActor +) { + const auto aggregate = TCoAggregate(node); + const auto pos = aggregate.Pos(); + + if (!aggregate.Input().Ref().IsCallable("AsList")) { + return nullptr; + } + + if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { + return nullptr; + } + + auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx, analyticsMode, lateArrivalDelay, defaultWatermarksMode, syncActor); + if (!result) { + return result; + } + + auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); + if (!outputColumnSetting) { + return result; + } + + return Build(ctx, pos) + .Input(result) + .Members(outputColumnSetting->ChildPtr(1)) + .Done() + .Ptr(); +} + +} // NYql::NHopping diff --git a/ydb/library/yql/core/yql_opt_hopping.h b/ydb/library/yql/core/yql_opt_hopping.h new file mode 100644 index 000000000000..183de8cf0a50 --- /dev/null +++ b/ydb/library/yql/core/yql_opt_hopping.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +#include + +namespace NYql::NHopping { + +TExprNode::TPtr RewriteAsHoppingWindow( + TExprNode::TPtr node, + TExprContext& ctx, + bool analyticsMode, + TDuration lateArrivalDelay, + bool defaultWatermarksMode, + bool syncActor); + +} // namespace NYql::NHopping From cbb7ad3004f56d3129d255e2309698e8fbf310f0 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Wed, 18 Sep 2024 15:55:07 +0000 Subject: [PATCH 02/11] refactor RewriteAggregate optimization --- .../yql/core/common_opt/yql_co_simple1.cpp | 9 +- ydb/library/yql/core/yql_opt_hopping.cpp | 174 ++++++------------ ydb/library/yql/core/yql_opt_hopping.h | 8 +- 3 files changed, 54 insertions(+), 137 deletions(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index c0165961c3ae..1dc635cd47b3 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -5081,14 +5081,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } - if (auto hopping = NHopping::RewriteAsHoppingWindow( - node, - ctx, - false, // analyticsHopping - TDuration::MilliSeconds(5'000), // TDqSettings::TDefault::WatermarksLateArrivalDelayMs - true, // defaultWatermarksMode - true); // syncActor - hopping) { + if (auto hopping = NHopping::RewriteAsHoppingWindow(node, ctx); hopping) { return hopping; } diff --git a/ydb/library/yql/core/yql_opt_hopping.cpp b/ydb/library/yql/core/yql_opt_hopping.cpp index fa003c39b324..bd2d0c8cb01e 100644 --- a/ydb/library/yql/core/yql_opt_hopping.cpp +++ b/ydb/library/yql/core/yql_opt_hopping.cpp @@ -17,14 +17,6 @@ using namespace NYql::NNodes; namespace { -struct THoppingTraits { - TString Column; - TCoHoppingTraits Traits; - ui64 Hop; - ui64 Interval; - ui64 Delay; -}; - struct TKeysDescription { TVector PickleKeys; TVector MemberKeys; @@ -174,7 +166,7 @@ void EnsureNotDistinct(const TCoAggregate& aggregate) { "Distinct is not supported for aggregation with hop"); } -TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { +TMaybe> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { const auto pos = aggregate.Pos(); const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); @@ -237,20 +229,7 @@ TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprCont return Nothing(); } - const auto newTraits = Build(ctx, aggregate.Pos()) - .InitFrom(traits) - .DataWatermarks(analyticsMode - ? ctx.NewAtom(aggregate.Pos(), "false") - : traits.DataWatermarks().Ptr()) - .Done(); - - return THoppingTraits { - hoppingColumn, - newTraits, - hop, - interval, - delay - }; + return std::tuple{hoppingColumn, traits}; } TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { @@ -583,140 +562,91 @@ TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& .Ptr(); } -TMaybe BuildWatermarkMode( - const TCoAggregate& aggregate, - const TCoHoppingTraits& hoppingTraits, - TExprContext& ctx, - bool analyticsMode, - bool defaultWatermarksMode, - bool syncActor) -{ - const bool enableWatermarks = !analyticsMode && - defaultWatermarksMode && - hoppingTraits.Version().Cast().StringValue() == "v2"; - if (enableWatermarks && syncActor) { - ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); - return Nothing(); - } - - if (hoppingTraits.Version().Cast().StringValue() == "v2" && !enableWatermarks) { - ctx.AddError(TIssue( - ctx.GetPosition(aggregate.Pos()), - "HoppingWindow requires watermarks to be enabled. If you don't want to do that, you can use HOP instead.")); - return Nothing(); - } - - return enableWatermarks; -} - -TExprNode::TPtr RewriteAsHoppingWindowFullOutput( - const TCoAggregate& aggregate, - TExprContext& ctx, - bool analyticsMode, - TDuration lateArrivalDelay, - bool defaultWatermarksMode, - bool syncActor -) { +TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { const auto pos = aggregate.Pos(); EnsureNotDistinct(aggregate); - const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx, analyticsMode); + const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx); if (!maybeHopTraits) { return nullptr; } - const auto hopTraits = *maybeHopTraits; + const auto [hoppingColumn, hoppingTraits] = *maybeHopTraits; const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); - TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn); + + // if (keysDescription.NeedPickle()) { + // return Build(ctx, pos) + // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + // .Input() + // .InitFrom(aggregate) + // .Input() + // .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) + // .Input(aggregate.Input()) + // .Build() + // .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx)) + // .Build() + // .Done() + // .Ptr(); + // } const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); - const auto timeExtractorLambda = BuildTimeExtractor(hopTraits.Traits, ctx); + const auto timeExtractorLambda = BuildTimeExtractor(hoppingTraits, ctx); const auto initLambda = BuildInitHopLambda(aggregate, ctx); const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); - const auto enableWatermarks = BuildWatermarkMode(aggregate, hopTraits.Traits, ctx, analyticsMode, defaultWatermarksMode, syncActor); - if (!enableWatermarks) { - return nullptr; - } + const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx); const auto streamArg = Build(ctx, pos).Name("stream").Done(); auto multiHoppingCoreBuilder = Build(ctx, pos) .KeyExtractor(keyLambda) .TimeExtractor(timeExtractorLambda) - .Hop(hopTraits.Traits.Hop()) - .Interval(hopTraits.Traits.Interval()) - .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .Hop(hoppingTraits.Hop()) + .Interval(hoppingTraits.Interval()) + .Delay(hoppingTraits.Delay()) + .DataWatermarks(hoppingTraits.DataWatermarks()) .InitHandler(initLambda) .UpdateHandler(updateLambda) .MergeHandler(mergeLambda) .FinishHandler(finishLambda) .SaveHandler(saveLambda) .LoadHandler(loadLambda) - .template WatermarkMode().Build(ToString(*enableWatermarks)); - - if (*enableWatermarks) { - const auto hop = TDuration::MicroSeconds(hopTraits.Hop); - multiHoppingCoreBuilder.template Delay() - .Literal().Build(ToString(Max(hop, lateArrivalDelay).MicroSeconds())) - .Build(); - } else { - multiHoppingCoreBuilder.Delay(hopTraits.Traits.Delay()); - } - - if (true) { - return Build(ctx, pos) - .Input(aggregate.Input()) - .KeySelectorLambda(keyLambda) - .SortDirections() - .Literal() - .Value("true") - .Build() + .template WatermarkMode().Build(ToString(false)); + + return Build(ctx, pos) + .Input(aggregate.Input()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") .Build() - .SortKeySelectorLambda(timeExtractorLambda) - .ListHandlerLambda() - .Args(streamArg) - .template Body() - .Stream(multiHoppingCoreBuilder - .template Input() - .List(streamArg) - .Build() - .Done()) - .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) .Build() - .Done() - .Ptr(); - } else { - return Build(ctx, pos) - .Input(multiHoppingCoreBuilder - .Input() - .Input(aggregate.Input()) - .Build() - .Done()) - .Done() - .Ptr(); - } + .Build() + .Done() + .Ptr(); } } // namespace namespace NYql::NHopping { -TExprNode::TPtr RewriteAsHoppingWindow( - TExprNode::TPtr node, - TExprContext& ctx, - bool analyticsMode, - TDuration lateArrivalDelay, - bool defaultWatermarksMode, - bool syncActor -) { +TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { const auto aggregate = TCoAggregate(node); - const auto pos = aggregate.Pos(); - if (!aggregate.Input().Ref().IsCallable("AsList")) { + if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { return nullptr; } @@ -724,7 +654,7 @@ TExprNode::TPtr RewriteAsHoppingWindow( return nullptr; } - auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx, analyticsMode, lateArrivalDelay, defaultWatermarksMode, syncActor); + auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); if (!result) { return result; } @@ -734,7 +664,7 @@ TExprNode::TPtr RewriteAsHoppingWindow( return result; } - return Build(ctx, pos) + return Build(ctx, aggregate.Pos()) .Input(result) .Members(outputColumnSetting->ChildPtr(1)) .Done() diff --git a/ydb/library/yql/core/yql_opt_hopping.h b/ydb/library/yql/core/yql_opt_hopping.h index 183de8cf0a50..3e90b45e2df1 100644 --- a/ydb/library/yql/core/yql_opt_hopping.h +++ b/ydb/library/yql/core/yql_opt_hopping.h @@ -6,12 +6,6 @@ namespace NYql::NHopping { -TExprNode::TPtr RewriteAsHoppingWindow( - TExprNode::TPtr node, - TExprContext& ctx, - bool analyticsMode, - TDuration lateArrivalDelay, - bool defaultWatermarksMode, - bool syncActor); +TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx); } // namespace NYql::NHopping From 32d10581f81a57700e9c81a563467187e8f2efc1 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Wed, 18 Sep 2024 15:56:51 +0000 Subject: [PATCH 03/11] add constraint processing for MultiHoppingCore --- ydb/library/yql/core/yql_expr_constraint.cpp | 20 +++++++++++++++++++- ydb/library/yql/core/yql_opt_utils.cpp | 2 +- ydb/library/yql/core/yql_opt_utils.h | 4 ++-- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index 8a9f5e79c5aa..e1b3a4a66c05 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -244,6 +244,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase; Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap; Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap; + Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap; } std::optional ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { @@ -2920,6 +2921,23 @@ class TCallableConstraintTransformer : public TCallableTransformerBaseChild(TCoMultiHoppingCore::idx_KeyExtractor); + std::vector columns; + ExtractKeys(*keySelectorLambda, columns); + if (!columns.empty()) { + input->AddConstraint(ctx.MakeConstraint(columns)); + input->AddConstraint(ctx.MakeConstraint(columns)); + } + + return TStatus::Ok; + } + private: template static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet& except) { @@ -2939,7 +2957,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase& columns) { + static void ExtractKeys(const TExprNode& keySelectorLambda, std::vector& columns) { const auto arg = keySelectorLambda.Head().Child(0); auto body = keySelectorLambda.Child(1); if (body->IsCallable("StablePickle")) { diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index dc026663547f..9a59a7e5a0b2 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1258,7 +1258,7 @@ TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& .Seal().Build(); } -void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, TVector& columns) { +void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, std::vector& columns) { if (keySelectorBody->IsList()) { for (auto& child: keySelectorBody->Children()) { if (child->IsCallable("Member") && child->Child(0) == keySelectorArg) { diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 452768fbce52..744a9cf9b2a3 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -87,8 +87,8 @@ TExprNode::TPtr ExpandFlattenByColumns(const TExprNode::TPtr& node, TExprContext TExprNode::TPtr ExpandCastStruct(const TExprNode::TPtr& node, TExprContext& ctx); TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& ctx); -void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, TVector& columns); -inline void ExtractSimpleKeys(const TExprNode& keySelectorLambda, TVector& columns) { +void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, std::vector& columns); +inline void ExtractSimpleKeys(const TExprNode& keySelectorLambda, std::vector& columns) { ExtractSimpleKeys(keySelectorLambda.Child(1), keySelectorLambda.Child(0)->Child(0), columns); } From 8ae12f40202fe6179a23cc33611e060cc27665b8 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 19 Sep 2024 07:58:54 +0000 Subject: [PATCH 04/11] add test --- .../sql/dq_file/part10/canondata/result.json | 28 +++++++++++++++++++ .../tests/sql/sql2yql/canondata/result.json | 14 ++++++++++ .../suites/aggregate/group_by_hop_static.sql | 26 +++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json index 9763237a6133..682aa6fea9d5 100644 --- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json @@ -294,6 +294,34 @@ } ], "test.test[aggregate-group_by_column-default.txt-Results]": [], + "test.test[aggregate-group_by_hop_static-default.txt-Analyze]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Analyze_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Debug]": [ + { + "checksum": "07d9a8f046f4661ba479dbaf70979aac", + "size": 1630, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Plan_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Results]": [ + { + "checksum": "dc21a63cca5d7481363c2b47840f1e38", + "size": 3102, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Results_/results.txt" + } + ], "test.test[aggregate-group_by_mul_gs_ru--Analyze]": [ { "checksum": "e78b8c0f6855d3df92663efab505204b", diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index ecb1c1e2c373..aaa5e832d31b 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -2190,6 +2190,13 @@ "uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_aggregate-group_by_hop_star_/sql.yql" } ], + "test_sql2yql.test[aggregate-group_by_hop_static]": [ + { + "checksum": "a7a563dc87672b141c8209b38c0d446c", + "size": 3368, + "uri": "https://{canondata_backend}/1925821/aca60c4aca6b335189396eb0d636b37dbc38e5d9/resource.tar.gz#test_sql2yql.test_aggregate-group_by_hop_static_/sql.yql" + } + ], "test_sql2yql.test[aggregate-group_by_mul_gb_ru]": [ { "checksum": "002e7ddce42c228debb7382e9f8ea1d3", @@ -21846,6 +21853,13 @@ "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-group_by_hop_star_/formatted.sql" } ], + "test_sql_format.test[aggregate-group_by_hop_static]": [ + { + "checksum": "a6f19201a2a81c7308fe9947b59276bf", + "size": 955, + "uri": "https://{canondata_backend}/1925821/aca60c4aca6b335189396eb0d636b37dbc38e5d9/resource.tar.gz#test_sql_format.test_aggregate-group_by_hop_static_/formatted.sql" + } + ], "test_sql_format.test[aggregate-group_by_mul_gb_ru]": [ { "checksum": "adae92846c7098e2ea3468096a13ffae", diff --git a/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql new file mode 100644 index 000000000000..7959c5c7df89 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql @@ -0,0 +1,26 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ + +$input = SELECT * FROM AS_TABLE([ + <|"time":"2024-01-01T00:00:01Z", "user": 1|>, + <|"time":"2024-01-01T00:00:02Z", "user": 1|>, + <|"time":"2024-01-01T00:00:03Z", "user": 1|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 3|>, + <|"time":"2024-01-01T00:00:02Z", "user": 3|>, + <|"time":"2024-01-01T00:00:03Z", "user": 3|> +]); + +SELECT + user, + COUNT(*) as count, + HOP_START() as start, +FROM $input +GROUP BY HOP(CAST(time as Timestamp), 'PT1S', 'PT1S', 'PT1S'), user From 7557e9e2c30ecda400868a70a16c8f2b10c581a9 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 19 Sep 2024 08:27:29 +0000 Subject: [PATCH 05/11] remove code duplication --- .../yql/core/common_opt/yql_co_simple1.cpp | 107 +++- ydb/library/yql/core/yql_opt_hopping.cpp | 307 ++++------ ydb/library/yql/core/yql_opt_hopping.h | 55 +- ydb/library/yql/dq/opt/dq_opt_hopping.cpp | 568 +----------------- 4 files changed, 263 insertions(+), 774 deletions(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 1dc635cd47b3..0f8db8841e72 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3302,6 +3302,111 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext& return aggr.Ptr(); } +TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + + NHopping::EnsureNotDistinct(aggregate); + + const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false); + if (!maybeHopTraits) { + return nullptr; + } + const auto hopTraits = *maybeHopTraits; + + const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); + NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + + // if (keysDescription.NeedPickle()) { + // return Build(ctx, pos) + // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + // .Input() + // .InitFrom(aggregate) + // .Input() + // .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) + // .Input(aggregate.Input()) + // .Build() + // .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx)) + // .Build() + // .Done() + // .Ptr(); + // } + + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); + const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx); + const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx); + const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx); + const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx); + const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx); + const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx); + const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + + const auto streamArg = Build(ctx, pos).Name("stream").Done(); + auto multiHoppingCoreBuilder = Build(ctx, pos) + .KeyExtractor(keyLambda) + .TimeExtractor(timeExtractorLambda) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .Delay(hopTraits.Traits.Delay()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .InitHandler(initLambda) + .UpdateHandler(updateLambda) + .MergeHandler(mergeLambda) + .FinishHandler(finishLambda) + .SaveHandler(saveLambda) + .LoadHandler(loadLambda) + .template WatermarkMode().Build(ToString(false)); + + return Build(ctx, pos) + .Input(aggregate.Input()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") + .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { + const auto aggregate = TCoAggregate(node); + + if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { + return nullptr; + } + + if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { + return nullptr; + } + + auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); + if (!result) { + return result; + } + + auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); + if (!outputColumnSetting) { + return result; + } + + return Build(ctx, aggregate.Pos()) + .Input(result) + .Members(outputColumnSetting->ChildPtr(1)) + .Done() + .Ptr(); +} + TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { TVector withAssume; for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) { @@ -5081,7 +5186,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } - if (auto hopping = NHopping::RewriteAsHoppingWindow(node, ctx); hopping) { + if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) { return hopping; } diff --git a/ydb/library/yql/core/yql_opt_hopping.cpp b/ydb/library/yql/core/yql_opt_hopping.cpp index bd2d0c8cb01e..2cfce04cfadc 100644 --- a/ydb/library/yql/core/yql_opt_hopping.cpp +++ b/ydb/library/yql/core/yql_opt_hopping.cpp @@ -1,10 +1,7 @@ #include "yql_opt_hopping.h" -#include #include -#include -#include #include #include @@ -15,126 +12,120 @@ using namespace NYql; using namespace NYql::NNodes; -namespace { - -struct TKeysDescription { - TVector PickleKeys; - TVector MemberKeys; - TVector FakeKeys; - - TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { - for (const auto& key : keys) { - if (key.StringValue() == hoppingColumn) { - FakeKeys.emplace_back(key.StringValue()); - continue; - } +namespace NYql::NHopping { - const auto index = rowType.FindItem(key.StringValue()); - Y_ENSURE(index); +TKeysDescription::TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { + for (const auto& key : keys) { + if (key.StringValue() == hoppingColumn) { + FakeKeys.emplace_back(key.StringValue()); + continue; + } - auto itemType = rowType.GetItems()[*index]->GetItemType(); - if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { - MemberKeys.emplace_back(key.StringValue()); - continue; - } + const auto index = rowType.FindItem(key.StringValue()); + Y_ENSURE(index); - PickleKeys.emplace_back(key.StringValue()); + auto itemType = rowType.GetItems()[*index]->GetItemType(); + if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { + MemberKeys.emplace_back(key.StringValue()); + continue; } - } - TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); + PickleKeys.emplace_back(key.StringValue()); + } +} - TExprBase body = arg; +TExprNode::TPtr TKeysDescription::BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); - for (const auto& key : PickleKeys) { - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); + TExprBase body = arg; - body = Build(ctx, pos) - .Struct(body) + for (const auto& key : PickleKeys) { + const auto member = Build(ctx, pos) .Name().Build(key) - .Item(ctx.NewCallable(pos, "StablePickle", { member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) + .Struct(arg) .Done() .Ptr(); - } - TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { - TCoArgument arg = Build(ctx, pos) - .Name("item") + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "StablePickle", { member })) .Done(); + } - TExprBase body = arg; + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); +} - for (const auto& key : PickleKeys) { - const auto index = rowType.FindItem(key); - Y_ENSURE(index); +TExprNode::TPtr TKeysDescription::BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); - auto itemType = rowType.GetItems().at(*index)->GetItemType(); - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); + TExprBase body = arg; - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) - .Done(); - } + for (const auto& key : PickleKeys) { + const auto index = rowType.FindItem(key); + Y_ENSURE(index); - return Build(ctx, pos) - .Args({arg}) - .Body(body) + auto itemType = rowType.GetItems().at(*index)->GetItemType(); + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) .Done() .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) + .Done(); } - TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { - TVector res; - res.reserve(PickleKeys.size() + MemberKeys.size()); + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); +} - for (const auto& pickleKey : PickleKeys) { - res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); - } - for (const auto& memberKey : MemberKeys) { - res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); - } - return res; - } +TVector TKeysDescription::GetKeysList(TExprContext& ctx, TPositionHandle pos) const { + TVector res; + res.reserve(PickleKeys.size() + MemberKeys.size()); - TVector GetActualGroupKeys() { - TVector result; - result.reserve(PickleKeys.size() + MemberKeys.size()); - result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); - result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); - return result; + for (const auto& pickleKey : PickleKeys) { + res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); } - - bool NeedPickle() const { - return !PickleKeys.empty(); + for (const auto& memberKey : MemberKeys) { + res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); } + return res; +} - TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { - auto builder = Build(ctx, pos); - for (auto key : GetKeysList(ctx, pos)) { - builder.Add(std::move(key)); - } - return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); +TVector TKeysDescription::GetActualGroupKeys() const { + TVector result; + result.reserve(PickleKeys.size() + MemberKeys.size()); + result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); + result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); + return result; +} + +bool TKeysDescription::NeedPickle() const { + return !PickleKeys.empty(); +} + +TExprNode::TPtr TKeysDescription::GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { + auto builder = Build(ctx, pos); + for (auto key : GetKeysList(ctx, pos)) { + builder.Add(std::move(key)); } -}; + return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); +} TString BuildColumnName(const TExprBase& column) { if (const auto columnName = column.Maybe()) { @@ -166,7 +157,7 @@ void EnsureNotDistinct(const TCoAggregate& aggregate) { "Distinct is not supported for aggregation with hop"); } -TMaybe> ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx) { +TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { const auto pos = aggregate.Pos(); const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); @@ -229,7 +220,20 @@ TMaybe> ExtractHopTraits(const TCoAggregat return Nothing(); } - return std::tuple{hoppingColumn, traits}; + const auto newTraits = Build(ctx, aggregate.Pos()) + .InitFrom(traits) + .DataWatermarks(analyticsMode + ? ctx.NewAtom(aggregate.Pos(), "false") + : traits.DataWatermarks().Ptr()) + .Done(); + + return THoppingTraits { + hoppingColumn, + newTraits, + hop, + interval, + delay + }; } TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { @@ -562,113 +566,4 @@ TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& .Ptr(); } -TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - - EnsureNotDistinct(aggregate); - - const auto maybeHopTraits = ExtractHopTraits(aggregate, ctx); - if (!maybeHopTraits) { - return nullptr; - } - const auto [hoppingColumn, hoppingTraits] = *maybeHopTraits; - - const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); - TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hoppingColumn); - - // if (keysDescription.NeedPickle()) { - // return Build(ctx, pos) - // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) - // .Input() - // .InitFrom(aggregate) - // .Input() - // .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) - // .Input(aggregate.Input()) - // .Build() - // .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx)) - // .Build() - // .Done() - // .Ptr(); - // } - - const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); - const auto timeExtractorLambda = BuildTimeExtractor(hoppingTraits, ctx); - const auto initLambda = BuildInitHopLambda(aggregate, ctx); - const auto updateLambda = BuildUpdateHopLambda(aggregate, ctx); - const auto saveLambda = BuildSaveHopLambda(aggregate, ctx); - const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); - const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); - const auto finishLambda = BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hoppingColumn, ctx); - - const auto streamArg = Build(ctx, pos).Name("stream").Done(); - auto multiHoppingCoreBuilder = Build(ctx, pos) - .KeyExtractor(keyLambda) - .TimeExtractor(timeExtractorLambda) - .Hop(hoppingTraits.Hop()) - .Interval(hoppingTraits.Interval()) - .Delay(hoppingTraits.Delay()) - .DataWatermarks(hoppingTraits.DataWatermarks()) - .InitHandler(initLambda) - .UpdateHandler(updateLambda) - .MergeHandler(mergeLambda) - .FinishHandler(finishLambda) - .SaveHandler(saveLambda) - .LoadHandler(loadLambda) - .template WatermarkMode().Build(ToString(false)); - - return Build(ctx, pos) - .Input(aggregate.Input()) - .KeySelectorLambda(keyLambda) - .SortDirections() - .Literal() - .Value("true") - .Build() - .Build() - .SortKeySelectorLambda(timeExtractorLambda) - .ListHandlerLambda() - .Args(streamArg) - .template Body() - .Stream(multiHoppingCoreBuilder - .template Input() - .List(streamArg) - .Build() - .Done()) - .Build() - .Build() - .Done() - .Ptr(); -} - -} // namespace - -namespace NYql::NHopping { - -TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { - const auto aggregate = TCoAggregate(node); - - if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { - return nullptr; - } - - if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { - return nullptr; - } - - auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); - if (!result) { - return result; - } - - auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); - if (!outputColumnSetting) { - return result; - } - - return Build(ctx, aggregate.Pos()) - .Input(result) - .Members(outputColumnSetting->ChildPtr(1)) - .Done() - .Ptr(); -} - } // NYql::NHopping diff --git a/ydb/library/yql/core/yql_opt_hopping.h b/ydb/library/yql/core/yql_opt_hopping.h index 3e90b45e2df1..a9c2f458bb08 100644 --- a/ydb/library/yql/core/yql_opt_hopping.h +++ b/ydb/library/yql/core/yql_opt_hopping.h @@ -1,11 +1,64 @@ #pragma once +#include #include #include namespace NYql::NHopping { -TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx); +struct THoppingTraits { + TString Column; + NYql::NNodes::TCoHoppingTraits Traits; + ui64 Hop; + ui64 Interval; + ui64 Delay; +}; + +struct TKeysDescription { + TVector PickleKeys; + TVector MemberKeys; + TVector FakeKeys; + + TKeysDescription(const TStructExprType& rowType, const NYql::NNodes::TCoAtomList& keys, const TString& hoppingColumn); + + TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const; + + TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType); + + TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const; + + TVector GetActualGroupKeys() const; + + bool NeedPickle() const; + + TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType); +}; + +TString BuildColumnName(const NYql::NNodes::TExprBase& column); + +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting); + +void EnsureNotDistinct(const NYql::NNodes::TCoAggregate& aggregate); + +TMaybe ExtractHopTraits(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode); + +TExprNode::TPtr BuildTimeExtractor(const NYql::NNodes::TCoHoppingTraits& hoppingTraits, TExprContext& ctx); + +TExprNode::TPtr BuildInitHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildUpdateHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildMergeHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildFinishHopLambda( + const NYql::NNodes::TCoAggregate& aggregate, + const TVector& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx); + +TExprNode::TPtr BuildSaveHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildLoadHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); } // namespace NYql::NHopping diff --git a/ydb/library/yql/dq/opt/dq_opt_hopping.cpp b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp index 661fc2b6d724..daae3444ad6d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_hopping.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp @@ -1,6 +1,7 @@ #include "dq_opt_hopping.h" #include +#include #include #include @@ -19,366 +20,11 @@ using namespace NYql; using namespace NYql::NDq; +using namespace NYql::NHopping; using namespace NYql::NNodes; namespace { -struct THoppingTraits { - TString Column; - TCoHoppingTraits Traits; - ui64 Hop; - ui64 Interval; - ui64 Delay; -}; - - struct TKeysDescription { - TVector PickleKeys; - TVector MemberKeys; - TVector FakeKeys; - - TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { - for (const auto& key : keys) { - if (key.StringValue() == hoppingColumn) { - FakeKeys.emplace_back(key.StringValue()); - continue; - } - - const auto index = rowType.FindItem(key.StringValue()); - Y_ENSURE(index); - - auto itemType = rowType.GetItems()[*index]->GetItemType(); - if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { - MemberKeys.emplace_back(key.StringValue()); - continue; - } - - PickleKeys.emplace_back(key.StringValue()); - } - } - - TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "StablePickle", { member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto index = rowType.FindItem(key); - Y_ENSURE(index); - - auto itemType = rowType.GetItems().at(*index)->GetItemType(); - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { - TVector res; - res.reserve(PickleKeys.size() + MemberKeys.size()); - - for (const auto& pickleKey : PickleKeys) { - res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); - } - for (const auto& memberKey : MemberKeys) { - res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); - } - return res; - } - - TVector GetActualGroupKeys() { - TVector result; - result.reserve(PickleKeys.size() + MemberKeys.size()); - result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); - result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); - return result; - } - - bool NeedPickle() const { - return !PickleKeys.empty(); - } - - TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { - auto builder = Build(ctx, pos); - for (auto key : GetKeysList(ctx, pos)) { - builder.Add(std::move(key)); - } - return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); - } -}; - -TString BuildColumnName(const TExprBase& column) { - if (const auto columnName = column.Maybe()) { - return columnName.Cast().StringValue(); - } - - if (const auto columnNames = column.Maybe()) { - TStringBuilder columnNameBuilder; - for (const auto columnName : columnNames.Cast()) { - columnNameBuilder.append(columnName.StringValue()); - columnNameBuilder.append("_"); - } - return columnNameBuilder; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << column.Ptr()->Dump()); -} - -bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { - return !hoppingSetting->Child(1)->IsList(); -} - -void EnsureNotDistinct(const TCoAggregate& aggregate) { - const auto& aggregateHandlers = aggregate.Handlers(); - - YQL_ENSURE( - AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }), - "Distinct is not supported for aggregation with hop"); -} - -TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { - const auto pos = aggregate.Pos(); - - const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); - if (!hopSetting) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); - return Nothing(); - } - - const auto hoppingColumn = IsLegacyHopping(hopSetting) - ? "_yql_time" - : TString(hopSetting->Child(1)->Child(0)->Content()); - - const auto traitsNode = IsLegacyHopping(hopSetting) - ? hopSetting->Child(1) - : hopSetting->Child(1)->Child(1); - - const auto maybeTraits = TMaybeNode(traitsNode); - if (!maybeTraits) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); - return Nothing(); - } - - const auto traits = maybeTraits.Cast(); - - const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { - if (param.Maybe()) { - param = param.Cast().Input(); - } - if (!param.Maybe()) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); - return 0; - } - auto value = FromString(param.Cast().Literal().Value()); - if (value <= 0) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); - return 0; - } - return (ui64)value; - }; - - const auto hop = checkIntervalParam(traits.Hop()); - if (!hop) { - return Nothing(); - } - const auto interval = checkIntervalParam(traits.Interval()); - if (!interval) { - return Nothing(); - } - const auto delay = checkIntervalParam(traits.Delay()); - if (!delay) { - return Nothing(); - } - - if (interval < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); - return Nothing(); - } - if (delay < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); - return Nothing(); - } - - const auto newTraits = Build(ctx, aggregate.Pos()) - .InitFrom(traits) - .DataWatermarks(analyticsMode - ? ctx.NewAtom(aggregate.Pos(), "false") - : traits.DataWatermarks().Ptr()) - .Done(); - - return THoppingTraits { - hoppingColumn, - newTraits, - hop, - interval, - delay - }; -} - -TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { - const auto pos = hoppingTraits.Pos(); - - if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { - // The case when no fields are used in lambda. F.e. when it has only DependsOn. - return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); - } - - return Build(ctx, pos) - .Args({"item"}) - .Body() - .Apply(hoppingTraits.TimeExtractor()) - .With(0) - .Type(hoppingTraits.ItemType()) - .Value("item") - .Build() - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto initItemArg = Build(ctx, pos).Name("item").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - ui32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - - TMaybeNode applier; - if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .With(1) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(BuildColumnName(tuple.ColumnName())) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({initItemArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto updateItemArg = Build(ctx, pos).Name("item").Done(); - const auto updateStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - i32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(updateStateArg) - .Name().Build(columnName) - .Done(); - - TMaybeNode applier; - if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .With(2) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({updateItemArg, updateStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - TExprNode::TPtr WrapToShuffle( const TKeysDescription& keysDescription, const TCoAggregate& aggregate, @@ -421,216 +67,6 @@ TExprNode::TPtr WrapToShuffle( .Ptr(); } -TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); - const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member1 = Build(ctx, pos) - .Struct(mergeState1Arg) - .Name().Build(columnName) - .Done(); - const auto member2 = Build(ctx, pos) - .Struct(mergeState2Arg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().MergeHandler()) - .With(0, member1) - .With(1, member2) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({mergeState1Arg, mergeState2Arg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildFinishHopLambda( - const TCoAggregate& aggregate, - const TVector& actualGroupKeys, - const TString& hoppingColumn, - TExprContext& ctx) -{ - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); - const auto finishStateArg = Build(ctx, pos).Name("state").Done(); - const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); - - TVector structItems; - structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); - - if (actualGroupKeys.size() == 1) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[0]) - .Value(finishKeyArg) - .Done()); - } else { - for (size_t i = 0; i < actualGroupKeys.size(); ++i) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[i]) - .Value() - .Tuple(finishKeyArg) - .Index() - .Value(ToString(i)) - .Build() - .Build() - .Done()); - } - } - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(finishStateArg) - .Name().Build(compoundColumnName) - .Done(); - - if (tuple.ColumnName().Maybe()) { - structItems.push_back(Build(ctx, pos) - .Name().Build(compoundColumnName) - .Value() - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Build() - .Done()); - - continue; - } - - if (const auto namesList = tuple.ColumnName().Maybe()) { - const auto expApplier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Done(); - - int index = 0; - for (const auto columnName : namesList.Cast()) { - const auto extracter = Build(ctx, pos) - .Tuple(expApplier) - .Index().Build(index++) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name(columnName) - .Value(extracter) - .Done()); - } - - continue; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << tuple.ColumnName().Ptr()->Dump()); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(hoppingColumn) - .Value(finishTimeArg) - .Done()); - - return Build(ctx, pos) - .Args({finishKeyArg, finishStateArg, finishTimeArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto saveStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(saveStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().SaveHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({saveStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(loadStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().LoadHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({loadStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - TMaybe BuildWatermarkMode( const TCoAggregate& aggregate, const TCoHoppingTraits& hoppingTraits, From a6ea71c58efb20b7917ad3e9865f88d699ef5d08 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Fri, 20 Sep 2024 13:55:57 +0000 Subject: [PATCH 06/11] make RewriteAggregate optimization filter more proper --- ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 0f8db8841e72..9eccb4bb6f79 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3382,7 +3382,7 @@ TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { const auto aggregate = TCoAggregate(node); - if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { + if (!IsPureIsolatedLambda(*aggregate.Ptr())) { return nullptr; } @@ -5187,6 +5187,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { } if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) { + YQL_CLOG(DEBUG, Core) << "RewriteAggregate"; return hopping; } From 2973ebcd90d34b0e72913a094722eac9a212e91f Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Tue, 24 Sep 2024 11:56:30 +0000 Subject: [PATCH 07/11] fix after review --- ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 9eccb4bb6f79..8a05bd1651ec 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3316,6 +3316,7 @@ TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + // TODO(YQ-3699): uncomment // if (keysDescription.NeedPickle()) { // return Build(ctx, pos) // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) @@ -5186,7 +5187,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } - if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) { + if (auto hopping = RewriteAsHoppingWindow(node, ctx)) { YQL_CLOG(DEBUG, Core) << "RewriteAggregate"; return hopping; } From 3f6bbfe546f8f984e4690f8952eb19d3778bc283 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Tue, 1 Oct 2024 13:54:23 +0000 Subject: [PATCH 08/11] make comment more detailed --- ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 8a05bd1651ec..88c1dac9ce67 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3316,7 +3316,9 @@ TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); - // TODO(YQ-3699): uncomment + // TODO(YQ-3699) + // To enable aggregation not only for simple types (int, string, etc) but also for complex types (list, dict, set, null, etc), + // uncomment these lines and fix the constaints for TCoMap in ydb/library/yql/core/yql_expr_constraint.cpp // if (keysDescription.NeedPickle()) { // return Build(ctx, pos) // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) From dba1e7a754ee0074776b17fea35bcad1b58dfbf2 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 3 Oct 2024 14:11:45 +0000 Subject: [PATCH 09/11] fix aggregation for 1 non-trivial column --- ydb/library/yql/core/common_opt/yql_co_simple1.cpp | 13 ++++++++----- ydb/library/yql/core/yql_expr_constraint.cpp | 2 ++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 88c1dac9ce67..647285f52574 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3371,10 +3371,13 @@ TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, .ListHandlerLambda() .Args(streamArg) .template Body() - .Stream(multiHoppingCoreBuilder - .template Input() - .List(streamArg) - .Build() + .Stream(Build(ctx, pos) + .Input(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) .Done()) .Build() .Build() @@ -5190,7 +5193,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { } if (auto hopping = RewriteAsHoppingWindow(node, ctx)) { - YQL_CLOG(DEBUG, Core) << "RewriteAggregate"; + YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow"; return hopping; } diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index e1b3a4a66c05..7367911ae995 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -245,6 +245,8 @@ class TCallableConstraintTransformer : public TCallableTransformerBase; Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap; Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap; + Functions["StablePickle"] = &TCallableConstraintTransformer::FromFirst; + Functions["Unpickle"] = &TCallableConstraintTransformer::FromSecond; } std::optional ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { From e384e3ebddb2d4e1788f080c6f82d0adc193d23e Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Fri, 4 Oct 2024 12:45:47 +0000 Subject: [PATCH 10/11] final fix of constraints --- .../yql/core/ut/yql_expr_constraint_ut.cpp | 29 +++++++++++++++++++ ydb/library/yql/core/yql_expr_constraint.cpp | 9 ++++-- ydb/library/yql/core/yql_opt_utils.h | 4 +-- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp index 2a44fef51a89..df3fdef8e5b3 100644 --- a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp +++ b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp @@ -3270,6 +3270,35 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) { CheckConstraint(exprRoot, "LazyList", ""); CheckConstraint(exprRoot, "LazyList", ""); } + + Y_UNIT_TEST(GroupByHop) { + const TStringBuf s = R"(( +(let list (AsList + (AsStruct '('"time" (String '"2024-01-01T00:00:01Z")) '('"user" (Int32 '"1")) '('"data" (Null))) + (AsStruct '('"time" (String '"2024-01-01T00:00:02Z")) '('"user" (Int32 '"1")) '('"data" (Null))) + (AsStruct '('"time" (String '"2024-01-01T00:00:03Z")) '('"user" (Int32 '"1")) '('"data" (Null))) +)) +(let input (FlatMap list (lambda '(row) (Just (AsStruct '('"data" (Member row '"data")) '('group0 (AsList (Member row '"user"))) '('"time" (Member row '"time")) '('"user" (Member row '"user"))))))) +(let keySelector (lambda '(row) '((StablePickle (Member row '"data")) (StablePickle (Member row 'group0))))) +(let sortKeySelector (lambda '(row) (SafeCast (Member row '"time") (OptionalType (DataType 'Timestamp))))) +(let res (PartitionsByKeys input keySelector (Bool 'true) sortKeySelector (lambda '(row) (block '( + (let interval (Interval '1000000)) + (let map (lambda '(item) (AsStruct))) + (let reduce (lambda '(lhs rhs) (AsStruct))) + (let hopping (MultiHoppingCore (Iterator row) keySelector sortKeySelector interval interval interval 'true map reduce map map reduce (lambda '(key state time) (AsStruct '('_yql_time time) '('"data" (Nth key '"0")) '('group0 (Nth key '"1")))) '"0")) + (return (ForwardList (FlatMap hopping (lambda '(row) (Just (AsStruct '('_yql_time (Member row '_yql_time)) '('"data" (Unpickle (NullType) (Member row '"data"))) '('group0 (Unpickle (ListType (DataType 'Int32)) (Member row 'group0))))))))) +))))) + +(let res_sink (DataSink 'yt (quote plato))) +(let world (Write! world res_sink (Key '('table (String 'Output))) res '('('mode 'renew)))) +(return (Commit! world res_sink)) + ))"; + + TExprContext exprCtx; + const auto exprRoot = ParseAndAnnotate(s, exprCtx); + CheckConstraint(exprRoot, "PartitionsByKeys", "Distinct((data,group0))"); + CheckConstraint(exprRoot, "PartitionsByKeys", "Unique((data,group0))"); + } } } // namespace NYql diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index 7367911ae995..63f2aca8a746 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -2930,8 +2930,11 @@ class TCallableConstraintTransformer : public TCallableTransformerBaseChild(TCoMultiHoppingCore::idx_KeyExtractor); - std::vector columns; - ExtractKeys(*keySelectorLambda, columns); + const auto keys = GetPathsToKeys(keySelectorLambda->Tail(), keySelectorLambda->Head().Head()); + std::vector columns(keys.size()); + std::transform(keys.begin(), keys.end(), columns.begin(), [](const TPartOfConstraintBase::TPathType& path) -> std::string_view { + return path.front(); + }); if (!columns.empty()) { input->AddConstraint(ctx.MakeConstraint(columns)); input->AddConstraint(ctx.MakeConstraint(columns)); @@ -2959,7 +2962,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase& columns) { + static void ExtractKeys(const TExprNode& keySelectorLambda, TVector& columns) { const auto arg = keySelectorLambda.Head().Child(0); auto body = keySelectorLambda.Child(1); if (body->IsCallable("StablePickle")) { diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 744a9cf9b2a3..452768fbce52 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -87,8 +87,8 @@ TExprNode::TPtr ExpandFlattenByColumns(const TExprNode::TPtr& node, TExprContext TExprNode::TPtr ExpandCastStruct(const TExprNode::TPtr& node, TExprContext& ctx); TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& ctx); -void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, std::vector& columns); -inline void ExtractSimpleKeys(const TExprNode& keySelectorLambda, std::vector& columns) { +void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, TVector& columns); +inline void ExtractSimpleKeys(const TExprNode& keySelectorLambda, TVector& columns) { ExtractSimpleKeys(keySelectorLambda.Child(1), keySelectorLambda.Child(0)->Child(0), columns); } From e38b47a008946ea8bfa0e1c3123bc88a33b17169 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Fri, 4 Oct 2024 12:48:33 +0000 Subject: [PATCH 11/11] remove provocative comment --- .../yql/core/common_opt/yql_co_simple1.cpp | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 647285f52574..adeb5106c36a 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -3316,24 +3316,6 @@ TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); - // TODO(YQ-3699) - // To enable aggregation not only for simple types (int, string, etc) but also for complex types (list, dict, set, null, etc), - // uncomment these lines and fix the constaints for TCoMap in ydb/library/yql/core/yql_expr_constraint.cpp - // if (keysDescription.NeedPickle()) { - // return Build(ctx, pos) - // .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) - // .Input() - // .InitFrom(aggregate) - // .Input() - // .Lambda(keysDescription.BuildPickleLambda(ctx, pos)) - // .Input(aggregate.Input()) - // .Build() - // .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx)) - // .Build() - // .Done() - // .Ptr(); - // } - const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx); const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);