Skip to content

Commit

Permalink
Merge 8694134 into 1078162
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Jun 25, 2024
2 parents 1078162 + 8694134 commit fd60865
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 236 deletions.
4 changes: 2 additions & 2 deletions ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& cl
}


TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoscalingSupport) {
TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set<ui32> partitions, bool autoPartitioningSupport) {
Impl = std::make_shared<TImpl>(name, autoCommit);

Impl->Acquire();

auto readSettings = TReadSessionSettings()
.ConsumerName(TEST_CONSUMER)
.AppendTopics(TEST_TOPIC)
.AutoscalingSupport(autoscalingSupport);
.AutoPartitioningSupport(autoPartitioningSupport);
for (auto partitionId : partitions) {
readSettings.Topics_[0].AppendPartitionIds(partitionId);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct TTestReadSession {

static constexpr size_t SemCount = 1;

TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max<size_t>(), bool autoCommit = true, std::set<ui32> partitions = {}, bool autoscalingSupport = true);
TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max<size_t>(), bool autoCommit = true, std::set<ui32> partitions = {}, bool AutoPartitioningSupport = true);

void WaitAllMessages();

Expand Down
68 changes: 34 additions & 34 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,19 +631,19 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto scaleUpPercent = 80;
auto scaleDownPercent = 20;
auto threshold = 500;
auto strategy = EAutoscalingStrategy::ScaleUp;
auto strategy = EAutoPartitioningStrategy::ScaleUp;

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(minParts)
.MaxActivePartitions(maxParts)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(scaleUpPercent)
.ScaleDownThresholdPercent(scaleDownPercent)
.ThresholdTime(TDuration::Seconds(threshold))
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(scaleUpPercent)
.DownUtilizationPercent(scaleDownPercent)
.StabilizationWindow(TDuration::Seconds(threshold))
.Strategy(strategy)
.EndConfigureAutoscalingSettings()
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();

Expand All @@ -655,29 +655,29 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), strategy);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), scaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), scaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), threshold);

auto alterMinParts = 10;
auto alterMaxParts = 20;
auto alterScaleUpPercent = 90;
auto alterScaleDownPercent = 10;
auto alterThreshold = 700;
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;
auto alterStrategy = EAutoPartitioningStrategy::ScaleUpAndDown;

TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MinActivePartitions(alterMinParts)
.MaxActivePartitions(alterMaxParts)
.BeginAlterAutoscalingSettings()
.ScaleDownThresholdPercent(alterScaleDownPercent)
.ScaleUpThresholdPercent(alterScaleUpPercent)
.ThresholdTime(TDuration::Seconds(alterThreshold))
.BeginAlterAutoPartitioningSettings()
.DownUtilizationPercent(alterScaleDownPercent)
.UpUtilizationPercent(alterScaleUpPercent)
.StabilizationWindow(TDuration::Seconds(alterThreshold))
.Strategy(alterStrategy)
.EndAlterAutoscalingSettings()
.EndAlterAutoPartitioningSettings()
.EndAlterTopicPartitioningSettings();

client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();
Expand All @@ -686,10 +686,10 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), alterStrategy);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), alterScaleDownPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), alterScaleUpPercent);
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), alterThreshold);
}

Y_UNIT_TEST(ControlPlane_DisableAutoPartitioning) {
Expand All @@ -704,9 +704,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(topicName, createSettings).Wait();
}
Expand All @@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
alterSettings
.BeginAlterPartitioningSettings()
.BeginAlterAutoscalingSettings()
.Strategy(EAutoscalingStrategy::Disabled)
.Strategy(EAutoPartitioningStrategy::Disabled)
.EndAlterAutoscalingSettings()
.EndAlterTopicPartitioningSettings();
auto f = client.AlterTopic(topicName, alterSettings);
Expand All @@ -732,7 +732,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginAlterPartitioningSettings()
.MaxActivePartitions(0)
.BeginAlterAutoscalingSettings()
.Strategy(EAutoscalingStrategy::Disabled)
.Strategy(EAutoPartitioningStrategy::Disabled)
.EndAlterAutoscalingSettings()
.EndAlterTopicPartitioningSettings();
auto f = client.AlterTopic(topicName, alterSettings);
Expand All @@ -752,9 +752,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
createSettings
.RetentionStorageMb(1024)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoscalingSettings()
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();

Expand Down Expand Up @@ -782,12 +782,12 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoscalingSettings()
.ScaleUpThresholdPercent(2)
.ScaleDownThresholdPercent(1)
.ThresholdTime(TDuration::Seconds(1))
.Strategy(EAutoscalingStrategy::ScaleUp)
.EndConfigureAutoscalingSettings()
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(2)
.DownUtilizationPercent(1)
.StabilizationWindow(TDuration::Seconds(1))
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings();
client.CreateTopic(TEST_TOPIC, createSettings).Wait();

Expand Down
60 changes: 30 additions & 30 deletions ydb/public/api/protos/ydb_persqueue_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1082,16 +1082,16 @@ message Credentials {
}
}

enum AutoscalingStrategy {
// The autoscaling algorithm is not specified. The default value will be used.
AUTOSCALING_STRATEGY_UNSPECIFIED = 0;
// The autoscaling is disabled.
AUTOSCALING_STRATEGY_DISABLED = 1;
// The autoscaling algorithm will increase partitions count depending on the load characteristics.
// The autoscaling algorithm will never decrease the number of partitions.
AUTOSCALING_STRATEGY_SCALE_UP = 2;
// The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics.
AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3;
enum AutoPartitioningStrategy {
// The auto partitioning algorithm is not specified. The default value will be used.
AUTO_PARTITIONING_STRATEGY_UNSPECIFIED = 0;
// The auto partitioning is disabled.
AUTO_PARTITIONING_STRATEGY_DISABLED = 1;
// The auto partitioning algorithm will increase partitions count depending on the load characteristics.
// The auto partitioning algorithm will never decrease the number of partitions.
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
}

/**
Expand All @@ -1103,12 +1103,12 @@ message TopicSettings {
FORMAT_BASE = 1;
}

oneof partitioning {
// How many partitions in topic. Must less than database limit. Default limit - 10.
int32 partitions_count = 1 [(value) = "> 0"];
// Settings for the partitions count autoscaling.
AutoscalingSettings autoscaling_settings = 15;
};

// How many partitions in topic. Must less than database limit. Default limit - 10.
int32 partitions_count = 1 [(value) = "> 0"];
// Settings for the partitions count auto partitioning.
AutoPartitioningSettings auto_partitioning_settings = 15;


oneof retention {
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
Expand Down Expand Up @@ -1184,9 +1184,9 @@ message TopicSettings {
RemoteMirrorRule remote_mirror_rule = 11;
}

message AutoscalingSettings {
// Strategy of autoscaling.
AutoscalingStrategy strategy = 1;
message AutoPartitioningSettings {
// Strategy of auto partitioning.
AutoPartitioningStrategy strategy = 1;

// Auto merge would stop working when the partitions count reaches min_active_partitions.
// Zero value means default - 1.
Expand All @@ -1198,24 +1198,24 @@ message AutoscalingSettings {
// Zero value means default - 100.
int64 partition_count_limit = 4 [(Ydb.value) = ">= 0", deprecated = true];

// Partition write speed autoscaling options.
AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 5;
// Partition write speed auto partitioning options.
AutoPartitioningWriteSpeedStrategy partition_write_speed = 5;
}

message AutoscalingPartitionWriteSpeedStrategy {
//Partition will be autoscaled up (divided into 2 partitions)
//after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
message AutoPartitioningWriteSpeedStrategy {
//Partition will be auto partitioning up (divided into 2 partitions)
//after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds

//Partition will become a candidate to the autoscaling down
//after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
//This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period.
//Partition will become a candidate to the auto partitioning down
//after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds
//This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period.

// Zero value means default - 300.
google.protobuf.Duration threshold_time = 1;
google.protobuf.Duration stabilization_window = 1;
// Zero value means default - 90.
int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"];
int32 up_utilization_percent = 2 [(Ydb.value) = ">= 0"];
// Zero value means default - 30.
int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"];
int32 down_utilization_percent = 3 [(Ydb.value) = ">= 0"];
}

/**
Expand Down
Loading

0 comments on commit fd60865

Please sign in to comment.