diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index b2b3e90718fa..a2450d21746e 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVectorBodySize) > MemLimit) { - break; + if (!forceAtLeastOne) { + break; + } + + forceAtLeastOne = false; } MemUsage += it->BodySize; @@ -161,7 +165,20 @@ void TBaseChangeSender::SendRecords() { THashSet registrations; bool needToResolve = false; + // used to avoid deadlock between RequestRecords & SendRecords + bool processedAtLeastOne = false; + while (it != PendingSent.end()) { + if (Enqueued && Enqueued.begin()->Order <= it->first) { + break; + } + + processedAtLeastOne = true; + + if (PendingBody && PendingBody.begin()->Order <= it->first) { + break; + } + if (!it->second->IsBroadcast()) { const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) { @@ -215,7 +232,7 @@ void TBaseChangeSender::SendRecords() { Resolver->Resolve(); } - RequestRecords(); + RequestRecords(!processedAtLeastOne); } void TBaseChangeSender::ForgetRecords(TVector&& records) { diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index cdaed3fe37b8..a4ed10b8aa3b 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -108,7 +108,7 @@ class TBaseChangeSender: public IChangeSender { void CreateMissingSenders(const TVector& partitionIds); void RecreateSenders(const TVector& partitionIds); - bool RequestRecords(); + bool RequestRecords(bool forceAtLeastOne = false); void SendRecords(); void SendPreparedRecords(ui64 partitionId);