Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3345 fixed WM counters and unit test #6643

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 163 additions & 89 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct TEvPrivate {
EvCpuQuotaRequest,
EvCpuQuotaResponse,
EvCpuLoadResponse,
EvNodesInfoRequest,
EvNodesInfoResponse,

EvTablesCreationFinished,
EvCleanupTableResponse,
Expand Down Expand Up @@ -183,6 +185,17 @@ struct TEvPrivate {
const NYql::TIssues Issues;
};

struct TEvNodesInfoRequest : public NActors::TEventLocal<TEvNodesInfoRequest, EvNodesInfoRequest> {
};

struct TEvNodesInfoResponse : public NActors::TEventLocal<TEvNodesInfoResponse, EvNodesInfoResponse> {
explicit TEvNodesInfoResponse(ui32 nodeCount)
: NodeCount(nodeCount)
{}

const ui32 NodeCount;
};

// Tables queries events
struct TEvTablesCreationFinished : public NActors::TEventLocal<TEvTablesCreationFinished, EvTablesCreationFinished> {
TEvTablesCreationFinished(bool success, NYql::TIssues issues)
Expand Down
37 changes: 36 additions & 1 deletion ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <ydb/core/protos/console_config.pb.h>

#include <ydb/library/actors/interconnect/interconnect.h>


namespace NKikimr::NKqp {

Expand All @@ -34,7 +36,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

enum class EWakeUp {
IdleCheck,
StartCpuLoadRequest
StartCpuLoadRequest,
StartNodeInfoRequest
};

public:
Expand Down Expand Up @@ -92,6 +95,13 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(ev->Sender, responseEvent.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
}

void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
NodeCount = ev->Get()->Nodes.size();
ScheduleNodeInfoRequest();

LOG_T("Updated node info, noode count: " << NodeCount);
}

void Handle(TEvents::TEvUndelivered::TPtr& ev) const {
switch (ev->Get()->SourceType) {
case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest:
Expand All @@ -102,6 +112,11 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
LOG_E("Failed to deliver config notification response");
break;

case TEvInterconnect::EvListNodes:
LOG_W("Failed to deliver list nodes request");
ScheduleNodeInfoRequest();
break;

default:
LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType);
break;
Expand Down Expand Up @@ -145,13 +160,18 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
case EWakeUp::StartCpuLoadRequest:
RunCpuLoadRequest();
break;

case EWakeUp::StartNodeInfoRequest:
RunNodeInfoRequest();
break;
}
}

STRICT_STFUNC(MainState,
sFunc(TEvents::TEvPoison, HandlePoison);
sFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleSetConfigSubscriptionResponse);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
hFunc(TEvents::TEvUndelivered, Handle);

hFunc(TEvPlaceRequestIntoPool, Handle);
Expand All @@ -160,6 +180,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle);
hFunc(TEvPrivate::TEvNodesInfoRequest, Handle);
hFunc(TEvPrivate::TEvRefreshPoolState, Handle);
hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle);
hFunc(TEvPrivate::TEvFinishRequestInPool, Handle);
Expand Down Expand Up @@ -214,6 +235,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
}
}

void Handle(TEvPrivate::TEvNodesInfoRequest::TPtr& ev) const {
Send(ev->Sender, new TEvPrivate::TEvNodesInfoResponse(NodeCount));
}

void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) {
const auto& event = ev->Get()->Record;
const TString& database = event.GetDatabase();
Expand Down Expand Up @@ -333,6 +358,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

LOG_I("Started workload service initialization");
Register(CreateCleanupTablesActor());
RunNodeInfoRequest();
}

void PrepareWorkloadServiceTables() {
Expand Down Expand Up @@ -420,6 +446,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Register(CreateCpuLoadFetcherActor(SelfId()));
}

void ScheduleNodeInfoRequest() const {
Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::StartCpuLoadRequest)));
}

void RunNodeInfoRequest() const {
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(), IEventHandle::FlagTrackDelivery);
}

private:
void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
ReplyContinueError(replyActorId, status, {NYql::TIssue(message)});
Expand Down Expand Up @@ -494,6 +528,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
std::unordered_set<TString> DatabasesWithDefaultPool;
std::unordered_map<TString, TPoolState> PoolIdToState;
std::unique_ptr<TCpuQuotaManagerState> CpuQuotaManager;
ui32 NodeCount = 0;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
}
}

// Coomon helpers
TTestActorRuntime* GetRuntime() const override {
return Server_->GetRuntime();
}
Expand Down Expand Up @@ -491,19 +492,6 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
return event;
}

static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout");
}

NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const {
return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp")
->GetSubgroup("subsystem", "workload_manager");
Expand Down Expand Up @@ -598,6 +586,21 @@ TIntrusivePtr<IYdbSetup> TYdbSetupSettings::Create() const {
return MakeIntrusive<TWorkloadServiceYdbSetup>(*this);
}

//// IYdbSetup

void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout);
}

//// TSampleQueriess

void TSampleQueries::CompareYson(const TString& expected, const TString& actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ class IYdbSetup : public TThrRefBase {
virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0;
virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0;

// Coomon helpers
virtual TTestActorRuntime* GetRuntime() const = 0;
virtual const TYdbSetupSettings& GetSettings() const = 0;
static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback);
};

// Test queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,26 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) {
Y_UNIT_TEST(TestLeaseExpiration) {
auto ydb = TYdbSetupSettings()
.ConcurrentQueryLimit(1)
.QueryCancelAfter(TDuration::Zero())
.Create();

// Create tables
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query));
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
ydb->WaitQueryExecution(hangingRequest);

const TDuration leaseDuration = TDuration::Seconds(10);
StartRequest(ydb, "test_session", leaseDuration);
DelayRequest(ydb, "test_session", leaseDuration);
CheckPoolDescription(ydb, 1, 1, leaseDuration);
auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});

ydb->StopWorkloadService();
ydb->WaitPoolHandlersCount(0);

// Check that lease expired
Sleep(leaseDuration + TDuration::Seconds(5));
CheckPoolDescription(ydb, 0, 0);
IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) {
auto description = ydb->GetPoolDescription(TDuration::Zero());

errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests;
return description.AmountRequests() == 0;
});
}

Y_UNIT_TEST(TestLeaseUpdates) {
Expand Down
23 changes: 10 additions & 13 deletions ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
DROP RESOURCE POOL )" << poolId << ";"
);

TInstant start = TInstant::Now();
while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) {
if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) {
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
return;
}

Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Pool drop waiting timeout");
IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) {
auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind;

errorString = TStringBuilder() << "kind = " << kind;
return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown;
});

auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
}

Y_UNIT_TEST(TestResourcePoolAcl) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/workload_service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ PEERDIR(
ydb/core/fq/libs/compute/common

ydb/core/kqp/workload_service/actors

ydb/library/actors/interconnect
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading