Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Sep 17, 2024
1 parent ba1c0f7 commit 182ccdf
Showing 1 changed file with 11 additions and 40 deletions.
51 changes: 11 additions & 40 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
// Don't finalize stats twice.
if (Planner->CompletedCA(taskId, computeActor)) {
ExtraData[computeActor].Swap(state.MutableExtraData());
auto& extraData = ExtraData[computeActor];
extraData.TaskId = taskId;
extraData.Data.Swap(state.MutableExtraData());


Stats->AddComputeActorStats(
computeActor.NodeId(),
Expand Down Expand Up @@ -458,45 +461,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
break;
}

case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}
auto& extraData = ExtraData[computeActor];
extraData.TaskId = taskId;
extraData.Data.Swap(state.MutableExtraData());

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();

if (Planner) {
auto it = Planner->GetPendingComputeActors().find(computeActor);
if (it == Planner->GetPendingComputeActors().end()) {
LOG_W("Got execution state for compute actor: " << computeActor
<< ", task: " << taskId
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
<< ", too early (waiting reply from RM)");

if (Planner && Planner->GetPendingComputeTasks().erase(taskId)) {
LOG_E("Got execution state for compute actor: " << computeActor
<< ", for unknown task: " << state.GetTaskId()
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()));
return;
}
} else {
if (state.HasStats()) {
it->second.Set(state.GetStats());
}
LastStats.emplace_back(std::move(it->second));
Planner->GetPendingComputeActors().erase(it);
YQL_ENSURE(Planner->GetPendingComputeTasks().find(taskId) == Planner->GetPendingComputeTasks().end());
}
}
}
default:
; // ignore all other states.
}

if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
ReplyErrorAndDie(NYql::NDq::DqStatusToYdbStatus(state.GetStatusCode()), state.MutableIssues());
return;
}

static_cast<TDerived*>(this)->CheckExecutionComplete();
Expand Down

0 comments on commit 182ccdf

Please sign in to comment.