Skip to content

Commit

Permalink
YQ WM fix for sls feature flag (#7057)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 30, 2024
1 parent a87900c commit 9d12242
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 38 deletions.
7 changes: 5 additions & 2 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload {
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

// Fetch pool and create default pool if needed
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);

// Fetch and create pool in scheme shard
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Checks that database is serverless
NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database);

// Cpu load fetcher actor
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);

Expand Down
118 changes: 102 additions & 16 deletions ydb/core/kqp/workload_service/actors/scheme_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ using namespace NActors;

class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
public:
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
: Event(std::move(event))
, EnableOnServerless(enableOnServerless)
{
if (!Event->Get()->PoolId) {
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
Expand All @@ -39,7 +38,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

void StartPoolFetchRequest() const {
LOG_D("Start pool fetching");
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
}

void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
Expand Down Expand Up @@ -116,20 +115,18 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {

private:
TEvPlaceRequestIntoPool::TPtr Event;
const bool EnableOnServerless;
bool CanCreatePool = false;
bool DefaultPoolCreated = false;
};


class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
public:
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: ReplyActorId(replyActorId)
, Database(database)
, PoolId(poolId)
, UserToken(userToken)
, EnableOnServerless(enableOnServerless)
{}

void DoBootstrap() {
Expand All @@ -144,11 +141,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
}

const auto& result = results[0];
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
return;
}

switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
Expand Down Expand Up @@ -238,7 +230,6 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
const TString Database;
const TString PoolId;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const bool EnableOnServerless;

NResourcePool::TPoolSettings PoolConfig;
NKikimrProto::TPathID PathId;
Expand Down Expand Up @@ -451,18 +442,113 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
TActorId SchemePipeActorId;
};


class TDatabaseFetcherActor : public TSchemeActorBase<TDatabaseFetcherActor> {
public:
TDatabaseFetcherActor(const TActorId& replyActorId, const TString& database)
: ReplyActorId(replyActorId)
, Database(database)
{}

void DoBootstrap() {
Become(&TDatabaseFetcherActor::StateFunc);
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto& results = ev->Get()->Request->ResultSet;
if (results.size() != 1) {
Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected scheme cache response");
return;
}

const auto& result = results[0];
switch (result.Status) {
case EStatus::Unknown:
case EStatus::PathNotTable:
case EStatus::PathNotPath:
case EStatus::RedirectLookupError:
case EStatus::AccessDenied:
case EStatus::RootUnknown:
case EStatus::PathErrorUnknown:
Reply(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database " << Database << " not found or you don't have access permissions");
return;
case EStatus::LookupError:
case EStatus::TableCreationNotComplete:
if (!ScheduleRetry(TStringBuilder() << "Retry error " << result.Status)) {
Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on scheme error: " << result.Status);
}
return;
case EStatus::Ok:
Serverless = result.DomainInfo && result.DomainInfo->IsServerless();
Reply(Ydb::StatusIds::SUCCESS);
return;
}
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
default:
StateFuncBase(ev);
}
}

protected:
void StartRequest() override {
LOG_D("Start database fetching");
auto event = NTableCreator::BuildSchemeCacheNavigateRequest({{}}, Database, nullptr);
event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery);
}

void OnFatalError(Ydb::StatusIds::StatusCode status, NYql::TIssue issue) override {
Reply(status, {std::move(issue)});
}

TString LogPrefix() const override {
return TStringBuilder() << "[TDatabaseFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", ";
}

private:
void Reply(Ydb::StatusIds::StatusCode status, const TString& message) {
Reply(status, {NYql::TIssue(message)});
}

void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
if (status == Ydb::StatusIds::SUCCESS) {
LOG_D("Database info successfully fetched");
} else {
LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString());
}

Issues.AddIssues(std::move(issues));
Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues)));
PassAway();
}

private:
const TActorId ReplyActorId;
const TString Database;

bool Serverless = false;
};

} // anonymous namespace

IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
return new TPoolResolverActor(std::move(event), defaultPoolExists);
}

IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
}

IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {
return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl);
}

IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) {
return new TDatabaseFetcherActor(replyActorId, database);
}

} // NKikimr::NKqp::NWorkload
15 changes: 15 additions & 0 deletions ydb/core/kqp/workload_service/common/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct TEvPrivate {
EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvResolvePoolResponse,
EvFetchPoolResponse,
EvFetchDatabaseResponse,
EvCreatePoolResponse,
EvPrepareTablesRequest,
EvPlaceRequestIntoPoolResponse,
Expand Down Expand Up @@ -85,6 +86,20 @@ struct TEvPrivate {
const NYql::TIssues Issues;
};

struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues)
: Status(status)
, Database(database)
, Serverless(serverless)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const bool Serverless;
const NYql::TIssues Issues;
};

struct TEvCreatePoolResponse : public NActors::TEventLocal<TEvCreatePoolResponse, EvCreatePoolResponse> {
TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
Expand Down
60 changes: 41 additions & 19 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ using namespace NActors;


class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
struct TCounters {
const NMonitoring::TDynamicCounterPtr Counters;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;

TCounters(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
Register();
}

private:
void Register() {
ActivePools = Counters->GetCounter("ActivePools", false);
}
};

enum class ETablesCreationStatus {
Cleanup,
NotStarted,
Expand All @@ -43,9 +60,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
public:
explicit TKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
{
RegisterCounters();
}
{}

void Bootstrap() {
Become(&TKqpWorkloadService::MainState);
Expand All @@ -55,7 +70,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
}), IEventHandle::FlagTrackDelivery);

CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));

EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
Expand Down Expand Up @@ -132,9 +147,9 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return;
}

LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
const TString& database = ev->Get()->Database;
LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
GetOrCreateDatabaseState(database)->DoPlaceRequest(std::move(ev));
}

void Handle(TEvCleanupRequest::TPtr& ev) {
Expand Down Expand Up @@ -177,6 +192,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
hFunc(TEvCleanupRequest, Handle);
hFunc(TEvents::TEvWakeup, Handle);

hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle);
hFunc(TEvPrivate::TEvNodesInfoRequest, Handle);
Expand All @@ -191,11 +207,15 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
)

private:
void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev);
}

void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
const auto& event = ev->Get()->Event;
const TString& database = event->Get()->Database;
if (ev->Get()->DefaultPoolCreated) {
DatabasesWithDefaultPool.insert(CanonizePath(database));
GetOrCreateDatabaseState(database)->HasDefaultPool = true;
}

const TString& poolId = event->Get()->PoolId;
Expand All @@ -211,10 +231,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
TString poolKey = GetPoolKey(database, poolId);
LOG_I("Creating new handler for pool " << poolKey);

auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters));
auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters.Counters));
poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second;

ActivePools->Inc();
Counters.ActivePools->Inc();
ScheduleIdleCheck();
}

Expand Down Expand Up @@ -409,7 +429,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
}
for (const auto& poolKey : poolsToDelete) {
PoolIdToState.erase(poolKey);
ActivePools->Dec();
Counters.ActivePools->Dec();
}

if (!PoolIdToState.empty()) {
Expand Down Expand Up @@ -472,6 +492,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
}

TDatabaseState* GetOrCreateDatabaseState(const TString& database) {
auto databaseIt = DatabaseToState.find(database);
if (databaseIt != DatabaseToState.end()) {
return &databaseIt->second;
}
return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
}

TPoolState* GetPoolState(const TString& database, const TString& poolId) {
return GetPoolState(GetPoolKey(database, poolId));
}
Expand All @@ -492,12 +520,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return "[Service] ";
}

void RegisterCounters() {
ActivePools = Counters->GetCounter("ActivePools", false);
}

private:
NMonitoring::TDynamicCounterPtr Counters;
TCounters Counters;

bool EnabledResourcePools = false;
bool EnabledResourcePoolsOnServerless = false;
Expand All @@ -506,12 +530,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;
std::unordered_set<TString> PendingHandlers;

std::unordered_set<TString> DatabasesWithDefaultPool;
std::unordered_map<TString, TDatabaseState> DatabaseToState;
std::unordered_map<TString, TPoolState> PoolIdToState;
std::unique_ptr<TCpuQuotaManagerState> CpuQuotaManager;
ui32 NodeCount = 0;

NMonitoring::TDynamicCounters::TCounterPtr ActivePools;
};

} // anonymous namespace
Expand Down
Loading

0 comments on commit 9d12242

Please sign in to comment.