Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto partitioning options to CLI, rename autoscaling and PQ_V1 SDK auto partitioning support #5895

4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& cl
}


TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoscalingSupport) {
TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoPartitioningSupport) {
Impl = std::make_shared<TImpl>(name, autoCommit);

Impl->Acquire();

auto readSettings = TReadSessionSettings()
.ConsumerName(TEST_CONSUMER)
.AppendTopics(TEST_TOPIC)
.AutoscalingSupport(autoscalingSupport);
.AutoPartitioningSupport(autoPartitioningSupport);
for (auto partitionId : partitions) {
readSettings.Topics_[0].AppendPartitionIds(partitionId);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct TTestReadSession {

static constexpr size_t SemCount = 1;

TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max<size_t>(), bool autoCommit = true, std::set<ui32> partitions = {}, bool autoscalingSupport = true);
TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max<size_t>(), bool autoCommit = true, std::set<ui32> partitions = {}, bool autoPartitioningSupport = true);

void WaitAllMessages();

Expand Down
76 changes: 38 additions & 38 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,19 +631,19 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto scaleUpPercent = 80;
auto scaleDownPercent = 20;
auto threshold = 500;
auto strategy = EAutoscalingStrategy::ScaleUp;
auto strategy = EAutoPartitioningStrategy::ScaleUp;

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(minParts)
.MaxActivePartitions(maxParts)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(scaleUpPercent)
.ScaleDownThresholdPercent(scaleDownPercent)
.ThresholdTime(TDuration::Seconds(threshold))
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(scaleUpPercent)
.DownUtilizationPercent(scaleDownPercent)
.StabilizationWindow(TDuration::Seconds(threshold))
.Strategy(strategy)
.EndConfigureAutoscalingSettings()
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();

Expand All @@ -655,29 +655,29 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), threshold);

auto alterMinParts = 10;
auto alterMaxParts = 20;
auto alterScaleUpPercent = 90;
auto alterScaleDownPercent = 10;
auto alterThreshold = 700;
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;
auto alterStrategy = EAutoPartitioningStrategy::ScaleUpAndDown;

TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MinActivePartitions(alterMinParts)
.MaxActivePartitions(alterMaxParts)
.BeginAlterAutoscalingSettings()
.ScaleDownThresholdPercent(alterScaleDownPercent)
.ScaleUpThresholdPercent(alterScaleUpPercent)
.ThresholdTime(TDuration::Seconds(alterThreshold))
.BeginAlterAutoPartitioningSettings()
.DownUtilizationPercent(alterScaleDownPercent)
.UpUtilizationPercent(alterScaleUpPercent)
.StabilizationWindow(TDuration::Seconds(alterThreshold))
.Strategy(alterStrategy)
.EndAlterAutoscalingSettings()
.EndAlterAutoPartitioningSettings()
.EndAlterTopicPartitioningSettings();

client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();
Expand All @@ -686,10 +686,10 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), alterThreshold);
}

Y_UNIT_TEST(ControlPlane_DisableAutoPartitioning) {
Expand All @@ -704,9 +704,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(topicName, createSettings).Wait();
}
Expand All @@ -715,9 +715,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.BeginAlterAutoscalingSettings()
.Strategy(EAutoscalingStrategy::Disabled)
.EndAlterAutoscalingSettings()
.BeginAlterAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::Disabled)
.EndAlterAutoPartitioningSettings()
.EndAlterTopicPartitioningSettings();
auto f = client.AlterTopic(topicName, alterSettings);
f.Wait();
Expand All @@ -731,9 +731,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
alterSettings
.BeginAlterPartitioningSettings()
.MaxActivePartitions(0)
.BeginAlterAutoscalingSettings()
.Strategy(EAutoscalingStrategy::Disabled)
.EndAlterAutoscalingSettings()
.BeginAlterAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::Disabled)
.EndAlterAutoPartitioningSettings()
.EndAlterTopicPartitioningSettings();
auto f = client.AlterTopic(topicName, alterSettings);
f.Wait();
Expand All @@ -752,9 +752,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
createSettings
.RetentionStorageMb(1024)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();

Expand Down Expand Up @@ -782,12 +782,12 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(2)
.ScaleDownThresholdPercent(1)
.ThresholdTime(TDuration::Seconds(1))
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(2)
.DownUtilizationPercent(1)
.StabilizationWindow(TDuration::Seconds(1))
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(TEST_TOPIC, createSettings).Wait();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class TAlterPQ: public TSubOperation {
if (alterConfig.HasPartitionStrategy() && !NPQ::SplitMergeEnabled(alterConfig)
&& tabletConfig->HasPartitionStrategy() && NPQ::SplitMergeEnabled(*tabletConfig)) {
if (!alterConfig.GetPartitionStrategy().HasMaxPartitionCount() || 0 != alterConfig.GetPartitionStrategy().GetMaxPartitionCount()) {
errStr = TStringBuilder() << "Can`t disable autoscaling. Disabling autoscaling is a destructive operation, "
errStr = TStringBuilder() << "Can`t disable auto partitioning. Disabling auto partitioning is a destructive operation, "
<< "after which all partitions will become active and the message order guarantee will be violated. "
<< "If you are sure of this, then set max_active_partitions to 0.";
return nullptr;
Expand Down
60 changes: 30 additions & 30 deletions ydb/public/api/protos/ydb_persqueue_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1082,16 +1082,16 @@ message Credentials {
}
}

enum AutoscalingStrategy {
// The autoscaling algorithm is not specified. The default value will be used.
AUTOSCALING_STRATEGY_UNSPECIFIED = 0;
// The autoscaling is disabled.
AUTOSCALING_STRATEGY_DISABLED = 1;
// The autoscaling algorithm will increase partitions count depending on the load characteristics.
// The autoscaling algorithm will never decrease the number of partitions.
AUTOSCALING_STRATEGY_SCALE_UP = 2;
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3;
enum AutoPartitioningStrategy {
// The auto partitioning algorithm is not specified. The default value will be used.
AUTO_PARTITIONING_STRATEGY_UNSPECIFIED = 0;
// The auto partitioning is disabled.
AUTO_PARTITIONING_STRATEGY_DISABLED = 1;
// The auto partitioning algorithm will increase partitions count depending on the load characteristics.
// The auto partitioning algorithm will never decrease the number of partitions.
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
}

/**
Expand All @@ -1103,12 +1103,12 @@ message TopicSettings {
FORMAT_BASE = 1;
}

oneof partitioning {
// How many partitions in topic. Must less than database limit. Default limit - 10.
int32 partitions_count = 1 [(value) = "> 0"];
// Settings for the partitions count autoscaling.
AutoscalingSettings autoscaling_settings = 15;
};

// How many partitions in topic. Must less than database limit. Default limit - 10.
int32 partitions_count = 1 [(value) = "> 0"];
// Settings for the partitions count auto partitioning.
AutoPartitioningSettings auto_partitioning_settings = 15;


oneof retention {
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
Expand Down Expand Up @@ -1184,9 +1184,9 @@ message TopicSettings {
RemoteMirrorRule remote_mirror_rule = 11;
}

message AutoscalingSettings {
// Strategy of autoscaling.
AutoscalingStrategy strategy = 1;
message AutoPartitioningSettings {
// Strategy of auto partitioning.
AutoPartitioningStrategy strategy = 1;

// Auto merge would stop working when the partitions count reaches min_active_partitions.
// Zero value means default - 1.
Expand All @@ -1198,24 +1198,24 @@ message AutoscalingSettings {
// Zero value means default - 100.
int64 partition_count_limit = 4 [(Ydb.value) = ">= 0", deprecated = true];

// Partition write speed autoscaling options.
AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 5;
// Partition write speed auto partitioning options.
AutoPartitioningWriteSpeedStrategy partition_write_speed = 5;
}

message AutoscalingPartitionWriteSpeedStrategy {
//Partition will be autoscaled up (divided into 2 partitions)
//after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
message AutoPartitioningWriteSpeedStrategy {
//Partition will be auto partitioning up (divided into 2 partitions)
//after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window

//Partition will become a candidate to the autoscaling down
//after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
//This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period.
//Partition will become a candidate to the auto partitioning down
//after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window
//This candidate partition will be auto partitioned down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period.

// Zero value means default - 300.
google.protobuf.Duration threshold_time = 1;
google.protobuf.Duration stabilization_window = 1;
// Zero value means default - 90.
int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"];
int32 up_utilization_percent = 2 [(Ydb.value) = ">= 0"];
// Zero value means default - 30.
int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"];
int32 down_utilization_percent = 3 [(Ydb.value) = ">= 0"];
}

/**
Expand Down
Loading
Loading