diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 70dc9981bdad..c2ac5186770d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -234,7 +234,7 @@ class TKqpCompileActor : public TActorBootstrapped { break; case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: - AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings); + AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr); break; default: diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index ba70cc89303b..a8f9dd4b28e4 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1142,10 +1142,10 @@ class TKqpHost : public IKqpHost { }); } - IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override { + IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) override { return CheckedProcessQuery(*ExprCtx, - [this, &query, settings] (TExprContext& ctx) mutable { - return PrepareQueryInternal(query, nullptr, EKikimrQueryType::Script, settings, ctx); + [this, &query, settings, expr] (TExprContext& ctx) mutable { + return PrepareQueryInternal(query, expr, EKikimrQueryType::Script, settings, ctx); }); } diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index b52c5dbb488e..ecf27af83fc7 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -89,7 +89,7 @@ class IKqpHost : public TThrRefBase { virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0; /* Federated queries */ - virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0; /* Scripting */ virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0; diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index cc5ec37a0215..975fcbadcd76 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -1804,13 +1804,13 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000); } - std::shared_ptr CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) { + std::shared_ptr CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName, bool enableOltp) { const TString bucket = "test_bucket3"; const TString object = "test_object"; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(enableOltp); appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true); appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); appConfig.MutableFeatureFlags()->SetEnableTempTables(true); @@ -1863,8 +1863,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { } - void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) { - { + void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable, bool enableOltp) { + if (enableOltp) { const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;"; ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync()); } @@ -1875,15 +1875,15 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { } } - Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) { + void DoCreateTableAsSelectFromExternalDataSource(std::function requestRunner, bool enableOltp) { const TString externalDataSourceName = "external_data_source"; const TString externalTableName = "test_binding_resolve"; - auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName); + auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp); auto client = kikimr->GetQueryClient(); const TString oltpTable = "DestinationOltp"; - { + if (enableOltp) { const TString query = fmt::format(R"( PRAGMA TablePathPrefix = "TestDomain"; CREATE TABLE `{destination}` ( @@ -1900,8 +1900,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "destination"_a = oltpTable, "external_source"_a = externalDataSourceName ); - auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + requestRunner(query, client, kikimr->GetDriver()); } const TString olapTable = "DestinationOlap"; @@ -1923,22 +1922,43 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "destination"_a = olapTable, "external_source"_a = externalDataSourceName ); - auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + requestRunner(query, client, kikimr->GetDriver()); } - ValidateTables(client, oltpTable, olapTable); + ValidateTables(client, oltpTable, olapTable, enableOltp); + } + + void RunGenericQuery(const TString& query, TQueryClient& client, const TDriver&) { + auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + void RunGenericScript(const TString& script, TQueryClient& client, const TDriver& driver) { + auto scriptExecutionOperation = client.ExecuteScript(script).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), driver); + UNIT_ASSERT_VALUES_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToOneLineString()); } - Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) { + Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericQuery) { + DoCreateTableAsSelectFromExternalDataSource(&RunGenericQuery, true); + } + + Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericScript) { + DoCreateTableAsSelectFromExternalDataSource(&RunGenericScript, false); + } + + void DoCreateTableAsSelectFromExternalTable(std::function requestRunner, bool enableOltp) { const TString externalDataSourceName = "external_data_source"; const TString externalTableName = "test_binding_resolve"; - auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName); + auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp); auto client = kikimr->GetQueryClient(); const TString oltpTable = "DestinationOltp"; - { + if (enableOltp) { const TString query = fmt::format(R"( PRAGMA TablePathPrefix = "TestDomain"; CREATE TABLE `{destination}` ( @@ -1949,8 +1969,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "destination"_a = oltpTable, "external_table"_a = externalTableName ); - auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + requestRunner(query, client, kikimr->GetDriver()); } const TString olapTable = "DestinationOlap"; @@ -1966,11 +1985,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "destination"_a = olapTable, "external_table"_a = externalTableName ); - auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + requestRunner(query, client, kikimr->GetDriver()); } - ValidateTables(client, oltpTable, olapTable); + ValidateTables(client, oltpTable, olapTable, enableOltp); + } + + Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericQuery) { + DoCreateTableAsSelectFromExternalTable(&RunGenericQuery, true); + } + + Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericScript) { + DoCreateTableAsSelectFromExternalTable(&RunGenericScript, false); } Y_UNIT_TEST(OverridePlannerDefaults) {