From f563a183ef27c289b3b79538cc29b23388084ef9 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 24 Apr 2024 23:06:14 +0300 Subject: [PATCH 1/3] Fix reordering of change records --- ydb/core/change_exchange/change_sender_common_ops.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index b2b3e90718fa..07d20e38d213 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -162,6 +162,14 @@ void TBaseChangeSender::SendRecords() { bool needToResolve = false; while (it != PendingSent.end()) { + if (Enqueued && Enqueued.begin()->Order <= it->first) { + break; + } + + if (PendingBody && PendingBody.begin()->Order <= it->first) { + break; + } + if (!it->second->IsBroadcast()) { const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) { From 084d4776be2adf568974413586fcef4cf5668816 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 25 Apr 2024 12:38:54 +0300 Subject: [PATCH 2/3] Fix review issues --- .../change_exchange/change_sender_common_ops.cpp | 15 ++++++++++++--- .../change_exchange/change_sender_common_ops.h | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index 07d20e38d213..e478fba083fe 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVectorBodySize) > MemLimit) { - break; + if (!forceAtLeastOne) { + break; + } + + forceAtLeastOne = false; } MemUsage += it->BodySize; @@ -161,6 +165,9 @@ void TBaseChangeSender::SendRecords() { THashSet registrations; bool needToResolve = false; + // used to avoid deadlock between RequestRecords & SendRecords + bool processedAtLeastOne = false; + while (it != PendingSent.end()) { if (Enqueued && Enqueued.begin()->Order <= it->first) { break; @@ -170,6 +177,8 @@ void TBaseChangeSender::SendRecords() { break; } + processedAtLeastOne = true; + if (!it->second->IsBroadcast()) { const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) { @@ -223,7 +232,7 @@ void TBaseChangeSender::SendRecords() { Resolver->Resolve(); } - RequestRecords(); + RequestRecords(!processedAtLeastOne); } void TBaseChangeSender::ForgetRecords(TVector&& records) { diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index cdaed3fe37b8..a4ed10b8aa3b 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -108,7 +108,7 @@ class TBaseChangeSender: public IChangeSender { void CreateMissingSenders(const TVector& partitionIds); void RecreateSenders(const TVector& partitionIds); - bool RequestRecords(); + bool RequestRecords(bool forceAtLeastOne = false); void SendRecords(); void SendPreparedRecords(ui64 partitionId); From 2705037ac3fe44e55febfd4f885c539e7bd66515 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 25 Apr 2024 12:47:13 +0300 Subject: [PATCH 3/3] Fix review issues --- ydb/core/change_exchange/change_sender_common_ops.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index e478fba083fe..a2450d21746e 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -173,12 +173,12 @@ void TBaseChangeSender::SendRecords() { break; } + processedAtLeastOne = true; + if (PendingBody && PendingBody.begin()->Order <= it->first) { break; } - processedAtLeastOne = true; - if (!it->second->IsBroadcast()) { const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) {