Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of pqconfig.proto - extract message Consumer instead of separeted arrays of ReadRules #2347

Merged
merged 17 commits into from
Mar 4, 2024
12 changes: 2 additions & 10 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_topics.h"

#include <ydb/core/base/path.h>
#include <ydb/core/persqueue/utils.h>
#include <ydb/library/actors/core/log.h>

#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
Expand Down Expand Up @@ -294,16 +295,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
result.PQGroupInfo->Description;

if (Consumer_) {
bool found = false;

for (auto& consumer : description.GetPQTabletConfig().GetReadRules()) {
if (Consumer_ == consumer) {
found = true;
break;
}
}

if (!found) {
if (!NPQ::HasConsumer(description.GetPQTabletConfig(), *Consumer_)) {
builder << "Unknown consumer '" << *Consumer_ << "'";

status = Ydb::StatusIds::BAD_REQUEST;
Expand Down
35 changes: 18 additions & 17 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,13 @@ ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
}

ui64 TPartition::ImportantClientsMinOffset() const {
const auto& partConfig = Config.GetPartitionConfig();

ui64 minOffset = EndOffset;
for (const auto& importantClientId : partConfig.GetImportantClientId()) {
const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantClientId);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

const TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
ui64 curOffset = StartOffset;
if (userInfo && userInfo->Offset >= 0) //-1 means no offset
curOffset = userInfo->Offset;
Expand Down Expand Up @@ -544,7 +546,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
InitDone = true;
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());

FillReadFromTimestamps(Config, ctx);
FillReadFromTimestamps(ctx);
ResendPendingEvents(ctx);
ProcessTxsAndUserActs(ctx);

Expand Down Expand Up @@ -1791,29 +1793,30 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
}

TSet<TString> important;
for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
for (const auto& consumer : config.GetConsumers()) {
if (consumer.GetImportant()) {
important.insert(consumer.GetName());
}
}

for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = GetOrCreatePendingUser(consumer, 0);
for (auto& consumer : config.GetConsumers()) {
auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0);

TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) {
ts += TDuration::MilliSeconds(1);
}
userInfo.ReadFromTimestamp = ts;
userInfo.Important = important.contains(consumer);
userInfo.Important = important.contains(consumer.GetName());

ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
ui64 rrGen = consumer.GetGeneration();
if (userInfo.ReadRuleGeneration != rrGen) {
TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo act(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{},
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);

ProcessUserAct(act, ctx);
}
hasReadRule.erase(consumer);
hasReadRule.erase(consumer.GetName());
}

for (auto& consumer : hasReadRule) {
Expand Down Expand Up @@ -1915,8 +1918,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf

Y_ABORT_UNLESS(Config.GetPartitionConfig().GetTotalPartitions() > 0);

UsersInfoStorage->UpdateConfig(Config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be this is a bug. Where is config in UsersInfoStorage updated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used reference now


Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void CreateMirrorerActor();
void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx);
void FailBadClient(const TActorContext& ctx);
void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
void FillReadFromTimestamps(const TActorContext& ctx);
void FilterDeadlinedWrites(const TActorContext& ctx);

void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
switch (response.GetStatus()) {
case NKikimrProto::OK:
Y_ABORT_UNLESS(Partition()->Config.ParseFromString(response.GetValue()));

Migrate(Partition()->Config);

if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
Expand Down
34 changes: 20 additions & 14 deletions ydb/core/persqueue/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ void TPartition::SendReadingFinished(const TString& consumer) {
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
}

void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
void TPartition::FillReadFromTimestamps(const TActorContext& ctx) {
TSet<TString> hasReadRule;

for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
userInfo.ReadFromTimestamp = TInstant::Zero();
userInfo.HasReadRule = false;
hasReadRule.insert(consumer);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx, 0);

for (auto& consumer : Config.GetConsumers()) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer.GetName(), ctx, 0);
userInfo.HasReadRule = true;
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {

if (userInfo.ReadRuleGeneration != consumer.GetGeneration()) {
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(
0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen
0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, consumer.GetGeneration()
);
//
// TODO(abcdef): заменить на вызов ProcessUserAct
Expand All @@ -61,12 +61,13 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
userInfo.Step = userInfo.Generation = 0;
}
hasReadRule.erase(consumer);
TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
hasReadRule.erase(consumer.GetName());
TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs());
if (!ts) ts += TDuration::MilliSeconds(1);
if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
userInfo.ReadFromTimestamp = ts;
}

for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
if (userInfo.NoConsumer) {
Expand Down Expand Up @@ -179,22 +180,27 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&

void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
important.insert(importantUser);
TUserInfo* userInfo = UsersInfoStorage->GetIfExists(importantUser);
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant()) {
continue;
}

important.insert(consumer.GetName());

TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumer.GetName());
if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
userInfo->SetImportant(true);
continue;
}
if (!userInfo) {
userInfo = &UsersInfoStorage->Create(
ctx, importantUser, 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
ctx, consumer.GetName(), 0, true, "", 0, 0, 0, 0, 0, TInstant::Zero(), {}
);
}
if (userInfo->Offset < (i64)StartOffset)
userInfo->Offset = StartOffset;
ReadTimestampForOffset(importantUser, *userInfo, ctx);
ReadTimestampForOffset(consumer.GetName(), *userInfo, ctx);
}
for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "partition_log.h"
#include "partition.h"
#include "read.h"
#include "utils.h"

#include <ydb/core/base/tx_processing.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/persqueue/config/config.h>
Expand Down Expand Up @@ -1013,6 +1015,8 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
bool res = Config.ParseFromString(read.GetValue());
Y_ABORT_UNLESS(res);

Migrate(Config);

if (!Config.PartitionsSize()) {
for (const auto partitionId : Config.GetPartitionIds()) {
Config.AddPartitions()->SetPartitionId(partitionId);
Expand Down Expand Up @@ -1145,10 +1149,12 @@ void TPersQueue::InitializeMeteringSink(const TActorContext& ctx) {
}

auto countReadRulesWithPricing = [&](const TActorContext& ctx, const auto& config) {
auto& defaultClientServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();

ui32 result = 0;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
TString rrServiceType = config.ReadRuleServiceTypesSize() <= i ? "" : config.GetReadRuleServiceTypes(i);
if (rrServiceType.empty() || rrServiceType == AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName())
for (auto& consumer : config.GetConsumers()) {
TString serviceType = consumer.GetServiceType();
if (serviceType.empty() || serviceType == defaultClientServiceType)
++result;
}
return result;
Expand Down Expand Up @@ -1618,24 +1624,27 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
if (!cfg.HasCacheSize() && Config.HasCacheSize()) //if not set and it is alter - preserve old cache size
cfg.SetCacheSize(Config.GetCacheSize());

Migrate(cfg);

// set rr generation for provided read rules
{
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
auto version = i < Config.ReadRuleVersionsSize() ? Config.GetReadRuleVersions(i) : 0;
auto generation = i < Config.ReadRuleGenerationsSize() ? Config.GetReadRuleGenerations(i) : 0;
existed[Config.GetReadRules(i)] = std::make_pair(version, generation);
for (const auto& c : Config.GetConsumers()) {
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
}
for (ui32 i = 0; i < cfg.ReadRulesSize(); ++i) {
auto version = i < cfg.ReadRuleVersionsSize() ? cfg.GetReadRuleVersions(i) : 0;
auto it = existed.find(cfg.GetReadRules(i));

for (auto& c : *cfg.MutableConsumers()) {
auto it = existed.find(c.GetName());
ui64 generation = 0;
if (it != existed.end() && it->second.first == version) {
if (it != existed.end() && it->second.first == c.GetVersion()) {
generation = it->second.second;
} else {
generation = curConfigVersion;
}
cfg.AddReadRuleGenerations(generation);
c.SetGeneration(generation);
if (ReadRuleCompatible()) {
cfg.AddReadRuleGenerations(generation);
}
}
}

Expand All @@ -1648,7 +1657,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf

BeginWriteConfig(cfg, bootstrapCfg, ctx);

NewConfig = cfg;
NewConfig = std::move(cfg);
}

void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
Expand Down
32 changes: 14 additions & 18 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ bool TPersQueueReadBalancer::TTxInit::Execute(TTransactionContext& txc, const TA
if (!config.empty()) {
bool res = Self->TabletConfig.ParseFromString(config);
Y_ABORT_UNLESS(res);

Migrate(Self->TabletConfig);
Self->Consumers.clear();

if (Self->TabletConfig.ReadRulesSize() == Self->TabletConfig.ConsumerScalingSupportSize()) {
for (size_t i = 0; i < Self->TabletConfig.ReadRulesSize(); ++i) {
Self->Consumers[Self->TabletConfig.GetReadRules(i)].ScalingSupport = Self->TabletConfig.GetConsumerScalingSupport(i);
}
} else {
for (const auto& rr : Self->TabletConfig.GetReadRules()) {
Self->Consumers[rr].ScalingSupport = DefaultScalingSupport();
}
for (auto& consumer : Self->TabletConfig.GetConsumers()) {
Self->Consumers[consumer.GetName()].ScalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();
}

Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
Expand Down Expand Up @@ -532,10 +528,12 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
Version = record.GetVersion();
MaxPartsPerTablet = record.GetPartitionPerTablet();
PathId = record.GetPathId();
Topic = record.GetTopicName();
Path = record.GetPath();
Topic = std::move(record.GetTopicName());
Path = std::move(record.GetPath());
TxId = record.GetTxId();
TabletConfig = record.GetTabletConfig();
TabletConfig = std::move(record.GetTabletConfig());
Migrate(TabletConfig);

SchemeShardId = record.GetSchemeShardId();
TotalGroups = record.HasTotalGroupCount() ? record.GetTotalGroupCount() : 0;
ui32 prevNextPartitionId = NextPartitionId;
Expand All @@ -547,17 +545,15 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

auto oldConsumers = std::move(Consumers);
Consumers.clear();
for (size_t i = 0; i < TabletConfig.ReadRulesSize(); ++i) {
auto& rr = TabletConfig.GetReadRules(i);
for (auto& consumer : TabletConfig.GetConsumers()) {
auto scalingSupport = consumer.HasScalingSupport() ? consumer.GetScalingSupport() : DefaultScalingSupport();

auto scalingSupport = i < TabletConfig.ConsumerScalingSupportSize() ? TabletConfig.GetConsumerScalingSupport(i)
: DefaultScalingSupport();
auto it = oldConsumers.find(rr);
auto it = oldConsumers.find(consumer.GetName());
if (it != oldConsumers.end()) {
auto& c = Consumers[rr] = std::move(it->second);
auto& c = Consumers[consumer.GetName()] = std::move(it->second);
c.ScalingSupport = scalingSupport;
} else {
Consumers[rr].ScalingSupport = scalingSupport;
Consumers[consumer.GetName()].ScalingSupport = scalingSupport;
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransactio
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();

Migrate(TabletConfig);

InitPartitions();
}

Expand Down Expand Up @@ -157,6 +159,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

Migrate(TabletConfig);

TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/user_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
{
TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
TString userServiceType = "";
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
if (Config.GetReadRules(i) == user) {
userServiceType = Config.ReadRuleServiceTypesSize() > i ? Config.GetReadRuleServiceTypes(i) : "";
for (auto& consumer : Config.GetConsumers()) {
if (consumer.GetName() == user) {
userServiceType = consumer.GetServiceType();
break;
}
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/persqueue/user_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,6 @@ class TUsersInfoStorage {
const TUserInfo* GetIfExists(const TString& user) const;
TUserInfo* GetIfExists(const TString& user);

void UpdateConfig(const NKikimrPQ::TPQTabletConfig& config) {
Config = config;
}

THashMap<TString, TUserInfo>& GetAll();

TUserInfoBase CreateUserInfo(const TString& user,
Expand Down Expand Up @@ -422,7 +418,7 @@ class TUsersInfoStorage {

TMaybe<TActorId> TabletActor;
TMaybe<TActorId> PartitionActor;
NKikimrPQ::TPQTabletConfig Config;
const NKikimrPQ::TPQTabletConfig& Config;

TString CloudId;
TString DbId;
Expand Down
Loading
Loading