diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index 33c2f30ac7ae..0442b2233bda 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -24,12 +25,14 @@ struct TEvPlaceRequestIntoPool : public NActors::TEventLocal { - explicit TEvContinueRequest(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + TEvContinueRequest(Ydb::StatusIds::StatusCode status, const NResourcePool::TPoolSettings& poolConfig, NYql::TIssues issues = {}) : Status(status) + , PoolConfig(poolConfig) , Issues(std::move(issues)) {} const Ydb::StatusIds::StatusCode Status; + const NResourcePool::TPoolSettings PoolConfig; const NYql::TIssues Issues; }; diff --git a/ydb/core/kqp/common/events/ya.make b/ydb/core/kqp/common/events/ya.make index 5aa90fb52e12..76d8e0fe6274 100644 --- a/ydb/core/kqp/common/events/ya.make +++ b/ydb/core/kqp/common/events/ya.make @@ -15,6 +15,7 @@ PEERDIR( ydb/core/grpc_services/cancelation ydb/core/kqp/common/shutdown ydb/core/kqp/common/compilation + ydb/core/resource_pools ydb/library/yql/dq/actors ydb/public/api/protos diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 2ff8e50da589..1d5a966bd0fb 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -4,6 +4,8 @@ #include #include +#include + namespace NKikimr::NKqp { struct TUserRequestContext : public TAtomicRefCount { @@ -13,6 +15,7 @@ namespace NKikimr::NKqp { TString CurrentExecutionId; TString CustomerSuppliedId; TString PoolId; + NResourcePool::TPoolSettings PoolConfig; TUserRequestContext() = default; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index bac69990f5ef..22b43d7afc26 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -481,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped { } LOG_D("continue request, pool id: " << poolId); + QueryState->UserRequestContext->PoolConfig = ev->Get()->PoolConfig; CompileQuery(); } diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 608649f195a6..2b13e225fa88 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -238,7 +239,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { Y_UNIT_TEST(ExecuteQueryWithWorkloadManager) { NWorkload::TWorkloadManagerConfig workloadManagerConfig; - workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()}); + workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()}); SetWorkloadManagerConfig(workloadManagerConfig); NKikimrConfig::TAppConfig config; diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp index 3ea346056dbf..c186f4d3f865 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -101,7 +102,7 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { Y_UNIT_TEST(ExecuteScriptWithWorkloadManager) { NWorkload::TWorkloadManagerConfig workloadManagerConfig; - workloadManagerConfig.Pools.insert({"sample_pool_id", NWorkload::TWorkloadManagerConfig::TPoolConfig()}); + workloadManagerConfig.Pools.insert({"sample_pool_id", NResourcePool::TPoolSettings()}); SetWorkloadManagerConfig(workloadManagerConfig); NKikimrConfig::TAppConfig config; diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index e169dc19c3c4..cedba9fe5ba4 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -118,11 +118,6 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - if (!poolState->HasAccess(ev->Get()->UserToken)) { - ReplyContinueError(workerActorId, Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You do not have access permissions for pool " << poolId); - return; - } - if (poolState->PlaceRequest(workerActorId, ev->Get()->SessionId) && poolState->TablesRequired()) { ScheduleLeaseUpdate(); PrepareWorkloadServiceTables(); @@ -258,7 +253,7 @@ class TKqpWorkloadService : public TActorBootstrapped { void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const { LOG_W("Reply continue error " << status << " to " << replyActorId << ": " << message); - Send(replyActorId, new TEvContinueRequest(status, {NYql::TIssue(message)})); + Send(replyActorId, new TEvContinueRequest(status, {}, {NYql::TIssue(message)})); } void ReplyCleanupError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const { diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.h b/ydb/core/kqp/workload_service/kqp_workload_service.h index 2c776a7ee6f3..fd09fd7d6f37 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service.h @@ -1,5 +1,6 @@ #pragma once +#include #include @@ -8,15 +9,7 @@ namespace NKikimr::NKqp { namespace NWorkload { struct TWorkloadManagerConfig { - struct TPoolConfig { - ui64 ConcurrentQueryLimit = 0; // 0 = infinity - ui64 QueryCountLimit = 0; // 0 = infinity - TDuration QueryCancelAfter = TDuration::Days(1); - - TString ACL = ""; // empty = full access for all users - }; - - std::unordered_map Pools; + std::unordered_map Pools; }; void SetWorkloadManagerConfig(const TWorkloadManagerConfig& workloadManagerConfig); diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 57daf9bd7a34..4f1d96378471 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -1,11 +1,10 @@ #pragma once -#include "kqp_workload_service.h" #include "kqp_workload_service_tables_impl.h" #include +#include -#include #include @@ -18,7 +17,6 @@ namespace NQueue { class IState : public TThrRefBase { public: virtual bool TablesRequired() const = 0; - virtual bool HasAccess(const TIntrusiveConstPtr& userToken) const = 0; virtual ui64 GetLocalPoolSize() const = 0; virtual void OnPreparingFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) = 0; @@ -36,7 +34,7 @@ class IState : public TThrRefBase { using TStatePtr = TIntrusivePtr; -TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters); +TStatePtr CreateState(const NActors::TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); } // NQueue diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp b/ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp index 9f711fb16966..579071efc689 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp @@ -4,7 +4,6 @@ #include #include -#include #include @@ -24,33 +23,6 @@ using namespace NActors; class TStateBase : public IState { - class TAccessChecker { - public: - TAccessChecker(const TString& poolId, const TString& acl) { - if (acl) { - SecurityObject = NACLib::TSecurityObject(); - try { - SecurityObject->FromString(acl); - } catch (const yexception& error) { - Y_ENSURE(false, "Invalid ACL format for pool " << poolId << ": " << error.what()); - } - } - } - - bool HasAccess(const TIntrusiveConstPtr& userToken) const { - if (!SecurityObject) { - return true; - } - if (!userToken) { - return false; - } - return SecurityObject->CheckAccess(NACLib::EAccessRights::GenericUse, *userToken); - } - - private: - std::optional SecurityObject; - }; - protected: struct TRequest { enum class EState { @@ -74,34 +46,32 @@ class TStateBase : public IState { }; public: - TStateBase(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters) + TStateBase(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : Counters(counters) , ActorContext(actorContext) , PoolId(poolId) - , CancelAfter(poolConfig.QueryCancelAfter) , PoolSizeLimit(GetMaxPoolSize(poolConfig)) , InFlightLimit(GetMaxInFlight(poolConfig)) - , AccessChecker(poolId, poolConfig.ACL) + , PoolConfig(poolConfig) + , CancelAfter(poolConfig.QueryCancelAfter) { RegisterCounters(); } - bool HasAccess(const TIntrusiveConstPtr& userToken) const final { - return AccessChecker.HasAccess(userToken); - } - ui64 GetLocalPoolSize() const final { return LocalSessions.size(); } bool PlaceRequest(const TActorId& workerActorId, const TString& sessionId) final { if (LocalSessions.contains(sessionId)) { - ActorContext.Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)})); + ActorContext.Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)})); return false; } LOG_D("received new request, worker id: " << workerActorId << ", session id: " << sessionId); - ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId)); + if (CancelAfter) { + ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId)); + } TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second; LocalDelayedRequests->Inc(); @@ -143,7 +113,7 @@ class TStateBase : public IState { } void ReplyContinue(TRequest* request, Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, NYql::TIssues issues = {}) { - ActorContext.Send(request->WorkerActorId, new TEvContinueRequest(status, std::move(issues))); + ActorContext.Send(request->WorkerActorId, new TEvContinueRequest(status, PoolConfig, std::move(issues))); if (status == Ydb::StatusIds::SUCCESS) { LocalInFlight++; @@ -177,7 +147,7 @@ class TStateBase : public IState { if (!request->Started && request->State != TRequest::EState::Finishing) { if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) { status = Ydb::StatusIds::CANCELLED; - issues.AddIssue("Delay deadline exceeded"); + issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId); } ReplyContinue(request, status, issues); return; @@ -222,6 +192,21 @@ class TStateBase : public IState { return LocalInFlight; } + TMaybe GetWaitDeadline(TInstant startTime) { + if (!CancelAfter) { + return Nothing(); + } + return startTime + CancelAfter; + } + + NYql::TIssue GroupIssues(const TString& message, NYql::TIssues issues) { + NYql::TIssue rootIssue(message); + for (const NYql::TIssue& issue : issues) { + rootIssue.AddSubIssue(MakeIntrusive(issue)); + } + return rootIssue; + } + TString LogPrefix() const { return TStringBuilder() << "PoolId: " << PoolId << ", "; } @@ -250,12 +235,12 @@ class TStateBase : public IState { LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } - static ui64 GetMaxPoolSize(const TWorkloadManagerConfig::TPoolConfig& poolConfig) { + static ui64 GetMaxPoolSize(const NResourcePool::TPoolSettings& poolConfig) { const ui64 queryCountLimit = poolConfig.QueryCountLimit; return queryCountLimit ? queryCountLimit : std::numeric_limits::max(); } - static ui64 GetMaxInFlight(const TWorkloadManagerConfig::TPoolConfig& poolConfig) { + static ui64 GetMaxInFlight(const NResourcePool::TPoolSettings& poolConfig) { const ui64 queueSizeLimit = GetMaxPoolSize(poolConfig); const ui64 concurrentQueryLimit = poolConfig.ConcurrentQueryLimit; return std::min(concurrentQueryLimit ? concurrentQueryLimit : std::numeric_limits::max(), queueSizeLimit); @@ -279,12 +264,12 @@ class TStateBase : public IState { const TActorContext ActorContext; const TString PoolId; - const TDuration CancelAfter; const ui64 PoolSizeLimit; const ui64 InFlightLimit; private: - const TAccessChecker AccessChecker; + const NResourcePool::TPoolSettings PoolConfig; + const TDuration CancelAfter; ui64 LocalInFlight = 0; std::unordered_map LocalSessions; @@ -306,7 +291,7 @@ class TUnlimitedState : public TStateBase { using TBase = TStateBase; public: - TUnlimitedState(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters) + TUnlimitedState(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(actorContext, poolId, poolConfig, counters) { Y_ENSURE(InFlightLimit == std::numeric_limits::max()); @@ -341,7 +326,7 @@ class TFifoState : public TStateBase { static constexpr ui64 MAX_PENDING_REQUESTS = 1000; public: - TFifoState(TActorContext actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters) + TFifoState(TActorContext actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(actorContext, poolId, poolConfig, counters) { Y_ENSURE(InFlightLimit < std::numeric_limits::max()); @@ -370,9 +355,12 @@ class TFifoState : public TStateBase { void RefreshState(bool refreshRequired = false) override { RefreshRequired |= refreshRequired; - DoCleanupRequests(); + if (!PreparingFinished) { + return; + } - if (RunningOperation || !PreparingFinished) { + DoCleanupRequests(); + if (RunningOperation) { return; } @@ -451,17 +439,17 @@ class TFifoState : public TStateBase { if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { LOG_E("failed to delay request " << ev->Get()->Status << ", session id: " << ev->Get()->SessionId << ", issues: " << ev->Get()->Issues.ToOneLineString()); - ForUnfinished(ev->Get()->SessionId, [this, ev](TRequest* request) { - ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues); + NYql::TIssue issue = GroupIssues("Failed to put request in queue", ev->Get()->Issues); + ForUnfinished(ev->Get()->SessionId, [this, ev, issue](TRequest* request) { + ReplyContinue(request, ev->Get()->Status, {issue}); }); RefreshRequired = true; return; } - LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId); - GlobalState.DelayedRequests++; GlobalDelayedRequests->Inc(); + LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId); DoStartDelayedRequest(); RefreshState(); @@ -473,9 +461,10 @@ class TFifoState : public TStateBase { const TString& sessionId = ev->Get()->SessionId; if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { LOG_E("failed start request " << ev->Get()->Status << ", session id: " << sessionId << ", issues: " << ev->Get()->Issues.ToOneLineString()); - ForUnfinished(sessionId, [this, ev](TRequest* request) { + NYql::TIssue issue = GroupIssues("Failed to start request", ev->Get()->Issues); + ForUnfinished(sessionId, [this, ev, issue](TRequest* request) { AddFinishedRequest(request->SessionId); - ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues); + ReplyContinue(request, ev->Get()->Status, {issue}); }); RefreshState(); return; @@ -500,8 +489,9 @@ class TFifoState : public TStateBase { GlobalInFly->Inc(); ReplyContinue(request); } else { - AddFinishedRequest(request->SessionId); - request->State = TRequest::EState::Canceling; + // Request was dropped due to lease expiration + PendingRequests.emplace_front(request->SessionId); + PendingRequestsCount->Inc(); } }); DelayedRequests.pop_front(); @@ -566,9 +556,10 @@ class TFifoState : public TStateBase { if (!PendingRequests.empty()) { RunningOperation = true; const TString& sessionId = PopPendingRequest(); - ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, GetRequest(sessionId)->StartTime + CancelAfter, Counters)); + TRequest* request = GetRequest(sessionId); + ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters)); DelayedRequests.emplace_back(sessionId); - GetRequest(sessionId)->CleanupRequired = true; + request->CleanupRequired = true; } } @@ -580,7 +571,6 @@ class TFifoState : public TStateBase { if (!FinishedRequests.empty()) { RunningOperation = true; ActorContext.Register(CreateCleanupRequestsActor(ActorContext.SelfID, PoolId, FinishedRequests, Counters)); - FinishedRequests.clear(); FinishingRequestsCount->Set(0); } @@ -667,7 +657,7 @@ class TFifoState : public TStateBase { } // anonymous namespace -TStatePtr CreateState(const TActorContext& actorContext, const TString& poolId, const TWorkloadManagerConfig::TPoolConfig& poolConfig, NMonitoring::TDynamicCounterPtr counters) { +TStatePtr CreateState(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) { if (!poolConfig.ConcurrentQueryLimit && !poolConfig.QueryCountLimit) { return MakeIntrusive(actorContext, poolId, poolConfig, counters); } diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_tables.cpp b/ydb/core/kqp/workload_service/kqp_workload_service_tables.cpp index 0abc5874cd61..80905d4e5dbf 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_tables.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service_tables.cpp @@ -92,13 +92,15 @@ class TTablesCreator : public NTableCreator::TMultiTableCreator { { Col("database", NScheme::NTypeIds::Text), Col("pool_id", NScheme::NTypeIds::Text), + Col("start_time", NScheme::NTypeIds::Timestamp), Col("session_id", NScheme::NTypeIds::Text), Col("node_id", NScheme::NTypeIds::Uint32), Col("wait_deadline", NScheme::NTypeIds::Timestamp), + Col("lease_deadline", NScheme::NTypeIds::Timestamp), }, - { "database", "pool_id", "wait_deadline", "session_id" }, + { "database", "pool_id", "start_time", "session_id" }, NKikimrServices::KQP_WORKLOAD_SERVICE, - TtlCol("wait_deadline", DEADLINE_OFFSET, BRO_RUN_INTERVAL) + TtlCol("lease_deadline", DEADLINE_OFFSET, BRO_RUN_INTERVAL) ); } @@ -338,18 +340,32 @@ class TRefreshPoolStateQuery : public TQueryBase { void OnRunQuery() override { TString sql = TStringBuilder() << R"( -- TRefreshPoolStateQuery::OnRunQuery + DECLARE $database AS Text; DECLARE $pool_id AS Text; DECLARE $node_id AS Uint32; DECLARE $lease_duration AS Interval; + UPDATE `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` + SET lease_deadline = CurrentUtcTimestamp() + $lease_duration + WHERE database = $database + AND pool_id = $pool_id + AND node_id = $node_id + AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) + AND lease_deadline >= CurrentUtcTimestamp(); + UPDATE `)" << TTablesCreator::GetRunningRequestsPath() << R"(` SET lease_deadline = CurrentUtcTimestamp() + $lease_duration - WHERE pool_id = $pool_id - AND node_id = $node_id; + WHERE database = $database + AND pool_id = $pool_id + AND node_id = $node_id + AND lease_deadline >= CurrentUtcTimestamp(); )"; NYdb::TParamsBuilder params; params + .AddParam("$database") + .Utf8(GetDefaultDatabase()) + .Build() .AddParam("$pool_id") .Utf8(PoolId) .Build() @@ -367,21 +383,28 @@ class TRefreshPoolStateQuery : public TQueryBase { void OnLeaseUpdated() { TString sql = TStringBuilder() << R"( -- TRefreshPoolStateQuery::OnLeaseUpdated + DECLARE $database AS Text; DECLARE $pool_id AS Text; SELECT COUNT(*) AS delayed_requests FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE pool_id = $pool_id - AND wait_deadline >= CurrentUtcTimestamp(); + WHERE database = $database + AND pool_id = $pool_id + AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) + AND lease_deadline >= CurrentUtcTimestamp(); SELECT COUNT(*) AS running_requests FROM `)" << TTablesCreator::GetRunningRequestsPath() << R"(` - WHERE pool_id = $pool_id + WHERE database = $database + AND pool_id = $pool_id AND lease_deadline >= CurrentUtcTimestamp(); )"; NYdb::TParamsBuilder params; params + .AddParam("$database") + .Utf8(GetDefaultDatabase()) + .Build() .AddParam("$pool_id") .Utf8(PoolId) .Build(); @@ -433,11 +456,13 @@ class TRefreshPoolStateQuery : public TQueryBase { class TDelayRequestQuery : public TQueryBase { public: - TDelayRequestQuery(const TString& poolId, const TString& sessionId, TInstant waitDeadline, NMonitoring::TDynamicCounterPtr counters) + TDelayRequestQuery(const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) : TQueryBase(__func__, poolId, sessionId, counters) , PoolId(poolId) , SessionId(sessionId) + , StartTime(startTime) , WaitDeadline(waitDeadline) + , LeaseDuration(leaseDuration) {} void OnRunQuery() override { @@ -445,14 +470,17 @@ class TDelayRequestQuery : public TQueryBase { -- TDelayRequestQuery::OnRunQuery DECLARE $database AS Text; DECLARE $pool_id AS Text; + DECLARE $start_time AS Timestamp; DECLARE $session_id AS Text; DECLARE $node_id AS Uint32; - DECLARE $wait_deadline AS Timestamp; + DECLARE $wait_deadline AS Optional; + DECLARE $lease_duration AS Interval; UPSERT INTO `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - (database, pool_id, session_id, node_id, wait_deadline) + (database, pool_id, start_time, session_id, node_id, wait_deadline, lease_deadline) VALUES ( - $database, $pool_id, $session_id, $node_id, $wait_deadline + $database, $pool_id, $start_time, $session_id, $node_id, $wait_deadline, + CurrentUtcTimestamp() + $lease_duration ); )"; @@ -464,6 +492,9 @@ class TDelayRequestQuery : public TQueryBase { .AddParam("$pool_id") .Utf8(PoolId) .Build() + .AddParam("$start_time") + .Timestamp(StartTime) + .Build() .AddParam("$session_id") .Utf8(SessionId) .Build() @@ -471,7 +502,10 @@ class TDelayRequestQuery : public TQueryBase { .Uint32(SelfId().NodeId()) .Build() .AddParam("$wait_deadline") - .Timestamp(WaitDeadline) + .OptionalTimestamp(WaitDeadline) + .Build() + .AddParam("$lease_duration") + .Interval(static_cast(LeaseDuration.MicroSeconds())) .Build(); RunDataQuery(sql, ¶ms); @@ -488,7 +522,9 @@ class TDelayRequestQuery : public TQueryBase { private: const TString PoolId; const TString SessionId; - const TInstant WaitDeadline; + const TInstant StartTime; + const TMaybe WaitDeadline; + const TDuration LeaseDuration; }; @@ -503,18 +539,24 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { void OnRunQuery() override { TString sql = TStringBuilder() << R"( -- TStartFirstDelayedRequestQuery::OnRunQuery + DECLARE $database AS Text; DECLARE $pool_id AS Text; - SELECT pool_id, wait_deadline, session_id, node_id + SELECT database, pool_id, start_time, session_id, node_id FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE pool_id = $pool_id - AND wait_deadline >= CurrentUtcTimestamp() - ORDER BY pool_id, wait_deadline + WHERE database = $database + AND pool_id = $pool_id + AND (wait_deadline IS NULL OR wait_deadline >= CurrentUtcTimestamp()) + AND lease_deadline >= CurrentUtcTimestamp() + ORDER BY database, pool_id, start_time LIMIT 1; )"; NYdb::TParamsBuilder params; params + .AddParam("$database") + .Utf8(GetDefaultDatabase()) + .Build() .AddParam("$pool_id") .Utf8(PoolId) .Build(); @@ -556,13 +598,13 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { RequestSessionId = *sessionId; UpdateLogInfo(PoolId, RequestSessionId); - TMaybe waitDeadline = result.ColumnParser("wait_deadline").GetOptionalTimestamp(); - if (!waitDeadline) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Wait deadline is not specified for delayed request"); + TMaybe startTime = result.ColumnParser("start_time").GetOptionalTimestamp(); + if (!startTime) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Start time is not specified for delayed request"); return; } - RequestWaitDeadline = *waitDeadline; + RequestStartTime = *startTime; StartQueuedRequest(); } @@ -571,14 +613,15 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { -- TStartFirstDelayedRequestQuery::StartQueuedRequest DECLARE $database AS Text; DECLARE $pool_id AS Text; + DECLARE $start_time AS Timestamp; DECLARE $session_id AS Text; DECLARE $node_id AS Uint32; - DECLARE $wait_deadline AS Timestamp; DECLARE $lease_duration AS Interval; DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE pool_id = $pool_id - AND wait_deadline = $wait_deadline + WHERE database = $database + AND pool_id = $pool_id + AND start_time = $start_time AND session_id = $session_id; UPSERT INTO `)" << TTablesCreator::GetRunningRequestsPath() << R"(` @@ -597,15 +640,15 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { .AddParam("$pool_id") .Utf8(PoolId) .Build() + .AddParam("$start_time") + .Timestamp(RequestStartTime) + .Build() .AddParam("$session_id") .Utf8(RequestSessionId) .Build() .AddParam("$node_id") .Uint32(SelfId().NodeId()) .Build() - .AddParam("$wait_deadline") - .Timestamp(RequestWaitDeadline) - .Build() .AddParam("$lease_duration") .Interval(static_cast(LeaseDuration.MicroSeconds())) .Build(); @@ -628,7 +671,7 @@ class TStartFirstDelayedRequestQuery : public TQueryBase { ui32 RequestNodeId = 0; TString RequestSessionId; - TInstant RequestWaitDeadline; + TInstant RequestStartTime; }; class TStartRequestQuery : public TQueryBase { @@ -743,22 +786,28 @@ class TCleanupRequestsQuery : public TQueryBase { -- TCleanupRequestsQuery::OnRunQuery PRAGMA AnsiInForEmptyOrNullableItemsCollections; + DECLARE $database AS Text; DECLARE $pool_id AS Text; DECLARE $node_id AS Uint32; DECLARE $session_ids AS List; DELETE FROM `)" << TTablesCreator::GetDelayedRequestsPath() << R"(` - WHERE pool_id = $pool_id + WHERE database = $database + AND pool_id = $pool_id AND session_id IN $session_ids; DELETE FROM `)" << TTablesCreator::GetRunningRequestsPath() << R"(` - WHERE pool_id = $pool_id + WHERE database = $database + AND pool_id = $pool_id AND node_id = $node_id AND session_id IN $session_ids; )"; NYdb::TParamsBuilder params; params + .AddParam("$database") + .Utf8(GetDefaultDatabase()) + .Build() .AddParam("$pool_id") .Utf8(PoolId) .Build() @@ -808,8 +857,8 @@ IActor* CreateRefreshPoolStateActor(const TActorId& replyActorId, const TString& return new TQueryRetryActor(replyActorId, poolId, leaseDuration, counters); } -IActor* CreateDelayRequestActor(const TActorId& replyActorId, const TString& poolId, const TString& sessionId, TInstant waitDeadline, NMonitoring::TDynamicCounterPtr counters) { - return new TQueryRetryActor(replyActorId, poolId, sessionId, waitDeadline, counters); +IActor* CreateDelayRequestActor(const TActorId& replyActorId, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { + return new TQueryRetryActor, TDuration, NMonitoring::TDynamicCounterPtr>(replyActorId, poolId, sessionId, startTime, waitDeadline, leaseDuration, counters); } IActor* CreateStartRequestActor(const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters) { diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_tables.h b/ydb/core/kqp/workload_service/kqp_workload_service_tables.h index 3d02d36cf780..6b6a5d066d98 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_tables.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_tables.h @@ -15,7 +15,7 @@ NActors::IActor* CreateCleanupTablesActor(); NActors::IActor* CreateRefreshPoolStateActor(const NActors::TActorId& replyActorId, const TString& poolId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); // Push / Start / Finish requests in queue -NActors::IActor* CreateDelayRequestActor(const NActors::TActorId& replyActorId, const TString& poolId, const TString& sessionId, TInstant waitDeadline, NMonitoring::TDynamicCounterPtr counters); +NActors::IActor* CreateDelayRequestActor(const NActors::TActorId& replyActorId, const TString& poolId, const TString& sessionId, TInstant startTime, TMaybe waitDeadline, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); NActors::IActor* CreateStartRequestActor(const TString& poolId, const std::optional& sessionId, TDuration leaseDuration, NMonitoring::TDynamicCounterPtr counters); NActors::IActor* CreateCleanupRequestsActor(const NActors::TActorId& replyActorId, const TString& poolId, const std::vector& sessionIds, NMonitoring::TDynamicCounterPtr counters); diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/kqp_workload_service_ut.cpp index 51d6b444de10..abc9504617b2 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service_ut.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace NKikimr::NKqp { @@ -19,7 +21,7 @@ using namespace Tests; namespace { -constexpr TDuration FUTURE_WAIT_TIMEOUT = TDuration::Minutes(4); +constexpr TDuration FUTURE_WAIT_TIMEOUT = TDuration::Minutes(2); // Query runner @@ -284,18 +286,16 @@ struct TYdbSetupSettings { FLUENT_FIELD_DEFAULT(TString, PoolId, "sample_pool_id"); FLUENT_FIELD_DEFAULT(ui64, ConcurrentQueryLimit, 0); FLUENT_FIELD_DEFAULT(ui64, QueryCountLimit, 0); - FLUENT_FIELD_DEFAULT(TDuration, QueryCancelAfter, TDuration::Days(1)); - FLUENT_FIELD_DEFAULT(TString, ACL, ""); + FLUENT_FIELD_DEFAULT(TDuration, QueryCancelAfter, TDuration::Zero()); }; class TWorkloadServiceYdbSetup { private: TAppConfig GetAppConfig() const { - TWorkloadManagerConfig::TPoolConfig defaultPoolConfig; + NResourcePool::TPoolSettings defaultPoolConfig; defaultPoolConfig.ConcurrentQueryLimit = Settings_.ConcurrentQueryLimit_; defaultPoolConfig.QueryCountLimit = Settings_.QueryCountLimit_; defaultPoolConfig.QueryCancelAfter = Settings_.QueryCancelAfter_; - defaultPoolConfig.ACL = Settings_.ACL_; TWorkloadManagerConfig workloadManagerConfig; workloadManagerConfig.Pools.insert({Settings_.PoolId_, defaultPoolConfig}); @@ -470,31 +470,6 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(ydb.ExecuteQueryGrpc(TSampleQueries::TSelect42::Query, "another_pool_id")); } - Y_UNIT_TEST(ValidationOfPoolACL) { - auto settings = TYdbSetupSettings().ACL(TStringBuilder() << "+U:" << BUILTIN_ACL_ROOT); - TWorkloadServiceYdbSetup ydb(settings); - - auto checkFail = [settings](const auto& result) { - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::UNAUTHORIZED, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "You do not have access permissions for pool " << settings.PoolId_); - }; - - // Auth fail without token - checkFail(ydb.ExecuteQueryGrpc(TSampleQueries::TSelect42::Query)); - - // Successful auth - TSampleQueries::TSelect42::CheckResult(ydb.ExecuteQuery( - TSampleQueries::TSelect42::Query, - TQueryRunnerSettings().UserSID(BUILTIN_ACL_ROOT) - )); - - // Auth fail with invalid token - checkFail(ydb.ExecuteQuery( - TSampleQueries::TSelect42::Query, - TQueryRunnerSettings().UserSID("invalid@sid") - )); - } - Y_UNIT_TEST(ValidationOfQueryCountLimit) { auto settings = TYdbSetupSettings() .ConcurrentQueryLimit(1) diff --git a/ydb/core/kqp/workload_service/ya.make b/ydb/core/kqp/workload_service/ya.make index 4ca18298f14c..c8bb970d09d7 100644 --- a/ydb/core/kqp/workload_service/ya.make +++ b/ydb/core/kqp/workload_service/ya.make @@ -8,8 +8,9 @@ SRCS( PEERDIR( ydb/core/cms/console - ydb/core/protos ydb/core/kqp/common/events + ydb/core/protos + ydb/core/resource_pools ydb/library/actors/core ydb/library/query_actor diff --git a/ydb/core/resource_pools/resource_pool_settings.cpp b/ydb/core/resource_pools/resource_pool_settings.cpp new file mode 100644 index 000000000000..5d04cf9bc259 --- /dev/null +++ b/ydb/core/resource_pools/resource_pool_settings.cpp @@ -0,0 +1,5 @@ +#include "resource_pool_settings.h" + + +namespace NKikimr::NResourcePool { +} // namespace NKikimr::NResourcePool diff --git a/ydb/core/resource_pools/resource_pool_settings.h b/ydb/core/resource_pools/resource_pool_settings.h new file mode 100644 index 000000000000..4dafdacf0c0c --- /dev/null +++ b/ydb/core/resource_pools/resource_pool_settings.h @@ -0,0 +1,16 @@ +#pragma once + +#include + + +namespace NKikimr::NResourcePool { + +struct TPoolSettings { + ui64 ConcurrentQueryLimit = 0; // 0 = infinity + ui64 QueryCountLimit = 0; // 0 = infinity + TDuration QueryCancelAfter = TDuration::Zero(); // 0 = disabled + + double QueryMemoryLimitRatioPerNode = 100; // Percent from node memory capacity +}; + +} // namespace NKikimr::NResourcePool diff --git a/ydb/core/resource_pools/ya.make b/ydb/core/resource_pools/ya.make new file mode 100644 index 000000000000..c0ceb67af3f7 --- /dev/null +++ b/ydb/core/resource_pools/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + resource_pool_settings.cpp +) + +PEERDIR( + util +) + +END() diff --git a/ydb/core/ya.make b/ydb/core/ya.make index ba842b1babb7..66133a2d2fa4 100644 --- a/ydb/core/ya.make +++ b/ydb/core/ya.make @@ -46,6 +46,7 @@ RECURSE( public_http quoter raw_socket + resource_pools scheme scheme_types security