diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index bfbb2096944f..5faa30f86dc7 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -22,6 +22,74 @@ template class TPoolHandlerActorBase : public TActor { using TBase = TActor; + struct TCommonCounters { + const NMonitoring::TDynamicCounterPtr CountersRoot; + const NMonitoring::TDynamicCounterPtr CountersSubgroup; + + // Workload service counters + NMonitoring::TDynamicCounters::TCounterPtr ActivePoolHandlers; + + // Pool counters + NMonitoring::TDynamicCounters::TCounterPtr LocalInFly; + NMonitoring::TDynamicCounters::TCounterPtr LocalDelayedRequests; + NMonitoring::TDynamicCounters::TCounterPtr ContinueOk; + NMonitoring::TDynamicCounters::TCounterPtr ContinueOverloaded; + NMonitoring::TDynamicCounters::TCounterPtr ContinueError; + NMonitoring::TDynamicCounters::TCounterPtr CleanupOk; + NMonitoring::TDynamicCounters::TCounterPtr CleanupError; + NMonitoring::TDynamicCounters::TCounterPtr Cancelled; + NMonitoring::THistogramPtr DelayedTimeMs; + NMonitoring::THistogramPtr RequestsLatencyMs; + + // Config counters + NMonitoring::TDynamicCounters::TCounterPtr InFlightLimit; + NMonitoring::TDynamicCounters::TCounterPtr QueueSizeLimit; + NMonitoring::TDynamicCounters::TCounterPtr LoadCpuThreshold; + + TCommonCounters(NMonitoring::TDynamicCounterPtr counters, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) + : CountersRoot(counters) + , CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId))) + { + Register(); + UpdateConfigCounters(poolConfig); + } + + void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) { + InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0)); + QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0)); + LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0)); + } + + void OnCleanup() { + ActivePoolHandlers->Dec(); + + InFlightLimit->Set(0); + QueueSizeLimit->Set(0); + LoadCpuThreshold->Set(0); + } + + private: + void Register() { + ActivePoolHandlers = CountersRoot->GetCounter("ActivePoolHandlers", false); + ActivePoolHandlers->Inc(); + + LocalInFly = CountersSubgroup->GetCounter("LocalInFly", false); + LocalDelayedRequests = CountersSubgroup->GetCounter("LocalDelayedRequests", false); + ContinueOk = CountersSubgroup->GetCounter("ContinueOk", true); + ContinueOverloaded = CountersSubgroup->GetCounter("ContinueOverloaded", true); + ContinueError = CountersSubgroup->GetCounter("ContinueError", true); + CleanupOk = CountersSubgroup->GetCounter("CleanupOk", true); + CleanupError = CountersSubgroup->GetCounter("CleanupError", true); + Cancelled = CountersSubgroup->GetCounter("Cancelled", true); + DelayedTimeMs = CountersSubgroup->GetHistogram("DelayedTimeMs", NMonitoring::ExponentialHistogram(20, 2, 4)); + RequestsLatencyMs = CountersSubgroup->GetHistogram("RequestsLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 4)); + + InFlightLimit = CountersSubgroup->GetCounter("InFlightLimit", false); + QueueSizeLimit = CountersSubgroup->GetCounter("QueueSizeLimit", false); + LoadCpuThreshold = CountersSubgroup->GetCounter("LoadCpuThreshold", false); + } + }; + protected: struct TRequest { enum class EState { @@ -50,16 +118,13 @@ class TPoolHandlerActorBase : public TActor { public: TPoolHandlerActorBase(void (TDerived::* requestFunc)(TAutoPtr& ev), const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters) : TBase(requestFunc) - , CountersRoot(counters) - , CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId))) + , Counters(counters, database, poolId, poolConfig) , Database(database) , PoolId(poolId) , QueueSizeLimit(GetMaxQueueSize(poolConfig)) , InFlightLimit(GetMaxInFlight(poolConfig)) , PoolConfig(poolConfig) - { - RegisterCounters(); - } + {} STRICT_STFUNC(StateFuncBase, // Workload service events @@ -85,7 +150,7 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); } - ActivePoolHandlers->Dec(); + Counters.OnCleanup(); TBase::PassAway(); } @@ -125,7 +190,7 @@ class TPoolHandlerActorBase : public TActor { } TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second; - LocalDelayedRequests->Inc(); + Counters.LocalDelayedRequests->Inc(); UpdatePoolConfig(ev->Get()->PoolConfig); UpdateSchemeboardSubscription(ev->Get()->PathId); @@ -202,25 +267,25 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { LocalInFlight++; request->Started = true; - LocalInFly->Inc(); - ContinueOk->Inc(); - DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.LocalInFly->Inc(); + Counters.ContinueOk->Inc(); + Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_D("Reply continue success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { if (status == Ydb::StatusIds::OVERLOADED) { - ContinueOverloaded->Inc(); + Counters.ContinueOverloaded->Inc(); LOG_I("Reply overloaded to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } else if (status == Ydb::StatusIds::CANCELLED) { - Cancelled->Inc(); + Counters.Cancelled->Inc(); LOG_I("Reply cancelled to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } else { - ContinueError->Inc(); + Counters.ContinueError->Inc(); LOG_W("Reply continue error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } RemoveRequest(request); } - LocalDelayedRequests->Dec(); + Counters.LocalDelayedRequests->Dec(); } void FinalReply(TRequest* request, Ydb::StatusIds::StatusCode status, const TString& message) { @@ -239,9 +304,9 @@ class TPoolHandlerActorBase : public TActor { if (request->Started) { LocalInFlight--; - LocalInFly->Dec(); + Counters.LocalInFly->Dec(); } else { - LocalDelayedRequests->Dec(); + Counters.LocalDelayedRequests->Dec(); } if (request->State == TRequest::EState::Canceling) { @@ -321,11 +386,11 @@ class TPoolHandlerActorBase : public TActor { this->Send(request->WorkerActorId, new TEvCleanupResponse(status, issues)); if (status == Ydb::StatusIds::SUCCESS) { - CleanupOk->Inc(); - RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CleanupOk->Inc(); + Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { - CleanupError->Inc(); + Counters.CleanupError->Inc(); LOG_W("Reply cleanup error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString()); } } @@ -335,8 +400,8 @@ class TPoolHandlerActorBase : public TActor { ev->Record.MutableRequest()->SetSessionId(request->SessionId); this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release()); - Cancelled->Inc(); - RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.Cancelled->Inc(); + Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } @@ -366,10 +431,11 @@ class TPoolHandlerActorBase : public TActor { PoolConfig = poolConfig; QueueSizeLimit = GetMaxQueueSize(poolConfig); InFlightLimit = GetMaxInFlight(poolConfig); + Counters.UpdateConfigCounters(poolConfig); RefreshState(true); if (ShouldResign()) { - const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(Database, PoolId, poolConfig, CountersRoot)); + const TActorId& newHandler = this->RegisterWithSameMailbox(CreatePoolHandlerActor(Database, PoolId, poolConfig, Counters.CountersRoot)); this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvResignPoolHandler(Database, PoolId, newHandler)); } } @@ -384,25 +450,8 @@ class TPoolHandlerActorBase : public TActor { return concurrentQueryLimit == -1 ? std::numeric_limits::max() : static_cast(concurrentQueryLimit); } - void RegisterCounters() { - ActivePoolHandlers = CountersRoot->GetCounter("ActivePoolHandlers", false); - ActivePoolHandlers->Inc(); - - LocalInFly = CountersSubgroup->GetCounter("LocalInFly", false); - LocalDelayedRequests = CountersSubgroup->GetCounter("LocalDelayedRequests", false); - ContinueOk = CountersSubgroup->GetCounter("ContinueOk", true); - ContinueOverloaded = CountersSubgroup->GetCounter("ContinueOverloaded", true); - ContinueError = CountersSubgroup->GetCounter("ContinueError", true); - CleanupOk = CountersSubgroup->GetCounter("CleanupOk", true); - CleanupError = CountersSubgroup->GetCounter("CleanupError", true); - Cancelled = CountersSubgroup->GetCounter("Cancelled", true); - DelayedTimeMs = CountersSubgroup->GetHistogram("DelayedTimeMs", NMonitoring::ExponentialHistogram(20, 2, 4)); - RequestsLatencyMs = CountersSubgroup->GetHistogram("RequestsLatencyMs", NMonitoring::ExponentialHistogram(20, 2, 4)); - } - protected: - NMonitoring::TDynamicCounterPtr CountersRoot; - NMonitoring::TDynamicCounterPtr CountersSubgroup; + TCommonCounters Counters; // Configuration const TString Database; @@ -421,19 +470,6 @@ class TPoolHandlerActorBase : public TActor { ui64 LocalInFlight = 0; std::unordered_map LocalSessions; bool StopHandler = false; // Stop than all requests finished - - // Counters - NMonitoring::TDynamicCounters::TCounterPtr ActivePoolHandlers; - NMonitoring::TDynamicCounters::TCounterPtr LocalInFly; - NMonitoring::TDynamicCounters::TCounterPtr LocalDelayedRequests; - NMonitoring::TDynamicCounters::TCounterPtr ContinueOk; - NMonitoring::TDynamicCounters::TCounterPtr ContinueOverloaded; - NMonitoring::TDynamicCounters::TCounterPtr ContinueError; - NMonitoring::TDynamicCounters::TCounterPtr CleanupOk; - NMonitoring::TDynamicCounters::TCounterPtr CleanupError; - NMonitoring::TDynamicCounters::TCounterPtr Cancelled; - NMonitoring::THistogramPtr DelayedTimeMs; - NMonitoring::THistogramPtr RequestsLatencyMs; }; @@ -465,6 +501,38 @@ class TUnlimitedPoolHandlerActor : public TPoolHandlerActorBase { using TBase = TPoolHandlerActorBase; + struct TCounters { + // Fifo pool counters + NMonitoring::TDynamicCounters::TCounterPtr PendingRequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr FinishingRequestsCount; + NMonitoring::TDynamicCounters::TCounterPtr GlobalInFly; + NMonitoring::TDynamicCounters::TCounterPtr GlobalDelayedRequests; + NMonitoring::THistogramPtr PoolStateUpdatesBacklogMs; + + TCounters(NMonitoring::TDynamicCounterPtr countersSubgroup) { + Register(countersSubgroup); + } + + void UpdateGlobalState(const TPoolStateDescription& description) { + GlobalInFly->Set(description.RunningRequests); + GlobalDelayedRequests->Set(description.DelayedRequests); + } + + void OnCleanup() { + GlobalInFly->Set(0); + GlobalDelayedRequests->Set(0); + } + + private: + void Register(NMonitoring::TDynamicCounterPtr countersSubgroup) { + PendingRequestsCount = countersSubgroup->GetCounter("PendingRequestsCount", false); + FinishingRequestsCount = countersSubgroup->GetCounter("FinishingRequestsCount", false); + GlobalInFly = countersSubgroup->GetCounter("GlobalInFly", false); + GlobalDelayedRequests = countersSubgroup->GetCounter("GlobalDelayedRequests", false); + PoolStateUpdatesBacklogMs = countersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40)); + } + }; + enum class EStartRequestCase { Pending, Delayed @@ -475,9 +543,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(0); - GlobalDelayedRequests->Set(0); + FifoCounters.OnCleanup(); TBase::PassAway(); } @@ -515,7 +583,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSessionId); - PendingRequestsCount->Inc(); + FifoCounters.PendingRequestsCount->Inc(); if (!PreparingFinished) { this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPrepareTablesRequest(Database, PoolId)); @@ -534,6 +602,11 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase LEASE_DURATION) { + WaitingNodesInfo = true; + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvNodesInfoRequest()); + } + RefreshRequired |= refreshRequired; if (!PreparingFinished) { return; @@ -551,7 +624,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateRefreshPoolStateActor(this->SelfId(), Database, PoolId, LEASE_DURATION, CountersSubgroup)); + this->Register(CreateRefreshPoolStateActor(this->SelfId(), Database, PoolId, LEASE_DURATION, Counters.CountersSubgroup)); } } @@ -571,6 +644,14 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseGet()->NodeCount; + + LOG_T("Updated node info, noode count: " << NodeCount); + } + void Handle(TEvPrivate::TEvTablesCreationFinished::TPtr& ev) { if (ev->Get()->Success) { PreparingFinished = true; @@ -584,7 +665,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(0); + FifoCounters.PendingRequestsCount->Set(0); } void Handle(TEvPrivate::TEvRefreshPoolStateResponse::TPtr& ev) { @@ -598,7 +679,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseCollect((TInstant::Now() - LastRefreshTime).MilliSeconds()); + FifoCounters.PoolStateUpdatesBacklogMs->Collect((TInstant::Now() - LastRefreshTime).MilliSeconds()); } LastRefreshTime = TInstant::Now(); @@ -606,8 +687,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(GlobalState.RunningRequests); - GlobalDelayedRequests->Set(GlobalState.DelayedRequests); + FifoCounters.UpdateGlobalState(GlobalState); LOG_T("succefully refreshed pool state, in flight: " << GlobalState.RunningRequests << ", delayed: " << GlobalState.DelayedRequests); RemoveFinishedRequests(); @@ -619,7 +699,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(PendingRequests.size()); + FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); } if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) { @@ -649,7 +729,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseInc(); + FifoCounters.GlobalDelayedRequests->Inc(); LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId); DoStartDelayedRequest(GetLoadCpuThreshold()); @@ -677,7 +757,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); GetRequest(sessionId)->CleanupRequired = true; } break; @@ -724,12 +804,12 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseUsedCpuQuota = !!GetLoadCpuThreshold(); requestFound = true; GlobalState.RunningRequests++; - GlobalInFly->Inc(); + FifoCounters.GlobalInFly->Inc(); ReplyContinue(request); } else { // Request was dropped due to lease expiration PendingRequests.emplace_front(request->SessionId); - PendingRequestsCount->Inc(); + FifoCounters.PendingRequestsCount->Inc(); } }); DelayedRequests.pop_front(); @@ -766,14 +846,16 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, sessionId, LEASE_DURATION, Counters.CountersSubgroup)); GetRequest(sessionId)->CleanupRequired = true; } } @@ -790,7 +872,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, CountersSubgroup)); + this->Register(CreateStartRequestActor(this->SelfId(), Database, PoolId, std::nullopt, LEASE_DURATION, Counters.CountersSubgroup)); } } } @@ -805,7 +887,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateDelayRequestActor(this->SelfId(), Database, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, CountersSubgroup)); + this->Register(CreateDelayRequestActor(this->SelfId(), Database, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters.CountersSubgroup)); DelayedRequests.emplace_back(sessionId); request->CleanupRequired = true; } @@ -818,9 +900,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseRegister(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, CountersSubgroup)); + this->Register(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, Counters.CountersSubgroup)); FinishedRequests.clear(); - FinishingRequestsCount->Set(0); + FifoCounters.FinishingRequestsCount->Set(0); } } @@ -844,7 +926,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseSet(PendingRequests.size()); + FifoCounters.PendingRequestsCount->Set(PendingRequests.size()); } void RemoveFinishedRequests(std::deque& requests) { @@ -885,24 +967,18 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBaseDec(); + FifoCounters.PendingRequestsCount->Dec(); return sessionId; } void AddFinishedRequest(const TString& sessionId) { FinishedRequests.emplace_back(sessionId); - FinishingRequestsCount->Inc(); - } - - void RegisterCounters() { - PendingRequestsCount = CountersSubgroup->GetCounter("PendingRequestsCount", false); - FinishingRequestsCount = CountersSubgroup->GetCounter("FinishingRequestsCount", false); - GlobalInFly = CountersSubgroup->GetCounter("GlobalInFly", false); - GlobalDelayedRequests = CountersSubgroup->GetCounter("GlobalDelayedRequests", false); - PoolStateUpdatesBacklogMs = CountersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40)); + FifoCounters.FinishingRequestsCount->Inc(); } private: + TCounters FifoCounters; + bool PreparingFinished = false; bool RefreshRequired = false; bool RunningOperation = false; @@ -915,11 +991,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase { + }; + + struct TEvNodesInfoResponse : public NActors::TEventLocal { + explicit TEvNodesInfoResponse(ui32 nodeCount) + : NodeCount(nodeCount) + {} + + const ui32 NodeCount; + }; + // Tables queries events struct TEvTablesCreationFinished : public NActors::TEventLocal { TEvTablesCreationFinished(bool success, NYql::TIssues issues) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 718f831c245b..d3de8654636e 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -14,6 +14,8 @@ #include +#include + namespace NKikimr::NKqp { @@ -34,7 +36,8 @@ class TKqpWorkloadService : public TActorBootstrapped { enum class EWakeUp { IdleCheck, - StartCpuLoadRequest + StartCpuLoadRequest, + StartNodeInfoRequest }; public: @@ -92,6 +95,13 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(ev->Sender, responseEvent.release(), IEventHandle::FlagTrackDelivery, ev->Cookie); } + void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) { + NodeCount = ev->Get()->Nodes.size(); + ScheduleNodeInfoRequest(); + + LOG_T("Updated node info, noode count: " << NodeCount); + } + void Handle(TEvents::TEvUndelivered::TPtr& ev) const { switch (ev->Get()->SourceType) { case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest: @@ -102,6 +112,11 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_E("Failed to deliver config notification response"); break; + case TEvInterconnect::EvListNodes: + LOG_W("Failed to deliver list nodes request"); + ScheduleNodeInfoRequest(); + break; + default: LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType); break; @@ -145,6 +160,10 @@ class TKqpWorkloadService : public TActorBootstrapped { case EWakeUp::StartCpuLoadRequest: RunCpuLoadRequest(); break; + + case EWakeUp::StartNodeInfoRequest: + RunNodeInfoRequest(); + break; } } @@ -152,6 +171,7 @@ class TKqpWorkloadService : public TActorBootstrapped { sFunc(TEvents::TEvPoison, HandlePoison); sFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleSetConfigSubscriptionResponse); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + hFunc(TEvInterconnect::TEvNodesInfo, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvPlaceRequestIntoPool, Handle); @@ -160,6 +180,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); + hFunc(TEvPrivate::TEvNodesInfoRequest, Handle); hFunc(TEvPrivate::TEvRefreshPoolState, Handle); hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle); hFunc(TEvPrivate::TEvFinishRequestInPool, Handle); @@ -214,6 +235,10 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvPrivate::TEvNodesInfoRequest::TPtr& ev) const { + Send(ev->Sender, new TEvPrivate::TEvNodesInfoResponse(NodeCount)); + } + void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) { const auto& event = ev->Get()->Record; const TString& database = event.GetDatabase(); @@ -333,6 +358,7 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_I("Started workload service initialization"); Register(CreateCleanupTablesActor()); + RunNodeInfoRequest(); } void PrepareWorkloadServiceTables() { @@ -420,6 +446,14 @@ class TKqpWorkloadService : public TActorBootstrapped { Register(CreateCpuLoadFetcherActor(SelfId())); } + void ScheduleNodeInfoRequest() const { + Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast(EWakeUp::StartCpuLoadRequest))); + } + + void RunNodeInfoRequest() const { + Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(), IEventHandle::FlagTrackDelivery); + } + private: void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const { ReplyContinueError(replyActorId, status, {NYql::TIssue(message)}); @@ -494,6 +528,7 @@ class TKqpWorkloadService : public TActorBootstrapped { std::unordered_set DatabasesWithDefaultPool; std::unordered_map PoolIdToState; std::unique_ptr CpuQuotaManager; + ui32 NodeCount = 0; NMonitoring::TDynamicCounters::TCounterPtr ActivePools; }; diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 557b4a41b649..825ea33bbe2f 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -460,6 +460,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { } } + // Coomon helpers TTestActorRuntime* GetRuntime() const override { return Server_->GetRuntime(); } @@ -491,19 +492,6 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { return event; } - static void WaitFor(TDuration timeout, TString description, std::function callback) { - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= timeout) { - TString errorString; - if (callback(errorString)) { - return; - } - Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Waiting " << description << " timeout"); - } - NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const { return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp") ->GetSubgroup("subsystem", "workload_manager"); @@ -598,6 +586,21 @@ TIntrusivePtr TYdbSetupSettings::Create() const { return MakeIntrusive(*this); } +//// IYdbSetup + +void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function callback) { + TInstant start = TInstant::Now(); + while (TInstant::Now() - start <= timeout) { + TString errorString; + if (callback(errorString)) { + return; + } + Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n"; + Sleep(TDuration::Seconds(1)); + } + UNIT_ASSERT_C(false, "Waiting " << description << " timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout); +} + //// TSampleQueriess void TSampleQueries::CompareYson(const TString& expected, const TString& actual) { diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index e2e05141247d..35f1a1693140 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -106,8 +106,10 @@ class IYdbSetup : public TThrRefBase { virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0; virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0; + // Coomon helpers virtual TTestActorRuntime* GetRuntime() const = 0; virtual const TYdbSetupSettings& GetSettings() const = 0; + static void WaitFor(TDuration timeout, TString description, std::function callback); }; // Test queries diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp index 288f3b72edeb..4d37370a8599 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp @@ -133,22 +133,26 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) { Y_UNIT_TEST(TestLeaseExpiration) { auto ydb = TYdbSetupSettings() .ConcurrentQueryLimit(1) + .QueryCancelAfter(TDuration::Zero()) .Create(); // Create tables - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); + auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true)); + ydb->WaitQueryExecution(hangingRequest); - const TDuration leaseDuration = TDuration::Seconds(10); - StartRequest(ydb, "test_session", leaseDuration); - DelayRequest(ydb, "test_session", leaseDuration); - CheckPoolDescription(ydb, 1, 1, leaseDuration); + auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)); + ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1}); ydb->StopWorkloadService(); ydb->WaitPoolHandlersCount(0); // Check that lease expired - Sleep(leaseDuration + TDuration::Seconds(5)); - CheckPoolDescription(ydb, 0, 0); + IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) { + auto description = ydb->GetPoolDescription(TDuration::Zero()); + + errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests; + return description.AmountRequests() == 0; + }); } Y_UNIT_TEST(TestLeaseUpdates) { diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 807ec953ffd1..5456074990f4 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -444,19 +444,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { DROP RESOURCE POOL )" << poolId << ";" ); - TInstant start = TInstant::Now(); - while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) { - if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) { - auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString()); - UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found"); - return; - } - - Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n"; - Sleep(TDuration::Seconds(1)); - } - UNIT_ASSERT_C(false, "Pool drop waiting timeout"); + IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) { + auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind; + + errorString = TStringBuilder() << "kind = " << kind; + return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown; + }); + + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found"); } Y_UNIT_TEST(TestResourcePoolAcl) { diff --git a/ydb/core/kqp/workload_service/ya.make b/ydb/core/kqp/workload_service/ya.make index d36580965937..b8b5704044e2 100644 --- a/ydb/core/kqp/workload_service/ya.make +++ b/ydb/core/kqp/workload_service/ya.make @@ -10,6 +10,8 @@ PEERDIR( ydb/core/fq/libs/compute/common ydb/core/kqp/workload_service/actors + + ydb/library/actors/interconnect ) YQL_LAST_ABI_VERSION()