Skip to content

Commit

Permalink
Apply the last stats received from terminated CAs (ydb-platform#8356)
Browse files Browse the repository at this point in the history
  • Loading branch information
abyss7 authored and stanislav-shchetinin committed Aug 30, 2024
1 parent 103793b commit ea8cb6b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 34 deletions.
18 changes: 6 additions & 12 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
HandleComputeStats(ev);
HandleComputeState(ev);
}

void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
Expand Down Expand Up @@ -1015,7 +1015,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
Expand Down Expand Up @@ -2646,6 +2646,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
Expand Down Expand Up @@ -2681,17 +2682,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
YQL_ENSURE(Planner);

TActorId actor = ev->Sender;
ui64 taskId = ev->Get()->Record.GetTaskId();

Planner->CompletedCA(taskId, actor);
HandleComputeStats(ev);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

Expand Down
57 changes: 39 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
}

void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
Expand Down Expand Up @@ -409,7 +409,39 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

YQL_ENSURE(Planner);
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
// Don't finalize stats twice.
if (Planner->CompletedCA(taskId, computeActor)) {
ExtraData[computeActor].Swap(state.MutableExtraData());

if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
}
default:
; // ignore all other states.
}

return ack;
}

void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();

bool populateChannels = HandleComputeStats(ev);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
Expand All @@ -427,22 +459,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
break;
}

case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
ExtraData[computeActor].Swap(state.MutableExtraData());
if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
YQL_ENSURE(Planner);
Planner->CompletedCA(taskId, computeActor);
}
default:
; // ignore all other states.
}

if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
Expand Down Expand Up @@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);

// Actualize stats with the last stats from terminated CAs, but keep the status.
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
this->Send(Target, ResponseEv.release());

for (auto channelPair: ResultChannelProxies) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
return false;
}

void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
auto& task = TasksGraph.GetTask(taskId);
if (task.Meta.Completed) {
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
return;
return false;
}

task.Meta.Completed = true;
Expand All @@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
PendingComputeActors.erase(it);

LOG_I("Compute actor has finished execution: " << computeActor.ToString());

return true;
}

void TKqpPlanner::TaskNotStarted(ui64 taskId) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TKqpPlanner {
std::unique_ptr<IEventHandle> PlanExecution();
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
void CompletedCA(ui64 taskId, TActorId computeActor);
bool CompletedCA(ui64 taskId, TActorId computeActor);
void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
Expand Down

0 comments on commit ea8cb6b

Please sign in to comment.