Skip to content

Commit

Permalink
Fix & unmute ReadTopic test KIKIMR-21110
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Feb 20, 2024
1 parent 9683aeb commit b55fed7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
ydb/core/tx/columnshard/engines/ut *
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst*
Expand Down
68 changes: 38 additions & 30 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace NKikimr::NReplication {

Y_UNIT_TEST_SUITE(YdbProxyTests) {
Y_UNIT_TEST_SUITE(YdbProxy) {
template <bool UseDatabase = true>
class TEnv: public NTestHelpers::TEnv<UseDatabase> {
using TBase = NTestHelpers::TEnv<UseDatabase>;
Expand Down Expand Up @@ -599,24 +599,24 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {

template <typename Env>
TEvYdbProxy::TReadTopicResult ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) {
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvYdbProxy::TEvReadTopicResponse, TEvYdbProxy::TEvTopicReaderGone>(handle);
if (handle->Recipient != env.GetSender()) {
continue;
}

if (auto* ev = std::get<TEvYdbProxy::TEvReadTopicResponse*>(result)) {
return ev->Result;
} else if (std::get<TEvYdbProxy::TEvTopicReaderGone*>(result)) {
do {
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());

try {
TAutoPtr<IEventHandle> ev;
env.GetRuntime().template GrabEdgeEventsRethrow<TEvYdbProxy::TEvReadTopicResponse, TEvYdbProxy::TEvTopicReaderGone>(ev);
UNIT_ASSERT_VALUES_EQUAL(ev->Sender, reader);

switch (ev->GetTypeRewrite()) {
case TEvYdbProxy::EvReadTopicResponse:
return ev->Get<TEvYdbProxy::TEvReadTopicResponse>()->Result;
case TEvYdbProxy::EvTopicReaderGone:
ythrow yexception();
}
} catch (yexception&) {
reader = CreateTopicReader(env, topicPath);
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
} else {
UNIT_ASSERT("Unexpected event");
}
}
} while (true);
}

Y_UNIT_TEST(ReadTopic) {
Expand Down Expand Up @@ -650,20 +650,28 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
// wait next event
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());

TActorId newReader = CreateTopicReader(env, "/Root/topic");
// wait next event
env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest());

// wait event from previous session
try {
auto ev = env.GetRuntime().GrabEdgeEventRethrow<TEvYdbProxy::TEvTopicReaderGone>(env.GetSender());
if (ev->Sender != reader) {
ythrow yexception();
TActorId newReader;
do {
newReader = CreateTopicReader(env, "/Root/topic");
// wait next event
env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest());

// wait event from previous session
try {
auto ev = env.GetRuntime().GrabEdgeEventRethrow<TEvYdbProxy::TEvTopicReaderGone>(env.GetSender());
if (ev->Sender == reader) {
break;
} else if (ev->Sender == newReader) {
continue;
} else {
UNIT_ASSERT("Unexpected reader has gone");
}
} catch (yexception&) {
// bad luck, previous session was not closed, close it manually
env.SendAsync(reader, new TEvents::TEvPoison());
break;
}
} catch (yexception&) {
// bad luck, previous session was not closed, close it manually
env.SendAsync(reader, new TEvents::TEvPoison());
}
} while (true);

UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-1"));
{
Expand Down

0 comments on commit b55fed7

Please sign in to comment.