From 4a3f12b6002a8c925f3472df97fc798ea43b154f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 27 Feb 2024 08:40:29 +0000 Subject: [PATCH 01/15] intermediate refactoring --- ydb/core/persqueue/partition.cpp | 2 +- ydb/core/persqueue/partition.h | 2 +- ydb/core/persqueue/partition_read.cpp | 10 ++++---- ydb/core/persqueue/read_balancer.cpp | 26 ++++++++----------- ydb/core/persqueue/utils.cpp | 36 +++++++++++++++++++++++++++ ydb/core/persqueue/utils.h | 2 ++ ydb/core/protos/pqconfig.proto | 28 +++++++++++++++------ 7 files changed, 76 insertions(+), 30 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 1162ff585eb7..8461b5f408e5 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -544,7 +544,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { InitDone = true; TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); - FillReadFromTimestamps(Config, ctx); + FillReadFromTimestamps(ctx); ResendPendingEvents(ctx); ProcessTxsAndUserActs(ctx); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index cb8f2ea872d2..d5772b51a6bf 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -117,7 +117,7 @@ class TPartition : public TActorBootstrapped { void CreateMirrorerActor(); void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx); void FailBadClient(const TActorContext& ctx); - void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); + void FillReadFromTimestamps(const TActorContext& ctx); void FilterDeadlinedWrites(const TActorContext& ctx); void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 3cf09386dfe1..87902262655f 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -33,7 +33,7 @@ void TPartition::SendReadingFinished(const TString& consumer) { Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId)); } -void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { +void TPartition::FillReadFromTimestamps(const TActorContext& ctx) { TSet hasReadRule; for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { @@ -41,11 +41,11 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config userInfo.HasReadRule = false; hasReadRule.insert(consumer); } - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - const auto& consumer = config.GetReadRules(i); + for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { + const auto& consumer = Config.GetReadRules(i); auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0); userInfo.HasReadRule = true; - ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; + ui64 rrGen = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0; if (userInfo.ReadRuleGeneration != rrGen) { THolder event = MakeHolder( 0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen @@ -62,7 +62,7 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config userInfo.Step = userInfo.Generation = 0; } hasReadRule.erase(consumer); - TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + TInstant ts = i < Config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(Config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); if (!ts) ts += TDuration::MilliSeconds(1); if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) userInfo.ReadFromTimestamp = ts; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 70c235487c4f..06b1ade91f34 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -66,16 +66,12 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA if (!config.empty()) { bool res = Self->TabletConfig.ParseFromString(config); Y_ABORT_UNLESS(res); + + Migrate(Self->TabletConfig); Self->Consumers.clear(); - if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) { - for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) { - Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i); - } - } else { - for (const auto& rr : Self->TabletConfig.GetReadRules()) { - Self->Consumers[rr].ScalingSupport = DefaultScalingSupport(); - } + for (auto& consumer : Self->TabletConfig.GetConsumers()) { + Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport(); } Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig); @@ -536,6 +532,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Path = record.GetPath(); TxId = record.GetTxId(); TabletConfig = record.GetTabletConfig(); + Migrate(TabletConfig); + SchemeShardId = record.GetSchemeShardId(); TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0; ui32 prevNextPartitionId = NextPartitionId; @@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr auto oldConsumers = std::move(Consumers); Consumers.clear(); - for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) { - auto& rr = TabletConfig.GetReadRules(i); + for (auto& consumer : TabletConfig.GetConsumers()) { + auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport(); - auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i) - : DefaultScalingSupport(); - auto it = oldConsumers.find(rr); + auto it = oldConsumers.find(consumer.GetName()); if (it != oldConsumers.end()) { - auto& c = Consumers[rr] = std::move(it->second); + auto& c = Consumers[consumer.GetName()] = std::move(it->second); c.ScalingSupport = scalingSupport; } else { - Consumers[rr].ScalingSupport = scalingSupport; + Consumers[consumer.GetName()].ScalingSupport = scalingSupport; } } diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 6008d848e232..ffe125f16c1d 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -45,6 +45,42 @@ ui64 PutUnitsSize(const ui64 size) { return putUnitsCount; } +void Migrate(NKikimrPQ::TPQTabletConfig& config) { + if (!config.ConsumersSize()) { + for(size_t i = 0; i < config.ReadRulesSize(); ++i) { + auto* consumer = config.AddConsumers(); + + consumer->SetName(config.GetReadRules(i)); + if (i < config.ReadFromTimestampsMsSize()) { + consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); + } + if (i < config.ConsumerFormatVersionsSize()) { + consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); + } + if (i < config.ConsumerCodecsSize()) { + auto& src = config.GetConsumerCodecs(i); + auto* dst = consumer->MutableCodec(); + + for (auto value : src.GetIds()) { + dst->AddIds(value); + } + for (auto& value : src.GetCodecs()) { + dst->AddCodecs(value); + } + } + if (i < config.ReadRuleServiceTypesSize()) { + consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); + } + if (i < config.ReadRuleVersionsSize()) { + consumer->SetVersion(config.GetReadRuleVersions(i)); + } + if (i < config.ReadRuleGenerationsSize()) { + consumer->SetGeneration(config.GetReadRuleGenerations(i)); + } + } + } +} + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { for(const auto& p : config.GetPartitions()) { if (partitionId == p.GetPartitionId()) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index df6fe683a907..71ea15791ed3 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -14,6 +14,8 @@ ui64 PutUnitsSize(const ui64 size); TString SourceIdHash(const TString& sourceId); +void Migrate(NKikimrPQ::TPQTabletConfig& config); + const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); // The graph of split-merge operations. diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 5817c6c93fa3..34e88ff20452 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -307,22 +307,34 @@ message TPQTabletConfig { optional string DC = 12; // ReadRules, ReadTopicTimestampMs, ReadRuleVersions, ConsumerFormatVersions, ConsumersCodecs and ConsumerScalingSupport form a consumer data array stored by columns - repeated string ReadRules = 13; - repeated uint64 ReadFromTimestampsMs = 14; - repeated uint64 ConsumerFormatVersions = 15; + repeated string ReadRules = 13; // Deprecated. Use Consumers.Name + repeated uint64 ReadFromTimestampsMs = 14; // Deprecated. Use Consumers.ReadFromTimestampsMs + repeated uint64 ConsumerFormatVersions = 15; // Deprecated. Use Consumers.FormatVersion message TCodecs { repeated int64 Ids = 1; repeated string Codecs = 2; } - repeated TCodecs ConsumerCodecs = 16; - repeated EConsumerScalingSupport ConsumerScalingSupport = 37; - repeated string ReadRuleServiceTypes = 17; + repeated TCodecs ConsumerCodecs = 16; // Deprecated. Use Consumers.Codec + repeated string ReadRuleServiceTypes = 17; // Deprecated. Use Consumers.ServiceType optional uint64 FormatVersion = 20; optional TCodecs Codecs = 21; - repeated uint64 ReadRuleVersions = 22; - repeated uint64 ReadRuleGenerations = 32; + repeated uint64 ReadRuleVersions = 22; // Deprecated. Use Consumers.Version + repeated uint64 ReadRuleGenerations = 32; // Deprecated. Use Consumers.Generation + + message TConsumer { + optional string Name = 1; + optional uint64 ReadFromTimestampsMs = 2; + optional uint64 FormatVersion = 3; + optional TCodecs Codec = 4; + optional string ServiceType = 5; + optional EConsumerScalingSupport ScalingSupport = 6; + optional uint64 Version = 7; + optional uint64 Generation = 8; + } + + repeated TConsumer Consumers = 37; optional string TopicPath = 23; From c2f10dc75a65acdf245f29f81bc7022e6e8e60b2 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 27 Feb 2024 14:18:41 +0000 Subject: [PATCH 02/15] intermediate --- ydb/core/persqueue/partition.cpp | 15 +++++++-------- ydb/core/persqueue/partition_init.cpp | 3 +++ ydb/core/persqueue/partition_read.cpp | 17 +++++++++-------- ydb/core/persqueue/pq_impl.cpp | 6 ++++++ ydb/core/persqueue/transaction.cpp | 4 ++++ ydb/core/persqueue/ut/common/pq_ut_common.cpp | 8 +++++--- ydb/core/persqueue/ut/common/pq_ut_common.h | 2 +- ydb/core/persqueue/ut/make_config.cpp | 4 ++++ ydb/core/persqueue/ut/pq_ut.cpp | 1 - ydb/core/persqueue/utils.cpp | 9 ++------- ydb/core/protos/pqconfig.proto | 8 ++++---- 11 files changed, 45 insertions(+), 32 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 8461b5f408e5..6dae4beddd07 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1777,25 +1777,24 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co important.insert(importantUser); } - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - const auto& consumer = config.GetReadRules(i); - auto& userInfo = GetOrCreatePendingUser(consumer, 0); + for (auto& consumer : config.GetConsumers()) { + auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0); - TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); if (!ts) { ts += TDuration::MilliSeconds(1); } userInfo.ReadFromTimestamp = ts; - userInfo.Important = important.contains(consumer); + userInfo.Important = important.contains(consumer.GetName()); - ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; + ui64 rrGen = consumer.GetGeneration(); if (userInfo.ReadRuleGeneration != rrGen) { - TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{}, + TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); ProcessUserAct(act, ctx); } - hasReadRule.erase(consumer); + hasReadRule.erase(consumer.GetName()); } for (auto& consumer : hasReadRule) { diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 76e431156940..dbaa1092fe70 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon switch (response.GetStatus()) { case NKikimrProto::OK: Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue())); + + Migrate(Partition()->Config); + if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) { auto event = MakeHolder(Partition()->TopicConverter, Partition()->TabletConfig); diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 87902262655f..188586c095d8 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -41,14 +41,14 @@ void TPartition::FillReadFromTimestamps(const TActorContext& ctx) { userInfo.HasReadRule = false; hasReadRule.insert(consumer); } - for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { - const auto& consumer = Config.GetReadRules(i); - auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0); + + for (auto& consumer : Config.GetConsumers()) { + auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0); userInfo.HasReadRule = true; - ui64 rrGen = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0; - if (userInfo.ReadRuleGeneration != rrGen) { + + if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) { THolder event = MakeHolder( - 0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen + 0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration() ); // // TODO(abcdef): заменить на вызов ProcessUserAct @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const TActorContext& ctx) { } userInfo.Step = userInfo.Generation = 0; } - hasReadRule.erase(consumer); - TInstant ts = i < Config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(Config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + hasReadRule.erase(consumer.GetName()); + TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); if (!ts) ts += TDuration::MilliSeconds(1); if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) userInfo.ReadFromTimestamp = ts; } + for (auto& consumer : hasReadRule) { auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx); if (userInfo.NoConsumer) { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 9e989af4cb80..5ee4061c02dc 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -4,6 +4,8 @@ #include "partition_log.h" #include "partition.h" #include "read.h" +#include "utils.h" + #include #include #include @@ -998,6 +1000,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& bool res = Config.ParseFromString(read.GetValue()); Y_ABORT_UNLESS(res); + Migrate(Config); + if (!Config.PartitionsSize()) { for (const auto partitionId : Config.GetPartitionIds()) { Config.AddPartitions()->SetPartitionId(partitionId); @@ -1506,6 +1510,8 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr #include #include +#include #include #include #include @@ -49,7 +50,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, } request->Record.MutableTabletConfig()->SetCacheSize(10_MB); request->Record.SetTxId(12345); - auto tabletConfig = request->Record.MutableTabletConfig(); + auto* tabletConfig = request->Record.MutableTabletConfig(); if (runtime.GetAppData().PQConfig.GetTopicsAreFirstClassCitizen()) { tabletConfig->SetTopicName("topic"); tabletConfig->SetTopicPath(runtime.GetAppData().PQConfig.GetDatabase() + "/topic"); @@ -93,6 +94,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, if (u.first != "user") tabletConfig->AddReadRules(u.first); } + runtime.SendToPipe(tabletId, edge, request.Release(), 0, GetPipeConfigWithRetries()); TEvPersQueue::TEvUpdateConfigResponse* result = runtime.GrabEdgeEvent(handle); @@ -136,7 +138,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, } -void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestContext& tc, i64 ctime, +void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, TTestContext& tc, i64 ctime, ui64 writeTime) { TAutoPtr handle; TEvPersQueue::TEvResponse *result; @@ -174,7 +176,7 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 offset, TTestCo } } } - UNIT_ASSERT((offset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == offset); + UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset); if (writeTime > 0) { UNIT_ASSERT(resp.HasWriteTimestampEstimateMS()); UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 301e836a1f47..e431ed13b389 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -486,7 +486,7 @@ TActorId CmdCreateSession(const TPQCmdSettings& settings, TTestContext& tc); void CmdGetOffset( const ui32 partition, const TString& user, - i64 offset, + i64 expectedOffset, TTestContext& tc, i64 ctime = -1, ui64 writeTime = 0); diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp index ceeed50b440a..8fbe5cfb63c3 100644 --- a/ydb/core/persqueue/ut/make_config.cpp +++ b/ydb/core/persqueue/ut/make_config.cpp @@ -1,5 +1,7 @@ #include "make_config.h" +#include + namespace NKikimr::NPQ::NHelpers { NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, @@ -27,6 +29,8 @@ NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, config.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + Migrate(config); + return config; } diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 9af2d4c4e71a..ec3680b0cca4 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -397,7 +397,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) { CmdGetOffset(1, client, 1, tc); CmdGetOffset(2, client, 1, tc); CmdGetOffset(3, client, 1, tc); - }); } diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index ffe125f16c1d..663dc8789ad0 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -46,6 +46,7 @@ ui64 PutUnitsSize(const ui64 size) { } void Migrate(NKikimrPQ::TPQTabletConfig& config) { + Cerr << ">>>>> Migrate" << Endl; if (!config.ConsumersSize()) { for(size_t i = 0; i < config.ReadRulesSize(); ++i) { auto* consumer = config.AddConsumers(); @@ -60,13 +61,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { if (i < config.ConsumerCodecsSize()) { auto& src = config.GetConsumerCodecs(i); auto* dst = consumer->MutableCodec(); - - for (auto value : src.GetIds()) { - dst->AddIds(value); - } - for (auto& value : src.GetCodecs()) { - dst->AddCodecs(value); - } + dst->CopyFrom(src); } if (i < config.ReadRuleServiceTypesSize()) { consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 34e88ff20452..d846313fb4b8 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -325,13 +325,13 @@ message TPQTabletConfig { message TConsumer { optional string Name = 1; - optional uint64 ReadFromTimestampsMs = 2; - optional uint64 FormatVersion = 3; + optional uint64 ReadFromTimestampsMs = 2 [default = 0]; + optional uint64 FormatVersion = 3 [default = 0]; optional TCodecs Codec = 4; optional string ServiceType = 5; optional EConsumerScalingSupport ScalingSupport = 6; - optional uint64 Version = 7; - optional uint64 Generation = 8; + optional uint64 Version = 7 [default = 0]; + optional uint64 Generation = 8 [default = 0]; } repeated TConsumer Consumers = 37; From 0e01aec29902d1782074fb3625eb346fa299ed7c Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 27 Feb 2024 15:12:06 +0000 Subject: [PATCH 03/15] fix tests --- ydb/core/persqueue/pq_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 5ee4061c02dc..165fd99ae7a6 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1510,8 +1510,6 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr Date: Wed, 28 Feb 2024 14:22:08 +0000 Subject: [PATCH 04/15] intermediate --- ydb/core/kqp/topics/kqp_topics.cpp | 12 +-- ydb/core/persqueue/partition.cpp | 2 - ydb/core/persqueue/pq_impl.cpp | 10 +- ydb/core/persqueue/read_balancer.cpp | 6 +- ydb/core/persqueue/user_info.cpp | 6 +- ydb/core/persqueue/user_info.h | 6 +- ydb/core/persqueue/utils.cpp | 81 ++++++++++----- ydb/core/persqueue/utils.h | 4 + ydb/services/lib/actors/pq_schema_actor.cpp | 99 +++++++++++++------ .../actors/read_init_auth_actor.cpp | 10 +- .../persqueue_v1/actors/schema_actors.cpp | 28 ++---- ydb/services/persqueue_v1/topic_yql_ut.cpp | 6 ++ 12 files changed, 160 insertions(+), 110 deletions(-) diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index e31b2f45b5ca..80589a7acb2b 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -1,4 +1,5 @@ #include "kqp_topics.h" +#include "ydb/core/persqueue/utils.h" #include #include @@ -293,16 +294,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac result.PQGroupInfo->Description; if (Consumer_) { - bool found = false; - - for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) { - if (Consumer_ == consumer) { - found = true; - break; - } - } - - if (!found) { + if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) { builder << "Unknown consumer '" << *Consumer_ << "'"; status = Ydb::StatusIds::BAD_REQUEST; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 6dae4beddd07..fb1f30ded888 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1896,8 +1896,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0); - UsersInfoStorage->UpdateConfig(Config); - Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 165fd99ae7a6..433e1cd71707 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1134,10 +1134,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) { } auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) { + auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName(); + ui32 result = 0; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i); - if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName()) + for (auto& consumer : config.GetConsumers()) { + TString serviceType = consumer.GetServiceType(); + if (serviceType.empty() || serviceType == defaultClientServiceType) ++result; } return result; @@ -1639,7 +1641,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtrPQConfig.GetDefaultClientServiceType().GetName(); TString userServiceType = ""; - for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { - if (Config.GetReadRules(i) == user) { - userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : ""; + for (auto& consumer : Config.GetConsumers()) { + if (consumer.GetName() == user) { + userServiceType = consumer.GetServiceType(); break; } } diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 328fdef7165a..25fd902c951f 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -383,10 +383,6 @@ class TUsersInfoStorage { const TUserInfo* GetIfExists(const TString& user) const; TUserInfo* GetIfExists(const TString& user); - void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) { - Config = config; - } - THashMap& GetAll(); TUserInfoBase CreateUserInfo(const TString& user, @@ -422,7 +418,7 @@ class TUsersInfoStorage { TMaybe TabletActor; TMaybe PartitionActor; - NKikimrPQ::TPQTabletConfig Config; + const NKikimrPQ::TPQTabletConfig& Config; TString CloudId; TString DbId; diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 663dc8789ad0..ebef7e4a1ae8 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -46,34 +46,67 @@ ui64 PutUnitsSize(const ui64 size) { } void Migrate(NKikimrPQ::TPQTabletConfig& config) { - Cerr << ">>>>> Migrate" << Endl; - if (!config.ConsumersSize()) { - for(size_t i = 0; i < config.ReadRulesSize(); ++i) { - auto* consumer = config.AddConsumers(); - - consumer->SetName(config.GetReadRules(i)); - if (i < config.ReadFromTimestampsMsSize()) { - consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); - } - if (i < config.ConsumerFormatVersionsSize()) { - consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); - } - if (i < config.ConsumerCodecsSize()) { - auto& src = config.GetConsumerCodecs(i); - auto* dst = consumer->MutableCodec(); - dst->CopyFrom(src); - } - if (i < config.ReadRuleServiceTypesSize()) { - consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); - } - if (i < config.ReadRuleVersionsSize()) { - consumer->SetVersion(config.GetReadRuleVersions(i)); + // Remove it after version 25.1 and removing modification ReadRulese fields. + // For back compatibility ReadRules is source for Consumers + config.ClearConsumers(); + + for(size_t i = 0; i < config.ReadRulesSize(); ++i) { + auto* consumer = config.AddConsumers(); + + consumer->SetName(config.GetReadRules(i)); + if (i < config.ReadFromTimestampsMsSize()) { + consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); + } + if (i < config.ConsumerFormatVersionsSize()) { + consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); + } + if (i < config.ConsumerCodecsSize()) { + auto& src = config.GetConsumerCodecs(i); + auto* dst = consumer->MutableCodec(); + dst->CopyFrom(src); + } + if (i < config.ReadRuleServiceTypesSize()) { + consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); + } + if (i < config.ReadRuleVersionsSize()) { + consumer->SetVersion(config.GetReadRuleVersions(i)); + } + if (i < config.ReadRuleGenerationsSize()) { + consumer->SetGeneration(config.GetReadRuleGenerations(i)); + } + } +} + +bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + if (config.ConsumersSize()) { + for (auto& cons : config.GetConsumers()) { + if (cons.GetName() == consumerName) { + return true; } - if (i < config.ReadRuleGenerationsSize()) { - consumer->SetGeneration(config.GetReadRuleGenerations(i)); + } + } else { + for (auto& cons : config.GetReadRules()) { + if (cons == consumerName) { + return true; } } } + + return false; +} + +size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) { + return std::max(config.ReadRulesSize(), config.ConsumersSize()); +} + +bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { + if (consumerName == i) { + return true; + } + } + + return false; } const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 71ea15791ed3..ea40a1944d47 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -15,6 +15,10 @@ ui64 PutUnitsSize(const ui64 size); TString SourceIdHash(const TString& sourceId); void Migrate(NKikimrPQ::TPQTabletConfig& config); +bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); +size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); + +bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index c6564b247985..7480dbfc282b 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -92,6 +93,9 @@ namespace NKikimr::NGRpcProxy::V1 { } } + auto* consumer = config->AddConsumers(); + + consumer->SetName(consumerName); config->AddReadRules(consumerName); if (rr.starting_message_timestamp_ms() < 0) { @@ -100,6 +104,7 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } + consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms()); config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) { @@ -108,6 +113,7 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } + consumer->SetFormatVersion(rr.supported_format() - 1); config->AddConsumerFormatVersions(rr.supported_format() - 1); if (rr.version() < 0) { @@ -116,7 +122,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } + consumer->SetVersion(rr.version()); config->AddReadRuleVersions(rr.version()); + + auto cct = consumer->MutableCodec(); auto ct = config->AddConsumerCodecs(); if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) { return TMsgPqCodes( @@ -131,8 +140,13 @@ namespace NKikimr::NGRpcProxy::V1 { TStringBuilder() << "Unknown codec with value " << codec << " for " << rr.consumer_name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); - ct->AddIds(codec - 1); - ct->AddCodecs(to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6)); + + auto codecName = to_lower(Ydb::PersQueue::V1::Codec_Name((Ydb::PersQueue::V1::Codec)codec)).substr(6); + + cct->AddIds(codec - 1); + cct->AddCodecs(codecName); + + ct->CopyFrom(*cct); } if (rr.important()) { @@ -205,6 +219,9 @@ namespace NKikimr::NGRpcProxy::V1 { } } + auto* consumer = config->AddConsumers(); + + consumer->SetName(consumerName); config->AddReadRules(consumerName); if (rr.read_from().seconds() < 0) { @@ -213,9 +230,12 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } + consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000); config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); + consumer->SetFormatVersion(0); config->AddConsumerFormatVersions(0); + TString serviceType; const auto& pqConfig = AppData(ctx)->PQConfig; @@ -276,9 +296,13 @@ namespace NKikimr::NGRpcProxy::V1 { } } + consumer->SetServiceType(serviceType); config->AddReadRuleServiceTypes(serviceType); + + consumer->SetVersion(version); config->AddReadRuleVersions(version); + auto cct = consumer->MutableCodec(); auto ct = config->AddConsumerCodecs(); for(const auto& codec : rr.supported_codecs().codecs()) { @@ -288,8 +312,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } - ct->AddIds(codec - 1); - ct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); + cct->AddIds(codec - 1); + cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); + + ct->CopyFrom(*cct); } if (rr.important()) { @@ -309,9 +335,6 @@ namespace NKikimr::NGRpcProxy::V1 { const TString& consumerName, const TActorContext& ctx ) { - THashSet rulesToRemove; - rulesToRemove.insert(consumerName); - config->ClearReadRuleVersions(); config->ClearReadRules(); config->ClearReadFromTimestampsMs(); @@ -320,21 +343,27 @@ namespace NKikimr::NGRpcProxy::V1 { config->MutablePartitionConfig()->ClearImportantClientId(); config->ClearReadRuleServiceTypes(); + config->ClearConsumers(); + for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (rulesToRemove.find(importantConsumer) == rulesToRemove.end()) { + if (importantConsumer != consumerName) { config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); } } + bool removed = false; + const auto& pqConfig = AppData(ctx)->PQConfig; for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { - if (auto it = rulesToRemove.find(originalConfig.GetReadRules(i)); it != rulesToRemove.end()) { - rulesToRemove.erase(it); + auto& readRule = originalConfig.GetReadRules(i); + + if (readRule == consumerName) { + removed = true; continue; } config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); - config->AddReadRules(originalConfig.GetReadRules(i)); + config->AddReadRules(readRule); config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); auto ct = config->AddConsumerCodecs(); @@ -347,14 +376,24 @@ namespace NKikimr::NGRpcProxy::V1 { } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { return TStringBuilder() << "service type cannot be empty for consumer '" - << originalConfig.GetReadRules(i) << "'"; + << readRule << "'"; } config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); } } - if (rulesToRemove.size() > 0) { - return TStringBuilder() << "Rule for consumer " << *rulesToRemove.begin() << " doesn't exist"; + for (auto& consumer : originalConfig.GetConsumers()) { + if (consumerName == consumer.GetName()) { + removed = true; + continue; + } + + auto* dst = config->AddConsumers(); + dst->CopyFrom(consumer); + } + + if (!removed) { + return TStringBuilder() << "Rule for consumer " << consumerName << " doesn't exist"; } return ""; @@ -364,9 +403,10 @@ namespace NKikimr::NGRpcProxy::V1 { const TClientServiceTypes& supportedClientServiceTypes, TString& error, const TActorContext& ctx) { - if (config.GetReadRules().size() > MAX_READ_RULES_COUNT) { + size_t consumerCount = NPQ::ConsumerCount(config); + if (consumerCount > MAX_READ_RULES_COUNT) { error = TStringBuilder() << "read rules count cannot be more than " - << MAX_READ_RULES_COUNT << ", provided " << config.GetReadRules().size(); + << MAX_READ_RULES_COUNT << ", provided " << consumerCount; return false; } @@ -1108,6 +1148,7 @@ namespace NKikimr::NGRpcProxy::V1 { auto config = pqDescr.MutablePQTabletConfig(); + NPQ::Migrate(*config); auto partConfig = config->MutablePartitionConfig(); if (request.alter_attributes().size()) { @@ -1186,11 +1227,11 @@ namespace NKikimr::NGRpcProxy::V1 { i32 dropped = 0; - for (ui32 i = 0; i < config->ReadRulesSize(); ++i) { - TString oldName = config->GetReadRules(i); - TString name = NPersQueue::ConvertOldConsumerName(oldName, ctx); + for (const auto& c : config->GetConsumers()) { + auto& oldName = c.GetName(); + auto name = NPersQueue::ConvertOldConsumerName(oldName, ctx); + bool erase = false; - bool important = false; for (auto consumer: request.drop_consumers()) { if (consumer == name || consumer == oldName) { erase = true; @@ -1199,20 +1240,15 @@ namespace NKikimr::NGRpcProxy::V1 { } } if (erase) continue; - for (auto imp : partConfig->GetImportantClientId()) { - if (imp == oldName) { - important = true; - break; - } - } + consumers.push_back({false, Ydb::Topic::Consumer{}}); // do not check service type for presented consumers auto& consumer = consumers.back().second; consumer.set_name(name); - consumer.set_important(important); - consumer.mutable_read_from()->set_seconds(config->GetReadFromTimestampsMs(i) / 1000); - (*consumer.mutable_attributes())["_service_type"] = config->GetReadRuleServiceTypes(i); - (*consumer.mutable_attributes())["_version"] = TStringBuilder() << config->GetReadRuleVersions(i); - for (ui32 codec : config->GetConsumerCodecs(i).GetIds()) { + consumer.set_important(NPQ::IsImportantClient(*config, oldName)); + consumer.mutable_read_from()->set_seconds(c.GetReadFromTimestampsMs() / 1000); + (*consumer.mutable_attributes())["_service_type"] = c.GetServiceType(); + (*consumer.mutable_attributes())["_version"] = TStringBuilder() << c.GetVersion(); + for (ui32 codec : c.GetCodec().GetIds()) { consumer.mutable_supported_codecs()->add_codecs(codec + 1); } } @@ -1252,6 +1288,7 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearReadRuleGenerations(); config->ClearReadRuleVersions(); + config->ClearConsumers(); for (const auto& rr : consumers) { auto messageAndCode = AddReadRuleToConfig(config, rr.second, supportedClientServiceTypes, rr.first, ctx); diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index a2a7c0e6f069..b47c98f1e2e6 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -4,6 +4,7 @@ #include "persqueue_utils.h" #include +#include namespace NKikimr::NGRpcProxy::V1 { @@ -197,15 +198,8 @@ bool TReadInitAndAuthActor::CheckTopicACL( return false; } if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { - bool found = false; - for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { - if (cons == ClientId) { - found = true; - break; - } - } //TODO : add here checking of client-service-type password. Provide it via API-call. - if (!found) { + if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) { CloseSession( TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster, PersQueue::ErrorCode::BAD_REQUEST, ctx diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 636f7b4e2041..bb335f3241df 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -2,10 +2,10 @@ #include "persqueue_utils.h" +#include +#include #include - #include -#include namespace NKikimr::NGRpcProxy::V1 { @@ -135,8 +135,10 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T const auto& pqConfig = AppData(ActorContext())->PQConfig; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { + auto& readRuleName = config.GetReadRules(i); + auto rr = settings->add_read_rules(); - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ActorContext()); + auto consumerName = NPersQueue::ConvertOldConsumerName(readRuleName, ActorContext()); rr->set_consumer_name(consumerName); rr->set_starting_message_timestamp_ms(config.GetReadFromTimestampsMs(i)); rr->set_supported_format( @@ -145,14 +147,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - bool important = false; - for (const auto &c : partConfig.GetImportantClientId()) { - if (c == config.GetReadRules(i)) { - important = true; - break; - } - } - rr->set_important(important); + rr->set_important(NPQ::IsImportantClient(config, readRuleName)); if (i < config.ReadRuleServiceTypesSize()) { rr->set_service_type(config.GetReadRuleServiceTypes(i)); @@ -1031,7 +1026,6 @@ bool TDescribeConsumerActor::ApplyResponse( bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i, const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) { - const auto &partConfig = config.GetPartitionConfig(); const auto& pqConfig = AppData(ctx)->PQConfig; auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); @@ -1043,14 +1037,8 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); } - bool important = false; - for (const auto &c : partConfig.GetImportantClientId()) { - if (c == config.GetReadRules(i)) { - important = true; - break; - } - } - rr->set_important(important); + + rr->set_important(NPQ::IsImportantClient(config, config.GetReadRules(i))); TString serviceType = ""; if (i < config.ReadRuleServiceTypesSize()) { serviceType = config.GetReadRuleServiceTypes(i); diff --git a/ydb/services/persqueue_v1/topic_yql_ut.cpp b/ydb/services/persqueue_v1/topic_yql_ut.cpp index 9d2a1734db96..9c0d45eae52d 100644 --- a/ydb/services/persqueue_v1/topic_yql_ut.cpp +++ b/ydb/services/persqueue_v1/topic_yql_ut.cpp @@ -67,6 +67,8 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { partCfg->SetWriteSpeedInBytesPerSecond(9001); auto* rtfs = descrCopy.MutableReadFromTimestampsMs(); rtfs->Set(1, 1609462861000); + + descrCopy.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000); } const char *query2 = R"__( ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1` @@ -80,6 +82,10 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { auto pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() .GetPersQueueGroup(); const auto& descr2 = pqGroup2.GetPQTabletConfig(); + + Cerr << ">>>>> 1: " << descrCopy.DebugString() << Endl; + Cerr << ">>>>> 2: " << descr2.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(descrCopy.DebugString(), descr2.DebugString()); const char *query3 = R"__( From 2263e6f86cae8b985ef31ce23e93f0a1bd2dca8b Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 10:06:37 +0000 Subject: [PATCH 05/15] if for code for remove --- ydb/core/persqueue/utils.cpp | 66 +++++------- ydb/core/persqueue/utils.h | 4 + ydb/core/tx/scheme_board/cache.cpp | 2 + .../datastreams/datastreams_proxy.cpp | 24 ++--- ydb/services/lib/actors/pq_schema_actor.cpp | 101 +++++++++++------- ydb/services/persqueue_v1/persqueue_ut.cpp | 25 +++++ 6 files changed, 137 insertions(+), 85 deletions(-) diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index ebef7e4a1ae8..433b88457c5c 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -46,49 +46,41 @@ ui64 PutUnitsSize(const ui64 size) { } void Migrate(NKikimrPQ::TPQTabletConfig& config) { - // Remove it after version 25.1 and removing modification ReadRulese fields. - // For back compatibility ReadRules is source for Consumers - config.ClearConsumers(); + if (config.ReadRulesSize()) { + config.ClearConsumers(); - for(size_t i = 0; i < config.ReadRulesSize(); ++i) { - auto* consumer = config.AddConsumers(); + for(size_t i = 0; i < config.ReadRulesSize(); ++i) { + auto* consumer = config.AddConsumers(); - consumer->SetName(config.GetReadRules(i)); - if (i < config.ReadFromTimestampsMsSize()) { - consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); - } - if (i < config.ConsumerFormatVersionsSize()) { - consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); - } - if (i < config.ConsumerCodecsSize()) { - auto& src = config.GetConsumerCodecs(i); - auto* dst = consumer->MutableCodec(); - dst->CopyFrom(src); - } - if (i < config.ReadRuleServiceTypesSize()) { - consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); - } - if (i < config.ReadRuleVersionsSize()) { - consumer->SetVersion(config.GetReadRuleVersions(i)); - } - if (i < config.ReadRuleGenerationsSize()) { - consumer->SetGeneration(config.GetReadRuleGenerations(i)); + consumer->SetName(config.GetReadRules(i)); + if (i < config.ReadFromTimestampsMsSize()) { + consumer->SetReadFromTimestampsMs(config.GetReadFromTimestampsMs(i)); + } + if (i < config.ConsumerFormatVersionsSize()) { + consumer->SetFormatVersion(config.GetConsumerFormatVersions(i)); + } + if (i < config.ConsumerCodecsSize()) { + auto& src = config.GetConsumerCodecs(i); + auto* dst = consumer->MutableCodec(); + dst->CopyFrom(src); + } + if (i < config.ReadRuleServiceTypesSize()) { + consumer->SetServiceType(config.GetReadRuleServiceTypes(i)); + } + if (i < config.ReadRuleVersionsSize()) { + consumer->SetVersion(config.GetReadRuleVersions(i)); + } + if (i < config.ReadRuleGenerationsSize()) { + consumer->SetGeneration(config.GetReadRuleGenerations(i)); + } } } } bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { - if (config.ConsumersSize()) { - for (auto& cons : config.GetConsumers()) { - if (cons.GetName() == consumerName) { - return true; - } - } - } else { - for (auto& cons : config.GetReadRules()) { - if (cons == consumerName) { - return true; - } + for (auto& cons : config.GetConsumers()) { + if (cons.GetName() == consumerName) { + return true; } } @@ -96,7 +88,7 @@ bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consum } size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) { - return std::max(config.ReadRulesSize(), config.ConsumersSize()); + return config.ConsumersSize(); } bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index ea40a1944d47..b05658d0c605 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -15,6 +15,10 @@ ui64 PutUnitsSize(const ui64 size); TString SourceIdHash(const TString& sourceId); void Migrate(NKikimrPQ::TPQTabletConfig& config); + +// This function required for marking the code which required remove after 25-1 +constexpr bool ReadRuleCompatible() { return true; } + bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 96443917beb6..3230bfef94c6 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -1481,6 +1482,7 @@ class TSchemeCache: public TMonitorableActor { Kind = TNavigate::KindTopic; IsPrivatePath = CalcPathIsPrivate(entryDesc.GetPathType(), entryDesc.GetPathSubType()); if (Created) { + NPQ::Migrate(*pathDesc.MutablePersQueueGroup()->MutablePQTabletConfig()); FillInfo(Kind, PQGroupInfo, std::move(*pathDesc.MutablePersQueueGroup())); } break; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 8b2e373acf28..cac2f2ef9d62 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -240,9 +240,8 @@ namespace NKikimr::NDataStreams::V1 { const auto& response = ev->Get()->Request.Get()->ResultSet.front(); const auto& pqGroupDescription = response.PQGroupInfo->Description; - const auto& readRules = pqGroupDescription.GetPQTabletConfig().GetReadRules(); - if (readRules.size() > 0 && EnforceDeletion == false) { + if (NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()) > 0 && EnforceDeletion == false) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream has registered consumers" << "and EnforceConsumerDeletion flag is false", ActorContext()); @@ -983,24 +982,25 @@ namespace NKikimr::NDataStreams::V1 { ui32 leftToRead{0}; const auto& response = result->ResultSet.front(); const auto& pqGroupDescription = response.PQGroupInfo->Description; - const auto& streamReadRulesNames = pqGroupDescription.GetPQTabletConfig().GetReadRules(); - const auto& streamReadRulesReadFromTimestamps = pqGroupDescription.GetPQTabletConfig().GetReadFromTimestampsMs(); + const auto& streamConsumers = pqGroupDescription.GetPQTabletConfig().GetConsumers(); const auto alreadyRead = NextToken.GetAlreadyRead(); - if (alreadyRead > (ui32)streamReadRulesNames.size()) { + ui32 consumerCount = NPQ::ConsumerCount(pqGroupDescription.GetPQTabletConfig()); + + if (alreadyRead > consumerCount) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " << "everything is already read", ActorContext()); } - const auto rulesToRead = std::min(streamReadRulesNames.size() - alreadyRead, MaxResults); + const auto rulesToRead = std::min(consumerCount - alreadyRead, MaxResults); readRules.reserve(rulesToRead); - auto itName = streamReadRulesNames.begin() + alreadyRead; - auto itTs = streamReadRulesReadFromTimestamps.begin() + alreadyRead; - for (auto i = rulesToRead; i > 0; --i, ++itName, ++itTs) { - readRules.push_back({*itName, *itTs}); + + auto consumer = streamConsumers.begin() + alreadyRead; + for (auto i = rulesToRead; i > 0; --i, ++consumer) { + readRules.push_back({consumer->GetName(), consumer->GetReadFromTimestampsMs()}); } - leftToRead = streamReadRulesNames.size() - alreadyRead - rulesToRead; + leftToRead = consumerCount - alreadyRead - rulesToRead; SendResponse(ActorContext(), readRules, leftToRead); } @@ -1929,7 +1929,7 @@ namespace NKikimr::NDataStreams::V1 { : Ydb::DataStreams::V1::StreamDescription::CREATING ); descriptionSummary.set_open_shard_count(PQGroup.GetPartitions().size()); - descriptionSummary.set_consumer_count(PQGroup.MutablePQTabletConfig()->GetReadRules().size()); + descriptionSummary.set_consumer_count(NPQ::ConsumerCount(PQGroup.GetPQTabletConfig())); descriptionSummary.set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE); Request_->SendResult(result, Ydb::StatusIds::SUCCESS); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 7480dbfc282b..8ee1616a5a75 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -96,7 +96,9 @@ namespace NKikimr::NGRpcProxy::V1 { auto* consumer = config->AddConsumers(); consumer->SetName(consumerName); - config->AddReadRules(consumerName); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRules(consumerName); + } if (rr.starting_message_timestamp_ms() < 0) { return TMsgPqCodes( @@ -105,7 +107,9 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms()); - config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); + } if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) { return TMsgPqCodes( @@ -114,7 +118,9 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetFormatVersion(rr.supported_format() - 1); - config->AddConsumerFormatVersions(rr.supported_format() - 1); + if (NPQ::ReadRuleCompatible()) { + config->AddConsumerFormatVersions(rr.supported_format() - 1); + } if (rr.version() < 0) { return TMsgPqCodes( @@ -123,7 +129,9 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetVersion(rr.version()); - config->AddReadRuleVersions(rr.version()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleVersions(rr.version()); + } auto cct = consumer->MutableCodec(); auto ct = config->AddConsumerCodecs(); @@ -146,7 +154,9 @@ namespace NKikimr::NGRpcProxy::V1 { cct->AddIds(codec - 1); cct->AddCodecs(codecName); - ct->CopyFrom(*cct); + if (NPQ::ReadRuleCompatible()) { + ct->CopyFrom(*cct); + } } if (rr.important()) { @@ -167,7 +177,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ); } - config->AddReadRuleServiceTypes(rr.service_type()); + consumer->SetServiceType(rr.service_type()); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(rr.service_type()); + } } else { const auto& pqConfig = AppData(ctx)->PQConfig; if (pqConfig.GetDisallowDefaultClientServiceType()) { @@ -177,7 +190,10 @@ namespace NKikimr::NGRpcProxy::V1 { ); } const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); - config->AddReadRuleServiceTypes(defaultCientServiceType); + consumer->SetServiceType(defaultCientServiceType); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(defaultCientServiceType); + } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -222,7 +238,9 @@ namespace NKikimr::NGRpcProxy::V1 { auto* consumer = config->AddConsumers(); consumer->SetName(consumerName); - config->AddReadRules(consumerName); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRules(consumerName); + } if (rr.read_from().seconds() < 0) { return TMsgPqCodes( @@ -231,10 +249,14 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000); - config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); + if (NPQ::ReadRuleCompatible()) { + config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); + } consumer->SetFormatVersion(0); - config->AddConsumerFormatVersions(0); + if (NPQ::ReadRuleCompatible()) { + config->AddConsumerFormatVersions(0); + } TString serviceType; const auto& pqConfig = AppData(ctx)->PQConfig; @@ -297,10 +319,14 @@ namespace NKikimr::NGRpcProxy::V1 { } consumer->SetServiceType(serviceType); - config->AddReadRuleServiceTypes(serviceType); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleServiceTypes(serviceType); + } consumer->SetVersion(version); - config->AddReadRuleVersions(version); + if (NPQ::ReadRuleCompatible()) { + config->AddReadRuleVersions(version); + } auto cct = consumer->MutableCodec(); auto ct = config->AddConsumerCodecs(); @@ -315,7 +341,9 @@ namespace NKikimr::NGRpcProxy::V1 { cct->AddIds(codec - 1); cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); - ct->CopyFrom(*cct); + if (NPQ::ReadRuleCompatible()) { + ct->CopyFrom(*cct); + } } if (rr.important()) { @@ -342,7 +370,6 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearConsumerCodecs(); config->MutablePartitionConfig()->ClearImportantClientId(); config->ClearReadRuleServiceTypes(); - config->ClearConsumers(); for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { @@ -354,31 +381,33 @@ namespace NKikimr::NGRpcProxy::V1 { bool removed = false; const auto& pqConfig = AppData(ctx)->PQConfig; - for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { - auto& readRule = originalConfig.GetReadRules(i); + if (NPQ::ReadRuleCompatible()) { + for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { + auto& readRule = originalConfig.GetReadRules(i); - if (readRule == consumerName) { - removed = true; - continue; - } + if (readRule == consumerName) { + removed = true; + continue; + } - config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); - config->AddReadRules(readRule); - config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); - config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); - auto ct = config->AddConsumerCodecs(); - for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { - ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); - ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); - } - if (i < originalConfig.ReadRuleServiceTypesSize()) { - config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); - } else { - if (pqConfig.GetDisallowDefaultClientServiceType()) { - return TStringBuilder() << "service type cannot be empty for consumer '" - << readRule << "'"; + config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); + config->AddReadRules(readRule); + config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); + config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); + auto ct = config->AddConsumerCodecs(); + for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { + ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); + ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); + } + if (i < originalConfig.ReadRuleServiceTypesSize()) { + config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); + } else { + if (pqConfig.GetDisallowDefaultClientServiceType()) { + return TStringBuilder() << "service type cannot be empty for consumer '" + << readRule << "'"; + } + config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); } - config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); } } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 312604fe48c2..1f50d9a6b734 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5020,10 +5020,35 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" + Consumers { + Name: "first-consumer" + ReadFromTimestampsMs: 11223344000 + FormatVersion: 0 + Codec { + } + ServiceType: "data-streams" + Version: 0 + } + Consumers { + Name: "consumer" + ReadFromTimestampsMs: 111000 + FormatVersion: 0 + Codec { + Ids: 2 + Ids: 10004 + Codecs: "lzop" + Codecs: "CUSTOM" + } + ServiceType: "data-streams" + Version: 567 + } } ErrorCode: OK } )___"; + + Cerr << ">>>>> " << res.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(res.DebugString(), resultDescribe); Cerr << "DESCRIBES:\n"; From 2ca71d6a7dbfcbf4fc97335899ed1690d9916819 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 10:29:23 +0000 Subject: [PATCH 06/15] more replaces --- ydb/core/persqueue/utils.cpp | 2 ++ .../persqueue_v1/actors/schema_actors.cpp | 32 +++++++++++-------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 433b88457c5c..a7506ffec781 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -46,6 +46,8 @@ ui64 PutUnitsSize(const ui64 size) { } void Migrate(NKikimrPQ::TPQTabletConfig& config) { + // if ReadRules isn`t empty than it is old configuration format + // when modify new format (add or alter a consumer) readRules is cleared if (config.ReadRulesSize()) { config.ClearConsumers(); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index bb335f3241df..d78cecab158b 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1023,25 +1023,25 @@ bool TDescribeConsumerActor::ApplyResponse( } -bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i, +bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer, const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) { const auto& pqConfig = AppData(ctx)->PQConfig; - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); rr->set_name(consumerName); - rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000); - auto version = config.GetReadRuleVersions(i); + rr->mutable_read_from()->set_seconds(consumer.GetReadFromTimestampsMs() / 1000); + auto version = consumer.GetVersion(); if (version != 0) (*rr->mutable_attributes())["_version"] = TStringBuilder() << version; - for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { + for (const auto &codec : consumer.GetCodec().GetIds()) { rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, config.GetReadRules(i))); + rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); TString serviceType = ""; - if (i < config.ReadRuleServiceTypesSize()) { - serviceType = config.GetReadRuleServiceTypes(i); + if (consumer.HasServiceType()) { + serviceType = consumer.GetServiceType(); } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { error = "service type must be set for all read rules"; @@ -1143,12 +1143,14 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - if (consumerName == config.GetReadRules(i)) found = true; + for (const auto& consumer : config.GetConsumers()) { + if (consumerName == consumer.GetName()) { + found = true; + } auto rr = Result.add_consumers(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } } @@ -1198,14 +1200,16 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - if (consumerName != config.GetReadRules(i)) + for (const auto& consumer : config.GetConsumers()) { + if (consumerName != consumer.GetName()) { continue; + } found = true; + auto rr = Result.mutable_consumer(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } break; From 031e8a878f563de7f10f0d8b3bb87302b9be8bd2 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 11:03:04 +0000 Subject: [PATCH 07/15] more replaces --- .../persqueue_v1/actors/schema_actors.cpp | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index d78cecab158b..7d2415920908 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -134,23 +134,33 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T } const auto& pqConfig = AppData(ActorContext())->PQConfig; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - auto& readRuleName = config.GetReadRules(i); + Cerr << ">>>>> ReadRules=" << config.ReadRulesSize() << " "; + for (auto& c : config.GetReadRules()) { + Cerr << c << ", "; + } + Cerr << Endl; + Cerr << ">>>>> Consumers=" << config.ConsumersSize() << " "; + for (auto& c : config.GetConsumers()) { + Cerr << c.GetName() << ", "; + } + Cerr << Endl; + + for (const auto& consumer : config.GetConsumers()) { auto rr = settings->add_read_rules(); - auto consumerName = NPersQueue::ConvertOldConsumerName(readRuleName, ActorContext()); + auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ActorContext()); rr->set_consumer_name(consumerName); - rr->set_starting_message_timestamp_ms(config.GetReadFromTimestampsMs(i)); + rr->set_starting_message_timestamp_ms(consumer.GetReadFromTimestampsMs()); rr->set_supported_format( - (Ydb::PersQueue::V1::TopicSettings::Format) (config.GetConsumerFormatVersions(i) + 1)); - rr->set_version(config.GetReadRuleVersions(i)); - for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { + (Ydb::PersQueue::V1::TopicSettings::Format) (consumer.GetFormatVersion() + 1)); + rr->set_version(consumer.GetVersion()); + for (const auto &codec : consumer.GetCodec().GetIds()) { rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, readRuleName)); + rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); - if (i < config.ReadRuleServiceTypesSize()) { - rr->set_service_type(config.GetReadRuleServiceTypes(i)); + if (consumer.HasServiceType()) { + rr->set_service_type(consumer.GetServiceType()); } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { this->Request_->RaiseIssue(FillIssue( From 0ef0d84e62ce172787658c833517189428ed03a1 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 13:30:01 +0000 Subject: [PATCH 08/15] more replaces --- ydb/services/lib/actors/pq_schema_actor.cpp | 23 +++++++++---------- .../persqueue_v1/actors/schema_actors.cpp | 11 --------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 8ee1616a5a75..4750164d93f8 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -440,20 +440,20 @@ namespace NKikimr::NGRpcProxy::V1 { } THashSet readRuleConsumers; - for (auto consumerName : config.GetReadRules()) { - if (readRuleConsumers.find(consumerName) != readRuleConsumers.end()) { - error = TStringBuilder() << "Duplicate consumer name " << consumerName; + for (auto consumer : config.GetConsumers()) { + if (readRuleConsumers.find(consumer.GetName()) != readRuleConsumers.end()) { + error = TStringBuilder() << "Duplicate consumer name " << consumer.GetName(); return true; } - readRuleConsumers.insert(consumerName); + readRuleConsumers.insert(consumer.GetName()); } for (const auto& t : supportedClientServiceTypes) { auto type = t.first; - auto count = std::count_if(config.GetReadRuleServiceTypes().begin(), config.GetReadRuleServiceTypes().end(), - [type](const TString& cType){ - return type == cType; + auto count = std::count_if(config.GetConsumers().begin(), config.GetConsumers().end(), + [type](const auto& c){ + return type == c.GetServiceType(); }); auto limit = t.second.MaxCount; if (count > limit) { @@ -462,13 +462,12 @@ namespace NKikimr::NGRpcProxy::V1 { } } if (config.GetCodecs().IdsSize() > 0) { - for (ui32 i = 0; i < config.ConsumerCodecsSize(); ++i) { - TString name = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + for (const auto& consumer : config.GetConsumers()) { + TString name = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ctx); - auto& consumerCodecs = config.GetConsumerCodecs(i); - if (consumerCodecs.IdsSize() > 0) { + if (consumer.GetCodec().IdsSize() > 0) { THashSet codecs; - for (auto& cc : consumerCodecs.GetIds()) { + for (auto& cc : consumer.GetCodec().GetIds()) { codecs.insert(cc); } for (auto& cc : config.GetCodecs().GetIds()) { diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 7d2415920908..4094a9457979 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -135,17 +135,6 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T const auto& pqConfig = AppData(ActorContext())->PQConfig; - Cerr << ">>>>> ReadRules=" << config.ReadRulesSize() << " "; - for (auto& c : config.GetReadRules()) { - Cerr << c << ", "; - } - Cerr << Endl; - Cerr << ">>>>> Consumers=" << config.ConsumersSize() << " "; - for (auto& c : config.GetConsumers()) { - Cerr << c.GetName() << ", "; - } - Cerr << Endl; - for (const auto& consumer : config.GetConsumers()) { auto rr = settings->add_read_rules(); auto consumerName = NPersQueue::ConvertOldConsumerName(consumer.GetName(), ActorContext()); From cbab5d32a65638d1bea85bb41e6d62218a8e028d Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 13:44:19 +0000 Subject: [PATCH 09/15] more replaces --- ydb/core/persqueue/pq_impl.cpp | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 433e1cd71707..3101fa91b175 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1610,24 +1610,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr> existed; // map name -> rrVersion, rrGeneration - for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { - auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0; - auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0; - existed[Config.GetReadRules(i)] = std::make_pair(version, generation); + for (const auto& c : Config.GetConsumers()) { + existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration()); } - for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) { - auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0; - auto it = existed.find(cfg.GetReadRules(i)); + + for (auto& c : *cfg.MutableConsumers()) { + auto it = existed.find(c.GetName()); ui64 generation = 0; - if (it != existed.end() && it->second.first == version) { + if (it != existed.end() && it->second.first == c.GetVersion()) { generation = it->second.second; } else { generation = curConfigVersion; } - cfg.AddReadRuleGenerations(generation); + c.SetGeneration(generation); + if (ReadRuleCompatible()) { + cfg.AddReadRuleGenerations(generation); + } } } @@ -1638,7 +1641,6 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr Date: Thu, 29 Feb 2024 13:49:28 +0000 Subject: [PATCH 10/15] fix include --- ydb/core/kqp/topics/kqp_topics.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index 80589a7acb2b..60064182a399 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -1,7 +1,7 @@ #include "kqp_topics.h" -#include "ydb/core/persqueue/utils.h" #include +#include #include #define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) From 01a27bb4ff918fa29ff750391485015ea19e93df Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 14:18:02 +0000 Subject: [PATCH 11/15] fix --- ydb/services/lib/actors/pq_schema_actor.cpp | 10 +++++----- ydb/services/persqueue_v1/actors/schema_actors.cpp | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 4750164d93f8..33bfab78447e 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -133,8 +133,8 @@ namespace NKikimr::NGRpcProxy::V1 { config->AddReadRuleVersions(rr.version()); } - auto cct = consumer->MutableCodec(); - auto ct = config->AddConsumerCodecs(); + auto* cct = consumer->MutableCodec(); + auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) { return TMsgPqCodes( TStringBuilder() << "supported_codecs count cannot be more than " @@ -328,8 +328,8 @@ namespace NKikimr::NGRpcProxy::V1 { config->AddReadRuleVersions(version); } - auto cct = consumer->MutableCodec(); - auto ct = config->AddConsumerCodecs(); + auto* cct = consumer->MutableCodec(); + auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; for(const auto& codec : rr.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { @@ -394,7 +394,7 @@ namespace NKikimr::NGRpcProxy::V1 { config->AddReadRules(readRule); config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); - auto ct = config->AddConsumerCodecs(); + auto* ct = config->AddConsumerCodecs(); for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 4094a9457979..6db48045ec71 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1016,7 +1016,6 @@ bool TDescribeConsumerActor::ApplyResponse( const auto& location = record.GetLocations(i); auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); SetPartitionLocation(location, locationResult); - } return true; } From b5badc1c92480ad3ee0d22a024c1d43865804c85 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Feb 2024 15:08:03 +0000 Subject: [PATCH 12/15] replace ImportantClientIds --- ydb/core/persqueue/partition.cpp | 16 ++++++++------ ydb/core/persqueue/partition_read.cpp | 15 ++++++++----- ydb/core/persqueue/utils.cpp | 21 ++++++++++--------- ydb/core/persqueue/utils.h | 2 -- ydb/core/protos/pqconfig.proto | 3 ++- ydb/services/lib/actors/pq_schema_actor.cpp | 20 ++++++++++++------ .../persqueue_v1/actors/schema_actors.cpp | 4 ++-- 7 files changed, 49 insertions(+), 32 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index fb1f30ded888..9d19fe57681c 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { } ui64 TPartition::ImportantClientsMinOffset() const { - const auto& partConfig = Config.GetPartitionConfig(); - ui64 minOffset = EndOffset; - for (const auto& importantClientId : partConfig.GetImportantClientId()) { - const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); ui64 curOffset = StartOffset; if (userInfo && userInfo->Offset >= 0) //-1 means no offset curOffset = userInfo->Offset; @@ -1773,8 +1775,10 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } TSet important; - for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); + for (const auto& consumer : config.GetConsumers()) { + if (consumer.GetImportant()) { + important.insert(consumer.GetName()); + } } for (auto& consumer : config.GetConsumers()) { diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 188586c095d8..f5070d2b23d9 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -180,9 +180,14 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet important; - for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) { - important.insert(importantUser); - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser); + for (const auto& consumer : Config.GetConsumers()) { + if (!consumer.GetImportant()) { + continue; + } + + important.insert(consumer.GetName()); + + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName()); if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup())); userInfo->SetImportant(true); @@ -190,12 +195,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { } if (!userInfo) { userInfo = &UsersInfoStorage->Create( - ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} + ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {} ); } if (userInfo->Offset < (i64)StartOffset) userInfo->Offset = StartOffset; - ReadTimestampForOffset(importantUser, *userInfo, ctx); + ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx); } for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index a7506ffec781..08e8f1fcb961 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -45,6 +45,16 @@ ui64 PutUnitsSize(const ui64 size) { return putUnitsCount; } +bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { + for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { + if (consumerName == i) { + return true; + } + } + + return false; +} + void Migrate(NKikimrPQ::TPQTabletConfig& config) { // if ReadRules isn`t empty than it is old configuration format // when modify new format (add or alter a consumer) readRules is cleared @@ -75,6 +85,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { if (i < config.ReadRuleGenerationsSize()) { consumer->SetGeneration(config.GetReadRuleGenerations(i)); } + consumer->SetImportant(IsImportantClient(config, consumer.GetName())); } } } @@ -93,16 +104,6 @@ size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config) { return config.ConsumersSize(); } -bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName) { - for (const auto& i : config.GetPartitionConfig().GetImportantClientId()) { - if (consumerName == i) { - return true; - } - } - - return false; -} - const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId) { for(const auto& p : config.GetPartitions()) { if (partitionId == p.GetPartitionId()) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index b05658d0c605..1952f5912b6d 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -22,8 +22,6 @@ constexpr bool ReadRuleCompatible() { return true; } bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); -bool IsImportantClient(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); - const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const ui32 partitionId); // The graph of split-merge operations. diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index d846313fb4b8..4471ebd1907c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -235,7 +235,7 @@ message TPartitionConfig { optional uint64 StorageLimitBytes = 16; // List of ClientIds, for which we don't delete data until they are read by these clients - repeated string ImportantClientId = 4; //can be empty + repeated string ImportantClientId = 4; //can be empty . Deprecated. Use Consumer.Important optional uint32 LowWatermark = 5 [default = 6291456]; //6Mb, compact blobs if they at least this big. optional uint32 SourceIdLifetimeSeconds = 6 [ default = 1382400]; //16 days optional uint32 SourceIdMaxCounts = 31 [default = 6000000]; // Maximum number of stored sourceId records in partition @@ -332,6 +332,7 @@ message TPQTabletConfig { optional EConsumerScalingSupport ScalingSupport = 6; optional uint64 Version = 7 [default = 0]; optional uint64 Generation = 8 [default = 0]; + optional bool Important = 9 [default = false]; } repeated TConsumer Consumers = 37; diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 33bfab78447e..3ff734d11d48 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -166,7 +166,10 @@ namespace NKikimr::NGRpcProxy::V1 { Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } if (!rr.service_type().empty()) { @@ -350,7 +353,10 @@ namespace NKikimr::NGRpcProxy::V1 { if (pqConfig.GetTopicsAreFirstClassCitizen() && !AppData(ctx)->FeatureFlags.GetEnableTopicDiskSubDomainQuota()) { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } - config->MutablePartitionConfig()->AddImportantClientId(consumerName); + consumer->SetImportant(true); + if (NPQ::ReadRuleCompatible()) { + config->MutablePartitionConfig()->AddImportantClientId(consumerName); + } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); @@ -372,9 +378,11 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearConsumers(); - for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (importantConsumer != consumerName) { - config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + if (NPQ::ReadRuleCompatible()) { + for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { + if (importantConsumer != consumerName) { + config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); + } } } @@ -1272,7 +1280,7 @@ namespace NKikimr::NGRpcProxy::V1 { consumers.push_back({false, Ydb::Topic::Consumer{}}); // do not check service type for presented consumers auto& consumer = consumers.back().second; consumer.set_name(name); - consumer.set_important(NPQ::IsImportantClient(*config, oldName)); + consumer.set_important(c.GetImportant()); consumer.mutable_read_from()->set_seconds(c.GetReadFromTimestampsMs() / 1000); (*consumer.mutable_attributes())["_service_type"] = c.GetServiceType(); (*consumer.mutable_attributes())["_version"] = TStringBuilder() << c.GetVersion(); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 6db48045ec71..2f043e4f1670 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -146,7 +146,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T for (const auto &codec : consumer.GetCodec().GetIds()) { rr->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); + rr->set_important(consumer.GetImportant()); if (consumer.HasServiceType()) { rr->set_service_type(consumer.GetServiceType()); @@ -1036,7 +1036,7 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); } - rr->set_important(NPQ::IsImportantClient(config, consumer.GetName())); + rr->set_important(consumer.GetImportant()); TString serviceType = ""; if (consumer.HasServiceType()) { serviceType = consumer.GetServiceType(); From 33a6c2def0bdeb2ca7128f27d8cf23e3c7670d18 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 1 Mar 2024 06:09:24 +0000 Subject: [PATCH 13/15] compilation fix --- ydb/core/persqueue/utils.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 08e8f1fcb961..f9589c40416f 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -85,7 +85,7 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { if (i < config.ReadRuleGenerationsSize()) { consumer->SetGeneration(config.GetReadRuleGenerations(i)); } - consumer->SetImportant(IsImportantClient(config, consumer.GetName())); + consumer->SetImportant(IsImportantClient(config, consumer->GetName())); } } } From 17d4c6be80c6457b5f905f1566a6a51647c3c794 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 1 Mar 2024 06:23:43 +0000 Subject: [PATCH 14/15] compilation fix --- ydb/services/persqueue_v1/actors/schema_actors.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 2f043e4f1670..d4e12a033d34 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1021,7 +1021,7 @@ bool TDescribeConsumerActor::ApplyResponse( } -bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer, +bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig::TConsumer& consumer, const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) { const auto& pqConfig = AppData(ctx)->PQConfig; @@ -1148,7 +1148,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv auto rr = Result.add_consumers(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } } @@ -1207,7 +1207,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: auto rr = Result.mutable_consumer(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, consumer, ActorContext(), status, error)) { + if (!FillConsumerProto(rr, consumer, ActorContext(), status, error)) { return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } break; From ca3e8bc0fa01b7b852479301bcbe31887dc4070b Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 1 Mar 2024 15:18:06 +0000 Subject: [PATCH 15/15] fix --- ydb/services/persqueue_v1/persqueue_ut.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 1f50d9a6b734..74510d7c3222 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5028,6 +5028,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } ServiceType: "data-streams" Version: 0 + Important: false } Consumers { Name: "consumer" @@ -5041,6 +5042,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } ServiceType: "data-streams" Version: 567 + Important: true } } ErrorCode: OK