Skip to content

Commit

Permalink
Remove distinct fields usage from load state functions in CombineByKey
Browse files Browse the repository at this point in the history
  • Loading branch information
Darych committed Aug 21, 2024
1 parent 817b042 commit 0df07f6
Showing 1 changed file with 21 additions and 116 deletions.
137 changes: 21 additions & 116 deletions ydb/library/yql/core/yql_aggregate_expander.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ TExprNode::TPtr TAggregateExpander::GeneratePartialAggregate(const TExprNode::TP
}

TExprNode::TPtr partialAgg = nullptr;

if (!NonDistinctColumns.empty()) {
partialAgg = GeneratePartialAggregateForNonDistinct(keyExtractor, pickleTypeNode);
}
for (ui32 index = 0; index < DistinctFields.size(); ++index) {
auto distinctField = DistinctFields[index];

Expand All @@ -405,9 +407,7 @@ TExprNode::TPtr TAggregateExpander::GeneratePartialAggregate(const TExprNode::TP
.Build();
}
}
if (!NonDistinctColumns.empty()) {
partialAgg = GeneratePartialAggregateForNonDistinct(keyExtractor, pickleTypeNode);
}

// If no aggregation functions then add additional combiner
if (AggregatedColumns->ChildrenSize() == 0 && KeyColumns->ChildrenSize() > 0 && !SessionWindowParams.Update) {
if (!partialAgg) {
Expand Down Expand Up @@ -978,125 +978,30 @@ TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const
})
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
for (ui32 i = 0; i < columnNames.size(); ++i) {
auto child = AggregatedColumns->Child(i);
auto trait = Traits[i];
if (!EffectiveCompact) {
auto loadLambda = trait->Child(4);
auto extractorLambda = GetFinalAggStateExtractor(i);
auto loadLambda = trait->Child(4);
auto extractorLambda = GetFinalAggStateExtractor(i);

if (!DistinctFields.empty() || Suffix == "MergeManyFinalize") {
parent.List(index++)
.Add(0, columnNames[i])
.Callable(1, "Map")
.Apply(0, *extractorLambda)
.With(0, "item")
.Seal()
.Add(1, loadLambda)
if (!DistinctFields.empty() || Suffix == "MergeManyFinalize") {
parent.List(index++)
.Add(0, columnNames[i])
.Callable(1, "Map")
.Apply(0, *extractorLambda)
.With(0, "item")
.Seal()
.Seal();
} else {
parent.List(index++)
.Add(0, columnNames[i])
.Apply(1, *loadLambda)
.With(0)
.Apply(*extractorLambda)
.With(0, "item")
.Seal()
.Done()
.Seal();
}
.Add(1, loadLambda)
.Seal()
.Seal();
} else {
auto initLambda = trait->Child(1);
auto distinctField = (child->ChildrenSize() == 3) ? child->Child(2) : nullptr;
auto initApply = [&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
parent.Apply(1, *initLambda)
parent.List(index++)
.Add(0, columnNames[i])
.Apply(1, *loadLambda)
.With(0)
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
if (distinctField) {
parent
.Callable("Member")
.Arg(0, "item")
.Add(1, distinctField)
.Seal();
} else {
parent
.Callable("CastStruct")
.Arg(0, "item")
.Add(1, ExpandType(Node->Pos(), *initLambda->Head().Head().GetTypeAnn(), Ctx))
.Seal();
}

return parent;
})
.Done()
.Do([&](TExprNodeReplaceBuilder& parent) -> TExprNodeReplaceBuilder& {
if (initLambda->Head().ChildrenSize() == 2) {
parent.With(1)
.Callable("Uint32")
.Atom(0, ToString(i), TNodeFlags::Default)
.Seal()
.Done();
}

return parent;
})
.Seal();

return parent;
};

if (distinctField) {
const bool isFirst = *Distinct2Columns[distinctField->Content()].begin() == i;
if (isFirst) {
parent.List(index++)
.Add(0, columnNames[i])
.List(1)
.Callable(0, "NamedApply")
.Add(0, UdfSetCreate[distinctField->Content()])
.List(1)
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
if (!DistinctFieldNeedsPickle[distinctField->Content()]) {
parent.Callable(0, "Member")
.Arg(0, "item")
.Add(1, distinctField)
.Seal();
} else {
parent.Callable(0, "StablePickle")
.Callable(0, "Member")
.Arg(0, "item")
.Add(1, distinctField)
.Seal()
.Seal();
}

return parent;
})
.Callable(1, "Uint32")
.Atom(0, "0", TNodeFlags::Default)
.Seal()
.Seal()
.Callable(2, "AsStruct").Seal()
.Callable(3, "DependsOn")
.Callable(0, "String")
.Add(0, distinctField)
.Seal()
.Seal()
.Seal()
.Do(initApply)
.Apply(*extractorLambda)
.With(0, "item")
.Seal()
.Seal();
} else {
parent.List(index++)
.Add(0, columnNames[i])
.Do(initApply)
.Seal();
}
} else {
parent.List(index++)
.Add(0, columnNames[i])
.Do(initApply)
.Done()
.Seal();
}
}
}
return parent;
Expand Down

0 comments on commit 0df07f6

Please sign in to comment.