Skip to content

Commit

Permalink
Delete TDeferredWrite
Browse files Browse the repository at this point in the history
As we use our own single-threaded event handlers executor,
deadlocks should not happen there, so we don't need TDeferredWrite.
  • Loading branch information
qyryq committed Jul 24, 2024
1 parent 36f589c commit ca06f15
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::OpenSubsessio
return;
}

TDeferredWrite deferred;
Y_ABORT_UNLESS(self->PendingToken.Empty());
self->PendingToken = std::move(ev.ContinuationToken);
self->PrepareDeferredWriteImpl(deferred);
deferred.DoWrite();
self->MaybeWriteImpl();
}
}
})
Expand Down Expand Up @@ -387,24 +385,20 @@ void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token
}

void TFederatedWriteSessionImpl::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
TDeferredWrite deferred(Subsession);

with_lock(Lock) {
ClientHasToken = false;
if (!wrapped.Message.CreateTimestamp_.Defined()) {
wrapped.Message.CreateTimestamp_ = TInstant::Now();
}
BufferFreeSpace -= wrapped.Message.Data.size();
OriginalMessagesToPassDown.emplace_back(std::move(wrapped));
PrepareDeferredWriteImpl(deferred);
MaybeWriteImpl();
}

deferred.DoWrite();

IssueTokenIfAllowed();
}

bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferred) {
bool TFederatedWriteSessionImpl::MaybeWriteImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());
if (PendingToken.Empty()) {
return false;
Expand All @@ -414,9 +408,7 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferr
}
OriginalMessagesToGetAck.push_back(std::move(OriginalMessagesToPassDown.front()));
OriginalMessagesToPassDown.pop_front();
deferred.Writer = Subsession;
deferred.Token.ConstructInPlace(std::move(*PendingToken));
deferred.Message.ConstructInPlace(std::move(OriginalMessagesToGetAck.back().Message));
Subsession->Write(std::move(*PendingToken), std::move(OriginalMessagesToGetAck.back().Message));
PendingToken.Clear();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,6 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
}
};

struct TDeferredWrite {
TDeferredWrite() {}
explicit TDeferredWrite(std::shared_ptr<NTopic::IWriteSession> writer)
: Writer(std::move(writer)) {
}

void DoWrite() {
if (Token.Empty() && Message.Empty()) {
return;
}
Y_ABORT_UNLESS(Token.Defined() && Message.Defined());
return Writer->Write(std::move(*Token), std::move(*Message));
}

std::shared_ptr<NTopic::IWriteSession> Writer;
TMaybe<NTopic::TContinuationToken> Token;
TMaybe<NTopic::TWriteMessage> Message;
};

private:
void Start();

Expand All @@ -92,7 +73,7 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
void ScheduleFederationStateUpdateImpl(TDuration delay);

void WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& message);
bool PrepareDeferredWriteImpl(TDeferredWrite& deferred);
bool MaybeWriteImpl();

void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
void CloseImpl(NTopic::TSessionClosedEvent const& ev);
Expand Down

0 comments on commit ca06f15

Please sign in to comment.