Skip to content

Commit

Permalink
WM passed resource pool settings to user request context (#5734)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 20, 2024
1 parent 5c18e12 commit a3d73ea
Show file tree
Hide file tree
Showing 18 changed files with 189 additions and 145 deletions.
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/event_local.h>
Expand All @@ -24,12 +25,14 @@ struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestInto
};

struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqpWorkloadServiceEvents::EvContinueRequest> {
explicit TEvContinueRequest(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
TEvContinueRequest(Ydb::StatusIds::StatusCode status, const NResourcePool::TPoolSettings& poolConfig, NYql::TIssues issues = {})
: Status(status)
, PoolConfig(poolConfig)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const NResourcePool::TPoolSettings PoolConfig;
const NYql::TIssues Issues;
};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ PEERDIR(
ydb/core/grpc_services/cancelation
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/compilation
ydb/core/resource_pools

ydb/library/yql/dq/actors
ydb/public/api/protos
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <util/generic/fwd.h>
#include <contrib/libs/protobuf/src/google/protobuf/map.h>

#include <ydb/core/resource_pools/resource_pool_settings.h>

namespace NKikimr::NKqp {

struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
Expand All @@ -13,6 +15,7 @@ namespace NKikimr::NKqp {
TString CurrentExecutionId;
TString CustomerSuppliedId;
TString PoolId;
NResourcePool::TPoolSettings PoolConfig;

TUserRequestContext() = default;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

LOG_D("continue request, pool id: " << poolId);
QueryState->UserRequestContext->PoolConfig = ev->Get()->PoolConfig;
CompileQuery();
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
Expand Down Expand Up @@ -238,7 +239,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

Y_UNIT_TEST(ExecuteQueryWithWorkloadManager) {
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
SetWorkloadManagerConfig(workloadManagerConfig);

NKikimrConfig::TAppConfig config;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -101,7 +102,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {

Y_UNIT_TEST(ExecuteScriptWithWorkloadManager) {
NWorkload::TWorkloadManagerConfig workloadManagerConfig;
workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()});
workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()});
SetWorkloadManagerConfig(workloadManagerConfig);

NKikimrConfig::TAppConfig config;
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

if (!poolState->HasAccess(ev->Get()->UserToken)) {
ReplyContinueError(workerActorId, Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You do not have access permissions for pool " << poolId);
return;
}

if (poolState->PlaceRequest(workerActorId, ev->Get()->SessionId) && poolState->TablesRequired()) {
ScheduleLeaseUpdate();
PrepareWorkloadServiceTables();
Expand Down Expand Up @@ -258,7 +253,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
LOG_W("Reply continue error " << status << " to " << replyActorId << ": " << message);
Send(replyActorId, new TEvContinueRequest(status, {NYql::TIssue(message)}));
Send(replyActorId, new TEvContinueRequest(status, {}, {NYql::TIssue(message)}));
}

void ReplyCleanupError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
Expand Down
11 changes: 2 additions & 9 deletions ydb/core/kqp/workload_service/kqp_workload_service.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/library/actors/core/actor.h>


Expand All @@ -8,15 +9,7 @@ namespace NKikimr::NKqp {
namespace NWorkload {

struct TWorkloadManagerConfig {
struct TPoolConfig {
ui64 ConcurrentQueryLimit = 0; // 0 = infinity
ui64 QueryCountLimit = 0; // 0 = infinity
TDuration QueryCancelAfter = TDuration::Days(1);

TString ACL = ""; // empty = full access for all users
};

std::unordered_map<TString, TPoolConfig> Pools;
std::unordered_map<TString, NResourcePool::TPoolSettings> Pools;
};

void SetWorkloadManagerConfig(const TWorkloadManagerConfig& workloadManagerConfig);
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/workload_service/kqp_workload_service_impl.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#pragma once

#include "kqp_workload_service.h"
#include "kqp_workload_service_tables_impl.h"

#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/actor.h>


Expand All @@ -18,7 +17,6 @@ namespace NQueue {
class IState : public TThrRefBase {
public:
virtual bool TablesRequired() const = 0;
virtual bool HasAccess(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken) const = 0;
virtual ui64 GetLocalPoolSize() const = 0;

virtual void OnPreparingFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) = 0;
Expand All @@ -36,7 +34,7 @@ class IState : public TThrRefBase {

using TStatePtr = TIntrusivePtr<IState>;

TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters);
TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

} // NQueue

Expand Down
Loading

0 comments on commit a3d73ea

Please sign in to comment.