Skip to content

Commit

Permalink
Fixed CTAS for script requests
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Sep 13, 2024
1 parent 2313a5e commit 9d88fbb
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
break;

case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings);
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr);
break;

default:
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,10 +1142,10 @@ class TKqpHost : public IKqpHost {
});
}

IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) override {
return CheckedProcessQuery(*ExprCtx,
[this, &query, settings] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, nullptr, EKikimrQueryType::Script, settings, ctx);
[this, &query, settings, expr] (TExprContext& ctx) mutable {
return PrepareQueryInternal(query, expr, EKikimrQueryType::Script, settings, ctx);
});
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class IKqpHost : public TThrRefBase {
virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;

/* Federated queries */
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;

/* Scripting */
virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0;
Expand Down
66 changes: 46 additions & 20 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1804,13 +1804,13 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
}

std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName, bool enableOltp) {
const TString bucket = "test_bucket3";
const TString object = "test_object";

NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(enableOltp);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
Expand Down Expand Up @@ -1863,8 +1863,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {

}

void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
{
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable, bool enableOltp) {
if (enableOltp) {
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
}
Expand All @@ -1875,15 +1875,15 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
}
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
void DoCreateTableAsSelectFromExternalDataSource(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
if (enableOltp) {
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
Expand All @@ -1900,8 +1900,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = oltpTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

const TString olapTable = "DestinationOlap";
Expand All @@ -1923,22 +1922,43 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = olapTable,
"external_source"_a = externalDataSourceName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

ValidateTables(client, oltpTable, olapTable);
ValidateTables(client, oltpTable, olapTable, enableOltp);
}

void RunGenericQuery(const TString& query, TQueryClient& client, const TDriver&) {
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void RunGenericScript(const TString& script, TQueryClient& client, const TDriver& driver) {
auto scriptExecutionOperation = client.ExecuteScript(script).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), driver);
UNIT_ASSERT_VALUES_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToOneLineString());
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericQuery) {
DoCreateTableAsSelectFromExternalDataSource(&RunGenericQuery, true);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericScript) {
DoCreateTableAsSelectFromExternalDataSource(&RunGenericScript, false);
}

void DoCreateTableAsSelectFromExternalTable(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
const TString externalDataSourceName = "external_data_source";
const TString externalTableName = "test_binding_resolve";

auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
auto client = kikimr->GetQueryClient();

const TString oltpTable = "DestinationOltp";
{
if (enableOltp) {
const TString query = fmt::format(R"(
PRAGMA TablePathPrefix = "TestDomain";
CREATE TABLE `{destination}` (
Expand All @@ -1949,8 +1969,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = oltpTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

const TString olapTable = "DestinationOlap";
Expand All @@ -1966,11 +1985,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"destination"_a = olapTable,
"external_table"_a = externalTableName
);
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
requestRunner(query, client, kikimr->GetDriver());
}

ValidateTables(client, oltpTable, olapTable);
ValidateTables(client, oltpTable, olapTable, enableOltp);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericQuery) {
DoCreateTableAsSelectFromExternalTable(&RunGenericQuery, true);
}

Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericScript) {
DoCreateTableAsSelectFromExternalTable(&RunGenericScript, false);
}

Y_UNIT_TEST(OverridePlannerDefaults) {
Expand Down

0 comments on commit 9d88fbb

Please sign in to comment.