diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp index 8c425a072f3d..bdfb2abeb81a 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp @@ -134,11 +134,9 @@ std::shared_ptr TFederatedWriteSessionImpl::OpenSubsessio return; } - TDeferredWrite deferred; Y_ABORT_UNLESS(self->PendingToken.Empty()); self->PendingToken = std::move(ev.ContinuationToken); - self->PrepareDeferredWriteImpl(deferred); - deferred.DoWrite(); + self->MaybeWriteImpl(); } } }) @@ -387,8 +385,6 @@ void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token } void TFederatedWriteSessionImpl::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) { - TDeferredWrite deferred(Subsession); - with_lock(Lock) { ClientHasToken = false; if (!wrapped.Message.CreateTimestamp_.Defined()) { @@ -396,15 +392,13 @@ void TFederatedWriteSessionImpl::WriteInternal(NTopic::TContinuationToken&&, TWr } BufferFreeSpace -= wrapped.Message.Data.size(); OriginalMessagesToPassDown.emplace_back(std::move(wrapped)); - PrepareDeferredWriteImpl(deferred); + MaybeWriteImpl(); } - deferred.DoWrite(); - IssueTokenIfAllowed(); } -bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferred) { +bool TFederatedWriteSessionImpl::MaybeWriteImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); if (PendingToken.Empty()) { return false; @@ -414,9 +408,7 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferr } OriginalMessagesToGetAck.push_back(std::move(OriginalMessagesToPassDown.front())); OriginalMessagesToPassDown.pop_front(); - deferred.Writer = Subsession; - deferred.Token.ConstructInPlace(std::move(*PendingToken)); - deferred.Message.ConstructInPlace(std::move(OriginalMessagesToGetAck.back().Message)); + Subsession->Write(std::move(*PendingToken), std::move(OriginalMessagesToGetAck.back().Message)); PendingToken.Clear(); return true; } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h index 96589cc34243..5b3483958871 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h @@ -63,25 +63,6 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer, } }; - struct TDeferredWrite { - TDeferredWrite() {} - explicit TDeferredWrite(std::shared_ptr writer) - : Writer(std::move(writer)) { - } - - void DoWrite() { - if (Token.Empty() && Message.Empty()) { - return; - } - Y_ABORT_UNLESS(Token.Defined() && Message.Defined()); - return Writer->Write(std::move(*Token), std::move(*Message)); - } - - std::shared_ptr Writer; - TMaybe Token; - TMaybe Message; - }; - private: void Start(); @@ -92,7 +73,7 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer, void ScheduleFederationStateUpdateImpl(TDuration delay); void WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& message); - bool PrepareDeferredWriteImpl(TDeferredWrite& deferred); + bool MaybeWriteImpl(); void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); void CloseImpl(NTopic::TSessionClosedEvent const& ev);