Skip to content

Commit

Permalink
Merge d5afc6b into 4d425b0
Browse files Browse the repository at this point in the history
  • Loading branch information
abyss7 authored Aug 28, 2024
2 parents 4d425b0 + d5afc6b commit 48dbf41
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 34 deletions.
18 changes: 6 additions & 12 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
HandleComputeStats(ev);
HandleComputeState(ev);
}

void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
Expand Down Expand Up @@ -1009,7 +1009,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
Expand Down Expand Up @@ -2608,6 +2608,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
Expand Down Expand Up @@ -2643,17 +2644,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
YQL_ENSURE(Planner);

TActorId actor = ev->Sender;
ui64 taskId = ev->Get()->Record.GetTaskId();

Planner->CompletedCA(taskId, actor);
HandleComputeStats(ev);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

Expand Down
58 changes: 40 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
}

void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
Expand Down Expand Up @@ -409,7 +409,40 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

YQL_ENSURE(Planner);
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);

// Don't finalize stats twice.
if (Planner->CompletedCA(taskId, computeActor)) {
switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
ExtraData[computeActor].Swap(state.MutableExtraData());

if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
}
default:
; // ignore all other states.
}
}

return ack;
}

void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();

bool populateChannels = HandleComputeStats(ev);

switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
Expand All @@ -427,22 +460,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
break;
}

case NYql::NDqProto::COMPUTE_STATE_FAILURE:
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
ExtraData[computeActor].Swap(state.MutableExtraData());
if (Stats) {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);
}

LastTaskId = taskId;
LastComputeActorId = computeActor.ToString();
YQL_ENSURE(Planner);
Planner->CompletedCA(taskId, computeActor);
}
default:
; // ignore all other states.
}

if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
Expand Down Expand Up @@ -1854,6 +1873,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);

// Actualize stats with the last stats from terminated CAs, but keep the status.
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
this->Send(Target, ResponseEv.release());

for (auto channelPair: ResultChannelProxies) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
return false;
}

void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
auto& task = TasksGraph.GetTask(taskId);
if (task.Meta.Completed) {
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
return;
return false;
}

task.Meta.Completed = true;
Expand All @@ -565,6 +565,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
PendingComputeActors.erase(it);

LOG_I("Compute actor has finished execution: " << computeActor.ToString());

return true;
}

void TKqpPlanner::TaskNotStarted(ui64 taskId) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TKqpPlanner {
std::unique_ptr<IEventHandle> PlanExecution();
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
void CompletedCA(ui64 taskId, TActorId computeActor);
bool CompletedCA(ui64 taskId, TActorId computeActor);
void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
Expand Down

0 comments on commit 48dbf41

Please sign in to comment.