Skip to content

Commit

Permalink
Handle NOT Ready GetOperationResponse for Scripts (ydb-platform#1479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored and EgorkaZ committed Apr 5, 2024
1 parent 0e6b83f commit ac466f7
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/compute/ydb/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ struct TEvYdbCompute {
};

struct TEvGetOperationResponse : public NActors::TEventLocal<TEvGetOperationResponse, EvGetOperationResponse> {
TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status)
TEvGetOperationResponse(NYql::TIssues issues, NYdb::EStatus status, bool ready)
: Issues(std::move(issues))
, Status(status)
, Ready(ready)
{}

TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues)
Expand All @@ -129,6 +130,7 @@ struct TEvYdbCompute {
, QueryStats(queryStats)
, Issues(std::move(issues))
, Status(NYdb::EStatus::SUCCESS)
, Ready(true)
{}

NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
Expand All @@ -137,6 +139,7 @@ struct TEvYdbCompute {
Ydb::TableStats::QueryStats QueryStats;
NYql::TIssues Issues;
NYdb::EStatus Status;
bool Ready;
};

struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();

if (response.Status == NYdb::EStatus::SUCCESS && !response.Ready) {
LOG_D("GetOperation IS NOT READY, repeating");
SendGetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
return;
}

if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
LOG_I("Operation has been already removed");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(response.Issues, response.Status, ExecStatus, ComputeStatus));
Expand Down
55 changes: 38 additions & 17 deletions ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvExecuteScriptResponse>(
database,
response.Status().GetIssues(),
response.Status().GetStatus(), database),
response.Status().GetStatus()),
0, cookie);
}
} catch (...) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvExecuteScriptResponse>(
database,
CurrentExceptionMessage(),
NYdb::EStatus::GENERIC_ERROR, database),
NYdb::EStatus::GENERIC_ERROR),
0, cookie);
}
});
Expand All @@ -89,22 +91,35 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie, database = ComputeConnection.database()](auto future) {
try {
auto response = future.ExtractValueSync();
if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) {
if (!response.Ready()) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
database,
response.Status().GetIssues(),
response.Status().GetStatus(),
false),
0, cookie);
} else if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) {
actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, static_cast<Ydb::StatusIds::StatusCode>(response.Status().GetStatus()), response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, RemoveDatabaseFromIssues(response.Status().GetIssues(), database)), 0, cookie);
} else {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
database,
response.Status().GetIssues(),
response.Status().GetStatus(), database),
response.Status().GetStatus(),
true),
0, cookie);
}
} catch (...) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvGetOperationResponse>(
database,
CurrentExceptionMessage(),
NYdb::EStatus::GENERIC_ERROR, database),
NYdb::EStatus::GENERIC_ERROR,
true),
0, cookie);
}
});
Expand All @@ -124,16 +139,18 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvFetchScriptResultResponse>(
database,
response.GetIssues(),
response.GetStatus(), database),
response.GetStatus()),
0, cookie);
}
} catch (...) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvFetchScriptResultResponse>(
database,
CurrentExceptionMessage(),
NYdb::EStatus::GENERIC_ERROR, database),
NYdb::EStatus::GENERIC_ERROR),
0, cookie);
}
});
Expand All @@ -148,15 +165,17 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvCancelOperationResponse>(
database,
response.GetIssues(),
response.GetStatus(), database),
response.GetStatus()),
0, cookie);
} catch (...) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvCancelOperationResponse>(
database,
CurrentExceptionMessage(),
NYdb::EStatus::GENERIC_ERROR, database),
NYdb::EStatus::GENERIC_ERROR),
0, cookie);
}
});
Expand All @@ -171,28 +190,30 @@ class TYdbConnectorActor : public NActors::TActorBootstrapped<TYdbConnectorActor
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvForgetOperationResponse>(
database,
response.GetIssues(),
response.GetStatus(), database),
response.GetStatus()),
0, cookie);
} catch (...) {
actorSystem->Send(
recipient,
MakeResponse<TEvYdbCompute::TEvForgetOperationResponse>(
database,
CurrentExceptionMessage(),
NYdb::EStatus::GENERIC_ERROR, database),
NYdb::EStatus::GENERIC_ERROR),
0, cookie);
}
});
}

template<typename TResponse>
static TResponse* MakeResponse(TString msg, NYdb::EStatus status, TString databasePath) {
return new TResponse(NYql::TIssues{NYql::TIssue{RemoveDatabaseFromStr(msg, databasePath)}}, status);
template<typename TResponse, typename... TArgs>
static TResponse* MakeResponse(TString databasePath, TString msg, TArgs&&... args) {
return new TResponse(NYql::TIssues{NYql::TIssue{RemoveDatabaseFromStr(msg, databasePath)}}, std::forward<TArgs>(args)...);
}

template<typename TResponse>
static TResponse* MakeResponse(const NYql::TIssues& issues, NYdb::EStatus status, TString databasePath) {
return new TResponse(RemoveDatabaseFromIssues(issues, databasePath), status);
template<typename TResponse, typename... TArgs>
static TResponse* MakeResponse(TString databasePath, const NYql::TIssues& issues, TArgs&&... args) {
return new TResponse(RemoveDatabaseFromIssues(issues, databasePath), std::forward<TArgs>(args)...);
}

private:
Expand Down

0 comments on commit ac466f7

Please sign in to comment.