Skip to content

Commit

Permalink
Custom TEvWorker::TEvGone with status KIKIMR-21006 (ydb-platform#1946)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 6, 2024
1 parent 05172e7 commit 9e4128e
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 25 deletions.
6 changes: 3 additions & 3 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class TLocalTableWriter

void LogCritAndLeave(const TString& error) {
LOG_C(error);
Leave();
Leave(TEvWorker::TEvGone::SCHEME_ERROR);
}

void LogWarnAndRetry(const TString& error) {
Expand Down Expand Up @@ -458,8 +458,8 @@ class TLocalTableWriter
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
}

void Leave() {
Send(Worker, new TEvents::TEvGone());
void Leave(TEvWorker::TEvGone::EStatus status) {
Send(Worker, new TEvWorker::TEvGone(status));
PassAway();
}

Expand Down
18 changes: 14 additions & 4 deletions ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,23 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {

if (!ev->Get()->Result.IsSuccess()) {
LOG_N("Unsuccessful commit offset");
Leave();
Leave(TEvWorker::TEvGone::UNAVAILABLE);
}
}

void Leave() {
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
switch (ev->Get()->Result.GetStatus()) {
case NYdb::EStatus::SCHEME_ERROR:
return Leave(TEvWorker::TEvGone::SCHEME_ERROR);
default:
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
}
}

void Leave(TEvWorker::TEvGone::EStatus status) {
LOG_I("Leave");
Send(Worker, new TEvents::TEvGone());
Send(Worker, new TEvWorker::TEvGone(status));
PassAway();
}

Expand Down Expand Up @@ -124,7 +134,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
sFunc(TEvents::TEvGone, Leave);
hFunc(TEvYdbProxy::TEvTopicReaderGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/replication/service/topic_reader_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvHandshake, TEvents::TEvGone>(handle);
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvHandshake, TEvWorker::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}

if (auto* ev = std::get<TEvWorker::TEvHandshake*>(result)) {
return reader;
} else if (std::get<TEvents::TEvGone*>(result)) {
} else if (std::get<TEvWorker::TEvGone*>(result)) {
reader = env.GetRuntime().Register(CreateRemoteTopicReader(env.GetYdbProxy(), settings));
env.SendAsync(reader, new TEvWorker::TEvHandshake());
continue;
Expand All @@ -42,14 +42,14 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvData, TEvents::TEvGone>(handle);
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvWorker::TEvData, TEvWorker::TEvGone>(handle);
if (handle->Sender != reader) {
continue;
}

if (auto* ev = std::get<TEvWorker::TEvData*>(result)) {
return ev->Records;
} else if (std::get<TEvents::TEvGone*>(result)) {
} else if (std::get<TEvWorker::TEvGone*>(result)) {
reader = CreateReader(env, settings);
env.SendAsync(reader, new TEvWorker::TEvPoll());
continue;
Expand Down
18 changes: 14 additions & 4 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ TString TEvWorker::TEvData::ToString() const {
<< " }";
}

TEvWorker::TEvGone::TEvGone(EStatus status)
: Status(status)
{
}

TString TEvWorker::TEvGone::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Status: " << Status
<< " }";
}

class TWorker: public TActorBootstrapped<TWorker> {
struct TActorInfo {
THolder<IActor> Actor;
Expand Down Expand Up @@ -131,15 +142,14 @@ class TWorker: public TActorBootstrapped<TWorker> {
Send(ev->Forward(Writer));
}

void Handle(TEvents::TEvGone::TPtr& ev) {
void Handle(TEvWorker::TEvGone::TPtr& ev) {
// TODO: handle status
if (ev->Sender == Reader) {
LOG_I("Reader has gone"
<< ": sender# " << ev->Sender);
// TODO: handle
} else if (ev->Sender == Writer) {
LOG_I("Writer has gone"
<< ": sender# " << ev->Sender);
// TODO: handle
} else {
LOG_W("Unknown actor has gone"
<< ": sender# " << ev->Sender);
Expand Down Expand Up @@ -179,7 +189,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvPoll, Handle);
hFunc(TEvWorker::TEvData, Handle);
hFunc(TEvents::TEvGone, Handle);
hFunc(TEvWorker::TEvGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct TEvWorker {
EvHandshake,
EvPoll,
EvData,
EvGone,

EvEnd,
};
Expand All @@ -38,6 +39,18 @@ struct TEvWorker {
explicit TEvData(TVector<TRecord>&& records);
TString ToString() const override;
};

struct TEvGone: public TEventLocal<TEvGone, EvGone> {
enum EStatus {
SCHEME_ERROR,
UNAVAILABLE,
};

EStatus Status;

explicit TEvGone(EStatus status);
TString ToString() const override;
};
};

IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ SRCS(
worker.cpp
)

GENERATE_ENUM_SERIALIZATION(worker.h)

YQL_LAST_ABI_VERSION()

END()
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,18 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TPartitionSessionStatusEvent>(&*event)) {
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
} else if (std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
return Leave(ev->Get()->Sender);
} else if (std::get_if<TSessionClosedEvent>(&*event)) {
return Leave(ev->Get()->Sender);
} else if (auto* x = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
auto status = TStatus(EStatus::UNAVAILABLE, NYql::TIssues{NYql::TIssue(x->DebugString())});
return Leave(ev->Get()->Sender, std::move(status));
} else if (auto* x = std::get_if<TSessionClosedEvent>(&*event)) {
return Leave(ev->Get()->Sender, std::move(*x));
} else {
Y_ABORT("Unexpected event");
}
}

void Leave(const TActorId& client) {
Send(client, new TEvents::TEvGone());
void Leave(const TActorId& client, TStatus&& status) {
Send(client, new TEvYdbProxy::TEvTopicReaderGone(std::move(status)));
PassAway();
}

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct TEvYdbProxy {
EV_REQUEST_RESPONSE(ModifyPermissions),

EvTable = EvBegin + 1 * 100,
EV_REQUEST_RESPONSE(CreateSession),
EvCreateSessionResponse,
EV_REQUEST_RESPONSE(CreateTable),
EV_REQUEST_RESPONSE(DropTable),
EV_REQUEST_RESPONSE(AlterTable),
Expand All @@ -48,6 +48,7 @@ struct TEvYdbProxy {
EV_REQUEST_RESPONSE(DescribeTopic),
EV_REQUEST_RESPONSE(DescribeConsumer),
EV_REQUEST_RESPONSE(CreateTopicReader),
EvTopicReaderGone,
EV_REQUEST_RESPONSE(ReadTopic),
EV_REQUEST_RESPONSE(CommitOffset),

Expand Down Expand Up @@ -119,6 +120,10 @@ struct TEvYdbProxy {
using TBase = TGenericResponse<TDerived, EventType, T>;
};

struct TEvTopicReaderGone: public TGenericResponse<TEvTopicReaderGone, EvTopicReaderGone, NYdb::TStatus> {
using TBase::TBase;
};

struct TReadTopicResult {
class TMessage {
using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent;
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,14 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {

while (true) {
TAutoPtr<IEventHandle> handle;
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvYdbProxy::TEvReadTopicResponse, TEvents::TEvGone>(handle);
auto result = env.GetRuntime().template GrabEdgeEventsRethrow<TEvYdbProxy::TEvReadTopicResponse, TEvYdbProxy::TEvTopicReaderGone>(handle);
if (handle->Recipient != env.GetSender()) {
continue;
}

if (auto* ev = std::get<TEvYdbProxy::TEvReadTopicResponse*>(result)) {
return ev->Result;
} else if (std::get<TEvents::TEvGone*>(result)) {
} else if (std::get<TEvYdbProxy::TEvTopicReaderGone*>(result)) {
reader = CreateTopicReader(env, topicPath);
env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest());
} else {
Expand Down Expand Up @@ -645,7 +645,7 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {

// wait event from previous session
try {
auto ev = env.GetRuntime().GrabEdgeEventRethrow<TEvents::TEvGone>(env.GetSender());
auto ev = env.GetRuntime().GrabEdgeEventRethrow<TEvYdbProxy::TEvTopicReaderGone>(env.GetSender());
if (ev->Sender != reader) {
ythrow yexception();
}
Expand All @@ -670,6 +670,16 @@ Y_UNIT_TEST_SUITE(YdbProxyTests) {
}
}

Y_UNIT_TEST(ReadNonExistentTopic) {
TEnv env;

auto reader = CreateTopicReader(env, "/Root/topic");
auto ev = env.template Send<TEvYdbProxy::TEvTopicReaderGone>(reader, new TEvYdbProxy::TEvReadTopicRequest());

UNIT_ASSERT(ev);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR);
}

} // YdbProxyTests

}

0 comments on commit 9e4128e

Please sign in to comment.