Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes #8676

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, result->Get()->Record.GetFatalCode()));
}

void HandleAck(TEvS3Provider::TEvAck::TPtr& ev) {
Expand Down
16 changes: 9 additions & 7 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
message = ErrorText;
}
Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));
FatalCode = StatusFromS3ErrorCode(errorCode);
}

if (ev->Get()->Issues) {
Expand Down Expand Up @@ -1115,15 +1116,15 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
}

NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
FatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;

StartCycleCount = GetCycleCountFast();

try {
if (ReadSpec->Arrow) {
if (ReadSpec->Compression) {
Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression"));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
} else {
try {
if (Url.StartsWith("file://")) {
Expand All @@ -1133,7 +1134,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
} catch (const parquet::ParquetException& ex) {
Issues.AddIssue(TIssue(ex.what()));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
RetryStuff->Cancel();
}
}
Expand All @@ -1149,7 +1150,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
LOG_CORO_D("S3 read ERROR");
} catch (const NDB::Exception& ex) {
Issues.AddIssue(TIssue(ex.message()));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
RetryStuff->Cancel();
}
}
Expand All @@ -1162,15 +1163,15 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
return;
} catch (const std::exception& err) {
Issues.AddIssue(TIssue(err.what()));
fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
RetryStuff->Cancel();
}

CpuTime += GetCpuTimeDelta();

auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), FatalCode));
else
Send(ParentActorId, new TEvS3Provider::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), RetryStuff->SizeLimit));
}
Expand Down Expand Up @@ -1230,6 +1231,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
bool ServerReturnedError = false;
TString ErrorText;
TIssues Issues;
NYql::NDqProto::StatusIds::StatusCode FatalCode;

NActors::TActorId DecompressorActorId;
std::size_t LastOffset = 0;
Expand Down Expand Up @@ -1719,7 +1721,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), result->Get()->Record.GetFatalCode()));
}

void HandleRetry(TEvS3Provider::TEvRetryEventFunc::TPtr& retry) {
Expand Down
7 changes: 5 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MaybeIssues(Nothing())
, FatalCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR)
, UseRuntimeListing(useRuntimeListing)
, ConsumersCount(consumersCount)
, BatchSizeLimit(batchSizeLimit)
Expand Down Expand Up @@ -302,6 +303,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
<< " and exceeds limit = " << FileSizeLimit;
LOG_E("TS3FileQueueActor", errorMessage);
MaybeIssues = TIssues{TIssue{errorMessage}};
FatalCode = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
return false;
}
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
Expand Down Expand Up @@ -377,7 +379,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
LOG_D(
"TS3FileQueueActor",
"HandleGetNextBatchForErrorState Giving away rest of Objects");
Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, ev->Get()->Record.GetTransportMeta()));
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
}

Expand Down Expand Up @@ -558,7 +560,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
if (!MaybeIssues.Defined()) {
SendObjects(consumer, requests.front());
} else {
Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, requests.front()));
Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, requests.front()));
TryFinish(consumer, requests.front().GetSeqNo());
}
requests.pop_front();
Expand Down Expand Up @@ -603,6 +605,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
size_t CurrentDirectoryPathIndex = 0;
THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests;
TMaybe<TIssues> MaybeIssues;
NYql::NDqProto::StatusIds::StatusCode FatalCode;
bool UseRuntimeListing;
ui64 ConsumersCount;
ui64 BatchSizeLimit;
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ struct TEvS3Provider {

TEvObjectPathReadError() = default;

TEvObjectPathReadError(TIssues issues, const NDqProto::TMessageTransportMeta& transportMeta) {
TEvObjectPathReadError(TIssues issues, NYql::NDqProto::StatusIds::StatusCode code, const NDqProto::TMessageTransportMeta& transportMeta) {
NYql::IssuesToMessage(issues, Record.MutableIssues());
Record.MutableTransportMeta()->CopyFrom(transportMeta);
Record.SetFatalCode(code);
}
};

Expand Down
20 changes: 14 additions & 6 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,15 @@ TS3ListObjectV2Response ParseListObjectV2Response(
if (const auto& root = xml.Root(); root.Name() == "Error") {
const auto& code = root.Node("Code", true).Value<TString>();
const auto& message = root.Node("Message", true).Value<TString>();
ythrow yexception() << message << ", error: code: " << code << ", request id: ["
<< requestId << "]";
const auto errorMessage = TStringBuilder{} << message << ", error: code: " << code
<< ", request id: [" << requestId << "]";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
throw yexception() << errorMessage;
} else if (root.Name() != "ListBucketResult") {
ythrow yexception() << "Unexpected response '" << root.Name()
const auto errorMessage = TStringBuilder{} << "Unexpected response '" << root.Name()
<< "' on discovery, request id: [" << requestId << "]";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
throw yexception() << errorMessage;
} else {
const NXml::TNamespacesForXPath nss(
1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
Expand Down Expand Up @@ -313,7 +317,8 @@ class TS3Lister : public IS3Lister {

auto gateway = ctx.GatewayWeak.lock();
if (!gateway) {
ythrow yexception() << "Gateway disappeared";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::SubmitRequestIntoGateway] Gateway disappeared";
throw yexception() << "Gateway disappeared";
}

auto sharedCtx = ctx.SharedCtx;
Expand Down Expand Up @@ -360,7 +365,8 @@ class TS3Lister : public IS3Lister {
static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try {
auto gateway = ctx.GatewayWeak.lock();
if (!gateway) {
ythrow yexception() << "Gateway disappeared";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::OnDiscovery] Gateway disappeared";
throw yexception() << "Gateway disappeared";
}
if (!result.Issues) {
auto xmlString = result.Content.Extract();
Expand Down Expand Up @@ -539,8 +545,10 @@ IS3Lister::TPtr MakeS3Lister(
}

if (!allowLocalFiles) {
ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access "
const auto errorMessage = TStringBuilder{} << "Using local files as DataSource isn't allowed, but trying access "
<< listingRequest.Url;
YQL_CLOG(DEBUG, ProviderS3) << "[IS3Lister::MakeS3Lister] " << errorMessage;
throw yexception() << errorMessage;
}
return std::make_shared<TLocalS3Lister>(listingRequest, delimiter);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/s3/proto/file_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ option cc_enable_arenas = true;
package NYql.NS3.FileQueue;

import "ydb/library/yql/dq/actors/protos/dq_events.proto";
import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";

message TEvUpdateConsumersCount {
Expand All @@ -29,6 +30,7 @@ message TEvObjectPathBatch {

message TEvObjectPathReadError {
repeated Ydb.Issue.IssueMessage Issues = 1;
optional NYql.NDqProto.StatusIds.StatusCode FatalCode = 2;

optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
Expand Down
Loading