From f04a115be07a8b3f47c8c5d3294191b59fb8c476 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Fri, 12 Jul 2024 17:50:35 +0400 Subject: [PATCH] move allocate resources call before actor launch from node service to factory (#6583) --- .../kqp_compute_actor_factory.cpp | 13 ++++- .../compute_actor/kqp_compute_actor_factory.h | 3 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 34 ++++++++++-- ydb/core/kqp/executer_actor/kqp_planner.h | 2 +- .../kqp/node_service/kqp_node_service.cpp | 54 +++++++------------ 5 files changed, 62 insertions(+), 44 deletions(-) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 9f3bb0c6d73d..5f0a48064907 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -114,13 +114,24 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold()); } - TActorId CreateKqpComputeActor(TCreateArgs&& args) { + TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) { NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load(); memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load(); auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks); + NRm::TKqpResourcesRequest resourcesRequest; + resourcesRequest.MemoryPool = args.MemoryPool; + resourcesRequest.ExecutionUnits = 1; + resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit; + + auto rmResult = ResourceManager_->AllocateResources( + args.TxId, args.Task->GetId(), resourcesRequest); + + if (!rmResult) { + return NRm::TKqpRMAllocateResult{rmResult}; + } { ui32 inputChannelsCount = 0; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index e89fcabce098..db9df8830812 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -123,7 +123,8 @@ struct IKqpNodeComputeActorFactory { std::shared_ptr State = nullptr; }; - virtual NActors::TActorId CreateKqpComputeActor(TCreateArgs&& args) = 0; + typedef std::variant TActorStartResult; + virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0; virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0; }; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index b611624d1ff1..c3964ab9f32d 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -37,6 +37,11 @@ std::unique_ptr CheckTaskSize(ui64 TxId, const TIntru return nullptr; } +std::unique_ptr MakeActorStartFailureError(const TActorId& executerId, const TString& reason) { + auto ev = std::make_unique(NYql::NDqProto::StatusIds::OVERLOADED, reason); + return std::make_unique(executerId, executerId, ev.release()); +} + void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) { const auto& task = graph.GetTask(taskId); const auto& stageInfo = graph.GetStageInfo(task.StageId); @@ -337,12 +342,12 @@ const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const { // optimizeProtoForLocalExecution - if we want to execute compute actor locally and don't want to serialize & then deserialize proto message // instead we just give ptr to proto message and after that we swap/copy it -void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { +TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { auto& task = TasksGraph.GetTask(taskId); NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true); NYql::NDq::TComputeRuntimeSettings settings; - task.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + auto startResult = CaFactory_->CreateKqpComputeActor({ .ExecuterId = ExecuterId, .TxId = TxId, .Task = taskDesc, @@ -360,10 +365,19 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) { .RlPath = Nothing() }); + if (const auto* rmResult = std::get_if(&startResult)) { + return rmResult->GetFailReason(); + } + + TActorId* actorId = std::get_if(&startResult); + Y_ABORT_UNLESS(actorId); + task.ComputeActorId = *actorId; + LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId); auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat()); YQL_ENSURE(result.second); + return TString(); } ui32 TKqpPlanner::GetnScanTasks() { @@ -401,7 +415,10 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // on datashard tx. if (LocalComputeTasks) { for (ui64 taskId : ComputeTasks) { - ExecuteDataComputeTask(taskId, ComputeTasks.size()); + auto result = ExecuteDataComputeTask(taskId, ComputeTasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } } ComputeTasks.clear(); } @@ -411,7 +428,10 @@ std::unique_ptr TKqpPlanner::PlanExecution() { // to execute this task locally so we can avoid useless overhead for remote task launching. for (auto& [shardId, tasks]: TasksPerNode) { for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, tasks.size()); + auto result = ExecuteDataComputeTask(taskId, tasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } } } @@ -437,7 +457,11 @@ std::unique_ptr TKqpPlanner::PlanExecution() { if (tasksOnNodeIt != TasksPerNode.end()) { auto& tasks = tasksOnNodeIt->second; for (ui64 taskId: tasks) { - ExecuteDataComputeTask(taskId, tasks.size()); + auto result = ExecuteDataComputeTask(taskId, tasks.size()); + if (!result.empty()) { + return MakeActorStartFailureError(ExecuterId, result); + } + PendingComputeTasks.erase(taskId); } } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 574827bf917b..08d3a5b47a58 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -85,7 +85,7 @@ class TKqpPlanner { private: const IKqpGateway::TKqpSnapshot& GetSnapshot() const; - void ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize); + TString ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize); void PrepareToProcess(); TString GetEstimationsInfo() const; diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 0ed46e259b8f..d06d8d5f4fb0 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -164,35 +164,6 @@ class TKqpNodeService : public TActorBootstrapped { memoryPool = NRm::EKqpMemoryPool::Unspecified; } - TVector allocatedTasks; - allocatedTasks.reserve(msg.GetTasks().size()); - for (auto& task : *msg.MutableTasks()) { - NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[task.GetId()]; - YQL_ENSURE(taskCtx.TaskId == 0); - taskCtx.TaskId = task.GetId(); - - NRm::TKqpResourcesRequest resourcesRequest; - resourcesRequest.MemoryPool = memoryPool; - resourcesRequest.ExecutionUnits = 1; - - // !!!!!!!!!!!!!!!!!!!!! - // we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing. - resourcesRequest.Memory = (1 << 19) /* 512kb limit for check that memory exists for processing with minimal requirements */; - - auto result = ResourceManager_->AllocateResources(txId, task.GetId(), resourcesRequest); - - if (!result) { - for (ui64 taskId : allocatedTasks) { - ResourceManager_->FreeResources(txId, taskId); - } - - ReplyError(txId, request.Executer, msg, result.GetStatus(), result.GetFailReason()); - return; - } - - allocatedTasks.push_back(task.GetId()); - } - auto reply = MakeHolder(); reply->Record.SetTxId(txId); @@ -213,13 +184,8 @@ class TKqpNodeService : public TActorBootstrapped { } const ui32 tasksCount = msg.GetTasks().size(); - for (int i = 0; i < msg.GetTasks().size(); ++i) { - auto& dqTask = *msg.MutableTasks(i); - auto& taskCtx = request.InFlyTasks[dqTask.GetId()]; - taskCtx.TaskId = dqTask.GetId(); - YQL_ENSURE(taskCtx.TaskId != 0); - - taskCtx.ComputeActorId = CaFactory_->CreateKqpComputeActor({ + for (auto& dqTask: *msg.MutableTasks()) { + auto result = CaFactory_->CreateKqpComputeActor({ .ExecuterId = request.Executer, .TxId = txId, .Task = &dqTask, @@ -239,6 +205,22 @@ class TKqpNodeService : public TActorBootstrapped { .State = State_ }); + if (const auto* rmResult = std::get_if(&result)) { + ReplyError(txId, request.Executer, msg, rmResult->GetStatus(), rmResult->GetFailReason()); + bucket.NewRequest(std::move(request)); + TerminateTx(txId, rmResult->GetFailReason()); + return; + } + + auto& taskCtx = request.InFlyTasks[dqTask.GetId()]; + YQL_ENSURE(taskCtx.TaskId == 0); + taskCtx.TaskId = dqTask.GetId(); + YQL_ENSURE(taskCtx.TaskId != 0); + + TActorId* actorId = std::get_if(&result); + Y_ABORT_UNLESS(actorId); + taskCtx.ComputeActorId = *actorId; + LOG_D("TxId: " << txId << ", executing task: " << taskCtx.TaskId << " on compute actor: " << taskCtx.ComputeActorId); auto* startedTask = reply->Record.AddStartedTasks();