Skip to content

Commit

Permalink
Workload read without consumer and metrics fixes (#1792)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Feb 29, 2024
1 parent 16c59a0 commit dcce157
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 53 deletions.
20 changes: 17 additions & 3 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
auto* cac = ac->AddConsumerAggregatedCounters();
cac->SetConsumer(userInfo.User);
Expand Down Expand Up @@ -1124,7 +1124,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
userInfo.EndOffset = EndOffset;
Expand Down Expand Up @@ -1228,6 +1228,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
}
}

if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
}

if (haveChanges) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
}
Expand Down Expand Up @@ -1339,6 +1345,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}

if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
haveChanges = true;
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}
return haveChanges;
}

Expand Down Expand Up @@ -1853,7 +1867,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
if (LastOffsetHasBeenCommited(userInfo)) {
SendReadingFinished(user);
}
} else {
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
auto ui = UsersInfoStorage->GetIfExists(user);
if (ui && ui->LabeledCounters) {
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
if (IsQuotingEnabled()) {
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
Expand All @@ -896,7 +896,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
subgroups.pop_back();
}

subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/counters_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Y_UNIT_TEST(PartitionWriteQuota) {
TStringStream histogramStr;
histogram->OutputHtml(histogramStr);
Cerr << "**** Total histogram: **** \n " << histogramStr.Str() << "**** **** **** ****" << Endl;
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "1000ms")->Val(), 3);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 1);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "0ms")->Val(),2);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 5);
}
}

Expand Down
34 changes: 17 additions & 17 deletions ydb/core/persqueue/ut/resources/counters_datastreams.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,15 @@
bin=60000: 0
bin=999999: 0

name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0

name=topic.write.lag_milliseconds:
bin=100: 0
bin=1000: 10
bin=1000: 0
bin=10000: 0
bin=180000: 0
bin=200: 0
bin=2000: 0
bin=30000: 0
bin=500: 20
bin=500: 30
bin=5000: 0
bin=60000: 0
bin=999999: 0
Expand All @@ -68,4 +53,19 @@
bin=5242880: 0
bin=67108864: 0
bin=99999999: 0

name=topic.write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0
</pre>
28 changes: 14 additions & 14 deletions ydb/core/persqueue/ut/resources/counters_pqproxy.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@
Account=asdfgs:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

Account=total:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

Expand Down Expand Up @@ -543,29 +543,29 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

OriginDC=cluster:

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

Expand All @@ -577,14 +577,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

Expand All @@ -598,14 +598,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

Expand All @@ -621,14 +621,14 @@

sensor=TimeLagsOriginal:
Interval=10000ms: 0
Interval=1000ms: 10
Interval=1000ms: 0
Interval=100ms: 0
Interval=180000ms: 0
Interval=2000ms: 0
Interval=200ms: 0
Interval=30000ms: 0
Interval=5000ms: 0
Interval=500ms: 20
Interval=500ms: 30
Interval=60000ms: 0
Interval=999999ms: 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@
Account=federationAccount:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0

Account=total:
Duration=10000ms: 0
Duration=1000ms: 0
Duration=100ms: 0
Duration=100ms: 3
Duration=1500ms: 0
Duration=2000ms: 0
Duration=200ms: 0
Duration=30000ms: 0
Duration=5000ms: 0
Duration=500ms: 3
Duration=500ms: 0
Duration=550ms: 0
Duration=99999999ms: 0
</pre>
2 changes: 2 additions & 0 deletions ydb/core/persqueue/ut/resources/counters_topics.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
name=topic.partition.read.inflight_throttled_microseconds_max: 0
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
name=topic.partition.read.throttled_microseconds_max: 0
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
name=topic.partition.storage_bytes_max: 0
name=topic.partition.total_count: 2
name=topic.partition.uptime_milliseconds_min: 30000
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/counters_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,7 @@ enum EPartitionLabeledCounters {
METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];

METRIC_READ_INFLIGHT_LIMIT_THROTTLED = 39 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.inflight_throttled_microseconds_max"}];

METRIC_READ_QUOTA_NO_CONSUMER_BYTES = 40 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read_without_consumer.speed_limit_bytes_per_second"}];
METRIC_READ_QUOTA_NO_CONSUMER_USAGE = 41 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read_without_consumer.throttled_microseconds_max"}];
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ void TTopicOperationsScenario::StartConsumerThreads(std::vector<std::future<void
.UseTopicCommit = OnlyTableInTx,
.UseTableSelect = UseTableSelect && !OnlyTopicInTx,
.UseTableUpsert = !OnlyTopicInTx,
.ReadWithoutConsumer = ReadWithoutConsumer,
.CommitPeriod = CommitPeriod,
.CommitMessages = CommitMessages
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TTopicOperationsScenario {
bool OnlyTopicInTx = false;
bool OnlyTableInTx = false;
bool UseTableSelect = true;
bool ReadWithoutConsumer = false;

protected:
void CreateTopic(const TString& database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
auto topicClient = std::make_unique<NYdb::NTopic::TTopicClient>(params.Driver);
std::optional<TTransactionSupport> txSupport;

auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto describeTopicResult = TCommandWorkloadTopicDescribe::DescribeTopic(params.Database, params.TopicName, params.Driver);
auto consumers = describeTopicResult.GetConsumers();
NYdb::NTopic::TReadSessionSettings settings;

if (!params.ReadWithoutConsumer) {
auto consumerName = TCommandWorkloadTopicDescribe::GenerateConsumerName(params.ConsumerPrefix, params.ConsumerIdx);
auto consumers = describeTopicResult.GetConsumers();

if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
if (!std::any_of(consumers.begin(), consumers.end(), [consumerName](const auto& consumer) { return consumer.GetConsumerName() == consumerName; }))
{
WRITE_LOG(params.Log, ELogPriority::TLOG_EMERG, TStringBuilder() << "Topic '" << params.TopicName << "' doesn't have a consumer '" << consumerName << "'. Run command 'workload init' with parameter '--consumers'.");
exit(EXIT_FAILURE);
}
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);
} else {
NYdb::NTopic::TTopicReadSettings topic = params.TopicName;
auto partitions = describeTopicResult.GetPartitions();
for(auto partition: partitions) {
topic.AppendPartitionIds(partition.GetPartitionId());
}
settings.WithoutConsumer().AppendTopics(topic);
}


if (params.UseTransactions) {
txSupport.emplace(params.Driver, params.ReadOnlyTableName, params.TableName);
}

NYdb::NTopic::TReadSessionSettings settings;
settings.ConsumerName(consumerName).AppendTopics(params.TopicName);

auto readSession = topicClient->CreateReadSession(settings);
WRITE_LOG(params.Log, ELogPriority::TLOG_INFO, "Reader session was created.");

Expand Down Expand Up @@ -93,7 +103,7 @@ void TTopicWorkloadReader::ReaderLoop(TTopicWorkloadReaderParams& params, TInsta
<< " createTime " << message.GetCreateTime() << " fullTimeMs " << fullTime);
}

if (!txSupport || params.UseTopicCommit) {
if (!params.ReadWithoutConsumer && (!txSupport || params.UseTopicCommit)) {
dataEvent->Commit();
}
} else if (auto* createPartitionStreamEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace NYdb {
bool UseTopicCommit = false;
bool UseTableSelect = true;
bool UseTableUpsert = true;
bool ReadWithoutConsumer = false;
size_t CommitPeriod = 15;
size_t CommitMessages = 1'000'000;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ void TCommandWorkloadTopicRunRead::Config(TConfig& config)
config.Opts->AddLongOption("topic", "Topic name.")
.DefaultValue(TOPIC)
.StoreResult(&Scenario.TopicName);
config.Opts->AddLongOption("no-consumer", "Read without consumer")
.Hidden()
.StoreTrue(&Scenario.ReadWithoutConsumer);

// Specific params
config.Opts->AddLongOption("consumer-prefix", "Use consumers with names '<consumer-prefix>-0' ... '<consumer-prefix>-<n-1>' where n is set in the '--consumers' option.")
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ namespace NKikimr::NPersQueueTests {
"topic.write.bytes",
"topic.write.messages",
"api.grpc.topic.stream_write.bytes",
"api.grpc.topic.stream_write.partition_throttled_milliseconds",
"topic.write.partition_throttled_milliseconds",
"topic.write.message_size_bytes",
"api.grpc.topic.stream_write.messages",
"topic.write.lag_milliseconds",
Expand Down

0 comments on commit dcce157

Please sign in to comment.