From 05172e7de6688d0a3e241714a51707c33369898c Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 13 Feb 2024 16:32:11 +0300 Subject: [PATCH] Logging & tagging KIKIMR-21006 (#1881) --- .../tx/replication/service/table_writer.cpp | 12 +++++ .../tx/replication/service/topic_reader.cpp | 10 ++++- ydb/core/tx/replication/service/worker.cpp | 45 ++++++++++++++++--- ydb/library/services/services.proto | 3 ++ 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 05a3a4003e16..6e8373e7e9ba 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -9,10 +9,14 @@ #include #include #include +#include #include #include +#include #include +#include +#include namespace NKikimr::NReplication::NService { @@ -128,6 +132,10 @@ class TTablePartitionWriter: public TActorBootstrapped { } public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_TABLE_PARTITION_WRITER; + } + explicit TTablePartitionWriter(const TActorId& parent, ui64 tabletId, const TPathId& tablePathId) : Parent(parent) , TabletId(tabletId) @@ -461,6 +469,10 @@ class TLocalTableWriter } public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER; + } + explicit TLocalTableWriter(const TPathId& tablePathId) : TActor(&TThis::StateWork) , TBaseChangeSender(this, this, tablePathId) diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 2c1d088fe702..13d5b8d94d1f 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -5,6 +5,10 @@ #include #include #include +#include + +#include +#include namespace NKikimr::NReplication::NService { @@ -64,7 +68,7 @@ class TRemoteTopicReader: public TActor { LOG_D("Handle " << ev->Get()->ToString()); auto& result = ev->Get()->Result; - TVector records(Reserve(result.Messages.size())); + TVector records(::Reserve(result.Messages.size())); for (auto& msg : result.Messages) { Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); @@ -100,6 +104,10 @@ class TRemoteTopicReader: public TActor { } public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_REMOTE_TOPIC_READER; + } + explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts) : TActor(&TThis::StateWork) , YdbProxy(ydbProxy) diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 453307046d01..b63852d64d3a 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -1,9 +1,11 @@ +#include "logging.h" #include "worker.h" #include #include #include +#include #include #include @@ -60,6 +62,16 @@ class TWorker: public TActorBootstrapped { } }; + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[Worker]" + << SelfId() << " "; + } + + return LogPrefix.GetRef(); + } + TActorId RegisterActor(TActorInfo& info) { Y_ABORT_UNLESS(info.Actor); info.ActorId = RegisterWithSameMailbox(info.Actor.Release()); @@ -73,22 +85,34 @@ class TWorker: public TActorBootstrapped { } void Handle(TEvWorker::TEvHandshake::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + if (ev->Sender == Reader) { + LOG_I("Handshake with reader" + << ": sender# " << ev->Sender); Reader.InitDone = true; } else if (ev->Sender == Writer) { + LOG_I("Handshake with writer" + << ": sender# " << ev->Sender); Writer.InitDone = true; } else { - // TODO: log warn + LOG_W("Handshake from unknown actor" + << ": sender# " << ev->Sender); + return; } if (Reader && Writer) { + LOG_N("Start working"); Send(Reader, new TEvWorker::TEvPoll()); } } void Handle(TEvWorker::TEvPoll::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + if (ev->Sender != Writer) { - // TODO: log warn + LOG_W("Poll from unknown actor" + << ": sender# " << ev->Sender); return; } @@ -96,8 +120,11 @@ class TWorker: public TActorBootstrapped { } void Handle(TEvWorker::TEvData::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + if (ev->Sender != Reader) { - // TODO: log warn + LOG_W("Data from unknown actor" + << ": sender# " << ev->Sender); return; } @@ -106,11 +133,16 @@ class TWorker: public TActorBootstrapped { void Handle(TEvents::TEvGone::TPtr& ev) { if (ev->Sender == Reader) { - // TODO + LOG_I("Reader has gone" + << ": sender# " << ev->Sender); + // TODO: handle } else if (ev->Sender == Writer) { - // TODO + LOG_I("Writer has gone" + << ": sender# " << ev->Sender); + // TODO: handle } else { - // TODO: log warn + LOG_W("Unknown actor has gone" + << ": sender# " << ev->Sender); } } @@ -153,6 +185,7 @@ class TWorker: public TActorBootstrapped { } private: + mutable TMaybe LogPrefix; TActorInfo Reader; TActorInfo Writer; }; diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 92c8eccd3357..afd9378750f3 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1018,5 +1018,8 @@ message TActivity { GRAPH_SERVICE = 624; REPLICATION_WORKER = 625; SCHEMESHARD_BACKGROUND_CLEANING = 626; + REPLICATION_REMOTE_TOPIC_READER = 628; + REPLICATION_LOCAL_TABLE_WRITER = 629; + REPLICATION_TABLE_PARTITION_WRITER = 630; }; };