Skip to content

Commit

Permalink
Bugfixes (#8676)
Browse files Browse the repository at this point in the history
  • Loading branch information
evanevanevanevannnn authored Sep 6, 2024
1 parent 18a91ed commit 3acd725
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, result->Get()->Record.GetFatalCode()));
}

void HandleAck(TEvS3Provider::TEvAck::TPtr& ev) {
Expand Down
16 changes: 9 additions & 7 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
message = ErrorText;
}
Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));
FatalCode = StatusFromS3ErrorCode(errorCode);
}

if (ev->Get()->Issues) {
Expand Down Expand Up @@ -1115,15 +1116,15 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
}

NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
FatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;

StartCycleCount = GetCycleCountFast();

try {
if (ReadSpec->Arrow) {
if (ReadSpec->Compression) {
Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression"));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
} else {
try {
if (Url.StartsWith("file://")) {
Expand All @@ -1133,7 +1134,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
} catch (const parquet::ParquetException& ex) {
Issues.AddIssue(TIssue(ex.what()));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
RetryStuff->Cancel();
}
}
Expand All @@ -1149,7 +1150,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
LOG_CORO_D("S3 read ERROR");
} catch (const NDB::Exception& ex) {
Issues.AddIssue(TIssue(ex.message()));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
RetryStuff->Cancel();
}
}
Expand All @@ -1162,15 +1163,15 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
return;
} catch (const std::exception& err) {
Issues.AddIssue(TIssue(err.what()));
fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
RetryStuff->Cancel();
}

CpuTime += GetCpuTimeDelta();

auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), FatalCode));
else
Send(ParentActorId, new TEvS3Provider::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), RetryStuff->SizeLimit));
}
Expand Down Expand Up @@ -1230,6 +1231,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
bool ServerReturnedError = false;
TString ErrorText;
TIssues Issues;
NYql::NDqProto::StatusIds::StatusCode FatalCode;

NActors::TActorId DecompressorActorId;
std::size_t LastOffset = 0;
Expand Down Expand Up @@ -1719,7 +1721,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
IssuesFromMessage(result->Get()->Record.GetIssues(), issues);
LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString());
issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), result->Get()->Record.GetFatalCode()));
}

void HandleRetry(TEvS3Provider::TEvRetryEventFunc::TPtr& retry) {
Expand Down
7 changes: 5 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MaybeIssues(Nothing())
, FatalCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR)
, UseRuntimeListing(useRuntimeListing)
, ConsumersCount(consumersCount)
, BatchSizeLimit(batchSizeLimit)
Expand Down Expand Up @@ -302,6 +303,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
<< " and exceeds limit = " << FileSizeLimit;
LOG_E("TS3FileQueueActor", errorMessage);
MaybeIssues = TIssues{TIssue{errorMessage}};
FatalCode = NYql::NDqProto::StatusIds::PRECONDITION_FAILED;
return false;
}
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
Expand Down Expand Up @@ -377,7 +379,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
LOG_D(
"TS3FileQueueActor",
"HandleGetNextBatchForErrorState Giving away rest of Objects");
Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, ev->Get()->Record.GetTransportMeta()));
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
}

Expand Down Expand Up @@ -558,7 +560,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
if (!MaybeIssues.Defined()) {
SendObjects(consumer, requests.front());
} else {
Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, requests.front()));
Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, requests.front()));
TryFinish(consumer, requests.front().GetSeqNo());
}
requests.pop_front();
Expand Down Expand Up @@ -603,6 +605,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
size_t CurrentDirectoryPathIndex = 0;
THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests;
TMaybe<TIssues> MaybeIssues;
NYql::NDqProto::StatusIds::StatusCode FatalCode;
bool UseRuntimeListing;
ui64 ConsumersCount;
ui64 BatchSizeLimit;
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ struct TEvS3Provider {

TEvObjectPathReadError() = default;

TEvObjectPathReadError(TIssues issues, const NDqProto::TMessageTransportMeta& transportMeta) {
TEvObjectPathReadError(TIssues issues, NYql::NDqProto::StatusIds::StatusCode code, const NDqProto::TMessageTransportMeta& transportMeta) {
NYql::IssuesToMessage(issues, Record.MutableIssues());
Record.MutableTransportMeta()->CopyFrom(transportMeta);
Record.SetFatalCode(code);
}
};

Expand Down
20 changes: 14 additions & 6 deletions ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,15 @@ TS3ListObjectV2Response ParseListObjectV2Response(
if (const auto& root = xml.Root(); root.Name() == "Error") {
const auto& code = root.Node("Code", true).Value<TString>();
const auto& message = root.Node("Message", true).Value<TString>();
ythrow yexception() << message << ", error: code: " << code << ", request id: ["
<< requestId << "]";
const auto errorMessage = TStringBuilder{} << message << ", error: code: " << code
<< ", request id: [" << requestId << "]";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
throw yexception() << errorMessage;
} else if (root.Name() != "ListBucketResult") {
ythrow yexception() << "Unexpected response '" << root.Name()
const auto errorMessage = TStringBuilder{} << "Unexpected response '" << root.Name()
<< "' on discovery, request id: [" << requestId << "]";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage;
throw yexception() << errorMessage;
} else {
const NXml::TNamespacesForXPath nss(
1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
Expand Down Expand Up @@ -313,7 +317,8 @@ class TS3Lister : public IS3Lister {

auto gateway = ctx.GatewayWeak.lock();
if (!gateway) {
ythrow yexception() << "Gateway disappeared";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::SubmitRequestIntoGateway] Gateway disappeared";
throw yexception() << "Gateway disappeared";
}

auto sharedCtx = ctx.SharedCtx;
Expand Down Expand Up @@ -360,7 +365,8 @@ class TS3Lister : public IS3Lister {
static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try {
auto gateway = ctx.GatewayWeak.lock();
if (!gateway) {
ythrow yexception() << "Gateway disappeared";
YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::OnDiscovery] Gateway disappeared";
throw yexception() << "Gateway disappeared";
}
if (!result.Issues) {
auto xmlString = result.Content.Extract();
Expand Down Expand Up @@ -539,8 +545,10 @@ IS3Lister::TPtr MakeS3Lister(
}

if (!allowLocalFiles) {
ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access "
const auto errorMessage = TStringBuilder{} << "Using local files as DataSource isn't allowed, but trying access "
<< listingRequest.Url;
YQL_CLOG(DEBUG, ProviderS3) << "[IS3Lister::MakeS3Lister] " << errorMessage;
throw yexception() << errorMessage;
}
return std::make_shared<TLocalS3Lister>(listingRequest, delimiter);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/s3/proto/file_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ option cc_enable_arenas = true;
package NYql.NS3.FileQueue;

import "ydb/library/yql/dq/actors/protos/dq_events.proto";
import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";

message TEvUpdateConsumersCount {
Expand All @@ -29,6 +30,7 @@ message TEvObjectPathBatch {

message TEvObjectPathReadError {
repeated Ydb.Issue.IssueMessage Issues = 1;
optional NYql.NDqProto.StatusIds.StatusCode FatalCode = 2;

optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
Expand Down

0 comments on commit 3acd725

Please sign in to comment.