Skip to content

Commit

Permalink
Cosmetic changes KIKIMR-21006 (ydb-platform#2120)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 10, 2024
1 parent dc01fc8 commit aca7eef
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 45 deletions.
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/service/table_writer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(LocalTableWriter) {
using namespace NTestHelpers;

Y_UNIT_TEST(WriteTable) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {

void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

switch (ev->Get()->Result.GetStatus()) {
case NYdb::EStatus::SCHEME_ERROR:
return Leave(TEvWorker::TEvGone::SCHEME_ERROR);
Expand Down
56 changes: 26 additions & 30 deletions ydb/core/tx/replication/service/topic_reader_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,46 @@
namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(RemoteTopicReader) {
using namespace NTestHelpers;

template <typename Env>
TActorId CreateReader(Env& env, const TEvYdbProxy::TTopicReaderSettings& settings) {
auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());
do {
auto reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvHandshake, TEvWorker::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}
TAutoPtr<IEventHandle> ev;
do {
env.GetRuntime().template GrabEdgeEvents<TEvWorker::TEvHandshake, TEvWorker::TEvGone>(ev);
} while (ev->Sender != reader);

if (auto* ev = std::get<TEvWorker::TEvHandshake*>(result)) {
switch (ev->GetTypeRewrite()) {
case TEvWorker::EvHandshake:
return reader;
} else if (std::get<TEvWorker::TEvGone*>(result)) {
reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());
case TEvWorker::EvGone:
continue;
} else {
UNIT_ASSERT("Unexpected event");
}
}
} while (true);
}

template <typename Env>
auto ReadData(Env& env, TActorId& reader, const TEvYdbProxy::TTopicReaderSettings& settings) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());
do {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvData, TEvWorker::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}
TAutoPtr<IEventHandle> ev;
do {
env.GetRuntime().template GrabEdgeEvents<TEvWorker::TEvData, TEvWorker::TEvGone>(ev);
} while (ev->Sender != reader);

if (auto* ev = std::get<TEvWorker::TEvData*>(result)) {
return ev->Records;
} else if (std::get<TEvWorker::TEvGone*>(result)) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());
switch (ev->GetTypeRewrite()) {
case TEvWorker::EvData:
return ev->Get<TEvWorker::TEvData>()->Records;
case TEvWorker::EvGone:
continue;
}
}
} while (true);
}

Y_UNIT_TEST(ReadTopic) {
Expand All @@ -68,7 +64,7 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
.ConsumerName("consumer")
.EndAddConsumer();

auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(env.GetYdbProxy(),
new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic", settings));
UNIT_ASSERT(ev);
UNIT_ASSERT(ev->Get()->Result.IsSuccess());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
out << "{"
<< " Offset: " << Offset
<< " Data: " << Data
<< " Data: " << Data.size() << "b"
<< " }";
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/replication/service/worker_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
namespace NKikimr::NReplication::NService {

Y_UNIT_TEST_SUITE(Worker) {
using namespace NTestHelpers;

Y_UNIT_TEST(Basic) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG);

{
auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(
auto ev = env.Send<TEvYdbProxy::TEvCreateTopicResponse>(env.GetYdbProxy(),
new TEvYdbProxy::TEvCreateTopicRequest("/Root/topic",
NYdb::NTopic::TCreateTopicSettings()
.BeginAddConsumer()
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tx/replication/ut_helpers/test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NReplication {
namespace NKikimr::NReplication::NTestHelpers {

template <bool UseDatabase = true>
class TEnv {
Expand Down Expand Up @@ -136,11 +136,6 @@ class TEnv {
return Server.GetRuntime()->GrabEdgeEvent<TEvResponse>(Sender);
}

template <typename TEvResponse>
auto Send(IEventBase* ev) {
return Send<TEvResponse>(YdbProxy, ev);
}

auto& GetRuntime() {
return *Server.GetRuntime();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/ut_helpers/test_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <ydb/core/protos/flat_scheme_op.pb.h>

namespace NKikimr::NReplication {
namespace NKikimr::NReplication::NTestHelpers {

void TTestTableDescription::TColumn::SerializeTo(NKikimrSchemeOp::TColumnDescription& proto) const {
proto.SetName(Name);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/ut_helpers/test_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NKikimrSchemeOp {
class TTableDescription;
}

namespace NKikimr::NReplication {
namespace NKikimr::NReplication::NTestHelpers {

struct TTestTableDescription {
struct TColumn {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/ut_helpers/write_topic.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

namespace NKikimr::NReplication {
namespace NKikimr::NReplication::NTestHelpers {

template <typename Env>
bool WriteTopic(const Env& env, const TString& topicPath, const TString& data) {
Expand Down
22 changes: 18 additions & 4 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@
namespace NKikimr::NReplication {

Y_UNIT_TEST_SUITE(YdbProxyTests) {
template <bool UseDatabase = true>
class TEnv: public NTestHelpers::TEnv<UseDatabase> {
using TBase = NTestHelpers::TEnv<UseDatabase>;

public:
using TBase::TBase;
using TBase::Send;

template <typename TEvResponse>
auto Send(IEventBase* ev) {
return TBase::template Send<TEvResponse>(this->GetYdbProxy(), ev);
}
};

Y_UNIT_TEST(MakeDirectory) {
TEnv env;
// ok
Expand Down Expand Up @@ -131,7 +145,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}

Y_UNIT_TEST(StaticCreds) {
TEnv env("user1", "password1");
TEnv<true> env("user1", "password1");
// make dir
{
auto ev = env.Send<TEvYdbProxy::TEvMakeDirectoryResponse>(
Expand Down Expand Up @@ -623,7 +637,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {

TActorId reader = CreateTopicReader(env, "/Root/topic");

UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-0"));
UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-0"));
{
auto data = ReadTopicData(env, reader, "/Root/topic");
UNIT_ASSERT_VALUES_EQUAL(data.Messages.size(), 1);
Expand Down Expand Up @@ -651,7 +665,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
env.SendAsync(reader, new TEvents::TEvPoison());
}

UNIT_ASSERT(WriteTopic(env, "/Root/topic", "message-1"));
UNIT_ASSERT(NTestHelpers::WriteTopic(env, "/Root/topic", "message-1"));
{
auto data = ReadTopicData(env, newReader, "/Root/topic");
UNIT_ASSERT(data.Messages.size() >= 1);
Expand All @@ -669,7 +683,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
TEnv env;

auto reader = CreateTopicReader(env, "/Root/topic");
auto ev = env.template Send<TEvYdbProxy::TEvTopicReaderGone>(reader, new TEvYdbProxy::TEvReadTopicRequest());
auto ev = env.Send<TEvYdbProxy::TEvTopicReaderGone>(reader, new TEvYdbProxy::TEvReadTopicRequest());

UNIT_ASSERT(ev);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
Expand Down

0 comments on commit aca7eef

Please sign in to comment.