From 01305242b16309256a89b57d7d1e7b12bcceecc0 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 21 Feb 2024 12:13:02 +0300 Subject: [PATCH] Fix & unmute ReadTopic test KIKIMR-21110 (#2121) --- .github/config/muted_ya.txt | 1 - .../tx/replication/ydb_proxy/ydb_proxy_ut.cpp | 68 +++++++++++-------- 2 files changed, 38 insertions(+), 31 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 46a896df34a4..b13e82ff0c8c 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -37,7 +37,6 @@ ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail ydb/core/tx/columnshard/engines/ut * ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium -ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst* diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index c7e7aad10384..504d6da3afb8 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -10,7 +10,7 @@ namespace NKikimr::NReplication { -Y_UNIT_TEST_SUITE(YdbProxyTests) { +Y_UNIT_TEST_SUITE(YdbProxy) { template class TEnv: public NTestHelpers::TEnv { using TBase = NTestHelpers::TEnv; @@ -599,24 +599,24 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { template TEvYdbProxy::TReadTopicResult ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) { - env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); - - while (true) { - TAutoPtr handle; - auto result = env.GetRuntime().template GrabEdgeEventsRethrow(handle); - if (handle->Recipient != env.GetSender()) { - continue; - } - - if (auto* ev = std::get(result)) { - return ev->Result; - } else if (std::get(result)) { + do { + env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); + + try { + TAutoPtr ev; + env.GetRuntime().template GrabEdgeEventsRethrow(ev); + UNIT_ASSERT_VALUES_EQUAL(ev->Sender, reader); + + switch (ev->GetTypeRewrite()) { + case TEvYdbProxy::EvReadTopicResponse: + return ev->Get()->Result; + case TEvYdbProxy::EvTopicReaderGone: + ythrow yexception(); + } + } catch (yexception&) { reader = CreateTopicReader(env, topicPath); - env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); - } else { - UNIT_ASSERT("Unexpected event"); } - } + } while (true); } Y_UNIT_TEST(ReadTopic) { @@ -650,20 +650,28 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { // wait next event env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); - TActorId newReader = CreateTopicReader(env, "/Root/topic"); - // wait next event - env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest()); - - // wait event from previous session - try { - auto ev = env.GetRuntime().GrabEdgeEventRethrow(env.GetSender()); - if (ev->Sender != reader) { - ythrow yexception(); + TActorId newReader; + do { + newReader = CreateTopicReader(env, "/Root/topic"); + // wait next event + env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest()); + + // wait event from previous session + try { + auto ev = env.GetRuntime().GrabEdgeEventRethrow(env.GetSender()); + if (ev->Sender == reader) { + break; + } else if (ev->Sender == newReader) { + continue; + } else { + UNIT_ASSERT("Unexpected reader has gone"); + } + } catch (yexception&) { + // bad luck, previous session was not closed, close it manually + env.SendAsync(reader, new TEvents::TEvPoison()); + break; } - } catch (yexception&) { - // bad luck, previous session was not closed, close it manually - env.SendAsync(reader, new TEvents::TEvPoison()); - } + } while (true); UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-1")); {