diff --git a/ydb/core/kqp/workload_service/actors/actors.h b/ydb/core/kqp/workload_service/actors/actors.h index 770867a58f1c..c575842faf78 100644 --- a/ydb/core/kqp/workload_service/actors/actors.h +++ b/ydb/core/kqp/workload_service/actors/actors.h @@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload { NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); // Fetch pool and create default pool if needed -NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless); +NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists); // Fetch and create pool in scheme shard -NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless); +NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken); NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); +// Checks that database is serverless +NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database); + // Cpu load fetcher actor NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId); diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 55b2cd3085d7..7647bc4f348d 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -22,9 +22,8 @@ using namespace NActors; class TPoolResolverActor : public TActorBootstrapped { public: - TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) + TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) : Event(std::move(event)) - , EnableOnServerless(enableOnServerless) { if (!Event->Get()->PoolId) { Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID; @@ -39,7 +38,7 @@ class TPoolResolverActor : public TActorBootstrapped { void StartPoolFetchRequest() const { LOG_D("Start pool fetching"); - Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless)); + Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken)); } void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { @@ -116,7 +115,6 @@ class TPoolResolverActor : public TActorBootstrapped { private: TEvPlaceRequestIntoPool::TPtr Event; - const bool EnableOnServerless; bool CanCreatePool = false; bool DefaultPoolCreated = false; }; @@ -124,12 +122,11 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) , UserToken(userToken) - , EnableOnServerless(enableOnServerless) {} void DoBootstrap() { @@ -144,11 +141,6 @@ class TPoolFetcherActor : public TSchemeActorBase { } const auto& result = results[0]; - if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) { - Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it"); - return; - } - switch (result.Status) { case EStatus::Unknown: case EStatus::PathNotTable: @@ -238,7 +230,6 @@ class TPoolFetcherActor : public TSchemeActorBase { const TString Database; const TString PoolId; const TIntrusiveConstPtr UserToken; - const bool EnableOnServerless; NResourcePool::TPoolSettings PoolConfig; NKikimrProto::TPathID PathId; @@ -451,18 +442,113 @@ class TPoolCreatorActor : public TSchemeActorBase { TActorId SchemePipeActorId; }; + +class TDatabaseFetcherActor : public TSchemeActorBase { +public: + TDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) + : ReplyActorId(replyActorId) + , Database(database) + {} + + void DoBootstrap() { + Become(&TDatabaseFetcherActor::StateFunc); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& results = ev->Get()->Request->ResultSet; + if (results.size() != 1) { + Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected scheme cache response"); + return; + } + + const auto& result = results[0]; + switch (result.Status) { + case EStatus::Unknown: + case EStatus::PathNotTable: + case EStatus::PathNotPath: + case EStatus::RedirectLookupError: + case EStatus::AccessDenied: + case EStatus::RootUnknown: + case EStatus::PathErrorUnknown: + Reply(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database " << Database << " not found or you don't have access permissions"); + return; + case EStatus::LookupError: + case EStatus::TableCreationNotComplete: + if (!ScheduleRetry(TStringBuilder() << "Retry error " << result.Status)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on scheme error: " << result.Status); + } + return; + case EStatus::Ok: + Serverless = result.DomainInfo && result.DomainInfo->IsServerless(); + Reply(Ydb::StatusIds::SUCCESS); + return; + } + } + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + StateFuncBase(ev); + } + } + +protected: + void StartRequest() override { + LOG_D("Start database fetching"); + auto event = NTableCreator::BuildSchemeCacheNavigateRequest({{}}, Database, nullptr); + event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery); + } + + void OnFatalError(Ydb::StatusIds::StatusCode status, NYql::TIssue issue) override { + Reply(status, {std::move(issue)}); + } + + TString LogPrefix() const override { + return TStringBuilder() << "[TDatabaseFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", "; + } + +private: + void Reply(Ydb::StatusIds::StatusCode status, const TString& message) { + Reply(status, {NYql::TIssue(message)}); + } + + void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + if (status == Ydb::StatusIds::SUCCESS) { + LOG_D("Database info successfully fetched"); + } else { + LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString()); + } + + Issues.AddIssues(std::move(issues)); + Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues))); + PassAway(); + } + +private: + const TActorId ReplyActorId; + const TString Database; + + bool Serverless = false; +}; + } // anonymous namespace -IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) { - return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless); +IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) { + return new TPoolResolverActor(std::move(event), defaultPoolExists); } -IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) { - return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless); +IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) { + return new TPoolFetcherActor(replyActorId, database, poolId, userToken); } IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) { return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl); } +IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) { + return new TDatabaseFetcherActor(replyActorId, database); +} + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index c32f4cd4f4d5..a0db39a644b5 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -22,6 +22,7 @@ struct TEvPrivate { EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvResolvePoolResponse, EvFetchPoolResponse, + EvFetchDatabaseResponse, EvCreatePoolResponse, EvPrepareTablesRequest, EvPlaceRequestIntoPoolResponse, @@ -85,6 +86,20 @@ struct TEvPrivate { const NYql::TIssues Issues; }; + struct TEvFetchDatabaseResponse : public NActors::TEventLocal { + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues) + : Status(status) + , Database(database) + , Serverless(serverless) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const TString Database; + const bool Serverless; + const NYql::TIssues Issues; + }; + struct TEvCreatePoolResponse : public NActors::TEventLocal { TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 66a6aaaaf64b..38498b498d96 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -27,6 +27,23 @@ using namespace NActors; class TKqpWorkloadService : public TActorBootstrapped { + struct TCounters { + const NMonitoring::TDynamicCounterPtr Counters; + + NMonitoring::TDynamicCounters::TCounterPtr ActivePools; + + TCounters(NMonitoring::TDynamicCounterPtr counters) + : Counters(counters) + { + Register(); + } + + private: + void Register() { + ActivePools = Counters->GetCounter("ActivePools", false); + } + }; + enum class ETablesCreationStatus { Cleanup, NotStarted, @@ -43,9 +60,7 @@ class TKqpWorkloadService : public TActorBootstrapped { public: explicit TKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters) : Counters(counters) - { - RegisterCounters(); - } + {} void Bootstrap() { Become(&TKqpWorkloadService::MainState); @@ -55,7 +70,7 @@ class TKqpWorkloadService : public TActorBootstrapped { (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem }), IEventHandle::FlagTrackDelivery); - CpuQuotaManager = std::make_unique(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); + CpuQuotaManager = std::make_unique(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools(); EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless(); @@ -132,9 +147,9 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); - bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); - Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); + const TString& database = ev->Get()->Database; + LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); + GetOrCreateDatabaseState(database)->DoPlaceRequest(std::move(ev)); } void Handle(TEvCleanupRequest::TPtr& ev) { @@ -177,6 +192,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvCleanupRequest, Handle); hFunc(TEvents::TEvWakeup, Handle); + hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); hFunc(TEvPrivate::TEvNodesInfoRequest, Handle); @@ -191,11 +207,15 @@ class TKqpWorkloadService : public TActorBootstrapped { ) private: + void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); + } + void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { const auto& event = ev->Get()->Event; const TString& database = event->Get()->Database; if (ev->Get()->DefaultPoolCreated) { - DatabasesWithDefaultPool.insert(CanonizePath(database)); + GetOrCreateDatabaseState(database)->HasDefaultPool = true; } const TString& poolId = event->Get()->PoolId; @@ -211,10 +231,10 @@ class TKqpWorkloadService : public TActorBootstrapped { TString poolKey = GetPoolKey(database, poolId); LOG_I("Creating new handler for pool " << poolKey); - auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters)); + auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters.Counters)); poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; - ActivePools->Inc(); + Counters.ActivePools->Inc(); ScheduleIdleCheck(); } @@ -409,7 +429,7 @@ class TKqpWorkloadService : public TActorBootstrapped { } for (const auto& poolKey : poolsToDelete) { PoolIdToState.erase(poolKey); - ActivePools->Dec(); + Counters.ActivePools->Dec(); } if (!PoolIdToState.empty()) { @@ -472,6 +492,14 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } + TDatabaseState* GetOrCreateDatabaseState(const TString& database) { + auto databaseIt = DatabaseToState.find(database); + if (databaseIt != DatabaseToState.end()) { + return &databaseIt->second; + } + return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; + } + TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } @@ -492,12 +520,8 @@ class TKqpWorkloadService : public TActorBootstrapped { return "[Service] "; } - void RegisterCounters() { - ActivePools = Counters->GetCounter("ActivePools", false); - } - private: - NMonitoring::TDynamicCounterPtr Counters; + TCounters Counters; bool EnabledResourcePools = false; bool EnabledResourcePoolsOnServerless = false; @@ -506,12 +530,10 @@ class TKqpWorkloadService : public TActorBootstrapped { ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup; std::unordered_set PendingHandlers; - std::unordered_set DatabasesWithDefaultPool; + std::unordered_map DatabaseToState; std::unordered_map PoolIdToState; std::unique_ptr CpuQuotaManager; ui32 NodeCount = 0; - - NMonitoring::TDynamicCounters::TCounterPtr ActivePools; }; } // anonymous namespace diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 9ee91f077720..8503a4fb7949 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -2,14 +2,70 @@ #include +#include #include #include +#include namespace NKikimr::NKqp::NWorkload { constexpr TDuration IDLE_DURATION = TDuration::Seconds(60); + +struct TDatabaseState { + NActors::TActorContext ActorContext; + bool& EnabledResourcePoolsOnServerless; + + std::vector PendingRequersts = {}; + bool HasDefaultPool = false; + bool Serverless = false; + + TInstant LastUpdateTime = TInstant::Zero(); + + void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) { + TString database = ev->Get()->Database; + PendingRequersts.emplace_back(std::move(ev)); + + if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) { + ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, database)); + } else { + StartPendingRequests(); + } + } + + void UpdateDatabaseInfo(const TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + ReplyContinueError(ev->Get()->Status, GroupIssues(ev->Get()->Issues, "Failed to fetch database info")); + return; + } + + LastUpdateTime = TInstant::Now(); + Serverless = ev->Get()->Serverless; + StartPendingRequests(); + } + +private: + void StartPendingRequests() { + if (!EnabledResourcePoolsOnServerless && Serverless) { + ReplyContinueError(Ydb::StatusIds::UNSUPPORTED, {NYql::TIssue("Resource pools are disabled for serverless domains. Please contact your system administrator to enable it")}); + return; + } + + for (auto& ev : PendingRequersts) { + ActorContext.Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool)); + } + PendingRequersts.clear(); + } + + void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { + for (const auto& ev : PendingRequersts) { + ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues)); + } + PendingRequersts.clear(); + } +}; + struct TPoolState { NActors::TActorId PoolHandler; NActors::TActorContext ActorContext; diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 271d7accbbfd..8d6880d3eb58 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -18,7 +18,7 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto userToken = MakeIntrusive(userSID, TVector{}); userToken->SaveSerializationInfo(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); }