From e367b83d6e3c2b9b9da949049256cdb1e9d35ad5 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar <103565628+FloatingCrowbar@users.noreply.github.com> Date: Sun, 3 Mar 2024 21:07:59 +0300 Subject: [PATCH 1/2] Add deduplication options checks (#2254) --- ydb/core/persqueue/writer/writer.cpp | 22 ++++----- .../ydb_topic/impl/write_session_impl.h | 1 - .../client/ydb_topic/ut/basic_usage_ut.cpp | 26 ++++++++-- .../actors/write_session_actor.ipp | 24 +++++---- ydb/services/persqueue_v1/persqueue_ut.cpp | 49 +++++++++++++++++++ 5 files changed, 96 insertions(+), 26 deletions(-) diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 5fac083b6c6d..5cd55141d045 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -33,7 +33,7 @@ namespace NKikimr::NPQ { #define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); #define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); -static const ui64 WRITE_BLOCK_SIZE = 4_KB; +static const ui64 WRITE_BLOCK_SIZE = 4_KB; TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const { auto out = TStringBuilder() << "Success {" @@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode; static constexpr size_t MAX_QUOTA_INFLIGHT = 3; - + static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request, ui32 partitionId, const TActorId& pipeClient) { @@ -272,12 +272,9 @@ class TPartitionWriter: public TActorBootstrapped, private TRl void GetOwnership() { auto ev = MakeRequest(PartitionId, PipeClient); - auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership(); - if (Opts.UseDeduplication) { - cmd.SetOwner(SourceId); - } else { - cmd.SetOwner(CreateGuidAsString()); - } + auto& request = *ev->Record.MutablePartitionRequest(); + auto& cmd = *request.MutableCmdGetOwnership(); + cmd.SetOwner(SourceId); cmd.SetForce(true); NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); @@ -741,15 +738,14 @@ class TPartitionWriter: public TActorBootstrapped, private TRl PendingQuota.clear(); ProcessQuota(); - break; case EWakeupTag::RlNoResource: - // Re-requesting the quota. We do this until we get a quota. + // Re-requesting the quota. We do this until we get a quota. // We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received. RequestDataQuota(PendingQuotaAmount, ctx); break; - + default: Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast(tag)); } @@ -770,7 +766,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl , TabletId(tabletId) , PartitionId(partitionId) , ExpectedGeneration(opts.ExpectedGeneration) - , SourceId(opts.SourceId) + , SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString()) , Opts(opts) { if (Opts.MeteringMode) { @@ -856,7 +852,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl IActor* CreatePartitionWriter(const TActorId& client, // const NKikimrSchemeOp::TPersQueueGroupDescription& config, ui64 tabletId, - ui32 partitionId, + ui32 partitionId, const TPartitionWriterOpts& opts) { return new TPartitionWriter(client, tabletId, partitionId, opts); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 8c527ac8437f..1b25030a8a4d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -317,7 +317,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, }; THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action - public: TWriteSessionImpl(const TWriteSessionSettings& settings, std::shared_ptr client, diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index d0b53205782a..09a378760e4b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count); } +} // Y_UNIT_TEST_SUITE(BasicUsage) +Y_UNIT_TEST_SUITE(TSettingsValidation) { + Y_UNIT_TEST(TWriteSessionProducerSettings) { + TTopicSdkTestSetup setup(TEST_CASE_NAME); + TTopicClient client = setup.MakeClient(); + { + auto writeSettings = TWriteSessionSettings() + .Path(TEST_TOPIC) + .ProducerId("something") + .DeduplicationEnabled(false); + try { + auto writeSession = client.CreateWriteSession(writeSettings); + auto event = writeSession->GetEvent(true); + UNIT_ASSERT(event.Defined()); + auto* closedEvent = std::get_if(&event.GetRef()); + UNIT_ASSERT(closedEvent); + } catch (NYdb::TContractViolation&) { + //pass + } + } + } +} // Y_UNIT_TEST_SUITE(TSettingsValidation) -} - -} +} // namespace diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 972e76a179f5..b76ab8fb24a7 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -389,11 +389,12 @@ void TWriteSessionActor::Handle(typename TEvWriteInit::TPt // 1.2. non-empty partition_id (explicit partitioning) // 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host) // 2. Empty producer id (no deduplication, partition is selected using round-robin). - bool isScenarioSupported = + bool isScenarioSupported = !InitRequest.producer_id().empty() && ( - InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() || + InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() || InitRequest.has_partition_id() || - InitRequest.has_partition_with_generation()) || + InitRequest.has_partition_with_generation()) + || InitRequest.producer_id().empty(); if (!isScenarioSupported) { @@ -424,7 +425,6 @@ void TWriteSessionActor::Handle(typename TEvWriteInit::TPt return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id(); } }(); - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName); if (!UseDeduplication) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id"); @@ -467,8 +467,9 @@ template void TWriteSessionActor::InitAfterDiscovery(const TActorContext& ctx) { Y_UNUSED(ctx); - if (SourceId.empty()) { - Y_ABORT_UNLESS(!UseDeduplication); + if (SourceId.empty() && UseDeduplication) { + CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx); + return; } InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check? @@ -835,9 +836,14 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::T OwnerCookie = result.GetResult().OwnerCookie; const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); - if (!UseDeduplication) { - Y_ABORT_UNLESS(maxSeqNo == 0); - } + + // ToDo: uncomment after fixing KIKIMR-21124 + // if (!UseDeduplication) { + // if (maxSeqNo != 0) { + // return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled", + // PersQueue::ErrorCode::ERROR, ctx); + // } + // } OwnerCookie = result.GetResult().OwnerCookie; MakeAndSentInitResponse(maxSeqNo, ctx); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 73b29d3bdd11..4ca583f438ec 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6694,6 +6694,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + Y_UNIT_TEST(DisableWrongSettings) { + NPersQueue::TTestServer server; + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR }); + server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO); + TString topicFullName = "rt3.dc1--acc--topic1"; + auto driver = SetupTestAndGetDriver(server, topicFullName, 3); + + std::shared_ptr Channel_; + std::unique_ptr TopicStubP_; + { + Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials()); + TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + } + + { + grpc::ClientContext rcontext1; + auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1); + UNIT_ASSERT(writeStream1); + Ydb::Topic::StreamWriteMessage::FromClient req; + Ydb::Topic::StreamWriteMessage::FromServer resp; + + req.mutable_init_request()->set_path("acc/topic1"); + req.mutable_init_request()->set_message_group_id("some-group"); + if (!writeStream1->Write(req)) { + ythrow yexception() << "write fail"; + } + UNIT_ASSERT(writeStream1->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST); + } + { + grpc::ClientContext rcontext1; + auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1); + UNIT_ASSERT(writeStream1); + Ydb::Topic::StreamWriteMessage::FromClient req; + Ydb::Topic::StreamWriteMessage::FromServer resp; + + req.mutable_init_request()->set_path("acc/topic1"); + req.mutable_init_request()->set_message_group_id("some-group"); + req.mutable_init_request()->set_producer_id("producer"); + if (!writeStream1->Write(req)) { + ythrow yexception() << "write fail"; + } + UNIT_ASSERT(writeStream1->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST); + } + } + Y_UNIT_TEST(DisableDeduplication) { NPersQueue::TTestServer server; TString topicFullName = "rt3.dc1--topic1"; From 4105093c6630cc0072f46afc1f571973ea981f6d Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Tue, 5 Mar 2024 08:00:39 +0000 Subject: [PATCH 2/2] Fix test --- ydb/services/persqueue_v1/persqueue_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 4ca583f438ec..c0cc198921cd 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6722,7 +6722,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } UNIT_ASSERT(writeStream1->Read(&resp)); Cerr << "===Got response: " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST); + UNIT_ASSERT(resp.status() == Ydb::StatusIds::SUCCESS); } { grpc::ClientContext rcontext1;