Skip to content

Commit

Permalink
Merge 2106be6 into 29f1fa1
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 16, 2024
2 parents 29f1fa1 + 2106be6 commit b09c5bf
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 123 deletions.
252 changes: 163 additions & 89 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct TEvPrivate {
EvCpuQuotaRequest,
EvCpuQuotaResponse,
EvCpuLoadResponse,
EvNodesInfoRequest,
EvNodesInfoResponse,

EvTablesCreationFinished,
EvCleanupTableResponse,
Expand Down Expand Up @@ -183,6 +185,17 @@ struct TEvPrivate {
const NYql::TIssues Issues;
};

struct TEvNodesInfoRequest : public NActors::TEventLocal<TEvNodesInfoRequest, EvNodesInfoRequest> {
};

struct TEvNodesInfoResponse : public NActors::TEventLocal<TEvNodesInfoResponse, EvNodesInfoResponse> {
explicit TEvNodesInfoResponse(ui32 nodeCount)
: NodeCount(nodeCount)
{}

const ui32 NodeCount;
};

// Tables queries events
struct TEvTablesCreationFinished : public NActors::TEventLocal<TEvTablesCreationFinished, EvTablesCreationFinished> {
TEvTablesCreationFinished(bool success, NYql::TIssues issues)
Expand Down
37 changes: 36 additions & 1 deletion ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <ydb/core/protos/console_config.pb.h>

#include <ydb/library/actors/interconnect/interconnect.h>


namespace NKikimr::NKqp {

Expand All @@ -34,7 +36,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

enum class EWakeUp {
IdleCheck,
StartCpuLoadRequest
StartCpuLoadRequest,
StartNodeInfoRequest
};

public:
Expand Down Expand Up @@ -92,6 +95,13 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(ev->Sender, responseEvent.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
}

void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
NodeCount = ev->Get()->Nodes.size();
ScheduleNodeInfoRequest();

LOG_T("Updated node info, noode count: " << NodeCount);
}

void Handle(TEvents::TEvUndelivered::TPtr& ev) const {
switch (ev->Get()->SourceType) {
case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest:
Expand All @@ -102,6 +112,11 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
LOG_E("Failed to deliver config notification response");
break;

case TEvInterconnect::EvListNodes:
LOG_W("Failed to deliver list nodes request");
ScheduleNodeInfoRequest();
break;

default:
LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType);
break;
Expand Down Expand Up @@ -145,13 +160,18 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
case EWakeUp::StartCpuLoadRequest:
RunCpuLoadRequest();
break;

case EWakeUp::StartNodeInfoRequest:
RunNodeInfoRequest();
break;
}
}

STRICT_STFUNC(MainState,
sFunc(TEvents::TEvPoison, HandlePoison);
sFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleSetConfigSubscriptionResponse);
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
hFunc(TEvents::TEvUndelivered, Handle);

hFunc(TEvPlaceRequestIntoPool, Handle);
Expand All @@ -160,6 +180,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle);
hFunc(TEvPrivate::TEvNodesInfoRequest, Handle);
hFunc(TEvPrivate::TEvRefreshPoolState, Handle);
hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle);
hFunc(TEvPrivate::TEvFinishRequestInPool, Handle);
Expand Down Expand Up @@ -214,6 +235,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
}
}

void Handle(TEvPrivate::TEvNodesInfoRequest::TPtr& ev) const {
Send(ev->Sender, new TEvPrivate::TEvNodesInfoResponse(NodeCount));
}

void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) {
const auto& event = ev->Get()->Record;
const TString& database = event.GetDatabase();
Expand Down Expand Up @@ -333,6 +358,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {

LOG_I("Started workload service initialization");
Register(CreateCleanupTablesActor());
RunNodeInfoRequest();
}

void PrepareWorkloadServiceTables() {
Expand Down Expand Up @@ -420,6 +446,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Register(CreateCpuLoadFetcherActor(SelfId()));
}

void ScheduleNodeInfoRequest() const {
Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::StartCpuLoadRequest)));
}

void RunNodeInfoRequest() const {
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(), IEventHandle::FlagTrackDelivery);
}

private:
void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
ReplyContinueError(replyActorId, status, {NYql::TIssue(message)});
Expand Down Expand Up @@ -494,6 +528,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
std::unordered_set<TString> DatabasesWithDefaultPool;
std::unordered_map<TString, TPoolState> PoolIdToState;
std::unique_ptr<TCpuQuotaManagerState> CpuQuotaManager;
ui32 NodeCount = 0;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
}
}

// Coomon helpers
TTestActorRuntime* GetRuntime() const override {
return Server_->GetRuntime();
}
Expand Down Expand Up @@ -491,19 +492,6 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
return event;
}

static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout");
}

NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const {
return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp")
->GetSubgroup("subsystem", "workload_manager");
Expand Down Expand Up @@ -598,6 +586,21 @@ TIntrusivePtr<IYdbSetup> TYdbSetupSettings::Create() const {
return MakeIntrusive<TWorkloadServiceYdbSetup>(*this);
}

//// IYdbSetup

void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout);
}

//// TSampleQueriess

void TSampleQueries::CompareYson(const TString& expected, const TString& actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ class IYdbSetup : public TThrRefBase {
virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0;
virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0;

// Coomon helpers
virtual TTestActorRuntime* GetRuntime() const = 0;
virtual const TYdbSetupSettings& GetSettings() const = 0;
static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback);
};

// Test queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,26 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) {
Y_UNIT_TEST(TestLeaseExpiration) {
auto ydb = TYdbSetupSettings()
.ConcurrentQueryLimit(1)
.QueryCancelAfter(TDuration::Zero())
.Create();

// Create tables
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query));
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
ydb->WaitQueryExecution(hangingRequest);

const TDuration leaseDuration = TDuration::Seconds(10);
StartRequest(ydb, "test_session", leaseDuration);
DelayRequest(ydb, "test_session", leaseDuration);
CheckPoolDescription(ydb, 1, 1, leaseDuration);
auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});

ydb->StopWorkloadService();
ydb->WaitPoolHandlersCount(0);

// Check that lease expired
Sleep(leaseDuration + TDuration::Seconds(5));
CheckPoolDescription(ydb, 0, 0);
IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) {
auto description = ydb->GetPoolDescription(TDuration::Zero());

errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests;
return description.AmountRequests() == 0;
});
}

Y_UNIT_TEST(TestLeaseUpdates) {
Expand Down
23 changes: 10 additions & 13 deletions ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
DROP RESOURCE POOL )" << poolId << ";"
);

TInstant start = TInstant::Now();
while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) {
if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) {
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
return;
}

Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Pool drop waiting timeout");
IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) {
auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind;

errorString = TStringBuilder() << "kind = " << kind;
return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown;
});

auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
}

Y_UNIT_TEST(TestResourcePoolAcl) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/workload_service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ PEERDIR(
ydb/core/fq/libs/compute/common

ydb/core/kqp/workload_service/actors

ydb/library/actors/interconnect
)

YQL_LAST_ABI_VERSION()
Expand Down

0 comments on commit b09c5bf

Please sign in to comment.