Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pg] support of table row #2350

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12086,6 +12086,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["PgGroupRef"] = &PgGroupRefWrapper;
Functions["PgGrouping"] = &PgGroupingWrapper;
Functions["PgGroupingSet"] = &PgGroupingSetWrapper;
Functions["PgToRecord"] = &PgToRecordWrapper;

Functions["AutoDemux"] = &AutoDemuxWrapper;
Functions["AggrCountInit"] = &AggrCountInitWrapper;
Expand Down
167 changes: 163 additions & 4 deletions ydb/library/yql/core/type_ann/type_ann_pg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,10 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
}

TString foundAlias;
bool matchedAlias = false;
ui32 matchedAliasI = 0;
TMaybe<ui32> matchedAliasIndex;
TMaybe<ui32> matchedAliasIndexI;
for (ui32 priority : {TInput::Projection, TInput::Current, TInput::External}) {
ui32 matches = 0;
for (ui32 inputIndex = 0; inputIndex < inputs.size(); ++inputIndex) {
Expand All @@ -1597,6 +1601,16 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
}
}

if (!x.Alias.empty()) {
if (node->Tail().Content() == x.Alias) {
matchedAlias = true;
matchedAliasIndex = inputIndex;
} else if (AsciiEqualsIgnoreCase(node->Tail().Content(), x.Alias)) {
++matchedAliasI;
matchedAliasIndexI = inputIndex;
}
}

auto pos = x.Type->FindItemI(node->Tail().Content());
if (pos) {
foundAlias = x.Alias;
Expand Down Expand Up @@ -1624,10 +1638,39 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
return true;
}

ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
TStringBuilder() << "No such column: " << node->Tail().Content()));
isError = true;
return false;
TInput* tableRefInput = nullptr;
if (matchedAlias) {
tableRefInput = &inputs[*matchedAliasIndex];
} else {
if (matchedAliasI > 1) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
TStringBuilder() << "Table reference is ambiguous: " << node->Tail().Content()));
isError = true;
return false;
}

if (matchedAliasI == 1) {
tableRefInput = &inputs[*matchedAliasIndexI];
}
}

if (!tableRefInput) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()),
TStringBuilder() << "No such column: " << node->Tail().Content()));
isError = true;
return false;
}

for (const auto& item : tableRefInput->Type->GetItems()) {
if (!item->GetName().StartsWith("_yql_")) {
refs.insert(TString(item->GetName()));
if (tableRefInput->Priority == TInput::External) {
tableRefInput->UsedExternalColumns.insert(TString(item->GetName()));
}
}
}

return true;
}
}

Expand Down Expand Up @@ -1890,6 +1933,8 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
}

if (node->IsCallable("PgColumnRef")) {
const TInput* matchedAliasInput = nullptr;
const TInput* matchedAliasInputI = nullptr;
for (ui32 priority : { TInput::Projection, TInput::Current, TInput::External }) {
for (const auto& x : inputs) {
if (priority != x.Priority) {
Expand All @@ -1902,6 +1947,14 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
}
}

if (!x.Alias.empty()) {
if (node->Tail().Content() == x.Alias) {
matchedAliasInput = &x;
} else if (AsciiEqualsIgnoreCase(node->Tail().Content(), x.Alias)) {
matchedAliasInputI = &x;
}
}

auto pos = x.Type->FindItemI(node->Tail().Content());
if (pos) {
return ctx.Expr.Builder(node->Pos())
Expand All @@ -1914,6 +1967,41 @@ IGraphTransformer::TStatus RebuildLambdaColumns(const TExprNode::TPtr& root, con
}
}

if (!matchedAliasInput && matchedAliasInputI) {
matchedAliasInput = matchedAliasInputI;
}

if (matchedAliasInput) {
return ctx.Expr.Builder(node->Pos())
.Callable("PgToRecord")
.Callable(0, "DivePrefixMembers")
.Add(0, argNode)
.List(1)
.Atom(0, MakeAliasedColumn(matchedAliasInput->Alias, ""))
.Seal()
.Seal()
.List(1)
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder & {
ui32 pos = 0;
for (ui32 i = 0; i < matchedAliasInput->Type->GetSize(); ++i) {
auto columnName = matchedAliasInput->Order ?
matchedAliasInput->Order.GetRef()[i] :
matchedAliasInput->Type->GetItems()[i]->GetName();
if (!columnName.StartsWith("_yql_")) {
parent.List(pos++)
.Atom(0, columnName)
.Atom(1, columnName)
.Seal();
}
}

return parent;
})
.Seal()
.Seal()
.Build();
}

YQL_ENSURE(false, "Missing input");
}

Expand Down Expand Up @@ -5276,5 +5364,76 @@ IGraphTransformer::TStatus PgGroupingSetWrapper(const TExprNode::TPtr& input, TE
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

if (!EnsureStructType(input->Head(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto structType = input->Head().GetTypeAnn()->Cast<TStructExprType>();
bool hasConversions = false;
for (ui32 pass = 0; pass < 2; ++pass) {
TExprNode::TListType convItems;
for (ui32 i = 0; i < structType->GetSize(); ++i) {
ui32 pgType;
bool convertToPg;
if (!ExtractPgType(structType->GetItems()[i]->GetItemType(), pgType, convertToPg, input->Head().Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

hasConversions = hasConversions || convertToPg;
if (pass == 1) {
auto name = ctx.Expr.NewAtom(input->Head().Pos(), structType->GetItems()[i]->GetName());
auto member = ctx.Expr.NewCallable(input->Head().Pos(), "Member", { input->HeadPtr(), name} );
if (convertToPg) {
member = ctx.Expr.NewCallable(input->Head().Pos(), "ToPg", { member });
}

convItems.push_back(ctx.Expr.NewList(input->Head().Pos(), { name, member }));
}
}

if (!hasConversions) {
break;
}

if (pass == 1) {
output = ctx.Expr.ChangeChild(*input, 0, ctx.Expr.NewCallable(input->Head().Pos(), "AsStruct", std::move(convItems)));
return IGraphTransformer::TStatus::Repeat;
}
}

if (!EnsureTuple(input->Tail(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

for (const auto& child : input->Tail().Children()) {
if (!EnsureTupleSize(*child, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

if (!EnsureAtom(child->Head(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

if (!EnsureAtom(child->Tail(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto pos = structType->FindItem(child->Tail().Content());
if (!pos) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(child->Pos()),
TStringBuilder() << "Missing member: " << child->Tail().Content()));
return IGraphTransformer::TStatus::Error;
}
}

input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(NPg::LookupType("record").TypeId));
return IGraphTransformer::TStatus::Ok;
}

} // namespace NTypeAnnImpl
}
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ IGraphTransformer::TStatus PgSubLinkWrapper(const TExprNode::TPtr& input, TExprN
IGraphTransformer::TStatus PgGroupRefWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus PgGroupingWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus PgGroupingSetWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);

} // namespace NTypeAnnImpl
} // namespace NYql
26 changes: 26 additions & 0 deletions ydb/library/yql/minikql/mkql_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "ydb/library/yql/minikql/mkql_type_builder.h"
#include "ydb/library/yql/core/sql_types/match_recognize.h"
#include "ydb/library/yql/core/sql_types/time_order_recover.h"
#include <ydb/library/yql/parser/pg_catalog/catalog.h>

#include <util/string/cast.h>
#include <util/string/printf.h>
Expand Down Expand Up @@ -5470,6 +5471,31 @@ TRuntimeNode TProgramBuilder::PgTableContent(
return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::PgToRecord(TRuntimeNode input, const TArrayRef<std::pair<std::string_view, std::string_view>>& members) {
if constexpr (RuntimeVersion < 48U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
}

MKQL_ENSURE(input.GetStaticType()->IsStruct(), "Expected struct");
auto structType = AS_TYPE(TStructType, input.GetStaticType());
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
auto itemType = structType->GetMemberType(i);
MKQL_ENSURE(itemType->IsNull() || itemType->IsPg(), "Expected null or pg");
}

auto returnType = NewPgType(NYql::NPg::LookupType("record").TypeId);
TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(input);
TVector<TRuntimeNode> names;
for (const auto& x : members) {
names.push_back(NewDataLiteral<NUdf::EDataSlot::String>(x.first));
names.push_back(NewDataLiteral<NUdf::EDataSlot::String>(x.second));
}

callableBuilder.Add(NewTuple(names));
return TRuntimeNode(callableBuilder.Build(), false);
}

TRuntimeNode TProgramBuilder::PgCast(TRuntimeNode input, TType* returnType, TRuntimeNode typeMod) {
if constexpr (RuntimeVersion < 30U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/mkql_program_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ class TProgramBuilder : public TTypeBuilder {
const std::string_view& cluster,
const std::string_view& table,
TType* returnType);
TRuntimeNode PgToRecord(TRuntimeNode input, const TArrayRef<std::pair<std::string_view, std::string_view>>& members);

TRuntimeNode ScalarApply(const TArrayRef<const TRuntimeNode>& args, const TArrayLambda& handler);

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/minikql/mkql_runtime_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
#define MKQL_RUNTIME_VERSION 47U
#define MKQL_RUNTIME_VERSION 48U
#endif

// History:
Expand Down
Loading
Loading