Skip to content

Commit

Permalink
Revert separate lock for Processor->Write calls
Browse files Browse the repository at this point in the history
This reverts commit 4bce249.
  • Loading branch information
qyryq committed Aug 15, 2024
1 parent 1d65019 commit a8b0837
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 27 deletions.
29 changes: 8 additions & 21 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
FirstTokenSent = true;
}
// Kickstart send after session reestablishment
FormGrpcMessagesImpl();
SendGrpcMessages();
SendImpl();
break;
}
case TServerMessage::kWriteResponse: {
Expand Down Expand Up @@ -1147,15 +1146,13 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {

void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
TMemoryUsageChange memoryUsage;
if (isSyncCompression) {
// The Lock is already held somewhere up the stack.
memoryUsage = OnCompressedImpl(std::move(block));
} else {
if (!isSyncCompression) {
with_lock(Lock) {
memoryUsage = OnCompressedImpl(std::move(block));
}
} else {
memoryUsage = OnCompressedImpl(std::move(block));
}
SendGrpcMessages();
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
Expand All @@ -1171,7 +1168,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
(*Counters->BytesInflightCompressed) += block.Data.size();

PackedMessagesToSend.emplace(std::move(block));
FormGrpcMessagesImpl();
SendImpl();
return memoryUsage;
}

Expand Down Expand Up @@ -1288,7 +1285,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
}
CurrentBatch.Reset();
if (skipCompression) {
FormGrpcMessagesImpl();
SendImpl();
}
return size;
}
Expand Down Expand Up @@ -1352,16 +1349,7 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
}

void TWriteSessionImpl::SendGrpcMessages() {
with_lock(ProcessorLock) {
TClientMessage message;
while (GrpcMessagesToSend.Dequeue(&message)) {
Processor->Write(std::move(message));
}
}
}

void TWriteSessionImpl::FormGrpcMessagesImpl() {
void TWriteSessionImpl::SendImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
Expand Down Expand Up @@ -1431,7 +1419,7 @@ void TWriteSessionImpl::FormGrpcMessagesImpl() {
<< OriginalMessagesToSend.size() << " left), first sequence number is "
<< writeRequest->messages(0).seq_no()
);
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
Processor->Write(std::move(clientMessage));
}
}

Expand Down Expand Up @@ -1493,7 +1481,6 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
with_lock(self->Lock) {
self->HandleWakeUpImpl();
}
self->SendGrpcMessages();
}
};
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>

#include <util/generic/buffer.h>
#include <util/thread/lfqueue.h>


namespace NYdb::NTopic {
Expand Down Expand Up @@ -386,8 +385,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
ui64 GetSeqNoImpl(ui64 id);
ui64 GetIdImpl(ui64 seqNo);
void FormGrpcMessagesImpl();
void SendGrpcMessages();
void SendImpl();
void AbortImpl();
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
void CloseImpl(EStatus statusCode, const TString& message);
Expand Down Expand Up @@ -448,9 +446,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
std::queue<TOriginalMessage> SentOriginalMessages;
std::queue<TBlock> SentPackedMessage;

TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
TAdaptiveLock ProcessorLock;

const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
bool Connected = false;
Expand Down

0 comments on commit a8b0837

Please sign in to comment.