Skip to content

Commit

Permalink
Use a separate lock for Processor->Write calls
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq committed Aug 12, 2024
1 parent 169c92d commit fbed661
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
29 changes: 21 additions & 8 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
FirstTokenSent = true;
}
// Kickstart send after session reestablishment
SendImpl();
FormGrpcMessagesImpl();
SendGrpcMessages();
break;
}
case TServerMessage::kWriteResponse: {
Expand Down Expand Up @@ -1140,13 +1141,15 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {

void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
TMemoryUsageChange memoryUsage;
if (!isSyncCompression) {
if (isSyncCompression) {
// The Lock is already held somewhere up the stack.
memoryUsage = OnCompressedImpl(std::move(block));
} else {
with_lock(Lock) {
memoryUsage = OnCompressedImpl(std::move(block));
}
} else {
memoryUsage = OnCompressedImpl(std::move(block));
}
SendGrpcMessages();
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
Expand All @@ -1162,7 +1165,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
(*Counters->BytesInflightCompressed) += block.Data.size();

PackedMessagesToSend.emplace(std::move(block));
SendImpl();
FormGrpcMessagesImpl();
return memoryUsage;
}

Expand Down Expand Up @@ -1279,7 +1282,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
}
CurrentBatch.Reset();
if (skipCompression) {
SendImpl();
FormGrpcMessagesImpl();
}
return size;
}
Expand Down Expand Up @@ -1343,7 +1346,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
}

void TWriteSessionImpl::SendImpl() {
void TWriteSessionImpl::SendGrpcMessages() {
with_lock(ProcessorLock) {
TClientMessage message;
while (GrpcMessagesToSend.Dequeue(&message)) {
Processor->Write(std::move(message));
}
}
}

void TWriteSessionImpl::FormGrpcMessagesImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
Expand Down Expand Up @@ -1408,7 +1420,7 @@ void TWriteSessionImpl::SendImpl() {
<< OriginalMessagesToSend.size() << " left), first sequence number is "
<< writeRequest->messages(0).seq_no()
);
Processor->Write(std::move(clientMessage));
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
}
}

Expand Down Expand Up @@ -1470,6 +1482,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
with_lock(self->Lock) {
self->HandleWakeUpImpl();
}
self->SendGrpcMessages();
}
};
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>

#include <util/generic/buffer.h>
#include <util/thread/lfqueue.h>


namespace NYdb::NTopic {
Expand Down Expand Up @@ -385,7 +386,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
ui64 GetSeqNoImpl(ui64 id);
ui64 GetIdImpl(ui64 seqNo);
void SendImpl();
void FormGrpcMessagesImpl();
void SendGrpcMessages();
void AbortImpl();
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
void CloseImpl(EStatus statusCode, const TString& message);
Expand Down Expand Up @@ -446,6 +448,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
std::queue<TOriginalMessage> SentOriginalMessages;
std::queue<TBlock> SentPackedMessage;

TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
TAdaptiveLock ProcessorLock;

const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
bool Connected = false;
Expand Down

0 comments on commit fbed661

Please sign in to comment.