From a510fe8463d318d17a7fa3f921284995e495d4c7 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 14 Mar 2024 13:06:41 +0500 Subject: [PATCH] Fix Verify in WriteSessionActor (#2651) (#2700) --- ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h | 2 ++ .../deprecated/persqueue_v0/grpc_pq_write_actor.cpp | 12 ++++++++++-- .../persqueue_v1/actors/write_session_actor.h | 2 ++ .../persqueue_v1/actors/write_session_actor.ipp | 9 ++++++++- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index 0d5e472ca443..cb1ca35ecd86 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -554,6 +554,8 @@ class TWriteSessionActor : public NActors::TActorBootstrapped { diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index e29b4bc4f304..6172305476af 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -310,7 +310,10 @@ void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& db void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(State == ES_WAIT_SCHEME || State == ES_INITED); + if (State != ES_WAIT_SCHEME && State != ES_INITED) { + return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx); + } + auto& res = ev->Get()->Result; Y_ABORT_UNLESS(res->ResultSet.size() == 1); @@ -503,6 +506,11 @@ void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorCont } void TWriteSessionActor::CloseSession(const TString& errorReason, const NPersQueue::NErrorCode::EErrorCode errorCode, const NActors::TActorContext& ctx) { + if (SessionClosed) { + return; + } + SessionClosed = true; + if (errorCode != NPersQueue::NErrorCode::OK) { if (InternalErrorCode(errorCode)) { SLIErrors.Inc(); @@ -865,7 +873,7 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) { void TWriteSessionActor::HandleWakeup(const TActorContext& ctx) { if (State != ES_INITED) { - return; + return CloseSession("erroneous internal state", NPersQueue::NErrorCode::ERROR, ctx); } auto now = ctx.Now(); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 2d69db971d7f..17c47aab7837 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -271,6 +271,8 @@ class TWriteSessionActor TActorId PartitionWriterCache; TActorId PartitionChooser; + + bool SessionClosed = false; }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index b76ab8fb24a7..825e342b7ead 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -724,6 +724,10 @@ void TWriteSessionActor::DestroyPartitionWriterCache(const template void TWriteSessionActor::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) { + if (SessionClosed) { + return; + } + SessionClosed = true; if (errorCode != PersQueue::ErrorCode::OK) { @@ -1503,7 +1507,10 @@ void TWriteSessionActor::Handle(TEvents::TEvWakeup::TPtr& template void TWriteSessionActor::RecheckACL(const TActorContext& ctx) { - Y_ABORT_UNLESS(State == ES_INITED); + if (State != ES_INITED) { + LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "WriteSessionActor state is wrong. Actual state '" << (int)State << "'"); + return CloseSession("erroneous internal state", PersQueue::ErrorCode::ERROR, ctx); + } auto now = ctx.Now();