Skip to content

Commit

Permalink
YQ-3644 added validations for resource pool parametres (#8958)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 11, 2024
1 parent 89fd4d0 commit b82247b
Show file tree
Hide file tree
Showing 21 changed files with 239 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ NMetadata::NModifications::TOperationParsingResult TResourcePoolClassifierManage
} catch (...) {
throw yexception() << "Failed to parse property " << property << ": " << CurrentExceptionMessage();
}
} else if (!featuresExtractor.ExtractResetFeature(property)) {
} else if (featuresExtractor.ExtractResetFeature(property)) {
if (property == "resource_pool") {
ythrow yexception() << "Cannot reset required property resource_pool";
}
} else {
continue;
}

Expand All @@ -65,6 +69,19 @@ NMetadata::NModifications::TOperationParsingResult TResourcePoolClassifierManage
}
}

if (context.GetActivityType() == EActivityType::Create) {
if (!configJson.GetMap().contains("resource_pool")) {
ythrow yexception() << "Missing required property resource_pool";
}

static const TString extraPathSymbolsAllowed = "!\"#$%&'()*+,-.:;<=>?@[\\]^_`{|}~";
const auto& name = settings.GetObjectId();
if (const auto brokenAt = PathPartBrokenAt(name, extraPathSymbolsAllowed); brokenAt != name.end()) {
ythrow yexception() << "Symbol '" << *brokenAt << "'" << " is not allowed in the resource pool classifier name '" << name << "'";
}
}
resourcePoolClassifierSettings.Validate();

NJsonWriter::TBuf writer;
writer.WriteJsonValue(&configJson);
result.SetColumn(TResourcePoolClassifierConfig::TDecoder::ConfigJson, NMetadata::NInternal::TYDBValue::Utf8(writer.Str()));
Expand Down
24 changes: 14 additions & 10 deletions ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,17 +422,19 @@ class TResourcePoolsCache {
struct TClassifierInfo {
const TString Membername;
const TString PoolId;
const i64 Rank;

TClassifierInfo(const NResourcePool::TClassifierSettings& classifierSettings)
: Membername(classifierSettings.Membername)
, PoolId(classifierSettings.ResourcePool)
, Rank(classifierSettings.Rank)
{}
};

struct TDatabaseInfo {
std::unordered_map<TString, TResourcePoolClassifierConfig> ResourcePoolsClassifiers = {};
std::map<i64, TClassifierInfo> RankToClassifierInfo = {};
std::unordered_map<TString, TString> UserToResourcePool = {};
std::unordered_map<TString, std::pair<TString, i64>> UserToResourcePool = {};
bool Serverless = false;
};

Expand Down Expand Up @@ -462,16 +464,16 @@ class TResourcePoolsCache {
}

TDatabaseInfo& databaseInfo = *GetOrCreateDatabaseInfo(database);
if (const auto& poolId = GetPoolIdFromClassifiers(database, userToken->GetUserSID(), databaseInfo, userToken, actorContext)) {
return poolId;
}
auto [resultPoolId, resultRank] = GetPoolIdFromClassifiers(database, userToken->GetUserSID(), databaseInfo, userToken, actorContext);
for (const auto& userSID : userToken->GetGroupSIDs()) {
if (const auto& poolId = GetPoolIdFromClassifiers(database, userSID, databaseInfo, userToken, actorContext)) {
return poolId;
const auto& [poolId, rank] = GetPoolIdFromClassifiers(database, userSID, databaseInfo, userToken, actorContext);
if (poolId && (!resultPoolId || resultRank > rank)) {
resultPoolId = poolId;
resultRank = rank;
}
}

return NResourcePool::DEFAULT_POOL_ID;
return resultPoolId ? resultPoolId : NResourcePool::DEFAULT_POOL_ID;
}

std::optional<TPoolInfo> GetPoolInfo(const TString& database, const TString& poolId, TActorContext actorContext) const {
Expand Down Expand Up @@ -582,13 +584,14 @@ class TResourcePoolsCache {
}
}

TString GetPoolIdFromClassifiers(const TString& database, const TString& userSID, TDatabaseInfo& databaseInfo, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorContext actorContext) const {
std::pair<TString, i64> GetPoolIdFromClassifiers(const TString& database, const TString& userSID, TDatabaseInfo& databaseInfo, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorContext actorContext) const {
auto& usersMap = databaseInfo.UserToResourcePool;
if (const auto it = usersMap.find(userSID); it != usersMap.end()) {
return it->second;
}

TString poolId = "";
i64 rank = -1;
for (const auto& [_, classifier] : databaseInfo.RankToClassifierInfo) {
if (classifier.Membername != userSID) {
continue;
Expand All @@ -605,11 +608,12 @@ class TResourcePoolsCache {
}

poolId = classifier.PoolId;
rank = classifier.Rank;
break;
}

usersMap[userSID] = poolId;
return poolId;
usersMap[userSID] = {poolId, rank};
return {poolId, rank};
}

TDatabaseInfo* GetOrCreateDatabaseInfo(const TString& database) {
Expand Down
76 changes: 65 additions & 11 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6316,6 +6316,20 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Failed to parse property concurrent_query_limit:");

result = session.ExecuteSchemeQuery(TStringBuilder() << R"(
CREATE RESOURCE POOL MyResourcePool WITH (
CONCURRENT_QUERY_LIMIT=)" << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT + 1 << R"(
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Invalid resource pool configuration, concurrent_query_limit is " << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT + 1 << ", that exceeds limit in " << NResourcePool::POOL_MAX_CONCURRENT_QUERY_LIMIT);

result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL MyResourcePool WITH (
QUEUE_SIZE=1
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SCHEME_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Invalid resource pool configuration, queue_size unsupported without concurrent_query_limit or database_load_cpu_threshold");
}

Y_UNIT_TEST(CreateResourcePool) {
Expand Down Expand Up @@ -6536,8 +6550,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
// ALTER RESOURCE POOL CLASSIFIER
checkDisabled(R"(
ALTER RESOURCE POOL CLASSIFIER MyResourcePoolClassifier
SET (RANK = 1, MEMBERNAME = "test@user"),
RESET (RESOURCE_POOL);
SET (RANK = 1, RESOURCE_POOL = "test"),
RESET (MEMBERNAME);
)");

// DROP RESOURCE POOL CLASSIFIER
Expand Down Expand Up @@ -6570,8 +6584,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

const auto& alterSql = R"(
ALTER RESOURCE POOL CLASSIFIER MyResourcePoolClassifier
SET (RANK = 1, MEMBERNAME = "test@user"),
RESET (RESOURCE_POOL);
SET (RANK = 1, RESOURCE_POOL = "test"),
RESET (MEMBERNAME);
)";

const auto& dropSql = "DROP RESOURCE POOL CLASSIFIER MyResourcePoolClassifier;";
Expand Down Expand Up @@ -6610,6 +6624,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

auto result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test",
ANOTHER_PROPERTY=20
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
Expand All @@ -6625,10 +6640,40 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test",
RANK="StringValue"
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Failed to parse property rank:");

result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RANK="0"
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing required property resource_pool");

result = session.ExecuteSchemeQuery(R"(
ALTER RESOURCE POOL CLASSIFIER MyResourcePoolClassifier
RESET (RESOURCE_POOL);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Cannot reset required property resource_pool");

result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER `MyResource/PoolClassifier` WITH (
RESOURCE_POOL="test"
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Symbol '/' is not allowed in the resource pool classifier name 'MyResource/PoolClassifier'");

result = session.ExecuteSchemeQuery(TStringBuilder() << R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test",
MEMBERNAME=")" << BUILTIN_ACL_METADATA << R"("
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Invalid resource pool classifier configuration, cannot create classifier for system user " << BUILTIN_ACL_METADATA);
}

Y_UNIT_TEST(ResourcePoolClassifiersRankValidation) {
Expand All @@ -6645,13 +6690,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
// Create with sample rank
auto result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER ClassifierRank42 WITH (
RESOURCE_POOL="test_pool",
RANK=42
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToOneLineString());

// Try to create with same rank
result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER AnotherClassifierRank42 WITH (
RESOURCE_POOL="test_pool",
RANK=42
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
Expand All @@ -6660,13 +6707,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
// Create with high rank
result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER `ClassifierRank2^63` WITH (
RESOURCE_POOL="test_pool",
RANK=9223372036854775807
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToOneLineString());

// Try to create with auto rank
result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL CLASSIFIER ClassifierRankAuto WITH (
RESOURCE_POOL="test_pool",
MEMBERNAME="test@user"
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
Expand Down Expand Up @@ -6724,11 +6773,12 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
// Auto rank
query = R"(
CREATE RESOURCE POOL CLASSIFIER AnotherResourcePoolClassifier WITH (
RESOURCE_POOL="test_pool",
MEMBERNAME="another@user"
);)";
result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"test@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":1020,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{\"membername\":\"another@user\"},\"database\":\"\\/Root\"}]}");
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"test@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":1020,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{\"membername\":\"another@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"}]}");
}

Y_UNIT_TEST(DoubleCreateResourcePoolClassifier) {
Expand All @@ -6745,6 +6795,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
{
auto query = R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test_pool",
RANK=20
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
Expand All @@ -6754,6 +6805,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
{
auto query = R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test_pool",
RANK=1
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
Expand Down Expand Up @@ -6800,22 +6852,23 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
{
auto query = R"(
CREATE RESOURCE POOL CLASSIFIER AnotherResourcePoolClassifier WITH (
RESOURCE_POOL="test_pool",
RANK=42
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"test@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":42,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{},\"database\":\"\\/Root\"}]}");
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"test@user\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":42,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"}]}");
}

// Test reset
{
auto query = R"(
ALTER RESOURCE POOL CLASSIFIER MyResourcePoolClassifier
RESET (RANK, RESOURCE_POOL);
RESET (RANK, MEMBERNAME);
)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":1042,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"test@user\",\"resource_pool\":\"default\"},\"database\":\"\\/Root\"},{\"rank\":42,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{},\"database\":\"\\/Root\"}]}");
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":1042,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"membername\":\"\",\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"},{\"rank\":42,\"name\":\"AnotherResourcePoolClassifier\",\"config\":{\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"}]}");
}
}

Expand All @@ -6832,8 +6885,8 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

auto query = R"(
ALTER RESOURCE POOL CLASSIFIER MyResourcePoolClassifier
SET (MEMBERNAME = "test@user", RANK = 100),
RESET (RESOURCE_POOL);
SET (RESOURCE_POOL = "test", RANK = 100),
RESET (MEMBERNAME);
)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
Expand All @@ -6854,11 +6907,12 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
{
auto query = R"(
CREATE RESOURCE POOL CLASSIFIER MyResourcePoolClassifier WITH (
RESOURCE_POOL="test_pool",
RANK=20
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{},\"database\":\"\\/Root\"}]}");
UNIT_ASSERT_VALUES_EQUAL(FetchResourcePoolClassifiers(kikimr), "{\"resource_pool_classifiers\":[{\"rank\":20,\"name\":\"MyResourcePoolClassifier\",\"config\":{\"resource_pool\":\"test_pool\"},\"database\":\"\\/Root\"}]}");
}

{
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/workload_service/common/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message) {
}

void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& description, NResourcePool::TPoolSettings& poolConfig) {
const auto& properties = description.GetProperties().GetProperties();
for (auto& [property, value] : poolConfig.GetPropertiesMap()) {
if (auto propertyIt = properties.find(property); propertyIt != properties.end()) {
std::visit(NResourcePool::TPoolSettings::TParser{propertyIt->second}, value);
}
}
poolConfig = NResourcePool::TPoolSettings(description.GetProperties().GetProperties());
}

ui64 SaturationSub(ui64 x, ui64 y) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
UNIT_ASSERT_C(settings.PoolId_, "Query pool id is not specified");

auto event = std::make_unique<TEvKqp::TEvQueryRequest>();
event->Record.SetUserToken(NACLib::TUserToken("", settings.UserSID_, {}).SerializeAsString());
event->Record.SetUserToken(NACLib::TUserToken("", settings.UserSID_, settings.GroupSIDs_).SerializeAsString());

auto request = event->Record.MutableRequest();
request->SetQuery(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct TQueryRunnerSettings {
FLUENT_SETTING_DEFAULT(ui32, NodeIndex, 0);
FLUENT_SETTING_DEFAULT(std::optional<TString>, PoolId, std::nullopt);
FLUENT_SETTING_DEFAULT(TString, UserSID, "user@" BUILTIN_SYSTEM_DOMAIN);
FLUENT_SETTING_DEFAULT(TVector<TString>, GroupSIDs, {});
FLUENT_SETTING_DEFAULT(TString, Database, "");

// Runner settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) {
// Check alter access
TSampleQueries::CheckSuccess(ydb->ExecuteQuery(TStringBuilder() << R"(
ALTER RESOURCE POOL )" << NResourcePool::DEFAULT_POOL_ID << R"( SET (
QUEUE_SIZE=1
QUERY_MEMORY_LIMIT_PERCENT_PER_NODE=1
);
)", settings));

Expand Down Expand Up @@ -205,7 +205,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceSubscriptions) {

ydb->ExecuteSchemeQuery(TStringBuilder() << R"(
ALTER RESOURCE POOL )" << ydb->GetSettings().PoolId_ << R"( SET (
QUEUE_SIZE=42
CONCURRENT_QUERY_LIMIT=42
);
)");

Expand All @@ -214,7 +214,7 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceSubscriptions) {

const auto& config = response->Get()->Config;
UNIT_ASSERT_C(config, "Pool config not found");
UNIT_ASSERT_VALUES_EQUAL(config->QueueSize, 42);
UNIT_ASSERT_VALUES_EQUAL(config->ConcurrentQueryLimit, 42);
}

Y_UNIT_TEST(TestResourcePoolSubscriptionAfterAclChange) {
Expand Down
Loading

0 comments on commit b82247b

Please sign in to comment.