From 3acd7256c6dc37625356cc999f28c8e5503fe2c0 Mon Sep 17 00:00:00 2001 From: Ivan Sukhov Date: Fri, 6 Sep 2024 20:43:39 +0300 Subject: [PATCH] Bugfixes (#8676) --- .../s3/actors/yql_s3_raw_read_actor.cpp | 2 +- .../providers/s3/actors/yql_s3_read_actor.cpp | 16 ++++++++------- .../s3/actors/yql_s3_source_queue.cpp | 7 +++++-- ydb/library/yql/providers/s3/events/events.h | 3 ++- .../s3/object_listers/yql_s3_list.cpp | 20 +++++++++++++------ .../yql/providers/s3/proto/file_queue.proto | 2 ++ 6 files changed, 33 insertions(+), 17 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp index 50fdf1894ef8..5b223ec6921d 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp @@ -284,7 +284,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID IssuesFromMessage(result->Get()->Record.GetIssues(), issues); LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString()); issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues)); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, result->Get()->Record.GetFatalCode())); } void HandleAck(TEvS3Provider::TEvAck::TPtr& ev) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b36836319d4f..93c867488fba 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -965,6 +965,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { message = ErrorText; } Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message)); + FatalCode = StatusFromS3ErrorCode(errorCode); } if (ev->Get()->Issues) { @@ -1115,7 +1116,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression)); } - NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; + FatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; StartCycleCount = GetCycleCountFast(); @@ -1123,7 +1124,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { if (ReadSpec->Arrow) { if (ReadSpec->Compression) { Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression")); - fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; } else { try { if (Url.StartsWith("file://")) { @@ -1133,7 +1134,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { } } catch (const parquet::ParquetException& ex) { Issues.AddIssue(TIssue(ex.what())); - fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; RetryStuff->Cancel(); } } @@ -1149,7 +1150,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { LOG_CORO_D("S3 read ERROR"); } catch (const NDB::Exception& ex) { Issues.AddIssue(TIssue(ex.message())); - fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + FatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; RetryStuff->Cancel(); } } @@ -1162,7 +1163,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { return; } catch (const std::exception& err) { Issues.AddIssue(TIssue(err.what())); - fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; + FatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; RetryStuff->Cancel(); } @@ -1170,7 +1171,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues)); if (issues) - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), FatalCode)); else Send(ParentActorId, new TEvS3Provider::TEvFileFinished(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), RetryStuff->SizeLimit)); } @@ -1230,6 +1231,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { bool ServerReturnedError = false; TString ErrorText; TIssues Issues; + NYql::NDqProto::StatusIds::StatusCode FatalCode; NActors::TActorId DecompressorActorId; std::size_t LastOffset = 0; @@ -1719,7 +1721,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public IssuesFromMessage(result->Get()->Record.GetIssues(), issues); LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << issues.ToOneLineString()); issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", std::move(issues)); - Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), result->Get()->Record.GetFatalCode())); } void HandleRetry(TEvS3Provider::TEvRetryEventFunc::TPtr& retry) { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index 918953ad5b8d..5adab4011331 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -183,6 +183,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped , FileSizeLimit(fileSizeLimit) , ReadLimit(readLimit) , MaybeIssues(Nothing()) + , FatalCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR) , UseRuntimeListing(useRuntimeListing) , ConsumersCount(consumersCount) , BatchSizeLimit(batchSizeLimit) @@ -302,6 +303,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped << " and exceeds limit = " << FileSizeLimit; LOG_E("TS3FileQueueActor", errorMessage); MaybeIssues = TIssues{TIssue{errorMessage}}; + FatalCode = NYql::NDqProto::StatusIds::PRECONDITION_FAILED; return false; } LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path); @@ -377,7 +379,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped LOG_D( "TS3FileQueueActor", "HandleGetNextBatchForErrorState Giving away rest of Objects"); - Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta())); + Send(ev->Sender, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, ev->Get()->Record.GetTransportMeta())); TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo()); } @@ -558,7 +560,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped if (!MaybeIssues.Defined()) { SendObjects(consumer, requests.front()); } else { - Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, requests.front())); + Send(consumer, new TEvS3Provider::TEvObjectPathReadError(*MaybeIssues, FatalCode, requests.front())); TryFinish(consumer, requests.front().GetSeqNo()); } requests.pop_front(); @@ -603,6 +605,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped size_t CurrentDirectoryPathIndex = 0; THashMap> PendingRequests; TMaybe MaybeIssues; + NYql::NDqProto::StatusIds::StatusCode FatalCode; bool UseRuntimeListing; ui64 ConsumersCount; ui64 BatchSizeLimit; diff --git a/ydb/library/yql/providers/s3/events/events.h b/ydb/library/yql/providers/s3/events/events.h index 8101d54deb0d..916da45cfb47 100644 --- a/ydb/library/yql/providers/s3/events/events.h +++ b/ydb/library/yql/providers/s3/events/events.h @@ -100,9 +100,10 @@ struct TEvS3Provider { TEvObjectPathReadError() = default; - TEvObjectPathReadError(TIssues issues, const NDqProto::TMessageTransportMeta& transportMeta) { + TEvObjectPathReadError(TIssues issues, NYql::NDqProto::StatusIds::StatusCode code, const NDqProto::TMessageTransportMeta& transportMeta) { NYql::IssuesToMessage(issues, Record.MutableIssues()); Record.MutableTransportMeta()->CopyFrom(transportMeta); + Record.SetFatalCode(code); } }; diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 5a764a7782ec..197e20630f35 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -133,11 +133,15 @@ TS3ListObjectV2Response ParseListObjectV2Response( if (const auto& root = xml.Root(); root.Name() == "Error") { const auto& code = root.Node("Code", true).Value(); const auto& message = root.Node("Message", true).Value(); - ythrow yexception() << message << ", error: code: " << code << ", request id: [" - << requestId << "]"; + const auto errorMessage = TStringBuilder{} << message << ", error: code: " << code + << ", request id: [" << requestId << "]"; + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage; + throw yexception() << errorMessage; } else if (root.Name() != "ListBucketResult") { - ythrow yexception() << "Unexpected response '" << root.Name() + const auto errorMessage = TStringBuilder{} << "Unexpected response '" << root.Name() << "' on discovery, request id: [" << requestId << "]"; + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::ParseListObjectV2Response] " << errorMessage; + throw yexception() << errorMessage; } else { const NXml::TNamespacesForXPath nss( 1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); @@ -313,7 +317,8 @@ class TS3Lister : public IS3Lister { auto gateway = ctx.GatewayWeak.lock(); if (!gateway) { - ythrow yexception() << "Gateway disappeared"; + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::SubmitRequestIntoGateway] Gateway disappeared"; + throw yexception() << "Gateway disappeared"; } auto sharedCtx = ctx.SharedCtx; @@ -360,7 +365,8 @@ class TS3Lister : public IS3Lister { static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try { auto gateway = ctx.GatewayWeak.lock(); if (!gateway) { - ythrow yexception() << "Gateway disappeared"; + YQL_CLOG(DEBUG, ProviderS3) << "[TS3Lister::OnDiscovery] Gateway disappeared"; + throw yexception() << "Gateway disappeared"; } if (!result.Issues) { auto xmlString = result.Content.Extract(); @@ -539,8 +545,10 @@ IS3Lister::TPtr MakeS3Lister( } if (!allowLocalFiles) { - ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access " + const auto errorMessage = TStringBuilder{} << "Using local files as DataSource isn't allowed, but trying access " << listingRequest.Url; + YQL_CLOG(DEBUG, ProviderS3) << "[IS3Lister::MakeS3Lister] " << errorMessage; + throw yexception() << errorMessage; } return std::make_shared(listingRequest, delimiter); } diff --git a/ydb/library/yql/providers/s3/proto/file_queue.proto b/ydb/library/yql/providers/s3/proto/file_queue.proto index 75ec283f20f2..6a450203914e 100644 --- a/ydb/library/yql/providers/s3/proto/file_queue.proto +++ b/ydb/library/yql/providers/s3/proto/file_queue.proto @@ -4,6 +4,7 @@ option cc_enable_arenas = true; package NYql.NS3.FileQueue; import "ydb/library/yql/dq/actors/protos/dq_events.proto"; +import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; message TEvUpdateConsumersCount { @@ -29,6 +30,7 @@ message TEvObjectPathBatch { message TEvObjectPathReadError { repeated Ydb.Issue.IssueMessage Issues = 1; + optional NYql.NDqProto.StatusIds.StatusCode FatalCode = 2; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; }