Skip to content

Commit

Permalink
[yt provider] Add ForceTransform optimizer (#8874)
Browse files Browse the repository at this point in the history
  • Loading branch information
rvu1024 authored Sep 6, 2024
1 parent de1d90b commit 37cf155
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 16 deletions.
10 changes: 0 additions & 10 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3207,16 +3207,6 @@ class TYtNativeGateway : public IYtGateway {
bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks);
TMaybe<ui64> limit = GetLimit(merge.Settings().Ref());

const auto cluster = merge.DataSink().Cluster().StringValue();
const bool hasOutGroup = !execCtx->OutTables_.front().ColumnGroups.IsUndefined();
const bool lookup = execCtx->Options_.Config()->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) == NYT::OF_LOOKUP_ATTR;
const bool enabledColGroup = execCtx->Options_.Config()->ColumnGroupMode.Get().GetOrElse(EColumnGroupMode::Disable) != EColumnGroupMode::Disable;
const bool hasNonTmpInput = !AllOf(execCtx->InputTables_, [](const auto& table) { return table.Temp; });

forceTransform = forceTransform
|| (!lookup && enabledColGroup != hasOutGroup)
|| (!lookup && hasOutGroup && hasNonTmpInput);

return execCtx->Session_->Queue_->Async([forceTransform, combineChunks, limit, execCtx]() {
return execCtx->LookupQueryCacheAsync().Apply([forceTransform, combineChunks, limit, execCtx] (const auto& f) {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
AddHandler(2, &TYtPublish::Match, HNDL(UnorderedPublishTarget));
AddHandler(2, &TYtMap::Match, HNDL(PushDownYtMapOverSortedMerge));
AddHandler(2, &TYtMerge::Match, HNDL(MergeToCopy));
AddHandler(2, &TYtMerge::Match, HNDL(ForceTransform));
#undef HNDL
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {

NNodes::TMaybeNode<NNodes::TExprBase> MergeToCopy(NNodes::TExprBase node, TExprContext& ctx) const;

NNodes::TMaybeNode<NNodes::TExprBase> ForceTransform(NNodes::TExprBase node, TExprContext& ctx) const;

template <typename TLMapType>
NNodes::TMaybeNode<NNodes::TExprBase> LMap(NNodes::TExprBase node, TExprContext& ctx) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,35 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MergeToCopy(TExprBase n
.Done();
}

TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ForceTransform(TExprBase node, TExprContext& ctx) const {
auto merge = node.Cast<TYtMerge>();

if (merge.Ref().HasResult()) {
return node;
}

if (NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform)) {
return node;
}

const auto cluster = merge.DataSink().Cluster().StringValue();
const bool hasOutGroup = NYql::HasSetting(merge.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups);
const bool lookup = State_->Configuration->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) == NYT::OF_LOOKUP_ATTR;
const bool enabledColGroup = State_->Configuration->ColumnGroupMode.Get().GetOrElse(EColumnGroupMode::Disable) != EColumnGroupMode::Disable;
const bool hasNonTmpInput = AnyOf(merge.Input().Item(0).Paths(), [](const TYtPath& path) {
return path.Table().Maybe<TYtTable>() && !NYql::HasSetting(path.Table().Cast<TYtTable>().Settings().Ref(), EYtSettingType::Anonymous);
});
const bool hasSampling = NYql::HasSetting(merge.Input().Item(0).Settings().Ref(), EYtSettingType::Sample);

const bool addForceTransform = hasSampling
|| (!lookup && enabledColGroup != hasOutGroup)
|| (!lookup && hasOutGroup && hasNonTmpInput)
;

if (addForceTransform) {
return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform, {}, ctx)));
}
return node;
}

} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -2607,9 +2607,9 @@
],
"test.test[sampling-map-keyfilter-Debug]": [
{
"checksum": "6c131543166d58f45f026cc0cc278256",
"size": 1450,
"uri": "https://{canondata_backend}/1923547/38a836c67ec3650a7755f75c235e64558a2634bf/resource.tar.gz#test.test_sampling-map-keyfilter-Debug_/opt.yql"
"checksum": "a5b19c8a8fb474cd970ff612f068651c",
"size": 1470,
"uri": "https://{canondata_backend}/1889210/7f4d1cfb08cbb8e5d57565ff98a6e424f57d7280/resource.tar.gz#test.test_sampling-map-keyfilter-Debug_/opt.yql"
}
],
"test.test[sampling-map-keyfilter-Plan]": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,9 @@
],
"test.test[column_group-min_group-default.txt-Debug]": [
{
"checksum": "1aaf16e9c5132c0572386fd4d56c7277",
"size": 2735,
"uri": "https://{canondata_backend}/1924537/b951bd7340443ed2b617ca5ca2a4a02ac6a08281/resource.tar.gz#test.test_column_group-min_group-default.txt-Debug_/opt.yql"
"checksum": "e773cf7052b0fc660a18e8315628900d",
"size": 2755,
"uri": "https://{canondata_backend}/1814674/38d85c1805c6536b3e75cfe9e8d9b3260b17f2bc/resource.tar.gz#test.test_column_group-min_group-default.txt-Debug_/opt.yql"
}
],
"test.test[column_group-min_group-default.txt-Plan]": [
Expand Down

0 comments on commit 37cf155

Please sign in to comment.