From 1e46621bcec4dbb353d97b75ab3186e650431d53 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 25 Apr 2024 16:26:44 +0300 Subject: [PATCH] 24-1-14-hotfix: Fix reordering of change records (#4087) (#4106) --- .../change_sender_common_ops.cpp | 23 ++++++++++++++++--- .../change_sender_common_ops.h | 2 +- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index b2b3e90718fa..a2450d21746e 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVectorBodySize) > MemLimit) { - break; + if (!forceAtLeastOne) { + break; + } + + forceAtLeastOne = false; } MemUsage += it->BodySize; @@ -161,7 +165,20 @@ void TBaseChangeSender::SendRecords() { THashSet registrations; bool needToResolve = false; + // used to avoid deadlock between RequestRecords & SendRecords + bool processedAtLeastOne = false; + while (it != PendingSent.end()) { + if (Enqueued && Enqueued.begin()->Order <= it->first) { + break; + } + + processedAtLeastOne = true; + + if (PendingBody && PendingBody.begin()->Order <= it->first) { + break; + } + if (!it->second->IsBroadcast()) { const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) { @@ -215,7 +232,7 @@ void TBaseChangeSender::SendRecords() { Resolver->Resolve(); } - RequestRecords(); + RequestRecords(!processedAtLeastOne); } void TBaseChangeSender::ForgetRecords(TVector&& records) { diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 2e25c8aa72f4..a370d20e9587 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -99,7 +99,7 @@ class TBaseChangeSender: public IChangeSender { void CreateMissingSenders(const TVector& partitionIds); void RecreateSenders(const TVector& partitionIds); - bool RequestRecords(); + bool RequestRecords(bool forceAtLeastOne = false); void SendRecords(); void SendPreparedRecords(ui64 partitionId);