Skip to content

Commit

Permalink
move allocate resources call before actor launch from node service to…
Browse files Browse the repository at this point in the history
… factory (#6583)
  • Loading branch information
gridnevvvit authored Jul 12, 2024
1 parent 91c2833 commit f04a115
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 44 deletions.
13 changes: 12 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,24 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
}

TActorId CreateKqpComputeActor(TCreateArgs&& args) {
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) {
NYql::NDq::TComputeMemoryLimits memoryLimits;
memoryLimits.ChannelBufferSize = 0;
memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load();
memoryLimits.MkqlHeavyProgramMemoryLimit = MkqlHeavyProgramMemoryLimit.load();

auto estimation = ResourceManager_->EstimateTaskResources(*args.Task, args.NumberOfTasks);
NRm::TKqpResourcesRequest resourcesRequest;
resourcesRequest.MemoryPool = args.MemoryPool;
resourcesRequest.ExecutionUnits = 1;
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;

auto rmResult = ResourceManager_->AllocateResources(
args.TxId, args.Task->GetId(), resourcesRequest);

if (!rmResult) {
return NRm::TKqpRMAllocateResult{rmResult};
}

{
ui32 inputChannelsCount = 0;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ struct IKqpNodeComputeActorFactory {
std::shared_ptr<IKqpNodeState> State = nullptr;
};

virtual NActors::TActorId CreateKqpComputeActor(TCreateArgs&& args) = 0;
typedef std::variant<TActorId, NKikimr::NKqp::NRm::TKqpRMAllocateResult> TActorStartResult;
virtual TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) = 0;

virtual void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) = 0;
};
Expand Down
34 changes: 29 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TIntru
return nullptr;
}

std::unique_ptr<IEventHandle> MakeActorStartFailureError(const TActorId& executerId, const TString& reason) {
auto ev = std::make_unique<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::OVERLOADED, reason);
return std::make_unique<IEventHandle>(executerId, executerId, ev.release());
}

void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskResourceEstimation& ret) {
const auto& task = graph.GetTask(taskId);
const auto& stageInfo = graph.GetStageInfo(task.StageId);
Expand Down Expand Up @@ -337,12 +342,12 @@ const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {

// optimizeProtoForLocalExecution - if we want to execute compute actor locally and don't want to serialize & then deserialize proto message
// instead we just give ptr to proto message and after that we swap/copy it
void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) {
TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) {
auto& task = TasksGraph.GetTask(taskId);
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
NYql::NDq::TComputeRuntimeSettings settings;

task.ComputeActorId = CaFactory_->CreateKqpComputeActor({
auto startResult = CaFactory_->CreateKqpComputeActor({
.ExecuterId = ExecuterId,
.TxId = TxId,
.Task = taskDesc,
Expand All @@ -360,10 +365,19 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize) {
.RlPath = Nothing()
});

if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {
return rmResult->GetFailReason();
}

TActorId* actorId = std::get_if<TActorId>(&startResult);
Y_ABORT_UNLESS(actorId);
task.ComputeActorId = *actorId;

LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);

auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
YQL_ENSURE(result.second);
return TString();
}

ui32 TKqpPlanner::GetnScanTasks() {
Expand Down Expand Up @@ -401,7 +415,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
// on datashard tx.
if (LocalComputeTasks) {
for (ui64 taskId : ComputeTasks) {
ExecuteDataComputeTask(taskId, ComputeTasks.size());
auto result = ExecuteDataComputeTask(taskId, ComputeTasks.size());
if (!result.empty()) {
return MakeActorStartFailureError(ExecuterId, result);
}
}
ComputeTasks.clear();
}
Expand All @@ -411,7 +428,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
// to execute this task locally so we can avoid useless overhead for remote task launching.
for (auto& [shardId, tasks]: TasksPerNode) {
for (ui64 taskId: tasks) {
ExecuteDataComputeTask(taskId, tasks.size());
auto result = ExecuteDataComputeTask(taskId, tasks.size());
if (!result.empty()) {
return MakeActorStartFailureError(ExecuterId, result);
}
}
}

Expand All @@ -437,7 +457,11 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
if (tasksOnNodeIt != TasksPerNode.end()) {
auto& tasks = tasksOnNodeIt->second;
for (ui64 taskId: tasks) {
ExecuteDataComputeTask(taskId, tasks.size());
auto result = ExecuteDataComputeTask(taskId, tasks.size());
if (!result.empty()) {
return MakeActorStartFailureError(ExecuterId, result);
}

PendingComputeTasks.erase(taskId);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TKqpPlanner {
private:

const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
void ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize);
TString ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize);
void PrepareToProcess();
TString GetEstimationsInfo() const;

Expand Down
54 changes: 18 additions & 36 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,35 +164,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
memoryPool = NRm::EKqpMemoryPool::Unspecified;
}

TVector<ui64> allocatedTasks;
allocatedTasks.reserve(msg.GetTasks().size());
for (auto& task : *msg.MutableTasks()) {
NKqpNode::TTaskContext& taskCtx = request.InFlyTasks[task.GetId()];
YQL_ENSURE(taskCtx.TaskId == 0);
taskCtx.TaskId = task.GetId();

NRm::TKqpResourcesRequest resourcesRequest;
resourcesRequest.MemoryPool = memoryPool;
resourcesRequest.ExecutionUnits = 1;

// !!!!!!!!!!!!!!!!!!!!!
// we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing.
resourcesRequest.Memory = (1 << 19) /* 512kb limit for check that memory exists for processing with minimal requirements */;

auto result = ResourceManager_->AllocateResources(txId, task.GetId(), resourcesRequest);

if (!result) {
for (ui64 taskId : allocatedTasks) {
ResourceManager_->FreeResources(txId, taskId);
}

ReplyError(txId, request.Executer, msg, result.GetStatus(), result.GetFailReason());
return;
}

allocatedTasks.push_back(task.GetId());
}

auto reply = MakeHolder<TEvKqpNode::TEvStartKqpTasksResponse>();
reply->Record.SetTxId(txId);

Expand All @@ -213,13 +184,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
}

const ui32 tasksCount = msg.GetTasks().size();
for (int i = 0; i < msg.GetTasks().size(); ++i) {
auto& dqTask = *msg.MutableTasks(i);
auto& taskCtx = request.InFlyTasks[dqTask.GetId()];
taskCtx.TaskId = dqTask.GetId();
YQL_ENSURE(taskCtx.TaskId != 0);

taskCtx.ComputeActorId = CaFactory_->CreateKqpComputeActor({
for (auto& dqTask: *msg.MutableTasks()) {
auto result = CaFactory_->CreateKqpComputeActor({
.ExecuterId = request.Executer,
.TxId = txId,
.Task = &dqTask,
Expand All @@ -239,6 +205,22 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
.State = State_
});

if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {
ReplyError(txId, request.Executer, msg, rmResult->GetStatus(), rmResult->GetFailReason());
bucket.NewRequest(std::move(request));
TerminateTx(txId, rmResult->GetFailReason());
return;
}

auto& taskCtx = request.InFlyTasks[dqTask.GetId()];
YQL_ENSURE(taskCtx.TaskId == 0);
taskCtx.TaskId = dqTask.GetId();
YQL_ENSURE(taskCtx.TaskId != 0);

TActorId* actorId = std::get_if<TActorId>(&result);
Y_ABORT_UNLESS(actorId);
taskCtx.ComputeActorId = *actorId;

LOG_D("TxId: " << txId << ", executing task: " << taskCtx.TaskId << " on compute actor: " << taskCtx.ComputeActorId);

auto* startedTask = reply->Record.AddStartedTasks();
Expand Down

0 comments on commit f04a115

Please sign in to comment.