Skip to content

Commit

Permalink
Merge 2705037 into ac93f55
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Apr 25, 2024
2 parents ac93f55 + 2705037 commit a8848b7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
23 changes: 20 additions & 3 deletions ydb/core/change_exchange/change_sender_common_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueReco
RequestRecords();
}

bool TBaseChangeSender::RequestRecords() {
bool TBaseChangeSender::RequestRecords(bool forceAtLeastOne) {
if (!Enqueued) {
return false;
}
Expand All @@ -105,7 +105,11 @@ bool TBaseChangeSender::RequestRecords() {

while (it != Enqueued.end()) {
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
break;
if (!forceAtLeastOne) {
break;
}

forceAtLeastOne = false;
}

MemUsage += it->BodySize;
Expand Down Expand Up @@ -161,7 +165,20 @@ void TBaseChangeSender::SendRecords() {
THashSet<ui64> registrations;
bool needToResolve = false;

// used to avoid deadlock between RequestRecords & SendRecords
bool processedAtLeastOne = false;

while (it != PendingSent.end()) {
if (Enqueued && Enqueued.begin()->Order <= it->first) {
break;
}

processedAtLeastOne = true;

if (PendingBody && PendingBody.begin()->Order <= it->first) {
break;
}

if (!it->second->IsBroadcast()) {
const ui64 partitionId = Resolver->GetPartitionId(it->second);
if (!Senders.contains(partitionId)) {
Expand Down Expand Up @@ -215,7 +232,7 @@ void TBaseChangeSender::SendRecords() {
Resolver->Resolve();
}

RequestRecords();
RequestRecords(!processedAtLeastOne);
}

void TBaseChangeSender::ForgetRecords(TVector<ui64>&& records) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TBaseChangeSender: public IChangeSender {
void CreateMissingSenders(const TVector<ui64>& partitionIds);
void RecreateSenders(const TVector<ui64>& partitionIds);

bool RequestRecords();
bool RequestRecords(bool forceAtLeastOne = false);
void SendRecords();

void SendPreparedRecords(ui64 partitionId);
Expand Down

0 comments on commit a8848b7

Please sign in to comment.