diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index c63dc75d344b..adeb5106c36a 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -3301,6 +3302,99 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext& return aggr.Ptr(); } +TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + + NHopping::EnsureNotDistinct(aggregate); + + const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false); + if (!maybeHopTraits) { + return nullptr; + } + const auto hopTraits = *maybeHopTraits; + + const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast(); + NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column); + + const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType); + const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx); + const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx); + const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx); + const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx); + const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx); + const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx); + const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx); + + const auto streamArg = Build(ctx, pos).Name("stream").Done(); + auto multiHoppingCoreBuilder = Build(ctx, pos) + .KeyExtractor(keyLambda) + .TimeExtractor(timeExtractorLambda) + .Hop(hopTraits.Traits.Hop()) + .Interval(hopTraits.Traits.Interval()) + .Delay(hopTraits.Traits.Delay()) + .DataWatermarks(hopTraits.Traits.DataWatermarks()) + .InitHandler(initLambda) + .UpdateHandler(updateLambda) + .MergeHandler(mergeLambda) + .FinishHandler(finishLambda) + .SaveHandler(saveLambda) + .LoadHandler(loadLambda) + .template WatermarkMode().Build(ToString(false)); + + return Build(ctx, pos) + .Input(aggregate.Input()) + .KeySelectorLambda(keyLambda) + .SortDirections() + .Literal() + .Value("true") + .Build() + .Build() + .SortKeySelectorLambda(timeExtractorLambda) + .ListHandlerLambda() + .Args(streamArg) + .template Body() + .Stream(Build(ctx, pos) + .Input(multiHoppingCoreBuilder + .template Input() + .List(streamArg) + .Build() + .Done()) + .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType)) + .Done()) + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) { + const auto aggregate = TCoAggregate(node); + + if (!IsPureIsolatedLambda(*aggregate.Ptr())) { + return nullptr; + } + + if (!GetSetting(aggregate.Settings().Ref(), "hopping")) { + return nullptr; + } + + auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx); + if (!result) { + return result; + } + + auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns"); + if (!outputColumnSetting) { + return result; + } + + return Build(ctx, aggregate.Pos()) + .Input(result) + .Members(outputColumnSetting->ChildPtr(1)) + .Done() + .Ptr(); +} + TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { TVector withAssume; for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) { @@ -5080,6 +5174,11 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return clean; } + if (auto hopping = RewriteAsHoppingWindow(node, ctx)) { + YQL_CLOG(DEBUG, Core) << "RewriteAsHoppingWindow"; + return hopping; + } + return DropReorder(node, ctx); }; diff --git a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp index 2a44fef51a89..df3fdef8e5b3 100644 --- a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp +++ b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp @@ -3270,6 +3270,35 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) { CheckConstraint(exprRoot, "LazyList", ""); CheckConstraint(exprRoot, "LazyList", ""); } + + Y_UNIT_TEST(GroupByHop) { + const TStringBuf s = R"(( +(let list (AsList + (AsStruct '('"time" (String '"2024-01-01T00:00:01Z")) '('"user" (Int32 '"1")) '('"data" (Null))) + (AsStruct '('"time" (String '"2024-01-01T00:00:02Z")) '('"user" (Int32 '"1")) '('"data" (Null))) + (AsStruct '('"time" (String '"2024-01-01T00:00:03Z")) '('"user" (Int32 '"1")) '('"data" (Null))) +)) +(let input (FlatMap list (lambda '(row) (Just (AsStruct '('"data" (Member row '"data")) '('group0 (AsList (Member row '"user"))) '('"time" (Member row '"time")) '('"user" (Member row '"user"))))))) +(let keySelector (lambda '(row) '((StablePickle (Member row '"data")) (StablePickle (Member row 'group0))))) +(let sortKeySelector (lambda '(row) (SafeCast (Member row '"time") (OptionalType (DataType 'Timestamp))))) +(let res (PartitionsByKeys input keySelector (Bool 'true) sortKeySelector (lambda '(row) (block '( + (let interval (Interval '1000000)) + (let map (lambda '(item) (AsStruct))) + (let reduce (lambda '(lhs rhs) (AsStruct))) + (let hopping (MultiHoppingCore (Iterator row) keySelector sortKeySelector interval interval interval 'true map reduce map map reduce (lambda '(key state time) (AsStruct '('_yql_time time) '('"data" (Nth key '"0")) '('group0 (Nth key '"1")))) '"0")) + (return (ForwardList (FlatMap hopping (lambda '(row) (Just (AsStruct '('_yql_time (Member row '_yql_time)) '('"data" (Unpickle (NullType) (Member row '"data"))) '('group0 (Unpickle (ListType (DataType 'Int32)) (Member row 'group0))))))))) +))))) + +(let res_sink (DataSink 'yt (quote plato))) +(let world (Write! world res_sink (Key '('table (String 'Output))) res '('('mode 'renew)))) +(return (Commit! world res_sink)) + ))"; + + TExprContext exprCtx; + const auto exprRoot = ParseAndAnnotate(s, exprCtx); + CheckConstraint(exprRoot, "PartitionsByKeys", "Distinct((data,group0))"); + CheckConstraint(exprRoot, "PartitionsByKeys", "Unique((data,group0))"); + } } } // namespace NYql diff --git a/ydb/library/yql/core/ya.make b/ydb/library/yql/core/ya.make index ab1abc133ab5..f68a92cab417 100644 --- a/ydb/library/yql/core/ya.make +++ b/ydb/library/yql/core/ya.make @@ -28,6 +28,7 @@ SRCS( yql_join.cpp yql_join.h yql_library_compiler.cpp + yql_opt_hopping.cpp yql_opt_match_recognize.cpp yql_opt_match_recognize.h yql_opt_proposed_by_data.cpp diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index 8a9f5e79c5aa..63f2aca8a746 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -244,6 +244,9 @@ class TCallableConstraintTransformer : public TCallableTransformerBase; Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap; Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap; + Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap; + Functions["StablePickle"] = &TCallableConstraintTransformer::FromFirst; + Functions["Unpickle"] = &TCallableConstraintTransformer::FromSecond; } std::optional ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { @@ -2920,6 +2923,26 @@ class TCallableConstraintTransformer : public TCallableTransformerBaseChild(TCoMultiHoppingCore::idx_KeyExtractor); + const auto keys = GetPathsToKeys(keySelectorLambda->Tail(), keySelectorLambda->Head().Head()); + std::vector columns(keys.size()); + std::transform(keys.begin(), keys.end(), columns.begin(), [](const TPartOfConstraintBase::TPathType& path) -> std::string_view { + return path.front(); + }); + if (!columns.empty()) { + input->AddConstraint(ctx.MakeConstraint(columns)); + input->AddConstraint(ctx.MakeConstraint(columns)); + } + + return TStatus::Ok; + } + private: template static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet& except) { diff --git a/ydb/library/yql/core/yql_opt_hopping.cpp b/ydb/library/yql/core/yql_opt_hopping.cpp new file mode 100644 index 000000000000..2cfce04cfadc --- /dev/null +++ b/ydb/library/yql/core/yql_opt_hopping.cpp @@ -0,0 +1,569 @@ +#include "yql_opt_hopping.h" + +#include + +#include + +#include +#include +#include +#include + +using namespace NYql; +using namespace NYql::NNodes; + +namespace NYql::NHopping { + +TKeysDescription::TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { + for (const auto& key : keys) { + if (key.StringValue() == hoppingColumn) { + FakeKeys.emplace_back(key.StringValue()); + continue; + } + + const auto index = rowType.FindItem(key.StringValue()); + Y_ENSURE(index); + + auto itemType = rowType.GetItems()[*index]->GetItemType(); + if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { + MemberKeys.emplace_back(key.StringValue()); + continue; + } + + PickleKeys.emplace_back(key.StringValue()); + } +} + +TExprNode::TPtr TKeysDescription::BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "StablePickle", { member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); +} + +TExprNode::TPtr TKeysDescription::BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { + TCoArgument arg = Build(ctx, pos) + .Name("item") + .Done(); + + TExprBase body = arg; + + for (const auto& key : PickleKeys) { + const auto index = rowType.FindItem(key); + Y_ENSURE(index); + + auto itemType = rowType.GetItems().at(*index)->GetItemType(); + const auto member = Build(ctx, pos) + .Name().Build(key) + .Struct(arg) + .Done() + .Ptr(); + + body = Build(ctx, pos) + .Struct(body) + .Name().Build(key) + .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) + .Done(); + } + + return Build(ctx, pos) + .Args({arg}) + .Body(body) + .Done() + .Ptr(); +} + +TVector TKeysDescription::GetKeysList(TExprContext& ctx, TPositionHandle pos) const { + TVector res; + res.reserve(PickleKeys.size() + MemberKeys.size()); + + for (const auto& pickleKey : PickleKeys) { + res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); + } + for (const auto& memberKey : MemberKeys) { + res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); + } + return res; +} + +TVector TKeysDescription::GetActualGroupKeys() const { + TVector result; + result.reserve(PickleKeys.size() + MemberKeys.size()); + result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); + result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); + return result; +} + +bool TKeysDescription::NeedPickle() const { + return !PickleKeys.empty(); +} + +TExprNode::TPtr TKeysDescription::GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { + auto builder = Build(ctx, pos); + for (auto key : GetKeysList(ctx, pos)) { + builder.Add(std::move(key)); + } + return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); +} + +TString BuildColumnName(const TExprBase& column) { + if (const auto columnName = column.Maybe()) { + return columnName.Cast().StringValue(); + } + + if (const auto columnNames = column.Maybe()) { + TStringBuilder columnNameBuilder; + for (const auto columnName : columnNames.Cast()) { + columnNameBuilder.append(columnName.StringValue()); + columnNameBuilder.append("_"); + } + return columnNameBuilder; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << column.Ptr()->Dump()); +} + +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { + return !hoppingSetting->Child(1)->IsList(); +} + +void EnsureNotDistinct(const TCoAggregate& aggregate) { + const auto& aggregateHandlers = aggregate.Handlers(); + + YQL_ENSURE( + AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }), + "Distinct is not supported for aggregation with hop"); +} + +TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { + const auto pos = aggregate.Pos(); + + const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); + if (!hopSetting) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); + return Nothing(); + } + + const auto hoppingColumn = IsLegacyHopping(hopSetting) + ? "_yql_time" + : TString(hopSetting->Child(1)->Child(0)->Content()); + + const auto traitsNode = IsLegacyHopping(hopSetting) + ? hopSetting->Child(1) + : hopSetting->Child(1)->Child(1); + + const auto maybeTraits = TMaybeNode(traitsNode); + if (!maybeTraits) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); + return Nothing(); + } + + const auto traits = maybeTraits.Cast(); + + const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { + if (param.Maybe()) { + param = param.Cast().Input(); + } + if (!param.Maybe()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); + return 0; + } + auto value = FromString(param.Cast().Literal().Value()); + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); + return 0; + } + return (ui64)value; + }; + + const auto hop = checkIntervalParam(traits.Hop()); + if (!hop) { + return Nothing(); + } + const auto interval = checkIntervalParam(traits.Interval()); + if (!interval) { + return Nothing(); + } + const auto delay = checkIntervalParam(traits.Delay()); + if (!delay) { + return Nothing(); + } + + if (interval < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); + return Nothing(); + } + if (delay < hop) { + ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); + return Nothing(); + } + + const auto newTraits = Build(ctx, aggregate.Pos()) + .InitFrom(traits) + .DataWatermarks(analyticsMode + ? ctx.NewAtom(aggregate.Pos(), "false") + : traits.DataWatermarks().Ptr()) + .Done(); + + return THoppingTraits { + hoppingColumn, + newTraits, + hop, + interval, + delay + }; +} + +TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { + const auto pos = hoppingTraits.Pos(); + + if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { + // The case when no fields are used in lambda. F.e. when it has only DependsOn. + return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); + } + + return Build(ctx, pos) + .Args({"item"}) + .Body() + .Apply(hoppingTraits.TimeExtractor()) + .With(0) + .Type(hoppingTraits.ItemType()) + .Value("item") + .Build() + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto initItemArg = Build(ctx, pos).Name("item").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + ui32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + + TMaybeNode applier; + if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().InitHandler()) + .With(0, initItemArg) + .With(1) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(BuildColumnName(tuple.ColumnName())) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({initItemArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto updateItemArg = Build(ctx, pos).Name("item").Done(); + const auto updateStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + i32 index = 0; + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(updateStateArg) + .Name().Build(columnName) + .Done(); + + TMaybeNode applier; + if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .Done(); + } else { + applier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().UpdateHandler()) + .With(0, updateItemArg) + .With(1, member) + .With(2) + .Literal().Build(ToString(index)) + .Build() + .Done(); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value(applier) + .Done()); + ++index; + } + + return Build(ctx, pos) + .Args({updateItemArg, updateStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto& aggregateHandlers = aggregate.Handlers(); + + const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); + const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member1 = Build(ctx, pos) + .Struct(mergeState1Arg) + .Name().Build(columnName) + .Done(); + const auto member2 = Build(ctx, pos) + .Struct(mergeState2Arg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().MergeHandler()) + .With(0, member1) + .With(1, member2) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({mergeState1Arg, mergeState2Arg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildFinishHopLambda( + const TCoAggregate& aggregate, + const TVector& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx) +{ + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); + const auto finishStateArg = Build(ctx, pos).Name("state").Done(); + const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); + + TVector structItems; + structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); + + if (actualGroupKeys.size() == 1) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[0]) + .Value(finishKeyArg) + .Done()); + } else { + for (size_t i = 0; i < actualGroupKeys.size(); ++i) { + structItems.push_back(Build(ctx, pos) + .Name().Build(actualGroupKeys[i]) + .Value() + .Tuple(finishKeyArg) + .Index() + .Value(ToString(i)) + .Build() + .Build() + .Done()); + } + } + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(finishStateArg) + .Name().Build(compoundColumnName) + .Done(); + + if (tuple.ColumnName().Maybe()) { + structItems.push_back(Build(ctx, pos) + .Name().Build(compoundColumnName) + .Value() + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Build() + .Done()); + + continue; + } + + if (const auto namesList = tuple.ColumnName().Maybe()) { + const auto expApplier = Build(ctx, pos) + .Apply(tuple.Trait().Cast().FinishHandler()) + .With(0, member) + .Done(); + + int index = 0; + for (const auto columnName : namesList.Cast()) { + const auto extracter = Build(ctx, pos) + .Tuple(expApplier) + .Index().Build(index++) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name(columnName) + .Value(extracter) + .Done()); + } + + continue; + } + + YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " + << tuple.ColumnName().Ptr()->Dump()); + } + + structItems.push_back(Build(ctx, pos) + .Name().Build(hoppingColumn) + .Value(finishTimeArg) + .Done()); + + return Build(ctx, pos) + .Args({finishKeyArg, finishStateArg, finishTimeArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + const auto saveStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(saveStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().SaveHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({saveStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto pos = aggregate.Pos(); + const auto aggregateHandlers = aggregate.Handlers(); + + TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); + + TVector structItems; + structItems.reserve(aggregateHandlers.Size()); + + for (const auto& handler : aggregateHandlers) { + const auto tuple = handler.Cast(); + const TString columnName = BuildColumnName(tuple.ColumnName()); + + const auto member = Build(ctx, pos) + .Struct(loadStateArg) + .Name().Build(columnName) + .Done(); + + structItems.push_back(Build(ctx, pos) + .Name().Build(columnName) + .Value() + .Apply(tuple.Trait().Cast().LoadHandler()) + .With(0, member) + .Build() + .Done()); + } + + return Build(ctx, pos) + .Args({loadStateArg}) + .Body() + .Add(structItems) + .Build() + .Done() + .Ptr(); +} + +} // NYql::NHopping diff --git a/ydb/library/yql/core/yql_opt_hopping.h b/ydb/library/yql/core/yql_opt_hopping.h new file mode 100644 index 000000000000..a9c2f458bb08 --- /dev/null +++ b/ydb/library/yql/core/yql_opt_hopping.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include + +#include + +namespace NYql::NHopping { + +struct THoppingTraits { + TString Column; + NYql::NNodes::TCoHoppingTraits Traits; + ui64 Hop; + ui64 Interval; + ui64 Delay; +}; + +struct TKeysDescription { + TVector PickleKeys; + TVector MemberKeys; + TVector FakeKeys; + + TKeysDescription(const TStructExprType& rowType, const NYql::NNodes::TCoAtomList& keys, const TString& hoppingColumn); + + TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const; + + TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType); + + TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const; + + TVector GetActualGroupKeys() const; + + bool NeedPickle() const; + + TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType); +}; + +TString BuildColumnName(const NYql::NNodes::TExprBase& column); + +bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting); + +void EnsureNotDistinct(const NYql::NNodes::TCoAggregate& aggregate); + +TMaybe ExtractHopTraits(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode); + +TExprNode::TPtr BuildTimeExtractor(const NYql::NNodes::TCoHoppingTraits& hoppingTraits, TExprContext& ctx); + +TExprNode::TPtr BuildInitHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildUpdateHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildMergeHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildFinishHopLambda( + const NYql::NNodes::TCoAggregate& aggregate, + const TVector& actualGroupKeys, + const TString& hoppingColumn, + TExprContext& ctx); + +TExprNode::TPtr BuildSaveHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +TExprNode::TPtr BuildLoadHopLambda(const NYql::NNodes::TCoAggregate& aggregate, TExprContext& ctx); + +} // namespace NYql::NHopping diff --git a/ydb/library/yql/dq/opt/dq_opt_hopping.cpp b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp index 661fc2b6d724..daae3444ad6d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_hopping.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_hopping.cpp @@ -1,6 +1,7 @@ #include "dq_opt_hopping.h" #include +#include #include #include @@ -19,366 +20,11 @@ using namespace NYql; using namespace NYql::NDq; +using namespace NYql::NHopping; using namespace NYql::NNodes; namespace { -struct THoppingTraits { - TString Column; - TCoHoppingTraits Traits; - ui64 Hop; - ui64 Interval; - ui64 Delay; -}; - - struct TKeysDescription { - TVector PickleKeys; - TVector MemberKeys; - TVector FakeKeys; - - TKeysDescription(const TStructExprType& rowType, const TCoAtomList& keys, const TString& hoppingColumn) { - for (const auto& key : keys) { - if (key.StringValue() == hoppingColumn) { - FakeKeys.emplace_back(key.StringValue()); - continue; - } - - const auto index = rowType.FindItem(key.StringValue()); - Y_ENSURE(index); - - auto itemType = rowType.GetItems()[*index]->GetItemType(); - if (RemoveOptionalType(itemType)->GetKind() == ETypeAnnotationKind::Data) { - MemberKeys.emplace_back(key.StringValue()); - continue; - } - - PickleKeys.emplace_back(key.StringValue()); - } - } - - TExprNode::TPtr BuildPickleLambda(TExprContext& ctx, TPositionHandle pos) const { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "StablePickle", { member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TExprNode::TPtr BuildUnpickleLambda(TExprContext& ctx, TPositionHandle pos, const TStructExprType& rowType) { - TCoArgument arg = Build(ctx, pos) - .Name("item") - .Done(); - - TExprBase body = arg; - - for (const auto& key : PickleKeys) { - const auto index = rowType.FindItem(key); - Y_ENSURE(index); - - auto itemType = rowType.GetItems().at(*index)->GetItemType(); - const auto member = Build(ctx, pos) - .Name().Build(key) - .Struct(arg) - .Done() - .Ptr(); - - body = Build(ctx, pos) - .Struct(body) - .Name().Build(key) - .Item(ctx.NewCallable(pos, "Unpickle", { ExpandType(pos, *itemType, ctx), member })) - .Done(); - } - - return Build(ctx, pos) - .Args({arg}) - .Body(body) - .Done() - .Ptr(); - } - - TVector GetKeysList(TExprContext& ctx, TPositionHandle pos) const { - TVector res; - res.reserve(PickleKeys.size() + MemberKeys.size()); - - for (const auto& pickleKey : PickleKeys) { - res.emplace_back(Build(ctx, pos).Value(pickleKey).Done()); - } - for (const auto& memberKey : MemberKeys) { - res.emplace_back(Build(ctx, pos).Value(memberKey).Done()); - } - return res; - } - - TVector GetActualGroupKeys() { - TVector result; - result.reserve(PickleKeys.size() + MemberKeys.size()); - result.insert(result.end(), PickleKeys.begin(), PickleKeys.end()); - result.insert(result.end(), MemberKeys.begin(), MemberKeys.end()); - return result; - } - - bool NeedPickle() const { - return !PickleKeys.empty(); - } - - TExprNode::TPtr GetKeySelector(TExprContext& ctx, TPositionHandle pos, const TStructExprType* rowType) { - auto builder = Build(ctx, pos); - for (auto key : GetKeysList(ctx, pos)) { - builder.Add(std::move(key)); - } - return BuildKeySelector(pos, *rowType, builder.Build().Value().Ptr(), ctx); - } -}; - -TString BuildColumnName(const TExprBase& column) { - if (const auto columnName = column.Maybe()) { - return columnName.Cast().StringValue(); - } - - if (const auto columnNames = column.Maybe()) { - TStringBuilder columnNameBuilder; - for (const auto columnName : columnNames.Cast()) { - columnNameBuilder.append(columnName.StringValue()); - columnNameBuilder.append("_"); - } - return columnNameBuilder; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << column.Ptr()->Dump()); -} - -bool IsLegacyHopping(const TExprNode::TPtr& hoppingSetting) { - return !hoppingSetting->Child(1)->IsList(); -} - -void EnsureNotDistinct(const TCoAggregate& aggregate) { - const auto& aggregateHandlers = aggregate.Handlers(); - - YQL_ENSURE( - AllOf(aggregateHandlers, [](const auto& t){ return !t.DistinctName(); }), - "Distinct is not supported for aggregation with hop"); -} - -TMaybe ExtractHopTraits(const TCoAggregate& aggregate, TExprContext& ctx, bool analyticsMode) { - const auto pos = aggregate.Pos(); - - const auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping"); - if (!hopSetting) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Aggregate over stream must have 'hopping' setting")); - return Nothing(); - } - - const auto hoppingColumn = IsLegacyHopping(hopSetting) - ? "_yql_time" - : TString(hopSetting->Child(1)->Child(0)->Content()); - - const auto traitsNode = IsLegacyHopping(hopSetting) - ? hopSetting->Child(1) - : hopSetting->Child(1)->Child(1); - - const auto maybeTraits = TMaybeNode(traitsNode); - if (!maybeTraits) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Invalid 'hopping' setting in Aggregate")); - return Nothing(); - } - - const auto traits = maybeTraits.Cast(); - - const auto checkIntervalParam = [&] (TExprBase param) -> ui64 { - if (param.Maybe()) { - param = param.Cast().Input(); - } - if (!param.Maybe()) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Not an interval data ctor")); - return 0; - } - auto value = FromString(param.Cast().Literal().Value()); - if (value <= 0) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval value must be positive")); - return 0; - } - return (ui64)value; - }; - - const auto hop = checkIntervalParam(traits.Hop()); - if (!hop) { - return Nothing(); - } - const auto interval = checkIntervalParam(traits.Interval()); - if (!interval) { - return Nothing(); - } - const auto delay = checkIntervalParam(traits.Delay()); - if (!delay) { - return Nothing(); - } - - if (interval < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Interval must be greater or equal then hop")); - return Nothing(); - } - if (delay < hop) { - ctx.AddError(TIssue(ctx.GetPosition(pos), "Delay must be greater or equal then hop")); - return Nothing(); - } - - const auto newTraits = Build(ctx, aggregate.Pos()) - .InitFrom(traits) - .DataWatermarks(analyticsMode - ? ctx.NewAtom(aggregate.Pos(), "false") - : traits.DataWatermarks().Ptr()) - .Done(); - - return THoppingTraits { - hoppingColumn, - newTraits, - hop, - interval, - delay - }; -} - -TExprNode::TPtr BuildTimeExtractor(const TCoHoppingTraits& hoppingTraits, TExprContext& ctx) { - const auto pos = hoppingTraits.Pos(); - - if (hoppingTraits.ItemType().Ref().GetTypeAnn()->Cast()->GetType()->Cast()->GetSize() == 0) { - // The case when no fields are used in lambda. F.e. when it has only DependsOn. - return ctx.DeepCopyLambda(hoppingTraits.TimeExtractor().Ref()); - } - - return Build(ctx, pos) - .Args({"item"}) - .Body() - .Apply(hoppingTraits.TimeExtractor()) - .With(0) - .Type(hoppingTraits.ItemType()) - .Value("item") - .Build() - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildInitHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto initItemArg = Build(ctx, pos).Name("item").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - ui32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - - TMaybeNode applier; - if (tuple.Trait().Cast().InitHandler().Args().Size() == 1) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().InitHandler()) - .With(0, initItemArg) - .With(1) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(BuildColumnName(tuple.ColumnName())) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({initItemArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildUpdateHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto updateItemArg = Build(ctx, pos).Name("item").Done(); - const auto updateStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - i32 index = 0; - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(updateStateArg) - .Name().Build(columnName) - .Done(); - - TMaybeNode applier; - if (tuple.Trait().Cast().UpdateHandler().Args().Size() == 2) { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .Done(); - } else { - applier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().UpdateHandler()) - .With(0, updateItemArg) - .With(1, member) - .With(2) - .Literal().Build(ToString(index)) - .Build() - .Done(); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value(applier) - .Done()); - ++index; - } - - return Build(ctx, pos) - .Args({updateItemArg, updateStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - TExprNode::TPtr WrapToShuffle( const TKeysDescription& keysDescription, const TCoAggregate& aggregate, @@ -421,216 +67,6 @@ TExprNode::TPtr WrapToShuffle( .Ptr(); } -TExprNode::TPtr BuildMergeHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto& aggregateHandlers = aggregate.Handlers(); - - const auto mergeState1Arg = Build(ctx, pos).Name("state1").Done(); - const auto mergeState2Arg = Build(ctx, pos).Name("state2").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member1 = Build(ctx, pos) - .Struct(mergeState1Arg) - .Name().Build(columnName) - .Done(); - const auto member2 = Build(ctx, pos) - .Struct(mergeState2Arg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().MergeHandler()) - .With(0, member1) - .With(1, member2) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({mergeState1Arg, mergeState2Arg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildFinishHopLambda( - const TCoAggregate& aggregate, - const TVector& actualGroupKeys, - const TString& hoppingColumn, - TExprContext& ctx) -{ - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto finishKeyArg = Build(ctx, pos).Name("key").Done(); - const auto finishStateArg = Build(ctx, pos).Name("state").Done(); - const auto finishTimeArg = Build(ctx, pos).Name("time").Done(); - - TVector structItems; - structItems.reserve(actualGroupKeys.size() + aggregateHandlers.Size() + 1); - - if (actualGroupKeys.size() == 1) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[0]) - .Value(finishKeyArg) - .Done()); - } else { - for (size_t i = 0; i < actualGroupKeys.size(); ++i) { - structItems.push_back(Build(ctx, pos) - .Name().Build(actualGroupKeys[i]) - .Value() - .Tuple(finishKeyArg) - .Index() - .Value(ToString(i)) - .Build() - .Build() - .Done()); - } - } - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString compoundColumnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(finishStateArg) - .Name().Build(compoundColumnName) - .Done(); - - if (tuple.ColumnName().Maybe()) { - structItems.push_back(Build(ctx, pos) - .Name().Build(compoundColumnName) - .Value() - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Build() - .Done()); - - continue; - } - - if (const auto namesList = tuple.ColumnName().Maybe()) { - const auto expApplier = Build(ctx, pos) - .Apply(tuple.Trait().Cast().FinishHandler()) - .With(0, member) - .Done(); - - int index = 0; - for (const auto columnName : namesList.Cast()) { - const auto extracter = Build(ctx, pos) - .Tuple(expApplier) - .Index().Build(index++) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name(columnName) - .Value(extracter) - .Done()); - } - - continue; - } - - YQL_ENSURE(false, "Invalid node. Expected Atom or AtomList, but received: " - << tuple.ColumnName().Ptr()->Dump()); - } - - structItems.push_back(Build(ctx, pos) - .Name().Build(hoppingColumn) - .Value(finishTimeArg) - .Done()); - - return Build(ctx, pos) - .Args({finishKeyArg, finishStateArg, finishTimeArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildSaveHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - const auto saveStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(saveStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().SaveHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({saveStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - -TExprNode::TPtr BuildLoadHopLambda(const TCoAggregate& aggregate, TExprContext& ctx) { - const auto pos = aggregate.Pos(); - const auto aggregateHandlers = aggregate.Handlers(); - - TCoArgument loadStateArg = Build(ctx, pos).Name("state").Done(); - - TVector structItems; - structItems.reserve(aggregateHandlers.Size()); - - for (const auto& handler : aggregateHandlers) { - const auto tuple = handler.Cast(); - const TString columnName = BuildColumnName(tuple.ColumnName()); - - const auto member = Build(ctx, pos) - .Struct(loadStateArg) - .Name().Build(columnName) - .Done(); - - structItems.push_back(Build(ctx, pos) - .Name().Build(columnName) - .Value() - .Apply(tuple.Trait().Cast().LoadHandler()) - .With(0, member) - .Build() - .Done()); - } - - return Build(ctx, pos) - .Args({loadStateArg}) - .Body() - .Add(structItems) - .Build() - .Done() - .Ptr(); -} - TMaybe BuildWatermarkMode( const TCoAggregate& aggregate, const TCoHoppingTraits& hoppingTraits, diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json index 9763237a6133..682aa6fea9d5 100644 --- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json @@ -294,6 +294,34 @@ } ], "test.test[aggregate-group_by_column-default.txt-Results]": [], + "test.test[aggregate-group_by_hop_static-default.txt-Analyze]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Analyze_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Debug]": [ + { + "checksum": "07d9a8f046f4661ba479dbaf70979aac", + "size": 1630, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Plan_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static-default.txt-Results]": [ + { + "checksum": "dc21a63cca5d7481363c2b47840f1e38", + "size": 3102, + "uri": "https://{canondata_backend}/1689644/763d9bd4404423a24deab02585b884f08692c90b/resource.tar.gz#test.test_aggregate-group_by_hop_static-default.txt-Results_/results.txt" + } + ], "test.test[aggregate-group_by_mul_gs_ru--Analyze]": [ { "checksum": "e78b8c0f6855d3df92663efab505204b", diff --git a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json index 07cf9e36e7a9..59be82e1fc70 100644 --- a/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part18/canondata/result.json @@ -350,6 +350,34 @@ "uri": "https://{canondata_backend}/1775059/e6328418d209e6f2afe65be714175e5a3ade006c/resource.tar.gz#test.test_aggregate-group_by_hop_only--Results_/results.txt" } ], + "test.test[aggregate-group_by_hop_static_list_key-default.txt-Analyze]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1130705/da7974592864104e97d4cfb7947d82f2379f0266/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Analyze_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static_list_key-default.txt-Debug]": [ + { + "checksum": "41d48b8937d3e4bcc583915a7460727d", + "size": 1946, + "uri": "https://{canondata_backend}/1925821/6132b4b967a7c6d2d9c522d4a344e781b4121793/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Debug_/opt.yql_patched" + } + ], + "test.test[aggregate-group_by_hop_static_list_key-default.txt-Plan]": [ + { + "checksum": "b4dd508a329723c74293d80f0278c705", + "size": 505, + "uri": "https://{canondata_backend}/1130705/da7974592864104e97d4cfb7947d82f2379f0266/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Plan_/plan.txt" + } + ], + "test.test[aggregate-group_by_hop_static_list_key-default.txt-Results]": [ + { + "checksum": "dc21a63cca5d7481363c2b47840f1e38", + "size": 3102, + "uri": "https://{canondata_backend}/1130705/da7974592864104e97d4cfb7947d82f2379f0266/resource.tar.gz#test.test_aggregate-group_by_hop_static_list_key-default.txt-Results_/results.txt" + } + ], "test.test[aggregate-group_compact_sorted_distinct--Analyze]": [ { "checksum": "683fe495c075d2b1f1efcc8737139f4c", diff --git a/ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json b/ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json index 2a9e9cb8560f..abc4ebae2743 100644 --- a/ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json +++ b/ydb/library/yql/tests/sql/hybrid_file/part2/canondata/result.json @@ -2913,9 +2913,9 @@ ], "test.test[window-full/session--Debug]": [ { - "checksum": "3686ec3be8fa6640e428268fc0c16598", - "size": 13069, - "uri": "https://{canondata_backend}/1809005/ad7c074711ee8d1675aebabbf8025a2c8bd317d8/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql_patched" + "checksum": "b06da41f9a9ea38646c43487f4b8b96a", + "size": 13340, + "uri": "https://{canondata_backend}/1775319/8ac8c87858e0db34f5a3c99b3f4ca1084cccbace/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql_patched" } ], "test.test[window-full/session--Plan]": [ @@ -2927,9 +2927,9 @@ ], "test.test[window-full/session_aliases--Debug]": [ { - "checksum": "47d22b82d599d4f9d30a2fdcda4406d8", - "size": 13912, - "uri": "https://{canondata_backend}/1809005/ad7c074711ee8d1675aebabbf8025a2c8bd317d8/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql_patched" + "checksum": "e021555a47e83d0b792765a8ee82be94", + "size": 14124, + "uri": "https://{canondata_backend}/1775319/8ac8c87858e0db34f5a3c99b3f4ca1084cccbace/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql_patched" } ], "test.test[window-full/session_aliases--Plan]": [ diff --git a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json index 3ca08f1d2b11..e5017f06145d 100644 --- a/ydb/library/yql/tests/sql/sql2yql/canondata/result.json +++ b/ydb/library/yql/tests/sql/sql2yql/canondata/result.json @@ -2197,6 +2197,20 @@ "uri": "https://{canondata_backend}/1784117/d56ae82ad9d30397a41490647be1bd2124718f98/resource.tar.gz#test_sql2yql.test_aggregate-group_by_hop_star_/sql.yql" } ], + "test_sql2yql.test[aggregate-group_by_hop_static]": [ + { + "checksum": "a7a563dc87672b141c8209b38c0d446c", + "size": 3368, + "uri": "https://{canondata_backend}/1925821/aca60c4aca6b335189396eb0d636b37dbc38e5d9/resource.tar.gz#test_sql2yql.test_aggregate-group_by_hop_static_/sql.yql" + } + ], + "test_sql2yql.test[aggregate-group_by_hop_static_list_key]": [ + { + "checksum": "4b8a74647da998a54e0ccffae0f365d6", + "size": 3547, + "uri": "https://{canondata_backend}/1937492/6205ff455a623f62222bc8ee2c2ee5c2e7ee4174/resource.tar.gz#test_sql2yql.test_aggregate-group_by_hop_static_list_key_/sql.yql" + } + ], "test_sql2yql.test[aggregate-group_by_mul_gb_ru]": [ { "checksum": "002e7ddce42c228debb7382e9f8ea1d3", @@ -21881,6 +21895,20 @@ "uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-group_by_hop_star_/formatted.sql" } ], + "test_sql_format.test[aggregate-group_by_hop_static]": [ + { + "checksum": "a6f19201a2a81c7308fe9947b59276bf", + "size": 955, + "uri": "https://{canondata_backend}/1925821/aca60c4aca6b335189396eb0d636b37dbc38e5d9/resource.tar.gz#test_sql_format.test_aggregate-group_by_hop_static_/formatted.sql" + } + ], + "test_sql_format.test[aggregate-group_by_hop_static_list_key]": [ + { + "checksum": "3d3184e982097fa7fed63bdeef6c1fae", + "size": 976, + "uri": "https://{canondata_backend}/1937492/6205ff455a623f62222bc8ee2c2ee5c2e7ee4174/resource.tar.gz#test_sql_format.test_aggregate-group_by_hop_static_list_key_/formatted.sql" + } + ], "test_sql_format.test[aggregate-group_by_mul_gb_ru]": [ { "checksum": "adae92846c7098e2ea3468096a13ffae", diff --git a/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql new file mode 100644 index 000000000000..fec507c827a5 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static.sql @@ -0,0 +1,26 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ + +$input = SELECT * FROM AS_TABLE([ + <|"time":"2024-01-01T00:00:01Z", "user": 1|>, + <|"time":"2024-01-01T00:00:02Z", "user": 1|>, + <|"time":"2024-01-01T00:00:03Z", "user": 1|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 3|>, + <|"time":"2024-01-01T00:00:02Z", "user": 3|>, + <|"time":"2024-01-01T00:00:03Z", "user": 3|> +]); + +SELECT + user, + COUNT(*) as count, + HOP_START() as start, +FROM $input +GROUP BY HOP(CAST(time as Timestamp), 'PT1S', 'PT1S', 'PT1S'), user; diff --git a/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static_list_key.sql b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static_list_key.sql new file mode 100644 index 000000000000..3639207bb340 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/aggregate/group_by_hop_static_list_key.sql @@ -0,0 +1,26 @@ +/* syntax version 1 */ +/* postgres can not */ +/* ytfile can not */ +/* yt can not */ + +$input = SELECT * FROM AS_TABLE([ + <|"time":"2024-01-01T00:00:01Z", "user": 1|>, + <|"time":"2024-01-01T00:00:02Z", "user": 1|>, + <|"time":"2024-01-01T00:00:03Z", "user": 1|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 2|>, + <|"time":"2024-01-01T00:00:02Z", "user": 2|>, + <|"time":"2024-01-01T00:00:03Z", "user": 2|>, + <|"time":"2024-01-01T00:00:01Z", "user": 3|>, + <|"time":"2024-01-01T00:00:02Z", "user": 3|>, + <|"time":"2024-01-01T00:00:03Z", "user": 3|> +]); + +SELECT + user, + COUNT(*) as count, + HOP_START() as start, +FROM $input +GROUP BY HOP(CAST(time as Timestamp), 'PT1S', 'PT1S', 'PT1S'), user, AsList(user, 0); diff --git a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json index d94ae46e45ba..9e4768204440 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part19/canondata/result.json @@ -2924,9 +2924,9 @@ ], "test.test[window-full/session--Debug]": [ { - "checksum": "a7f5b924c596e4861cfff98981b5f071", - "size": 11042, - "uri": "https://{canondata_backend}/1937027/16b7289b1b8f5fdff728155d836fa2b238949b2d/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql" + "checksum": "fd79f82807ae5a2b2ac7181f3da01c58", + "size": 11314, + "uri": "https://{canondata_backend}/1942173/f70acaf8d9dbbd62a5305d5424f4de9ac3080ddc/resource.tar.gz#test.test_window-full_session--Debug_/opt.yql" } ], "test.test[window-full/session--Plan]": [ diff --git a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json index 8590ffd11ae3..0876482dc5aa 100644 --- a/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json +++ b/ydb/library/yql/tests/sql/yt_native_file/part6/canondata/result.json @@ -2772,9 +2772,9 @@ ], "test.test[window-full/session_aliases--Debug]": [ { - "checksum": "88d37ebd17099f93d640d857b6198de6", - "size": 11552, - "uri": "https://{canondata_backend}/1917492/ddc0a6b96495a49628829c42f1882eff49e71e11/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql" + "checksum": "751c1ae97702b51753f626bfa02facbd", + "size": 11764, + "uri": "https://{canondata_backend}/212715/c96504db58dd13ce5e79be71afa29b676fde90a1/resource.tar.gz#test.test_window-full_session_aliases--Debug_/opt.yql" } ], "test.test[window-full/session_aliases--Plan]": [