Skip to content

Commit

Permalink
Merge 25df095 into cb9a972
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 15, 2024
2 parents cb9a972 + 25df095 commit ea00dc4
Show file tree
Hide file tree
Showing 20 changed files with 552 additions and 4 deletions.
28 changes: 28 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/behaviour.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "behaviour.h"
#include "manager.h"

#include <ydb/services/metadata/abstract/initialization.h>


namespace NKikimr::NKqp {

TResourcePoolBehaviour::TFactory::TRegistrator<TResourcePoolBehaviour> TResourcePoolBehaviour::Registrator(TResourcePoolConfig::GetTypeId());

NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolBehaviour::ConstructInitializer() const {
return nullptr;
}

NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolBehaviour::ConstructOperationsManager() const {
return std::make_shared<TResourcePoolManager>();
}

TString TResourcePoolBehaviour::GetInternalStorageTablePath() const {
return TResourcePoolConfig::GetTypeId();
}


TString TResourcePoolBehaviour::GetTypeId() const {
return TResourcePoolConfig::GetTypeId();
}

} // namespace NKikimr::NKqp
27 changes: 27 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/behaviour.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <ydb/services/metadata/abstract/kqp_common.h>


namespace NKikimr::NKqp {

class TResourcePoolConfig {
public:
static TString GetTypeId() {
return "RESOURCE_POOL";
}
};

class TResourcePoolBehaviour: public NMetadata::TClassBehaviour<TResourcePoolConfig> {
static TFactory::TRegistrator<TResourcePoolBehaviour> Registrator;

protected:
virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override;
virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override;
virtual TString GetInternalStorageTablePath() const override;

public:
virtual TString GetTypeId() const override;
};

} // namespace NKikimr::NKqp
80 changes: 80 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "manager.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>


namespace NKikimr::NKqp {

namespace {

void CheckFeatureFlag(TResourcePoolManager::TInternalModificationContext& context) {
auto* actorSystem = context.GetExternalData().GetActorSystem();
if (!actorSystem) {
ythrow yexception() << "This place needs an actor system. Please contact internal support";
}

if (!AppData(actorSystem)->FeatureFlags.GetEnableResourcePools()) {
throw std::runtime_error("Resource pools are disabled. Please contact your system administrator to enable it");
}
}

void ValidateObjectId(const TString& objectId) {
if (objectId.find('/') != TString::npos) {
throw std::runtime_error("Resource pool id should not contain '/' symbol");
}
}

TResourcePoolManager::TYqlConclusionStatus StatusFromActivityType(TResourcePoolManager::EActivityType activityType) {
using TYqlConclusionStatus = TResourcePoolManager::TYqlConclusionStatus;
using EActivityType = TResourcePoolManager::EActivityType;

switch (activityType) {
case EActivityType::Undefined:
return TYqlConclusionStatus::Fail("Undefined operation for RESOURCE_POOL object");
case EActivityType::Upsert:
return TYqlConclusionStatus::Fail("Upsert operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Create:
return TYqlConclusionStatus::Fail("Create operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Alter:
return TYqlConclusionStatus::Fail("Alter operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Drop:
return TYqlConclusionStatus::Fail("Drop operation for RESOURCE_POOL objects is not implemented");
}
}

} // anonymous namespace

NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
Y_UNUSED(nodeId, manager);

try {
CheckFeatureFlag(context);
ValidateObjectId(settings.GetObjectId());

return NThreading::MakeFuture<TYqlConclusionStatus>(StatusFromActivityType(context.GetActivityType()));
} catch (...) {
return NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Fail(CurrentExceptionMessage()));
}
}

TResourcePoolManager::TYqlConclusionStatus TResourcePoolManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
Y_UNUSED(schemeOperation, manager);

try {
CheckFeatureFlag(context);
ValidateObjectId(settings.GetObjectId());

return StatusFromActivityType(context.GetActivityType());
} catch (...) {
return TYqlConclusionStatus::Fail(CurrentExceptionMessage());
}
}

NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const {
Y_UNUSED(nodeId, manager, context);

return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
}

} // namespace NKikimr::NKqp
25 changes: 25 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <ydb/services/metadata/manager/abstract.h>


namespace NKikimr::NKqp {

class TResourcePoolManager : public NMetadata::NModifications::IOperationsManager {
public:
using NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus;

protected:
NThreading::TFuture<TYqlConclusionStatus> DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId,
const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override;

TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override;

public:

NThreading::TFuture<TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
};

} // namespace NKikimr::NKqp
15 changes: 15 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
LIBRARY()

SRCS(
manager.cpp
GLOBAL behaviour.cpp
)

PEERDIR(
ydb/services/metadata/abstract
ydb/services/metadata/manager
)

YQL_LAST_ABI_VERSION()

END()
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/behaviour/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
RECURSE(
external_data_source
resource_pool
table
tablestore
)
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ PEERDIR(
ydb/core/kqp/gateway/behaviour/tablestore
ydb/core/kqp/gateway/behaviour/table
ydb/core/kqp/gateway/behaviour/external_data_source
ydb/core/kqp/gateway/behaviour/resource_pool
ydb/core/kqp/gateway/behaviour/view
ydb/core/kqp/gateway/utils
ydb/library/yql/providers/result/expr_nodes
Expand Down
45 changes: 45 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5988,6 +5988,51 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_EQUAL_C(describe.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(DisableResourcePools) {
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto checkDisabled = [&session](const TString& query) {
Cerr << "Check query:\n" << query << "\n";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
};

// CREATE RESOURCE POOL
checkDisabled(R"(
CREATE RESOURCE POOL `MyResourcePool` WITH (
CONCURRENT_QUERY_LIMIT=20,
QUERY_CANCEL_AFTER_SECONDS=86400,
QUERY_COUNT_LIMIT=1000
);)");

// ALTER RESOURCE POOL
checkDisabled(R"(
ALTER RESOURCE POOL `MyResourcePool`
SET (CONCURRENT_QUERY_LIMIT = 30),
SET QUERY_COUNT_LIMIT 100,
RESET (QUERY_CANCEL_AFTER_SECONDS);
)");

// DROP RESOURCE POOL
checkDisabled("DROP RESOURCE POOL `MyResourcePool`;");
}

Y_UNIT_TEST(ResourcePoolsValidation) {
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(true));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL `MyFolder/MyResourcePool` WITH (
CONCURRENT_QUERY_LIMIT=20
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pool id should not contain '/' symbol");
}
}

Y_UNIT_TEST_SUITE(KqpOlapScheme) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ message TFeatureFlags {
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
optional bool EnableDbMetadataCache = 127 [default = false];
optional bool EnableTableDatetime64 = 128 [default = false];
optional bool EnableResourcePools = 129 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableReplaceIfExistsForExternalEntities)
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
FEATURE_FLAG_SETTER(EnableTableDatetime64)
FEATURE_FLAG_SETTER(EnableResourcePools)

#undef FEATURE_FLAG_SETTER
};
Expand Down
21 changes: 21 additions & 0 deletions ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ sql_stmt_core:
| create_view_stmt
| drop_view_stmt
| alter_replication_stmt
| create_resource_pool_stmt
| alter_resource_pool_stmt
| drop_resource_pool_stmt
;

expr:
Expand Down Expand Up @@ -798,6 +801,21 @@ permission_name: permission_id | STRING_VALUE;

permission_name_target: permission_name (COMMA permission_name)* COMMA? | ALL PRIVILEGES?;

create_resource_pool_stmt: CREATE RESOURCE POOL object_ref
with_table_settings
;

alter_resource_pool_stmt: ALTER RESOURCE POOL object_ref
alter_resource_pool_action (COMMA alter_resource_pool_action)*
;
alter_resource_pool_action:
alter_table_set_table_setting_uncompat
| alter_table_set_table_setting_compat
| alter_table_reset_table_setting
;

drop_resource_pool_stmt: DROP RESOURCE POOL object_ref;

create_replication_stmt: CREATE ASYNC REPLICATION object_ref
FOR replication_target (COMMA replication_target)*
WITH LPAREN replication_settings RPAREN
Expand Down Expand Up @@ -1189,6 +1207,7 @@ keyword_as_compat:
| PATTERN
| PER
| PERMUTE
| POOL
| PRIVILEGES
| QUEUE
// | READ
Expand Down Expand Up @@ -1347,6 +1366,7 @@ keyword_compat: (
| PER
| PERMUTE
| PLAN
| POOL
| PRAGMA
| PRECEDING
| PRESORT
Expand Down Expand Up @@ -1695,6 +1715,7 @@ PATTERN: P A T T E R N;
PER: P E R;
PERMUTE: P E R M U T E;
PLAN: P L A N;
POOL: P O O L;
PRAGMA: P R A G M A;
PRECEDING: P R E C E D I N G;
PRESORT: P R E S O R T;
Expand Down
37 changes: 36 additions & 1 deletion ydb/library/yql/sql/v1/format/sql_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,38 @@ friend struct TStaticData;
VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg);
}

void VisitCreateResourcePool(const TRule_create_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitAllFields(TRule_create_resource_pool_stmt::GetDescriptor(), msg);
}

void VisitAlterResourcePool(const TRule_alter_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitToken(msg.GetToken1());
VisitToken(msg.GetToken2());
VisitToken(msg.GetToken3());
Visit(msg.GetRule_object_ref4());

NewLine();
PushCurrentIndent();
Visit(msg.GetRule_alter_resource_pool_action5());
for (const auto& action : msg.GetBlock6()) {
Visit(action.GetToken1()); // comma
NewLine();
Visit(action.GetRule_alter_resource_pool_action2());
}

PopCurrentIndent();
}

void VisitDropResourcePool(const TRule_drop_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitAllFields(TRule_drop_resource_pool_stmt::GetDescriptor(), msg);
}

void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) {
VisitAllFieldsImpl<TPrettyVisitor, &TPrettyVisitor::Visit>(this, descr, msg);
}
Expand Down Expand Up @@ -2708,7 +2740,10 @@ TStaticData::TStaticData()
{TRule_revoke_permissions_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitRevokePermissions)},
{TRule_alter_table_store_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTableStore)},
{TRule_create_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateView)},
{TRule_drop_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropView)}
{TRule_drop_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropView)},
{TRule_create_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateResourcePool)},
{TRule_alter_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterResourcePool)},
{TRule_drop_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropResourcePool)}
})
, ObfuscatingVisitDispatch({
{TToken::GetDescriptor(), MakeObfuscatingFunctor(&TObfuscatingVisitor::VisitToken)},
Expand Down
18 changes: 18 additions & 0 deletions ydb/library/yql/sql/v1/format/sql_format_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1581,4 +1581,22 @@ FROM Input MATCH_RECOGNIZE (PATTERN (A) DEFINE A AS A);
TSetup setup;
setup.Run(cases);
}

Y_UNIT_TEST(ResourcePoolOperations) {
TCases cases = {
{"creAte reSourCe poOl naMe With (a = \"b\")",
"CREATE RESOURCE POOL naMe WITH (a = \"b\");\n"},
{"create resource pool eds with (a=\"a\",b=\"b\",c = true)",
"CREATE RESOURCE POOL eds WITH (\n\ta = \"a\",\n\tb = \"b\",\n\tc = TRUE\n);\n"},
{"alTer reSOurcE poOl naMe sEt a tRue, resEt (b, c), seT (x=y, z=false)",
"ALTER RESOURCE POOL naMe\n\tSET a TRUE,\n\tRESET (b, c),\n\tSET (x = y, z = FALSE);\n"},
{"alter resource pool eds reset (a), set (x=y)",
"ALTER RESOURCE POOL eds\n\tRESET (a),\n\tSET (x = y);\n"},
{"dRop reSourCe poOl naMe",
"DROP RESOURCE POOL naMe;\n"},
};

TSetup setup;
setup.Run(cases);
}
}
Loading

0 comments on commit ea00dc4

Please sign in to comment.