Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WM passed resource pool settings to user request context #5734

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/event_local.h>
Expand All @@ -24,12 +25,14 @@ struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestInto
};

struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqpWorkloadServiceEvents::EvContinueRequest> {
explicit TEvContinueRequest(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
TEvContinueRequest(Ydb::StatusIds::StatusCode status, const NResourcePool::TPoolSettings& poolConfig, NYql::TIssues issues = {})
: Status(status)
, PoolConfig(poolConfig)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const NResourcePool::TPoolSettings PoolConfig;
const NYql::TIssues Issues;
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PEERDIR(
ydb/core/grpc_services/cancelation
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/compilation
ydb/core/resource_pools

ydb/library/yql/dq/actors
ydb/public/api/protos
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <util/generic/fwd.h>
#include <contrib/libs/protobuf/src/google/protobuf/map.h>

#include <ydb/core/resource_pools/resource_pool_settings.h>

namespace NKikimr::NKqp {

struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
Expand All @@ -13,6 +15,7 @@ namespace NKikimr::NKqp {
TString CurrentExecutionId;
TString CustomerSuppliedId;
TString PoolId;
NResourcePool::TPoolSettings PoolConfig;

TUserRequestContext() = default;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

LOG_D("continue request, pool id: " << poolId);
QueryState->UserRequestContext->PoolConfig = ev->Get()->PoolConfig;
CompileQuery();
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
Expand Down Expand Up @@ -238,7 +239,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

Y_UNIT_TEST(ExecuteQueryWithWorkloadManager) {
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
SetWorkloadManagerConfig(workloadManagerConfig);

NKikimrConfig::TAppConfig config;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -101,7 +102,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {

Y_UNIT_TEST(ExecuteScriptWithWorkloadManager) {
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
SetWorkloadManagerConfig(workloadManagerConfig);

NKikimrConfig::TAppConfig config;
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

if (!poolState->HasAccess(ev->Get()->UserToken)) {
ReplyContinueError(workerActorId, Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You do not have access permissions for pool " << poolId);
return;
}

if (poolState->PlaceRequest(workerActorId, ev->Get()->SessionId) && poolState->TablesRequired()) {
ScheduleLeaseUpdate();
PrepareWorkloadServiceTables();
Expand Down Expand Up @@ -258,7 +253,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
LOG_W("Reply continue error " << status << " to " << replyActorId << ": " << message);
Send(replyActorId, new TEvContinueRequest(status, {NYql::TIssue(message)}));
Send(replyActorId, new TEvContinueRequest(status, {}, {NYql::TIssue(message)}));
}

void ReplyCleanupError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
Expand Down
11 changes: 2 additions & 9 deletions ydb/core/kqp/workload_service/kqp_workload_service.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/library/actors/core/actor.h>


Expand All @@ -8,15 +9,7 @@ namespace NKikimr::NKqp {
namespace NWorkload {

struct TWorkloadManagerConfig {
struct TPoolConfig {
ui64 ConcurrentQueryLimit = 0; // 0 = infinity
ui64 QueryCountLimit = 0; // 0 = infinity
TDuration QueryCancelAfter = TDuration::Days(1);

TString ACL = ""; // empty = full access for all users
};

std::unordered_map<TString, TPoolConfig> Pools;
std::unordered_map<TString, NResourcePool::TPoolSettings> Pools;
};

void SetWorkloadManagerConfig(const TWorkloadManagerConfig& workloadManagerConfig);
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/workload_service/kqp_workload_service_impl.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#pragma once

#include "kqp_workload_service.h"
#include "kqp_workload_service_tables_impl.h"

#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/actor.h>


Expand All @@ -18,7 +17,6 @@ namespace NQueue {
class IState : public TThrRefBase {
public:
virtual bool TablesRequired() const = 0;
virtual bool HasAccess(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken) const = 0;
virtual ui64 GetLocalPoolSize() const = 0;

virtual void OnPreparingFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) = 0;
Expand All @@ -36,7 +34,7 @@ class IState : public TThrRefBase {

using TStatePtr = TIntrusivePtr<IState>;

TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters);
TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

} // NQueue

Expand Down
Loading
Loading