Skip to content

Commit

Permalink
Merge 7557e9e into 2a0bfb0
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov authored Sep 19, 2024
2 parents 2a0bfb0 + 7557e9e commit d5247f0
Show file tree
Hide file tree
Showing 11 changed files with 836 additions and 570 deletions.
110 changes: 110 additions & 0 deletions ydb/library/yql/core/common_opt/yql_co_simple1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/yql/core/yql_atom_enums.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_join.h>
#include <ydb/library/yql/core/yql_opt_hopping.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_opt_window.h>
#include <ydb/library/yql/core/yql_type_helpers.h>
Expand Down Expand Up @@ -3301,6 +3302,111 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
return aggr.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

// if (keysDescription.NeedPickle()) {
// return Build<TCoMap>(ctx, pos)
// .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
// .Input<TCoAggregate>()
// .InitFrom(aggregate)
// .Input<TCoMap>()
// .Lambda(keysDescription.BuildPickleLambda(ctx, pos))
// .Input(aggregate.Input())
// .Build()
// .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx))
// .Build()
// .Done()
// .Ptr();
// }

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
TVector<ui32> withAssume;
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
Expand Down Expand Up @@ -5080,6 +5186,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return clean;
}

if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) {
return hopping;
}

return DropReorder<false>(node, ctx);
};

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SRCS(
yql_join.cpp
yql_join.h
yql_library_compiler.cpp
yql_opt_hopping.cpp
yql_opt_match_recognize.cpp
yql_opt_match_recognize.h
yql_opt_proposed_by_data.cpp
Expand Down
20 changes: 19 additions & 1 deletion ydb/library/yql/core/yql_expr_constraint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
Functions["MultiHoppingCore"] = &TCallableConstraintTransformer::MultiHoppingCoreWrap;
}

std::optional<IGraphTransformer::TStatus> ProcessCore(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
Expand Down Expand Up @@ -2920,6 +2921,23 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable

return TStatus::Ok;
}

TStatus MultiHoppingCoreWrap(const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) const {
if (const auto status = UpdateAllChildLambdasConstraints(*input); status != TStatus::Ok) {
return status;
}

TExprNode::TPtr keySelectorLambda = input->Child(TCoMultiHoppingCore::idx_KeyExtractor);
std::vector<std::string_view> columns;
ExtractKeys(*keySelectorLambda, columns);
if (!columns.empty()) {
input->AddConstraint(ctx.MakeConstraint<TUniqueConstraintNode>(columns));
input->AddConstraint(ctx.MakeConstraint<TDistinctConstraintNode>(columns));
}

return TStatus::Ok;
}

private:
template <class TConstraintContainer>
static void CopyExcept(TConstraintContainer& dst, const TConstraintContainer& from, const TSet<TStringBuf>& except) {
Expand All @@ -2939,7 +2957,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
}
}

static void ExtractKeys(const TExprNode& keySelectorLambda, TVector<TStringBuf>& columns) {
static void ExtractKeys(const TExprNode& keySelectorLambda, std::vector<std::string_view>& columns) {
const auto arg = keySelectorLambda.Head().Child(0);
auto body = keySelectorLambda.Child(1);
if (body->IsCallable("StablePickle")) {
Expand Down
Loading

0 comments on commit d5247f0

Please sign in to comment.