diff --git a/ydb/core/tx/replication/service/table_writer_ut.cpp b/ydb/core/tx/replication/service/table_writer_ut.cpp index 6b89e0159879..1763cedab9c5 100644 --- a/ydb/core/tx/replication/service/table_writer_ut.cpp +++ b/ydb/core/tx/replication/service/table_writer_ut.cpp @@ -12,6 +12,8 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(LocalTableWriter) { + using namespace NTestHelpers; + Y_UNIT_TEST(WriteTable) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index e39647aaa9f7..a787c6c49099 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -66,6 +66,7 @@ class TRemoteTopicReader: public TActor { void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); + switch (ev->Get()->Result.GetStatus()) { case NYdb::EStatus::SCHEME_ERROR: return Leave(TEvWorker::TEvGone::SCHEME_ERROR); diff --git a/ydb/core/tx/replication/service/topic_reader_ut.cpp b/ydb/core/tx/replication/service/topic_reader_ut.cpp index 1b66e29e0d44..5cfcf70a79c7 100644 --- a/ydb/core/tx/replication/service/topic_reader_ut.cpp +++ b/ydb/core/tx/replication/service/topic_reader_ut.cpp @@ -11,50 +11,46 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(RemoteTopicReader) { + using namespace NTestHelpers; + template TActorId CreateReader(Env& env, const TEvYdbProxy::TTopicReaderSettings& settings) { - auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); - env.SendAsync(reader, new TEvWorker::TEvHandshake()); + do { + auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); + env.SendAsync(reader, new TEvWorker::TEvHandshake()); - while (true) { - TAutoPtr handle; - auto result = env.GetRuntime().template GrabEdgeEventsRethrow(handle); - if (handle->Sender != reader) { - continue; - } + TAutoPtr ev; + do { + env.GetRuntime().template GrabEdgeEvents(ev); + } while (ev->Sender != reader); - if (auto* ev = std::get(result)) { + switch (ev->GetTypeRewrite()) { + case TEvWorker::EvHandshake: return reader; - } else if (std::get(result)) { - reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings)); - env.SendAsync(reader, new TEvWorker::TEvHandshake()); + case TEvWorker::EvGone: continue; - } else { - UNIT_ASSERT("Unexpected event"); } - } + } while (true); } template auto ReadData(Env& env, TActorId& reader, const TEvYdbProxy::TTopicReaderSettings& settings) { - reader = CreateReader(env, settings); - env.SendAsync(reader, new TEvWorker::TEvPoll()); + do { + reader = CreateReader(env, settings); + env.SendAsync(reader, new TEvWorker::TEvPoll()); - while (true) { - TAutoPtr handle; - auto result = env.GetRuntime().template GrabEdgeEventsRethrow(handle); - if (handle->Sender != reader) { - continue; - } + TAutoPtr ev; + do { + env.GetRuntime().template GrabEdgeEvents(ev); + } while (ev->Sender != reader); - if (auto* ev = std::get(result)) { - return ev->Records; - } else if (std::get(result)) { - reader = CreateReader(env, settings); - env.SendAsync(reader, new TEvWorker::TEvPoll()); + switch (ev->GetTypeRewrite()) { + case TEvWorker::EvData: + return ev->Get()->Records; + case TEvWorker::EvGone: continue; } - } + } while (true); } Y_UNIT_TEST(ReadTopic) { @@ -68,7 +64,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) { .ConsumerName("consumer") .EndAddConsumer(); - auto ev = env.Send( + auto ev = env.Send(env.GetYdbProxy(), new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings)); UNIT_ASSERT(ev); UNIT_ASSERT(ev->Get()->Result.IsSuccess()); diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index bb5cd1cc39c3..8f2be72c6aa5 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -36,7 +36,7 @@ TEvWorker::TEvData::TEvData(TVector&& records) void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const { out << "{" << " Offset: " << Offset - << " Data: " << Data + << " Data: " << Data.size() << "b" << " }"; } diff --git a/ydb/core/tx/replication/service/worker_ut.cpp b/ydb/core/tx/replication/service/worker_ut.cpp index 18895a1e73ce..d03ff144f460 100644 --- a/ydb/core/tx/replication/service/worker_ut.cpp +++ b/ydb/core/tx/replication/service/worker_ut.cpp @@ -13,12 +13,14 @@ namespace NKikimr::NReplication::NService { Y_UNIT_TEST_SUITE(Worker) { + using namespace NTestHelpers; + Y_UNIT_TEST(Basic) { TEnv env; env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); { - auto ev = env.Send( + auto ev = env.Send(env.GetYdbProxy(), new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", NYdb::NTopic::TCreateTopicSettings() .BeginAddConsumer() diff --git a/ydb/core/tx/replication/ut_helpers/test_env.h b/ydb/core/tx/replication/ut_helpers/test_env.h index 84d17737558b..5a4a1b72c14a 100644 --- a/ydb/core/tx/replication/ut_helpers/test_env.h +++ b/ydb/core/tx/replication/ut_helpers/test_env.h @@ -8,7 +8,7 @@ #include -namespace NKikimr::NReplication { +namespace NKikimr::NReplication::NTestHelpers { template class TEnv { @@ -136,11 +136,6 @@ class TEnv { return Server.GetRuntime()->GrabEdgeEvent(Sender); } - template - auto Send(IEventBase* ev) { - return Send(YdbProxy, ev); - } - auto& GetRuntime() { return *Server.GetRuntime(); } diff --git a/ydb/core/tx/replication/ut_helpers/test_table.cpp b/ydb/core/tx/replication/ut_helpers/test_table.cpp index 1fe8d388e59d..1b762c25274c 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.cpp +++ b/ydb/core/tx/replication/ut_helpers/test_table.cpp @@ -2,7 +2,7 @@ #include -namespace NKikimr::NReplication { +namespace NKikimr::NReplication::NTestHelpers { void TTestTableDescription::TColumn::SerializeTo(NKikimrSchemeOp::TColumnDescription& proto) const { proto.SetName(Name); diff --git a/ydb/core/tx/replication/ut_helpers/test_table.h b/ydb/core/tx/replication/ut_helpers/test_table.h index c730c51df1b1..0314bc1a2be4 100644 --- a/ydb/core/tx/replication/ut_helpers/test_table.h +++ b/ydb/core/tx/replication/ut_helpers/test_table.h @@ -9,7 +9,7 @@ namespace NKikimrSchemeOp { class TTableDescription; } -namespace NKikimr::NReplication { +namespace NKikimr::NReplication::NTestHelpers { struct TTestTableDescription { struct TColumn { diff --git a/ydb/core/tx/replication/ut_helpers/write_topic.h b/ydb/core/tx/replication/ut_helpers/write_topic.h index 3fc8955d7b77..8d384a8d653a 100644 --- a/ydb/core/tx/replication/ut_helpers/write_topic.h +++ b/ydb/core/tx/replication/ut_helpers/write_topic.h @@ -1,6 +1,6 @@ #include -namespace NKikimr::NReplication { +namespace NKikimr::NReplication::NTestHelpers { template bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) { diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index 6b5a749ef4ae..c7e7aad10384 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -11,6 +11,20 @@ namespace NKikimr::NReplication { Y_UNIT_TEST_SUITE(YdbProxyTests) { + template + class TEnv: public NTestHelpers::TEnv { + using TBase = NTestHelpers::TEnv; + + public: + using TBase::TBase; + using TBase::Send; + + template + auto Send(IEventBase* ev) { + return TBase::template Send(this->GetYdbProxy(), ev); + } + }; + Y_UNIT_TEST(MakeDirectory) { TEnv env; // ok @@ -131,7 +145,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { } Y_UNIT_TEST(StaticCreds) { - TEnv env("user1", "password1"); + TEnv env("user1", "password1"); // make dir { auto ev = env.Send( @@ -623,7 +637,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { TActorId reader = CreateTopicReader(env, "/Root/topic"); - UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-0")); + UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-0")); { auto data = ReadTopicData(env, reader, "/Root/topic"); UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1); @@ -651,7 +665,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { env.SendAsync(reader, new TEvents::TEvPoison()); } - UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1")); + UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-1")); { auto data = ReadTopicData(env, newReader, "/Root/topic"); UNIT_ASSERT(data.Messages.size() >= 1); @@ -669,7 +683,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) { TEnv env; auto reader = CreateTopicReader(env, "/Root/topic"); - auto ev = env.template Send(reader, new TEvYdbProxy::TEvReadTopicRequest()); + auto ev = env.Send(reader, new TEvYdbProxy::TEvReadTopicRequest()); UNIT_ASSERT(ev); UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);