Skip to content

Commit

Permalink
Logging & tagging KIKIMR-21006 (ydb-platform#1881)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 6, 2024
1 parent 6da4b1e commit 05172e7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 7 deletions.
12 changes: 12 additions & 0 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

#include <util/generic/map.h>
#include <util/generic/maybe.h>
#include <util/string/builder.h>

namespace NKikimr::NReplication::NService {

Expand Down Expand Up @@ -128,6 +132,10 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_TABLE_PARTITION_WRITER;
}

explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TPathId& tablePathId)
: Parent(parent)
, TabletId(tabletId)
Expand Down Expand Up @@ -461,6 +469,10 @@ class TLocalTableWriter
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER;
}

explicit TLocalTableWriter(const TPathId& tablePathId)
: TActor(&TThis::StateWork)
, TBaseChangeSender(this, this, tablePathId)
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

#include <util/generic/maybe.h>
#include <util/string/builder.h>

namespace NKikimr::NReplication::NService {

Expand Down Expand Up @@ -64,7 +68,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
LOG_D("Handle " << ev->Get()->ToString());

auto& result = ev->Get()->Result;
TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size()));
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(result.Messages.size()));

for (auto& msg : result.Messages) {
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
Expand Down Expand Up @@ -100,6 +104,10 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_REMOTE_TOPIC_READER;
}

explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts)
: TActor(&TThis::StateWork)
, YdbProxy(ydbProxy)
Expand Down
45 changes: 39 additions & 6 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#include "logging.h"
#include "worker.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

#include <util/generic/maybe.h>
#include <util/string/builder.h>
#include <util/string/join.h>

Expand Down Expand Up @@ -60,6 +62,16 @@ class TWorker: public TActorBootstrapped<TWorker> {
}
};

TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
<< "[Worker]"
<< SelfId() << " ";
}

return LogPrefix.GetRef();
}

TActorId RegisterActor(TActorInfo& info) {
Y_ABORT_UNLESS(info.Actor);
info.ActorId = RegisterWithSameMailbox(info.Actor.Release());
Expand All @@ -73,31 +85,46 @@ class TWorker: public TActorBootstrapped<TWorker> {
}

void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

if (ev->Sender == Reader) {
LOG_I("Handshake with reader"
<< ": sender# " << ev->Sender);
Reader.InitDone = true;
} else if (ev->Sender == Writer) {
LOG_I("Handshake with writer"
<< ": sender# " << ev->Sender);
Writer.InitDone = true;
} else {
// TODO: log warn
LOG_W("Handshake from unknown actor"
<< ": sender# " << ev->Sender);
return;
}

if (Reader && Writer) {
LOG_N("Start working");
Send(Reader, new TEvWorker::TEvPoll());
}
}

void Handle(TEvWorker::TEvPoll::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

if (ev->Sender != Writer) {
// TODO: log warn
LOG_W("Poll from unknown actor"
<< ": sender# " << ev->Sender);
return;
}

Send(ev->Forward(Reader));
}

void Handle(TEvWorker::TEvData::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());

if (ev->Sender != Reader) {
// TODO: log warn
LOG_W("Data from unknown actor"
<< ": sender# " << ev->Sender);
return;
}

Expand All @@ -106,11 +133,16 @@ class TWorker: public TActorBootstrapped<TWorker> {

void Handle(TEvents::TEvGone::TPtr& ev) {
if (ev->Sender == Reader) {
// TODO
LOG_I("Reader has gone"
<< ": sender# " << ev->Sender);
// TODO: handle
} else if (ev->Sender == Writer) {
// TODO
LOG_I("Writer has gone"
<< ": sender# " << ev->Sender);
// TODO: handle
} else {
// TODO: log warn
LOG_W("Unknown actor has gone"
<< ": sender# " << ev->Sender);
}
}

Expand Down Expand Up @@ -153,6 +185,7 @@ class TWorker: public TActorBootstrapped<TWorker> {
}

private:
mutable TMaybe<TString> LogPrefix;
TActorInfo Reader;
TActorInfo Writer;
};
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1018,5 +1018,8 @@ message TActivity {
GRAPH_SERVICE = 624;
REPLICATION_WORKER = 625;
SCHEMESHARD_BACKGROUND_CLEANING = 626;
REPLICATION_REMOTE_TOPIC_READER = 628;
REPLICATION_LOCAL_TABLE_WRITER = 629;
REPLICATION_TABLE_PARTITION_WRITER = 630;
};
};

0 comments on commit 05172e7

Please sign in to comment.