From a512394f28ac668485df6798fd2a729191447297 Mon Sep 17 00:00:00 2001 From: Vasily Gerasimov Date: Thu, 8 Feb 2024 15:14:27 +0200 Subject: [PATCH] Convert mismatched types of arg/sequence when applying optimization of ShuffleByKeys to empty sequence (#1568) --- .../s3/kqp_federated_query_ut.cpp | 72 +++++++++++++++++++ .../yql/core/common_opt/yql_co_simple1.cpp | 38 +++++++--- 2 files changed, 99 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 69d72ee6e089..cefcb956b0d0 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -1591,6 +1591,78 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_STRING_CONTAINS(queryExecutionOperation.GetIssues().ToString(), "\"/Root/external_table\" is expected to be external data source"); } } + + Y_UNIT_TEST(QueryWithNoDataInS3) { + const TString externalDataSourceName = "tpc_h_s3_storage_connection"; + const TString bucket = "test_bucket_no_data"; + + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucket(bucket, s3Client); + // Uncomment if you want to compare with query with data + //UploadObject(bucket, "l/l", R"json({"l_extendedprice": 0.0, "l_discount": 1.0, "l_partkey": 1})json", s3Client); + //UploadObject(bucket, "p/p", R"json({"p_partkey": 1, "p_type": "t"})json", s3Client); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto client = kikimr->GetQueryClient(); + + { + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + )sql", + "external_source"_a = externalDataSourceName, + "location"_a = GetBucketLocation(bucket) + ); + auto result = client.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + // YQ-2750 + const TString query = fmt::format(R"sql( + $border = Date("1994-08-01"); + select + 100.00 * sum(case + when StartsWith(p.p_type, 'PROMO') + then l.l_extendedprice * (1 - l.l_discount) + else 0 + end) / sum(l.l_extendedprice * (1 - l.l_discount)) as promo_revenue + from + {external_source}.`l/` with ( schema ( + l_extendedprice double, + l_discount double, + l_partkey int64, + l_shipdate date + ), + format = "json_each_row" + ) as l + join + {external_source}.`p/` with ( schema ( + p_partkey int64, + p_type string + ), + format = "json_each_row" + ) as p + on + l.l_partkey = p.p_partkey + where + cast(l.l_shipdate as timestamp) >= $border + and cast(l.l_shipdate as timestamp) < ($border + Interval("P31D")); + )sql", + "external_source"_a = externalDataSourceName + ); + auto result = client.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + auto rs = result.GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(rs.RowsCount(), 1); + rs.TryNextRow(); + TMaybe sum = rs.ColumnParser(0).GetOptionalDouble(); + UNIT_ASSERT(!sum); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp index 490a089f5c1e..006fb580d2de 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp @@ -75,7 +75,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon return input; } auto val = input->Child(0); - if (!val->IsCallable("PgConst")) { + if (!val->IsCallable("PgConst")) { return input; } @@ -85,7 +85,7 @@ TExprNode::TPtr OptimizePgCastOverPgConst(const TExprNode::TPtr& input, TExprCon YQL_CLOG(DEBUG, Core) << "Remove PgCast unknown->text over PgConst"; return ctx.ChangeChild(*val, 1, castToType); } - + return input; } @@ -6282,17 +6282,33 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) { return node; }; - map["ShuffleByKeys"] = map["PartitionsByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + map["PartitionsByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { if (IsEmpty(node->Head(), *optCtx.Types)) { YQL_CLOG(DEBUG, Core) << node->Content() << " over empty input."; - auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, KeepConstraints(node->HeadPtr(), node->Tail().Head().Head(), ctx)).Seal().Build(); - if (node->IsCallable("ShuffleByKeys")) { - auto lambdaType = node->Tail().GetTypeAnn(); - if (lambdaType->GetKind() == ETypeAnnotationKind::Optional) { - lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ToList", { lambdaResult }); - } else if (lambdaType->GetKind() == ETypeAnnotationKind::Stream) { - lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ForwardList", { lambdaResult }); - } + + TExprNode::TPtr sequence = KeepConstraints(node->HeadPtr(), node->Tail().Head().Head(), ctx); + auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, sequence).Seal().Build(); + return lambdaResult; + } + return node; + }; + + map["ShuffleByKeys"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) { + if (IsEmpty(node->Head(), *optCtx.Types)) { + YQL_CLOG(DEBUG, Core) << node->Content() << " over empty input."; + + auto& lambdaArg = node->Tail().Head().Head(); + + TExprNode::TPtr sequence = node->HeadPtr(); // param (list) + sequence = ctx.NewCallable(sequence->Pos(), "ToStream", { sequence }); // lambda accepts stream, but we have list type + sequence = KeepConstraints(sequence, lambdaArg, ctx); + + auto lambdaResult = ctx.Builder(node->Pos()).Apply(node->Tail()).With(0, sequence).Seal().Build(); + auto lambdaType = node->Tail().GetTypeAnn(); + if (lambdaType->GetKind() == ETypeAnnotationKind::Optional) { + lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ToList", { lambdaResult }); + } else if (lambdaType->GetKind() == ETypeAnnotationKind::Stream || lambdaType->GetKind() == ETypeAnnotationKind::Flow) { + lambdaResult = ctx.NewCallable(lambdaResult->Pos(), "ForwardList", { lambdaResult }); } return lambdaResult; }