diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index d251be747186..00a544a83fe0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -958,7 +958,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess FirstTokenSent = true; } // Kickstart send after session reestablishment - SendImpl(); + FormGrpcMessagesImpl(); + SendGrpcMessages(); break; } case TServerMessage::kWriteResponse: { @@ -1140,13 +1141,15 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { TMemoryUsageChange memoryUsage; - if (!isSyncCompression) { + if (isSyncCompression) { + // The Lock is already held somewhere up the stack. + memoryUsage = OnCompressedImpl(std::move(block)); + } else { with_lock(Lock) { memoryUsage = OnCompressedImpl(std::move(block)); } - } else { - memoryUsage = OnCompressedImpl(std::move(block)); } + SendGrpcMessages(); if (memoryUsage.NowOk && !memoryUsage.WasOk) { EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } @@ -1162,7 +1165,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { (*Counters->BytesInflightCompressed) += block.Data.size(); PackedMessagesToSend.emplace(std::move(block)); - SendImpl(); + FormGrpcMessagesImpl(); return memoryUsage; } @@ -1279,7 +1282,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() { } CurrentBatch.Reset(); if (skipCompression) { - SendImpl(); + FormGrpcMessagesImpl(); } return size; } @@ -1343,7 +1346,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx); } -void TWriteSessionImpl::SendImpl() { +void TWriteSessionImpl::SendGrpcMessages() { + with_lock(ProcessorLock) { + TClientMessage message; + while (GrpcMessagesToSend.Dequeue(&message)) { + Processor->Write(std::move(message)); + } + } +} + +void TWriteSessionImpl::FormGrpcMessagesImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB. @@ -1408,7 +1420,7 @@ void TWriteSessionImpl::SendImpl() { << OriginalMessagesToSend.size() << " left), first sequence number is " << writeRequest->messages(0).seq_no() ); - Processor->Write(std::move(clientMessage)); + GrpcMessagesToSend.Enqueue(std::move(clientMessage)); } } @@ -1470,6 +1482,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() { with_lock(self->Lock) { self->HandleWakeUpImpl(); } + self->SendGrpcMessages(); } }; if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index fccbef38c7f4..ad0c89a219f7 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -5,6 +5,7 @@ #include #include +#include namespace NYdb::NTopic { @@ -385,7 +386,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, ui64 GetNextIdImpl(const TMaybe& seqNo); ui64 GetSeqNoImpl(ui64 id); ui64 GetIdImpl(ui64 seqNo); - void SendImpl(); + void FormGrpcMessagesImpl(); + void SendGrpcMessages(); void AbortImpl(); void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); void CloseImpl(EStatus statusCode, const TString& message); @@ -446,6 +448,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, std::queue SentOriginalMessages; std::queue SentPackedMessage; + TLockFreeQueue GrpcMessagesToSend; + TAdaptiveLock ProcessorLock; + const size_t MaxBlockSize = std::numeric_limits::max(); const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility bool Connected = false;