From 4b752c6553413167e2b2465cb0e747c43a582c40 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 24 Jun 2024 13:54:54 +0000 Subject: [PATCH 01/18] Rename autosaceling to auto partitioning --- .../ut/ut_with_sdk/autoscaling_ut.cpp | 68 +++++------ ydb/public/api/protos/ydb_persqueue_v1.proto | 60 +++++----- ydb/public/api/protos/ydb_topic.proto | 80 ++++++------- .../sdk/cpp/client/ydb_topic/impl/topic.cpp | 30 ++--- .../cpp/client/ydb_topic/impl/topic_impl.h | 26 ++--- .../client/ydb_topic/include/control_plane.h | 106 +++++++++--------- .../ut/ut_utils/topic_sdk_test_setup.cpp | 4 +- ydb/services/lib/actors/pq_schema_actor.cpp | 58 +++++----- .../actors/read_session_actor.cpp | 2 +- .../persqueue_v1/actors/schema_actors.cpp | 12 +- 10 files changed, 223 insertions(+), 223 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index e1b9002b0a6a..7df06f1870db 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -631,19 +631,19 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto scaleUpPercent = 80; auto scaleDownPercent = 20; auto threshold = 500; - auto strategy = EAutoscalingStrategy::ScaleUp; + auto strategy = EAutoPartitioningStrategy::ScaleUp; TCreateTopicSettings createSettings; createSettings .BeginConfigurePartitioningSettings() .MinActivePartitions(minParts) .MaxActivePartitions(maxParts) - .BeginConfigureAutoscalingSettings() - .ScaleUpThresholdPercent(scaleUpPercent) - .ScaleDownThresholdPercent(scaleDownPercent) - .ThresholdTime(TDuration::Seconds(threshold)) + .BeginConfigureAutoPartitioningSettings() + .UpUtilizationPercent(scaleUpPercent) + .DownUtilizationPercent(scaleDownPercent) + .StabilizationWindow(TDuration::Seconds(threshold)) .Strategy(strategy) - .EndConfigureAutoscalingSettings() + .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings(); client.CreateTopic(autoscalingTestTopic, createSettings).Wait(); @@ -655,29 +655,29 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts); UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts); - UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy); - UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent); - UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent); - UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), strategy); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), scaleDownPercent); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), scaleUpPercent); + UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), threshold); auto alterMinParts = 10; auto alterMaxParts = 20; auto alterScaleUpPercent = 90; auto alterScaleDownPercent = 10; auto alterThreshold = 700; - auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown; + auto alterStrategy = EAutoPartitioningStrategy::ScaleUpAndDown; TAlterTopicSettings alterSettings; alterSettings .BeginAlterPartitioningSettings() .MinActivePartitions(alterMinParts) .MaxActivePartitions(alterMaxParts) - .BeginAlterAutoscalingSettings() - .ScaleDownThresholdPercent(alterScaleDownPercent) - .ScaleUpThresholdPercent(alterScaleUpPercent) - .ThresholdTime(TDuration::Seconds(alterThreshold)) + .BeginAlterAutoPartitioningSettings() + .DownUtilizationPercent(alterScaleDownPercent) + .UpUtilizationPercent(alterScaleUpPercent) + .StabilizationWindow(TDuration::Seconds(alterThreshold)) .Strategy(alterStrategy) - .EndAlterAutoscalingSettings() + .EndAlterAutoPartitioningSettings() .EndAlterTopicPartitioningSettings(); client.AlterTopic(autoscalingTestTopic, alterSettings).Wait(); @@ -686,10 +686,10 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts); UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), alterStrategy); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), alterScaleDownPercent); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), alterScaleUpPercent); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), alterThreshold); } Y_UNIT_TEST(ControlPlane_DisableAutoPartitioning) { @@ -704,9 +704,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { .BeginConfigurePartitioningSettings() .MinActivePartitions(1) .MaxActivePartitions(100) - .BeginConfigureAutoscalingSettings() - .Strategy(EAutoscalingStrategy::ScaleUp) - .EndConfigureAutoscalingSettings() + .BeginConfigureAutoPartitioningSettings() + .Strategy(EAutoPartitioningStrategy::ScaleUp) + .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings(); client.CreateTopic(topicName, createSettings).Wait(); } @@ -716,7 +716,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { alterSettings .BeginAlterPartitioningSettings() .BeginAlterAutoscalingSettings() - .Strategy(EAutoscalingStrategy::Disabled) + .Strategy(EAutoPartitioningStrategy::Disabled) .EndAlterAutoscalingSettings() .EndAlterTopicPartitioningSettings(); auto f = client.AlterTopic(topicName, alterSettings); @@ -732,7 +732,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { .BeginAlterPartitioningSettings() .MaxActivePartitions(0) .BeginAlterAutoscalingSettings() - .Strategy(EAutoscalingStrategy::Disabled) + .Strategy(EAutoPartitioningStrategy::Disabled) .EndAlterAutoscalingSettings() .EndAlterTopicPartitioningSettings(); auto f = client.AlterTopic(topicName, alterSettings); @@ -752,9 +752,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { createSettings .RetentionStorageMb(1024) .BeginConfigurePartitioningSettings() - .BeginConfigureAutoscalingSettings() - .Strategy(EAutoscalingStrategy::ScaleUp) - .EndConfigureAutoscalingSettings() + .BeginConfigureAutoPartitioningSettings() + .Strategy(EAutoPartitioningStrategy::ScaleUp) + .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings(); auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync(); @@ -782,12 +782,12 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { .BeginConfigurePartitioningSettings() .MinActivePartitions(1) .MaxActivePartitions(100) - .BeginConfigureAutoscalingSettings() - .ScaleUpThresholdPercent(2) - .ScaleDownThresholdPercent(1) - .ThresholdTime(TDuration::Seconds(1)) - .Strategy(EAutoscalingStrategy::ScaleUp) - .EndConfigureAutoscalingSettings() + .BeginConfigureAutoPartitioningSettings() + .UpUtilizationPercent(2) + .DownUtilizationPercent(1) + .StabilizationWindow(TDuration::Seconds(1)) + .Strategy(EAutoPartitioningStrategy::ScaleUp) + .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings(); client.CreateTopic(TEST_TOPIC, createSettings).Wait(); diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index f2b58f65a509..026adefa6c1a 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -1082,16 +1082,16 @@ message Credentials { } } -enum AutoscalingStrategy { - // The autoscaling algorithm is not specified. The default value will be used. - AUTOSCALING_STRATEGY_UNSPECIFIED = 0; - // The autoscaling is disabled. - AUTOSCALING_STRATEGY_DISABLED = 1; - // The autoscaling algorithm will increase partitions count depending on the load characteristics. - // The autoscaling algorithm will never decrease the number of partitions. - AUTOSCALING_STRATEGY_SCALE_UP = 2; - // The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics. - AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3; +enum AutoPartitioningStrategy { + // The auto partitioning algorithm is not specified. The default value will be used. + AUTO_PARTITIONING_STRATEGY_UNSPECIFIED = 0; + // The auto partitioning is disabled. + AUTO_PARTITIONING_STRATEGY_DISABLED = 1; + // The auto partitioning algorithm will increase partitions count depending on the load characteristics. + // The auto partitioning algorithm will never decrease the number of partitions. + AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2; + // The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics. + AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3; } /** @@ -1103,12 +1103,12 @@ message TopicSettings { FORMAT_BASE = 1; } - oneof partitioning { - // How many partitions in topic. Must less than database limit. Default limit - 10. - int32 partitions_count = 1 [(value) = "> 0"]; - // Settings for the partitions count autoscaling. - AutoscalingSettings autoscaling_settings = 15; - }; + + // How many partitions in topic. Must less than database limit. Default limit - 10. + int32 partitions_count = 1 [(value) = "> 0"]; + // Settings for the partitions count auto partitioning. + AutoPartitioningSettings auto_partitioning_settings = 15; + oneof retention { // How long data in partition should be stored. Must be greater than 0 and less than limit for this database. @@ -1184,9 +1184,9 @@ message TopicSettings { RemoteMirrorRule remote_mirror_rule = 11; } -message AutoscalingSettings { - // Strategy of autoscaling. - AutoscalingStrategy strategy = 1; +message AutoPartitioningSettings { + // Strategy of auto partitioning. + AutoPartitioningStrategy strategy = 1; // Auto merge would stop working when the partitions count reaches min_active_partitions. // Zero value means default - 1. @@ -1198,24 +1198,24 @@ message AutoscalingSettings { // Zero value means default - 100. int64 partition_count_limit = 4 [(Ydb.value) = ">= 0", deprecated = true]; - // Partition write speed autoscaling options. - AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 5; + // Partition write speed auto partitioning options. + AutoPartitioningWriteSpeedStrategy partition_write_speed = 5; } -message AutoscalingPartitionWriteSpeedStrategy { - //Partition will be autoscaled up (divided into 2 partitions) - //after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds +message AutoPartitioningWriteSpeedStrategy { + //Partition will be auto partitioning up (divided into 2 partitions) + //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds - //Partition will become a candidate to the autoscaling down - //after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds - //This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period. + //Partition will become a candidate to the auto partitioning down + //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. // Zero value means default - 300. - google.protobuf.Duration threshold_time = 1; + google.protobuf.Duration stabilization_window = 1; // Zero value means default - 90. - int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"]; + int32 up_utilization_percent = 2 [(Ydb.value) = ">= 0"]; // Zero value means default - 30. - int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"]; + int32 down_utilization_percent = 3 [(Ydb.value) = ">= 0"]; } /** diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index fca6d330da00..fc1d2a2dba5c 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -322,8 +322,8 @@ message StreamReadMessage { string reader_name = 3; // Direct reading from a partition node. bool direct_read = 4; - // Indicates that the SDK supports autoscaling. - bool autoscaling_support = 5; + // Indicates that the SDK supports auto partitioning. + bool auto_partitioning_support = 5; message TopicReadSettings { // Topic path. @@ -832,16 +832,16 @@ message AlterConsumer { map alter_attributes = 6; } -enum AutoscalingStrategy { - // The autoscaling algorithm is not specified. The default value will be used. - AUTOSCALING_STRATEGY_UNSPECIFIED = 0; - // The autoscaling is disabled. - AUTOSCALING_STRATEGY_DISABLED = 1; - // The autoscaling algorithm will increase partitions count depending on the load characteristics. - // The autoscaling algorithm will never decrease the number of partitions. - AUTOSCALING_STRATEGY_SCALE_UP = 2; - // The autoscaling algorithm will both increase and decrease partitions count depending on the load characteristics. - AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN = 3; +enum AutoPartitioningStrategy { + // The auto partitioning algorithm is not specified. The default value will be used. + AUTO_PARTITIONING_STRATEGY_UNSPECIFIED = 0; + // The auto partitioning is disabled. + AUTO_PARTITIONING_STRATEGY_DISABLED = 1; + // The auto partitioning algorithm will increase partitions count depending on the load characteristics. + // The auto partitioning algorithm will never decrease the number of partitions. + AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2; + // The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics. + AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3; } // Partitioning settings for topic. @@ -856,31 +856,31 @@ message PartitioningSettings { // Zero value means default - 100. // Use max_active_partitions int64 partition_count_limit = 2 [(Ydb.value) = ">= 0", deprecated = true]; - // Settings for the partitions count autoscaling. - AutoscalingSettings autoscaling_settings = 4; + // Settings for the partitions count auto partitioning. + AutoPartitioningSettings auto_partitioning_settings = 4; } -message AutoscalingSettings { - // Strategy of autoscaling. - AutoscalingStrategy strategy = 1; - // Partition write speed autoscaling options. - AutoscalingPartitionWriteSpeedStrategy partition_write_speed = 2; +message AutoPartitioningSettings { + // Strategy of auto partitioning. + AutoPartitioningStrategy strategy = 1; + // Partition write speed auto partitioning options. + AutoPartitioningWriteSpeedStrategy partition_write_speed = 2; } -message AutoscalingPartitionWriteSpeedStrategy { - //Partition will be autoscaled up (divided into 2 partitions) - //after write speed to the partition exceeds scale_up_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds +message AutoPartitioningWriteSpeedStrategy { + //Partition will be auto partitioned up (divided into 2 partitions) + //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds - //Partition will become a candidate to the autoscaling down - //after write speed doesn’t reach scale_down_threshold_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds - //This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the autoscaling down and not earlier than a retention period. + //Partition will become a candidate to the auto partitioned down + //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //This candidate partition will be auto partitioned down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. // Zero value means default - 300. - google.protobuf.Duration threshold_time = 1; + google.protobuf.Duration stabilization_window = 1; // Zero value means default - 90. - int32 scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"]; + int32 up_utilization_percent = 2 [(Ydb.value) = ">= 0"]; // Zero value means default - 30. - int32 scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"]; + int32 down_utilization_percent = 3 [(Ydb.value) = ">= 0"]; } // Partitioning settings for topic. @@ -895,30 +895,30 @@ message AlterPartitioningSettings { // Zero value means default - 100. // Use set_max_active_partitions optional int64 set_partition_count_limit = 2 [(Ydb.value) = ">= 0", deprecated = true]; - // Settings for autoscaling the partition number - optional AlterAutoscalingSettings alter_autoscaling_settings = 4; + // Settings for auto partitioning the partition number + optional AlterAutoPartitioningSettings alter_auto_partitioning_settings = 4; } -message AlterAutoscalingSettings { - // Strategy of autoscaling - optional AutoscalingStrategy set_strategy = 1; - // Autoscaling partition write speed options. - optional AlterAutoscalingPartitionWriteSpeedStrategy set_partition_write_speed = 2; +message AlterAutoPartitioningSettings { + // Strategy of auto partitioning + optional AutoPartitioningStrategy set_strategy = 1; + // Auto partitioning write speed options. + optional AlterAutoPartitioningWriteSpeedStrategy set_partition_write_speed = 2; } -message AlterAutoscalingPartitionWriteSpeedStrategy { +message AlterAutoPartitioningWriteSpeedStrategy { // The time of exceeding the threshold value, after which the partition will be - // autoscaling. + // auto partitioning. // Zero value means default - 300. - optional google.protobuf.Duration set_threshold_time = 1; + optional google.protobuf.Duration set_stabilization_window = 1; // The threshold value of the write speed to the partition as a percentage, when exceeded, // the partition will be auto split. // Zero value means default - 90. - optional int32 set_scale_up_threshold_percent = 2 [(Ydb.value) = ">= 0"]; + optional int32 set_up_utilization_percent = 2 [(Ydb.value) = ">= 0"]; // The threshold value of the write speed to the partition as a percentage, if it is not reached, // the partition will be auto merged. // Zero value means default - 30. - optional int32 set_scale_down_threshold_percent = 3 [(Ydb.value) = ">= 0"]; + optional int32 set_down_utilization_percent = 3 [(Ydb.value) = ">= 0"]; } // Metering mode specifies the method used to determine consumption of resources by the topic. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index 626ad1c81643..486e6a747d39 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -224,7 +224,7 @@ TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSetti : MinActivePartitions_(settings.min_active_partitions()) , MaxActivePartitions_(settings.max_active_partitions()) , PartitionCountLimit_(settings.partition_count_limit()) - , AutoscalingSettings_(settings.autoscaling_settings()) + , AutoPartitioningSettings_(settings.auto_partitioning_settings()) {} ui64 TPartitioningSettings::GetMinActivePartitions() const { @@ -239,31 +239,31 @@ ui64 TPartitioningSettings::GetPartitionCountLimit() const { return PartitionCountLimit_; } -TAutoscalingSettings TPartitioningSettings::GetAutoscalingSettings() const { - return AutoscalingSettings_; +TAutoPartitioningSettings TPartitioningSettings::GetAutoPartitioningSettings() const { + return AutoPartitioningSettings_; } -TAutoscalingSettings::TAutoscalingSettings(const Ydb::Topic::AutoscalingSettings& settings) - : Strategy_(static_cast(settings.strategy())) - , ThresholdTime_(TDuration::Seconds(settings.partition_write_speed().threshold_time().seconds())) - , ScaleDownThresholdPercent_(settings.partition_write_speed().scale_down_threshold_percent()) - , ScaleUpThresholdPercent_(settings.partition_write_speed().scale_up_threshold_percent()) +TAutoPartitioningSettings::TAutoPartitioningSettings(const Ydb::Topic::AutoPartitioningSettings& settings) + : Strategy_(static_cast(settings.strategy())) + , StabilizationWindow_(TDuration::Seconds(settings.partition_write_speed().stabilization_window().seconds())) + , DownUtilizationPercent_(settings.partition_write_speed().down_utilization_percent()) + , UpUtilizationPercent_(settings.partition_write_speed().up_utilization_percent()) {} -EAutoscalingStrategy TAutoscalingSettings::GetStrategy() const { +EAutoPartitioningStrategy TAutoPartitioningSettings::GetStrategy() const { return Strategy_; } -TDuration TAutoscalingSettings::GetThresholdTime() const { - return ThresholdTime_; +TDuration TAutoPartitioningSettings::GetStabilizationWindow() const { + return StabilizationWindow_; } -ui32 TAutoscalingSettings::GetScaleUpThresholdPercent() const { - return ScaleUpThresholdPercent_; +ui32 TAutoPartitioningSettings::GetUpUtilizationPercent() const { + return UpUtilizationPercent_; } -ui32 TAutoscalingSettings::GetScaleDownThresholdPercent() const { - return ScaleDownThresholdPercent_; +ui32 TAutoPartitioningSettings::GetDownUtilizationPercent() const { + return DownUtilizationPercent_; } TTopicStats::TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index 305b3aba46c9..bbbeb056c706 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -81,10 +81,10 @@ class TTopicClient::TImpl : public TClientImplCommon { request.mutable_partitioning_settings()->set_min_active_partitions(settings.PartitioningSettings_.GetMinActivePartitions()); request.mutable_partitioning_settings()->set_partition_count_limit(settings.PartitioningSettings_.GetPartitionCountLimit()); request.mutable_partitioning_settings()->set_max_active_partitions(settings.PartitioningSettings_.GetMaxActivePartitions()); - request.mutable_partitioning_settings()->mutable_autoscaling_settings()->set_strategy(static_cast(settings.PartitioningSettings_.GetAutoscalingSettings().GetStrategy())); - request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->mutable_threshold_time()->set_seconds(settings.PartitioningSettings_.GetAutoscalingSettings().GetThresholdTime().Seconds()); - request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_up_threshold_percent(settings.PartitioningSettings_.GetAutoscalingSettings().GetScaleUpThresholdPercent()); - request.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_down_threshold_percent(settings.PartitioningSettings_.GetAutoscalingSettings().GetScaleDownThresholdPercent()); + request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(static_cast(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStrategy())); + request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetStabilizationWindow().Seconds()); + request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetUpUtilizationPercent()); + request.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(settings.PartitioningSettings_.GetAutoPartitioningSettings().GetDownUtilizationPercent()); request.mutable_retention_period()->set_seconds(settings.RetentionPeriod_.Seconds()); @@ -129,18 +129,18 @@ class TTopicClient::TImpl : public TClientImplCommon { if (settings.AlterPartitioningSettings_->MaxActivePartitions_) { request.mutable_alter_partitioning_settings()->set_set_max_active_partitions(*settings.AlterPartitioningSettings_->MaxActivePartitions_); } - if (settings.AlterPartitioningSettings_->AutoscalingSettings_) { - if (settings.AlterPartitioningSettings_->AutoscalingSettings_->Strategy_) { - request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->set_set_strategy(static_cast(*settings.AlterPartitioningSettings_->AutoscalingSettings_->Strategy_)); + if (settings.AlterPartitioningSettings_->AutoPartitioningSettings_) { + if (settings.AlterPartitioningSettings_->AutoPartitioningSettings_->Strategy_) { + request.mutable_alter_partitioning_settings()->mutable_alter_auto_partitioning_settings()->set_set_strategy(static_cast(*settings.AlterPartitioningSettings_->AutoPartitioningSettings_->Strategy_)); } - if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleDownThresholdPercent_) { - request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->set_set_scale_down_threshold_percent(*settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleDownThresholdPercent_); + if (settings.AlterPartitioningSettings_->AutoPartitioningSettings_->DownUtilizationPercent_) { + request.mutable_alter_partitioning_settings()->mutable_alter_auto_partitioning_settings()->mutable_set_partition_write_speed()->set_set_down_utilization_percent(*settings.AlterPartitioningSettings_->AutoPartitioningSettings_->DownUtilizationPercent_); } - if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleUpThresholdPercent_) { - request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->set_set_scale_up_threshold_percent(*settings.AlterPartitioningSettings_->AutoscalingSettings_->ScaleUpThresholdPercent_); + if (settings.AlterPartitioningSettings_->AutoPartitioningSettings_->UpUtilizationPercent_) { + request.mutable_alter_partitioning_settings()->mutable_alter_auto_partitioning_settings()->mutable_set_partition_write_speed()->set_set_up_utilization_percent(*settings.AlterPartitioningSettings_->AutoPartitioningSettings_->UpUtilizationPercent_); } - if (settings.AlterPartitioningSettings_->AutoscalingSettings_->ThresholdTime_) { - request.mutable_alter_partitioning_settings()->mutable_alter_autoscaling_settings()->mutable_set_partition_write_speed()->mutable_set_threshold_time()->set_seconds(settings.AlterPartitioningSettings_->AutoscalingSettings_->ThresholdTime_->Seconds()); + if (settings.AlterPartitioningSettings_->AutoPartitioningSettings_->StabilizationWindow_) { + request.mutable_alter_partitioning_settings()->mutable_alter_auto_partitioning_settings()->mutable_set_partition_write_speed()->mutable_set_stabilization_window()->set_seconds(settings.AlterPartitioningSettings_->AutoPartitioningSettings_->StabilizationWindow_->Seconds()); } } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h index 4b73daa5cab2..30bfd3d6383a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h @@ -30,7 +30,7 @@ enum class EMeteringMode : ui32 { Unknown = std::numeric_limits::max(), }; -enum class EAutoscalingStrategy: ui32 { +enum class EAutoPartitioningStrategy: ui32 { Unspecified = 0, Disabled = 1, ScaleUp = 2, @@ -157,44 +157,44 @@ class TPartitionInfo { struct TAlterPartitioningSettings; struct TAlterTopicSettings; -struct TAutoscalingSettings { +struct TAutoPartitioningSettings { friend struct TAutoscalingSettingsBuilder; public: - TAutoscalingSettings() - : Strategy_(EAutoscalingStrategy::Disabled) - , ThresholdTime_(TDuration::Seconds(0)) - , ScaleDownThresholdPercent_(0) - , ScaleUpThresholdPercent_(0) { - } - TAutoscalingSettings(const Ydb::Topic::AutoscalingSettings& settings); - TAutoscalingSettings(EAutoscalingStrategy strategy, TDuration thresholdTime, ui64 scaleUpThresholdPercent, ui64 scaleDownThresholdPercent) + TAutoPartitioningSettings() + : Strategy_(EAutoPartitioningStrategy::Disabled) + , StabilizationWindow_(TDuration::Seconds(0)) + , DownUtilizationPercent_(0) + , UpUtilizationPercent_(0) { + } + TAutoPartitioningSettings(const Ydb::Topic::AutoPartitioningSettings& settings); + TAutoPartitioningSettings(EAutoPartitioningStrategy strategy, TDuration stabilizationWindow, ui64 downUtilizationPercent, ui64 upUtilizationPercent) : Strategy_(strategy) - , ThresholdTime_(thresholdTime) - , ScaleDownThresholdPercent_(scaleDownThresholdPercent) - , ScaleUpThresholdPercent_(scaleUpThresholdPercent) {} - - EAutoscalingStrategy GetStrategy() const; - TDuration GetThresholdTime() const; - ui32 GetScaleDownThresholdPercent() const; - ui32 GetScaleUpThresholdPercent() const; + , StabilizationWindow_(stabilizationWindow) + , DownUtilizationPercent_(downUtilizationPercent) + , UpUtilizationPercent_(upUtilizationPercent) {} + + EAutoPartitioningStrategy GetStrategy() const; + TDuration GetStabilizationWindow() const; + ui32 GetDownUtilizationPercent() const; + ui32 GetUpUtilizationPercent() const; private: - EAutoscalingStrategy Strategy_; - TDuration ThresholdTime_; - ui32 ScaleDownThresholdPercent_; - ui32 ScaleUpThresholdPercent_; + EAutoPartitioningStrategy Strategy_; + TDuration StabilizationWindow_; + ui32 DownUtilizationPercent_; + ui32 UpUtilizationPercent_; }; -struct TAlterAutoscalingSettings { - using TSelf = TAlterAutoscalingSettings; +struct TAlterAutoPartitioningSettings { + using TSelf = TAlterAutoPartitioningSettings; public: - TAlterAutoscalingSettings(TAlterPartitioningSettings& parent): Parent_(parent) {} + TAlterAutoPartitioningSettings(TAlterPartitioningSettings& parent): Parent_(parent) {} - FLUENT_SETTING_OPTIONAL(EAutoscalingStrategy, Strategy); - FLUENT_SETTING_OPTIONAL(TDuration, ThresholdTime); - FLUENT_SETTING_OPTIONAL(ui64, ScaleUpThresholdPercent); - FLUENT_SETTING_OPTIONAL(ui64, ScaleDownThresholdPercent); + FLUENT_SETTING_OPTIONAL(EAutoPartitioningStrategy, Strategy); + FLUENT_SETTING_OPTIONAL(TDuration, StabilizationWindow); + FLUENT_SETTING_OPTIONAL(ui64, DownUtilizationPercent); + FLUENT_SETTING_OPTIONAL(ui64, UpUtilizationPercent); - TAlterPartitioningSettings& EndAlterAutoscalingSettings() { return Parent_; }; + TAlterPartitioningSettings& EndAlterAutoPartitioningSettings() { return Parent_; }; private: TAlterPartitioningSettings& Parent_; @@ -204,25 +204,25 @@ class TPartitioningSettings { using TSelf = TPartitioningSettings; friend struct TPartitioningSettingsBuilder; public: - TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoscalingSettings_(){} + TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){} TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings); - TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoscalingSettings autoscalingSettings = {}) + TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {}) : MinActivePartitions_(minActivePartitions) , MaxActivePartitions_(maxActivePartitions) , PartitionCountLimit_(0) - , AutoscalingSettings_(autoscalingSettings) + , AutoPartitioningSettings_(autoscalingSettings) { } ui64 GetMinActivePartitions() const; ui64 GetMaxActivePartitions() const; ui64 GetPartitionCountLimit() const; - TAutoscalingSettings GetAutoscalingSettings() const; + TAutoPartitioningSettings GetAutoPartitioningSettings() const; private: ui64 MinActivePartitions_; ui64 MaxActivePartitions_; ui64 PartitionCountLimit_; - TAutoscalingSettings AutoscalingSettings_; + TAutoPartitioningSettings AutoPartitioningSettings_; }; struct TAlterTopicSettings; @@ -237,12 +237,12 @@ struct TAlterPartitioningSettings { TAlterTopicSettings& EndAlterTopicPartitioningSettings() { return Parent_; }; - TAlterAutoscalingSettings& BeginAlterAutoscalingSettings() { - AutoscalingSettings_.ConstructInPlace(*this); - return *AutoscalingSettings_; + TAlterAutoPartitioningSettings& BeginAlterAutoPartitioningSettings() { + AutoPartitioningSettings_.ConstructInPlace(*this); + return *AutoPartitioningSettings_; } - TMaybe AutoscalingSettings_; + TMaybe AutoPartitioningSettings_; private: TAlterTopicSettings& Parent_; @@ -558,8 +558,8 @@ struct TCreateTopicSettings : public TOperationRequestSettings partitionCount) { topics .BeginConfigurePartitioningSettings() - .BeginConfigureAutoscalingSettings() - .Strategy(EAutoscalingStrategy::ScaleUp); + .BeginConfigureAutoPartitioningSettings() + .Strategy(EAutoPartitioningStrategy::ScaleUp); } TConsumerSettings consumers(topics, consumer); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 7309dc7d18fa..7a84aba01169 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -763,26 +763,26 @@ namespace NKikimr::NGRpcProxy::V1 { } } - if (!settings.has_autoscaling_settings()) { + if (!settings.has_auto_partitioning_settings()) { minParts = settings.partitions_count(); } else { - const auto& autoScalteSettings = settings.autoscaling_settings(); - if (autoScalteSettings.min_active_partitions() > 0) { - minParts = autoScalteSettings.min_active_partitions(); + const auto& autoPartitioningSettings = settings.auto_partitioning_settings(); + if (autoPartitioningSettings.min_active_partitions() > 0) { + minParts = autoPartitioningSettings.min_active_partitions(); } if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge()) { auto pqTabletConfigPartStrategy = pqTabletConfig->MutablePartitionStrategy(); pqTabletConfigPartStrategy->SetMinPartitionCount(minParts); - pqTabletConfigPartStrategy->SetMaxPartitionCount(IfEqualThenDefault(autoScalteSettings.max_active_partitions(), 0L, minParts)); - pqTabletConfigPartStrategy->SetScaleUpPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoScalteSettings.partition_write_speed().scale_up_threshold_percent(), 0 ,30)); - pqTabletConfigPartStrategy->SetScaleDownPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoScalteSettings.partition_write_speed().scale_down_threshold_percent(), 0, 90)); - pqTabletConfigPartStrategy->SetScaleThresholdSeconds(IfEqualThenDefault(autoScalteSettings.partition_write_speed().threshold_time().seconds(), 0L, 300L)); - switch(autoScalteSettings.strategy()) { - case ::Ydb::PersQueue::V1::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP: + pqTabletConfigPartStrategy->SetMaxPartitionCount(IfEqualThenDefault(autoPartitioningSettings.max_active_partitions(), 0L, minParts)); + pqTabletConfigPartStrategy->SetScaleUpPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoPartitioningSettings.partition_write_speed().up_utilization_percent(), 0 ,30)); + pqTabletConfigPartStrategy->SetScaleDownPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoPartitioningSettings.partition_write_speed().down_utilization_percent(), 0, 90)); + pqTabletConfigPartStrategy->SetScaleThresholdSeconds(IfEqualThenDefault(autoPartitioningSettings.partition_write_speed().stabilization_window().seconds(), 0L, 300L)); + switch(autoPartitioningSettings.strategy()) { + case ::Ydb::PersQueue::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP: pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT); break; - case ::Ydb::PersQueue::V1::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN: + case ::Ydb::PersQueue::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN: pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE); break; default: @@ -1097,17 +1097,17 @@ namespace NKikimr::NGRpcProxy::V1 { minParts = std::max(1, settings.min_active_partitions()); if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge() && request.has_partitioning_settings()) { auto pqTabletConfigPartStrategy = pqTabletConfig->MutablePartitionStrategy(); - auto autoscaleSettings = settings.autoscaling_settings(); + auto autoscaleSettings = settings.auto_partitioning_settings(); pqTabletConfigPartStrategy->SetMinPartitionCount(minParts); pqTabletConfigPartStrategy->SetMaxPartitionCount(IfEqualThenDefault(settings.max_active_partitions(),0L,minParts)); - pqTabletConfigPartStrategy->SetScaleUpPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoscaleSettings.partition_write_speed().scale_up_threshold_percent(), 0, 90)); - pqTabletConfigPartStrategy->SetScaleDownPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoscaleSettings.partition_write_speed().scale_down_threshold_percent(), 0, 30)); - pqTabletConfigPartStrategy->SetScaleThresholdSeconds(IfEqualThenDefault(autoscaleSettings.partition_write_speed().threshold_time().seconds(), 0L, 300L)); + pqTabletConfigPartStrategy->SetScaleUpPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoscaleSettings.partition_write_speed().up_utilization_percent(), 0, 90)); + pqTabletConfigPartStrategy->SetScaleDownPartitionWriteSpeedThresholdPercent(IfEqualThenDefault(autoscaleSettings.partition_write_speed().down_utilization_percent(), 0, 30)); + pqTabletConfigPartStrategy->SetScaleThresholdSeconds(IfEqualThenDefault(autoscaleSettings.partition_write_speed().stabilization_window().seconds(), 0L, 300L)); switch(autoscaleSettings.strategy()) { - case ::Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP: + case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP: pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT); break; - case ::Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN: + case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN: pqTabletConfigPartStrategy->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE); break; default: @@ -1270,24 +1270,24 @@ namespace NKikimr::NGRpcProxy::V1 { if (settings.has_set_max_active_partitions()) { pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions()); } - if (settings.has_alter_autoscaling_settings()) { - if (settings.alter_autoscaling_settings().has_set_partition_write_speed()) { - if (settings.alter_autoscaling_settings().set_partition_write_speed().has_set_scale_up_threshold_percent()) { - pqTabletConfig->MutablePartitionStrategy()->SetScaleUpPartitionWriteSpeedThresholdPercent(settings.alter_autoscaling_settings().set_partition_write_speed().set_scale_up_threshold_percent()); + if (settings.has_alter_auto_partitioning_settings()) { + if (settings.alter_auto_partitioning_settings().has_set_partition_write_speed()) { + if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_scale_up_threshold_percent()) { + pqTabletConfig->MutablePartitionStrategy()->SetScaleUpPartitionWriteSpeedThresholdPercent(settings.alter_auto_partitioning_settings().set_partition_write_speed().set_up_utilization_percent()); } - if (settings.alter_autoscaling_settings().set_partition_write_speed().has_set_scale_down_threshold_percent()) { - pqTabletConfig->MutablePartitionStrategy()->SetScaleDownPartitionWriteSpeedThresholdPercent(settings.alter_autoscaling_settings().set_partition_write_speed().set_scale_down_threshold_percent()); + if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_scale_down_threshold_percent()) { + pqTabletConfig->MutablePartitionStrategy()->SetScaleDownPartitionWriteSpeedThresholdPercent(settings.alter_auto_partitioning_settings().set_partition_write_speed().set_down_utilization_percent()); } - if (settings.alter_autoscaling_settings().set_partition_write_speed().has_set_threshold_time()) { - pqTabletConfig->MutablePartitionStrategy()->SetScaleThresholdSeconds(settings.alter_autoscaling_settings().set_partition_write_speed().set_threshold_time().seconds()); + if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_stabilization_window()) { + pqTabletConfig->MutablePartitionStrategy()->SetScaleThresholdSeconds(settings.alter_auto_partitioning_settings().set_partition_write_speed().set_stabilization_window().seconds()); } } - if (settings.alter_autoscaling_settings().has_set_strategy()) { - switch(settings.alter_autoscaling_settings().set_strategy()) { - case ::Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP: + if (settings.alter_auto_partitioning_settings().has_set_strategy()) { + switch(settings.alter_auto_partitioning_settings().set_strategy()) { + case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP: pqTabletConfig->MutablePartitionStrategy()->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT); break; - case ::Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN: + case ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN: pqTabletConfig->MutablePartitionStrategy()->SetPartitionStrategyType(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE); break; default: diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 5da8f49f37e4..167a499eb6c1 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -758,7 +758,7 @@ void TReadSessionActor::Handle(typename TEvReadInit::TPtr& if (init.reader_name()) { PeerName = init.reader_name(); } - AutoscalingSupport = init.autoscaling_support(); + AutoscalingSupport = init.auto_partitioning_support(); } if (MaxTimeLagMs < 0) { diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 7274ff904a0a..3161acfa8917 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1081,18 +1081,18 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv Result.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount()); switch(config.GetPartitionStrategy().GetPartitionStrategyType()) { case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT: - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->set_strategy(Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP); break; case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE: - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->set_strategy(Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_SCALE_UP_AND_DOWN); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN); break; default: - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->set_strategy(Ydb::Topic::AutoscalingStrategy::AUTOSCALING_STRATEGY_DISABLED); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->set_strategy(Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); break; } - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->mutable_threshold_time()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds()); - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_down_threshold_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()); - Result.mutable_partitioning_settings()->mutable_autoscaling_settings()->mutable_partition_write_speed()->set_scale_up_threshold_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds(config.GetPartitionStrategy().GetScaleThresholdSeconds()); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()); + Result.mutable_partitioning_settings()->mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()); if (!config.GetRequireAuthWrite()) { (*Result.mutable_attributes())["_allow_unauthenticated_write"] = "true"; From 7dfdc7d62951631e4bf35865d593252d0205a666 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 25 Jun 2024 07:04:48 +0000 Subject: [PATCH 02/18] Fix --- ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp | 4 ++-- ydb/core/persqueue/ut/common/autoscaling_ut_common.h | 2 +- .../cpp/client/ydb_topic/impl/read_session_impl.ipp | 2 +- .../sdk/cpp/client/ydb_topic/include/control_plane.h | 10 +++++----- .../sdk/cpp/client/ydb_topic/include/read_session.h | 4 ++-- .../persqueue_v1/actors/read_session_actor.cpp | 8 ++++---- ydb/services/persqueue_v1/actors/read_session_actor.h | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp index 9ac35c3cc6e9..4f01439c8601 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp @@ -132,7 +132,7 @@ std::shared_ptr CreateWriteSession(TTopicClient& cl } -TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set partitions, bool autoscalingSupport) { +TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount, bool autoCommit, std::set partitions, bool autoPartitioningSupport) { Impl = std::make_shared(name, autoCommit); Impl->Acquire(); @@ -140,7 +140,7 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si auto readSettings = TReadSessionSettings() .ConsumerName(TEST_CONSUMER) .AppendTopics(TEST_TOPIC) - .AutoscalingSupport(autoscalingSupport); + .AutoPartitioningSupport(autoPartitioningSupport); for (auto partitionId : partitions) { readSettings.Topics_[0].AppendPartitionIds(partitionId); } diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h index 33705c8fb39a..25e5a248ed1e 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h @@ -44,7 +44,7 @@ struct TTestReadSession { static constexpr size_t SemCount = 1; - TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max(), bool autoCommit = true, std::set partitions = {}, bool autoscalingSupport = true); + TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max(), bool autoCommit = true, std::set partitions = {}, bool AutoPartitioningSupport = true); void WaitAllMessages(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp index cd7529dcf9f0..3e1a0ca44e1a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.ipp @@ -485,7 +485,7 @@ inline void TSingleClusterReadSessionImpl::InitImpl(TDeferredActions { FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); - //! AutoscalingSupport. - FLUENT_SETTING_DEFAULT(bool, AutoscalingSupport, false); + //! AutoPartitioningSupport. + FLUENT_SETTING_DEFAULT(bool, AutoPartitioningSupport, false); //! Log. FLUENT_SETTING_OPTIONAL(TLog, Log); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 167a499eb6c1..d40a4f1324d7 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -59,7 +59,7 @@ TReadSessionActor::TReadSessionActor( , ReadsInfly(0) , TopicsHandler(topicsHandler) , DirectRead(false) - , AutoscalingSupport(false) + , AutoPartitioningSupport(false) { Y_ASSERT(Request); } @@ -758,7 +758,7 @@ void TReadSessionActor::Handle(typename TEvReadInit::TPtr& if (init.reader_name()) { PeerName = init.reader_name(); } - AutoscalingSupport = init.auto_partitioning_support(); + AutoPartitioningSupport = init.auto_partitioning_support(); } if (MaxTimeLagMs < 0) { @@ -2327,10 +2327,10 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadingFinis } auto& topic = it->second; - NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(ClientId, msg->PartitionId, AutoscalingSupport, msg->FirstMessage)); + NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(ClientId, msg->PartitionId, AutoPartitioningSupport, msg->FirstMessage)); if constexpr (!UseMigrationProtocol) { - if (AutoscalingSupport) { + if (AutoPartitioningSupport) { TPartitionActorInfo* partitionInfo = nullptr; for (auto& [_, p] : Partitions) { if (p.Partition.Partition == msg->PartitionId) { diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index e7999870aa11..76e195c11eeb 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -451,7 +451,7 @@ class TReadSessionActor NPersQueue::TTopicsToConverter TopicsList; bool DirectRead; - bool AutoscalingSupport; + bool AutoPartitioningSupport; }; } From cec584a9626633e32265d9adc4e123fbe09db31a Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 25 Jun 2024 07:05:27 +0000 Subject: [PATCH 03/18] fix --- ydb/services/lib/actors/pq_schema_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 7a84aba01169..a1aeb0b3f869 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -1272,10 +1272,10 @@ namespace NKikimr::NGRpcProxy::V1 { } if (settings.has_alter_auto_partitioning_settings()) { if (settings.alter_auto_partitioning_settings().has_set_partition_write_speed()) { - if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_scale_up_threshold_percent()) { + if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_up_utilization_percent()) { pqTabletConfig->MutablePartitionStrategy()->SetScaleUpPartitionWriteSpeedThresholdPercent(settings.alter_auto_partitioning_settings().set_partition_write_speed().set_up_utilization_percent()); } - if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_scale_down_threshold_percent()) { + if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_down_utilization_percent()) { pqTabletConfig->MutablePartitionStrategy()->SetScaleDownPartitionWriteSpeedThresholdPercent(settings.alter_auto_partitioning_settings().set_partition_write_speed().set_down_utilization_percent()); } if (settings.alter_auto_partitioning_settings().set_partition_write_speed().has_set_stabilization_window()) { From 26e155cf6a84e5ff5f28d0caf2d6c013870dfdb1 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 25 Jun 2024 10:19:15 +0000 Subject: [PATCH 04/18] fix --- ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 7df06f1870db..4c287e2a7255 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -715,9 +715,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { TAlterTopicSettings alterSettings; alterSettings .BeginAlterPartitioningSettings() - .BeginAlterAutoscalingSettings() + .BeginAlterAutoPartitioningSettings() .Strategy(EAutoPartitioningStrategy::Disabled) - .EndAlterAutoscalingSettings() + .EndAlterAutoPartitioningSettings() .EndAlterTopicPartitioningSettings(); auto f = client.AlterTopic(topicName, alterSettings); f.Wait(); @@ -731,9 +731,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { alterSettings .BeginAlterPartitioningSettings() .MaxActivePartitions(0) - .BeginAlterAutoscalingSettings() + .BeginAlterAutoPartitioningSettings() .Strategy(EAutoPartitioningStrategy::Disabled) - .EndAlterAutoscalingSettings() + .EndAlterAutoPartitioningSettings() .EndAlterTopicPartitioningSettings(); auto f = client.AlterTopic(topicName, alterSettings); f.Wait(); From 1e87819da20afb162cf6655a4d73e0c5870da2ce Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 25 Jun 2024 13:10:16 +0000 Subject: [PATCH 05/18] fix ut --- ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 4c287e2a7255..80de3bd5b9c5 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -687,8 +687,8 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts); UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts); UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(), alterStrategy); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), alterScaleDownPercent); - UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), alterScaleUpPercent); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent(), alterScaleUpPercent); + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent(), alterScaleDownPercent); UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds(), alterThreshold); } From 0f702bf9d12e09de5038fbbd3aaf4dde57951754 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Tue, 25 Jun 2024 20:48:24 +0000 Subject: [PATCH 06/18] fix --- ydb/core/persqueue/ut/common/autoscaling_ut_common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h index 25e5a248ed1e..44e35246bbb2 100644 --- a/ydb/core/persqueue/ut/common/autoscaling_ut_common.h +++ b/ydb/core/persqueue/ut/common/autoscaling_ut_common.h @@ -44,7 +44,7 @@ struct TTestReadSession { static constexpr size_t SemCount = 1; - TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max(), bool autoCommit = true, std::set partitions = {}, bool AutoPartitioningSupport = true); + TTestReadSession(const TString& name, TTopicClient& client, size_t expectedMessagesCount = Max(), bool autoCommit = true, std::set partitions = {}, bool autoPartitioningSupport = true); void WaitAllMessages(); From 170b6c5f81c24d707d739e1287e25f7fb3168086 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 26 Jun 2024 05:36:36 +0000 Subject: [PATCH 07/18] fix --- ydb/public/api/protos/ydb_persqueue_v1.proto | 4 ++-- ydb/public/api/protos/ydb_topic.proto | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index 026adefa6c1a..bbdcf6ae6bf6 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -1204,10 +1204,10 @@ message AutoPartitioningSettings { message AutoPartitioningWriteSpeedStrategy { //Partition will be auto partitioning up (divided into 2 partitions) - //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window //Partition will become a candidate to the auto partitioning down - //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window //This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. // Zero value means default - 300. diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index fc1d2a2dba5c..ccccbd59d811 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -869,10 +869,10 @@ message AutoPartitioningSettings { message AutoPartitioningWriteSpeedStrategy { //Partition will be auto partitioned up (divided into 2 partitions) - //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window //Partition will become a candidate to the auto partitioned down - //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time threshold_time_seconds + //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window //This candidate partition will be auto partitioned down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. // Zero value means default - 300. From 454ef94f67c4bb092f22088973de6aed43126129 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 26 Jun 2024 06:42:07 +0000 Subject: [PATCH 08/18] Add auto partitioning to CLI --- .../ydb_cli/commands/ydb_service_topic.cpp | 147 +++++++++++++++++- .../lib/ydb_cli/commands/ydb_service_topic.h | 31 +++- 2 files changed, 166 insertions(+), 12 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 8885b35483df..69c751a6bb41 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -38,6 +38,18 @@ namespace NYdb::NConsoleClient { std::pair(NTopic::EMeteringMode::RequestUnits, "Read/write operations valued in request units, storage usage on hourly basis."), }; + THashMap AutoPartitioningStrategies = { + std::pair("disabled", NTopic::EAutoPartitioningStrategy::Disabled), + std::pair("up", NTopic::EAutoPartitioningStrategy::ScaleUp), + std::pair("up-and-down", NTopic::EAutoPartitioningStrategy::ScaleUpAndDown), + }; + + THashMap AutoscaleStrategiesDescriptions = { + std::pair(NTopic::EAutoPartitioningStrategy::Disabled, "Automatic scaling of the number of partitions is disabled"), + std::pair(NTopic::EAutoPartitioningStrategy::ScaleUp, "The number of partitions can increase under high load, but cannot decrease"), + std::pair(NTopic::EAutoPartitioningStrategy::ScaleUpAndDown, "The number of partitions can increase under high load and decrease under low load"), + }; + THashMap TopicMetadataFieldsDescriptions = { {ETopicMetadataField::Body, "Message data"}, {ETopicMetadataField::WriteTime, "Message write time, a UNIX timestamp the message was written to server."}, @@ -172,6 +184,81 @@ namespace { return MeteringMode_; } + void TCommandWithAutoPartitioning::AddAutoPartitioning(TClientCommand::TConfig& config, bool isAlter) { + TStringStream description; + description << "A strategy to automatically change the number of partitions depending on the load. Available strategies: "; + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + for (const auto& strategy: AutoPartitioningStrategies) { + auto findResult = AutoscaleStrategiesDescriptions.find(strategy.second); + Y_ABORT_UNLESS(findResult != AutoscaleStrategiesDescriptions.end(), + "Couldn't find description for %s autoscale strategy", (TStringBuilder() << strategy.second).c_str()); + description << "\n " << colors.BoldColor() << strategy.first << colors.OldColor() + << "\n " << findResult->second; + } + + if (isAlter) { + config.Opts->AddLongOption("auto-partitioning-strategy", description.Str()) + .Optional() + .StoreResult(&AutoPartitioningStrategy_); + config.Opts->AddLongOption("auto-partitioning-stabilization-window-seconds", "Duration in seconds of high or low load before automatically scale the number of partitions") + .Optional() + .StoreResult(&ScaleThresholdTime_); + config.Opts->AddLongOption("auto-partitioning-up-utilization-percent", "The load percentage at which the number of partitions will increase") + .Optional() + .StoreResult(&ScaleUpThresholdPercent_); + config.Opts->AddLongOption("auto-partitioning-down-utilization-percent", "The load percentage at which the number of partitions will decrease") + .Optional() + .StoreResult(&ScaleDownThresholdPercent_); + } else { + config.Opts->AddLongOption("auto-partitioning-strategy", description.Str()) + .Optional() + .DefaultValue("disabled") + .StoreResult(&AutoPartitioningStrategy_); + config.Opts->AddLongOption("auto-partitioning-stabilization-window-seconds", "Duration in seconds of high or low load before automatically scale the number of partitions") + .Optional() + .DefaultValue(300) + .StoreResult(&ScaleThresholdTime_); + config.Opts->AddLongOption("auto-partitioning-up-utilization-percent", "The load percentage at which the number of partitions will increase") + .Optional() + .DefaultValue(90) + .StoreResult(&ScaleUpThresholdPercent_); + config.Opts->AddLongOption("auto-partitioning-down-utilization-percent", "The load percentage at which the number of partitions will decrease") + .Optional() + .DefaultValue(30) + .StoreResult(&ScaleDownThresholdPercent_); + } + } + + void TCommandWithAutoPartitioning::ParseAutoPartitioningStrategy() { + if (AutoPartitioningStrategyStr_.empty()) { + return; + } + + TString toLowerStrategy = to_lower(AutoPartitioningStrategyStr_); + auto strategyIt = AutoPartitioningStrategies.find(toLowerStrategy); + if (strategyIt.IsEnd()) { + throw TMisuseException() << "Auto partitioning strategy " << AutoPartitioningStrategyStr_ << " is not available for this command"; + } else { + AutoPartitioningStrategy_ = strategyIt->second; + } + } + + TMaybe TCommandWithAutoPartitioning::GetAutoPartitioningStrategy() const { + return AutoPartitioningStrategy_; + } + + TMaybe TCommandWithAutoPartitioning::GetAutoPartitioningStabilizationWindowSeconds() const { + return ScaleThresholdTime_; + } + + TMaybe TCommandWithAutoPartitioning::GetAutoPartitioningUpUtilizationPercent() const { + return ScaleUpThresholdPercent_; + } + + TMaybe TCommandWithAutoPartitioning::GetAutoPartitioninDownUtilizationPercent() const { + return ScaleDownThresholdPercent_; + } + TCommandTopic::TCommandTopic() : TClientCommandTree("topic", {}, "TopicService operations") { AddCommand(std::make_unique()); @@ -188,9 +275,10 @@ namespace { void TCommandTopicCreate::Config(TConfig& config) { TYdbCommand::Config(config); - config.Opts->AddLongOption("partitions-count", "Total partitions count for topic") - .DefaultValue(1) - .StoreResult(&PartitionsCount_); + config.Opts->AddLongOption("partitions-count", "Initial and minimum number of partitions for topic") + .Optional() + .StoreResult(&MinActivePartitions_) + .DefaultValue(1); config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored") .DefaultValue(24) .Optional() @@ -207,6 +295,12 @@ namespace { SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); AddAllowedMeteringModes(config); + + config.Opts->AddLongOption("auto-partitioning-max-partitions-count", "Maximum number of partitions for topic") + .Optional() + .StoreResult(&MaxActivePartitions_) + .DefaultValue(1); + AddAutoPartitioning(config, false); } void TCommandTopicCreate::Parse(TConfig& config) { @@ -214,6 +308,7 @@ namespace { ParseTopicName(config, 0); ParseCodecs(); ParseMeteringMode(); + ParseAutoPartitioningStrategy(); } int TCommandTopicCreate::Run(TConfig& config) { @@ -221,7 +316,14 @@ namespace { NYdb::NTopic::TTopicClient topicClient(driver); auto settings = NYdb::NTopic::TCreateTopicSettings(); - settings.PartitioningSettings(PartitionsCount_, PartitionsCount_); + + auto autoscaleSettings = NTopic::TAutoPartitioningSettings( + GetAutoPartitioningStrategy() ? *GetAutoPartitioningStrategy() : NTopic::EAutoPartitioningStrategy::Disabled, + GetAutoPartitioningStabilizationWindowSeconds() ? TDuration::Seconds(*GetAutoPartitioningStabilizationWindowSeconds()) : TDuration::Seconds(0), + GetAutoPartitioningUpUtilizationPercent() ? *GetAutoPartitioningUpUtilizationPercent() : 0, + GetAutoPartitioninDownUtilizationPercent() ? *GetAutoPartitioninDownUtilizationPercent() : 0); + + settings.PartitioningSettings(MinActivePartitions_, MaxActivePartitions_, autoscaleSettings); settings.PartitionWriteBurstBytes(PartitionWriteSpeedKbps_ * 1_KB); settings.PartitionWriteSpeedBytesPerSecond(PartitionWriteSpeedKbps_ * 1_KB); @@ -249,8 +351,9 @@ namespace { void TCommandTopicAlter::Config(TConfig& config) { TYdbCommand::Config(config); - config.Opts->AddLongOption("partitions-count", "Total partitions count for topic") - .StoreResult(&PartitionsCount_); + config.Opts->AddLongOption("partitions-count", "Initial and minimum number of partitions for topic") + .Optional() + .StoreResult(&MinActivePartitions_); config.Opts->AddLongOption("retention-period-hours", "Duration for which data in topic is stored") .Optional() .StoreResult(&RetentionPeriodHours_); @@ -264,6 +367,11 @@ namespace { SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); AddAllowedMeteringModes(config); + + config.Opts->AddLongOption("auto-partitioning-max-partitions-count", "Maximum number of partitions for topic") + .Optional() + .StoreResult(&MaxActivePartitions_); + AddAutoPartitioning(config, true); } void TCommandTopicAlter::Parse(TConfig& config) { @@ -276,9 +384,32 @@ namespace { NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings( NYdb::NTopic::TDescribeTopicResult& describeResult) { auto settings = NYdb::NTopic::TAlterTopicSettings(); + auto partitioningSettings = settings.BeginAlterPartitioningSettings(); + + if (MinActivePartitions_.Defined() && (*MinActivePartitions_ != describeResult.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions())) { + partitioningSettings.MinActivePartitions(*MinActivePartitions_); + } + + if (MaxActivePartitions_.Defined() && (*MaxActivePartitions_ != describeResult.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions())) { + partitioningSettings.MaxActivePartitions(*MaxActivePartitions_); + } + + auto autoPartitioningSettings = partitioningSettings.BeginAlterAutoPartitioningSettings(); + + if (GetAutoPartitioningStabilizationWindowSeconds().Defined() && *GetAutoPartitioningStabilizationWindowSeconds() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds()) { + autoPartitioningSettings.StabilizationWindow(TDuration::Seconds(*GetAutoPartitioningStabilizationWindowSeconds())); + } + + if (GetAutoPartitioningStrategy().Defined() && *GetAutoPartitioningStrategy() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy()) { + autoPartitioningSettings.Strategy(*GetAutoPartitioningStrategy()); + } + + if (GetAutoPartitioninDownUtilizationPercent().Defined() && *GetAutoPartitioninDownUtilizationPercent() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent()) { + autoPartitioningSettings.DownUtilizationPercent(*GetAutoPartitioninDownUtilizationPercent()); + } - if (PartitionsCount_.Defined() && (*PartitionsCount_ != describeResult.GetTopicDescription().GetTotalPartitionsCount())) { - settings.AlterPartitioningSettings(*PartitionsCount_, *PartitionsCount_); + if (GetAutoPartitioningUpUtilizationPercent().Defined() && *GetAutoPartitioningUpUtilizationPercent() != describeResult.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent()) { + autoPartitioningSettings.UpUtilizationPercent(*GetAutoPartitioningUpUtilizationPercent()); } auto codecs = GetCodecs(); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 6e887a064a77..cebefe86f6af 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -37,12 +37,30 @@ namespace NYdb::NConsoleClient { NTopic::EMeteringMode MeteringMode_ = NTopic::EMeteringMode::Unspecified; }; + class TCommandWithAutoPartitioning { + protected: + void AddAutoPartitioning(TClientCommand::TConfig& config, bool withDefault); + void ParseAutoPartitioningStrategy(); + TMaybe GetAutoPartitioningStrategy() const; + TMaybe GetAutoPartitioningStabilizationWindowSeconds() const; + TMaybe GetAutoPartitioningUpUtilizationPercent() const; + TMaybe GetAutoPartitioninDownUtilizationPercent() const; + + private: + TMaybe ScaleThresholdTime_; + TMaybe ScaleUpThresholdPercent_; + TMaybe ScaleDownThresholdPercent_; + + TString AutoPartitioningStrategyStr_; + TMaybe AutoPartitioningStrategy_; + }; + class TCommandTopic: public TClientCommandTree { public: TCommandTopic(); }; - class TCommandTopicCreate: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode { + class TCommandTopicCreate: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode, public TCommandWithAutoPartitioning { public: TCommandTopicCreate(); void Config(TConfig& config) override; @@ -52,11 +70,13 @@ namespace NYdb::NConsoleClient { private: ui64 RetentionPeriodHours_; ui64 RetentionStorageMb_; - ui32 PartitionsCount_; + ui32 MinActivePartitions_; + ui32 MaxActivePartitions_; + ui32 PartitionWriteSpeedKbps_; }; - class TCommandTopicAlter: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode { + class TCommandTopicAlter: public TYdbCommand, public TCommandWithTopicName, public TCommandWithSupportedCodecs, public TCommandWithMeteringMode, public TCommandWithAutoPartitioning { public: TCommandTopicAlter(); void Config(TConfig& config) override; @@ -66,7 +86,10 @@ namespace NYdb::NConsoleClient { private: TMaybe RetentionPeriodHours_; TMaybe RetentionStorageMb_; - TMaybe PartitionsCount_; + TMaybe MinActivePartitions_; + TMaybe MaxActivePartitions_; + + TMaybe PartitionWriteSpeedKbps_; NYdb::NTopic::TAlterTopicSettings PrepareAlterSettings(NYdb::NTopic::TDescribeTopicResult& describeResult); From f5b63f0c66a7152176b6886827a0e9b6a23e94b4 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 26 Jun 2024 07:00:11 +0000 Subject: [PATCH 09/18] Auto partitioning to PQ_V1 sdk --- .../ydb_persqueue_public/impl/persqueue.cpp | 13 +++++++-- .../impl/persqueue_impl.h | 28 +++++++++++++++++++ .../include/control_plane.h | 27 ++++++++++++++++++ 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue.cpp index eaba2d40b1ff..1cac026d12ce 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue.cpp @@ -86,11 +86,20 @@ TDescribeTopicResult::TDescribeTopicResult(TStatus status, const Ydb::PersQueue: } TDescribeTopicResult::TTopicSettings::TTopicSettings(const Ydb::PersQueue::V1::TopicSettings& settings) { - - PartitionsCount_ = settings.partitions_count(); RetentionPeriod_ = TDuration::MilliSeconds(settings.retention_period_ms()); SupportedFormat_ = static_cast(settings.supported_format()); + if (settings.has_auto_partitioning_settings()) { + PartitionsCount_ = settings.auto_partitioning_settings().min_active_partitions(); + MaxPartitionsCount_ = settings.auto_partitioning_settings().max_active_partitions(); + StabilizationWindow_ = TDuration::Seconds(settings.auto_partitioning_settings().partition_write_speed().stabilization_window().seconds()); + UpUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().up_utilization_percent(); + DownUtilizationPercent_ = settings.auto_partitioning_settings().partition_write_speed().down_utilization_percent(); + AutoPartitioningStrategy_ = settings.auto_partitioning_settings().strategy(); + } else { + PartitionsCount_ = settings.partitions_count(); + } + for (const auto& codec : settings.supported_codecs()) { SupportedCodecs_.push_back(static_cast(codec)); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h index 38aa5afdee58..57b8fe0aac2d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h @@ -54,6 +54,34 @@ class TPersQueueClient::TImpl : public TClientImplCommonset_max_active_partitions(settings.PartitionsCount_); + autoscalingSettingsDefined = true; + } + if (settings.AutoPartitioningStrategy_.Defined()) { + props.mutable_auto_partitioning_settings()->set_strategy(*settings.AutoPartitioningStrategy_); + autoscalingSettingsDefined = true; + } + if (settings.DownUtilizationPercent_.Defined()) { + props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(*settings.DownUtilizationPercent_); + autoscalingSettingsDefined = true; + } + if (settings.UpUtilizationPercent_.Defined()) { + props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(*settings.UpUtilizationPercent_); + autoscalingSettingsDefined = true; + } + if (settings.StabilizationWindow_.Defined()) { + props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds((*settings.StabilizationWindow_).Seconds()); + autoscalingSettingsDefined = true; + } + if (!autoscalingSettingsDefined) { + props.set_partitions_count(settings.PartitionsCount_); + } else { + props.mutable_auto_partitioning_settings()->set_min_active_partitions(settings.PartitionsCount_); + } + + props.set_retention_period_ms(settings.RetentionPeriod_.MilliSeconds()); props.set_supported_format(static_cast(settings.SupportedFormat_)); for (const auto& codec : settings.SupportedCodecs_) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/include/control_plane.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/include/control_plane.h index 9b746c93e496..a6a487e30af7 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/include/control_plane.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/include/control_plane.h @@ -118,6 +118,12 @@ struct TDescribeTopicResult : public TStatus { } GETTER(TMaybe, RemoteMirrorRule); + GETTER(TMaybe, MaxPartitionsCount); + GETTER(TMaybe, StabilizationWindow); + GETTER(TMaybe, UpUtilizationPercent); + GETTER(TMaybe, DownUtilizationPercent); + GETTER(TMaybe, AutoPartitioningStrategy); + #undef GETTER @@ -139,6 +145,12 @@ struct TDescribeTopicResult : public TStatus { TMaybe AbcId_; TMaybe AbcSlug_; TString FederationAccount_; + + TMaybe MaxPartitionsCount_; + TMaybe StabilizationWindow_; + TMaybe UpUtilizationPercent_; + TMaybe DownUtilizationPercent_; + TMaybe AutoPartitioningStrategy_; }; TDescribeTopicResult(TStatus status, const Ydb::PersQueue::V1::DescribeTopicResult& result); @@ -192,6 +204,7 @@ struct TReadRuleSettings { // Settings for topic. template struct TTopicSettings : public TOperationRequestSettings { + friend class TPersQueueClient; struct TRemoteMirrorRuleSettings { TRemoteMirrorRuleSettings() {} @@ -267,9 +280,23 @@ struct TTopicSettings : public TOperationRequestSettings { if (settings.RemoteMirrorRule()) { RemoteMirrorRule_ = TRemoteMirrorRuleSettings().SetSettings(settings.RemoteMirrorRule().GetRef()); } + + MaxPartitionsCount_ = settings.MaxPartitionsCount(); + StabilizationWindow_ = settings.StabilizationWindow(); + UpUtilizationPercent_ = settings.UpUtilizationPercent(); + DownUtilizationPercent_ = settings.DownUtilizationPercent(); + AutoPartitioningStrategy_ = settings.AutoPartitioningStrategy(); + return static_cast(*this); } + private: + TMaybe MaxPartitionsCount_; + TMaybe StabilizationWindow_; + TMaybe UpUtilizationPercent_; + TMaybe DownUtilizationPercent_; + TMaybe AutoPartitioningStrategy_; + }; From 8bf7482e51a7535cf3dc47a5c83a5e78bbd312ca Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 26 Jun 2024 07:07:54 +0000 Subject: [PATCH 10/18] Rename --- .../ydb_persqueue_public/impl/persqueue_impl.h | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h index 57b8fe0aac2d..5786a7b52207 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_public/impl/persqueue_impl.h @@ -54,28 +54,29 @@ class TPersQueueClient::TImpl : public TClientImplCommonset_max_active_partitions(settings.PartitionsCount_); - autoscalingSettingsDefined = true; + autoPartitioningSettingsDefined = true; } if (settings.AutoPartitioningStrategy_.Defined()) { props.mutable_auto_partitioning_settings()->set_strategy(*settings.AutoPartitioningStrategy_); - autoscalingSettingsDefined = true; + autoPartitioningSettingsDefined = true; } if (settings.DownUtilizationPercent_.Defined()) { props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_down_utilization_percent(*settings.DownUtilizationPercent_); - autoscalingSettingsDefined = true; + autoPartitioningSettingsDefined = true; } if (settings.UpUtilizationPercent_.Defined()) { props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->set_up_utilization_percent(*settings.UpUtilizationPercent_); - autoscalingSettingsDefined = true; + autoPartitioningSettingsDefined = true; } if (settings.StabilizationWindow_.Defined()) { props.mutable_auto_partitioning_settings()->mutable_partition_write_speed()->mutable_stabilization_window()->set_seconds((*settings.StabilizationWindow_).Seconds()); - autoscalingSettingsDefined = true; + autoPartitioningSettingsDefined = true; } - if (!autoscalingSettingsDefined) { + + if (!autoPartitioningSettingsDefined) { props.set_partitions_count(settings.PartitionsCount_); } else { props.mutable_auto_partitioning_settings()->set_min_active_partitions(settings.PartitionsCount_); From 848297a00c220308e0975aa956e2db32e1cea193 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 27 Jun 2024 07:07:35 +0000 Subject: [PATCH 11/18] fix --- ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 69c751a6bb41..c9caba3825ba 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -199,7 +199,7 @@ namespace { if (isAlter) { config.Opts->AddLongOption("auto-partitioning-strategy", description.Str()) .Optional() - .StoreResult(&AutoPartitioningStrategy_); + .StoreResult(&AutoPartitioningStrategyStr_); config.Opts->AddLongOption("auto-partitioning-stabilization-window-seconds", "Duration in seconds of high or low load before automatically scale the number of partitions") .Optional() .StoreResult(&ScaleThresholdTime_); @@ -213,7 +213,7 @@ namespace { config.Opts->AddLongOption("auto-partitioning-strategy", description.Str()) .Optional() .DefaultValue("disabled") - .StoreResult(&AutoPartitioningStrategy_); + .StoreResult(&AutoPartitioningStrategyStr_); config.Opts->AddLongOption("auto-partitioning-stabilization-window-seconds", "Duration in seconds of high or low load before automatically scale the number of partitions") .Optional() .DefaultValue(300) @@ -379,6 +379,7 @@ namespace { ParseTopicName(config, 0); ParseCodecs(); ParseMeteringMode(); + ParseAutoPartitioningStrategy(); } NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings( From fef7d6f74e96594b1f2bfba49df37c5b450c5b84 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Fri, 28 Jun 2024 07:57:41 +0000 Subject: [PATCH 12/18] rename --- ydb/services/lib/actors/pq_schema_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index a1aeb0b3f869..af52ed00363a 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -718,7 +718,7 @@ namespace NKikimr::NGRpcProxy::V1 { return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } if (strategy.GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED && config.GetPartitionConfig().HasStorageLimitBytes()) { - error = TStringBuilder() << "Partitions autoscaling is incompatible with retention storage bytes option"; + error = TStringBuilder() << "Auto partitioning is incompatible with retention storage bytes option"; return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR); } From 6a91dc5a5fc9e9734d816367144916b64bc177ac Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Fri, 28 Jun 2024 09:14:03 +0000 Subject: [PATCH 13/18] rename --- ydb/public/api/protos/ydb_persqueue_v1.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index bbdcf6ae6bf6..eed53b258af9 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -1208,7 +1208,7 @@ message AutoPartitioningWriteSpeedStrategy { //Partition will become a candidate to the auto partitioning down //after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window - //This candidate partition will be autoscaled down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. + //This candidate partition will be auto partitioned down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period. // Zero value means default - 300. google.protobuf.Duration stabilization_window = 1; From 8b8e1272a9aabd4d6987f953e75b12d2e8167dd2 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Fri, 28 Jun 2024 13:54:03 +0000 Subject: [PATCH 14/18] fix --- ydb/services/datastreams/datastreams_ut.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index babf2cbf7a4a..dbcfb4b0f492 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -2689,9 +2689,9 @@ Y_UNIT_TEST_SUITE(DataStreams) { .BeginConfigurePartitioningSettings() .MinActivePartitions(3) .MaxActivePartitions(7) - .BeginConfigureAutoscalingSettings() - .Strategy(NYdb::NTopic::EAutoscalingStrategy::ScaleUpAndDown) - .EndConfigureAutoscalingSettings() + .BeginConfigureAutoPartitioningSettings() + .Strategy(NYdb::NTopic::EAutoPartitioningStrategy::ScaleUpAndDown) + .EndConfigureAutoPartitioningSettings() .EndConfigurePartitioningSettings() ; auto result = pqClient.CreateTopic(streamName, settings).ExtractValueSync(); From e31694e7f194f468e256bbfee04da9688dcdf372 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 30 Jun 2024 14:18:19 +0000 Subject: [PATCH 15/18] fix --- ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index a5df3c29e2e2..dac4b58742ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -178,7 +178,7 @@ class TAlterPQ: public TSubOperation { if (alterConfig.HasPartitionStrategy() && !NPQ::SplitMergeEnabled(alterConfig) && tabletConfig->HasPartitionStrategy() && NPQ::SplitMergeEnabled(*tabletConfig)) { if (!alterConfig.GetPartitionStrategy().HasMaxPartitionCount() || 0 != alterConfig.GetPartitionStrategy().GetMaxPartitionCount()) { - errStr = TStringBuilder() << "Can`t disable autoscaling. Disabling autoscaling is a destructive operation, " + errStr = TStringBuilder() << "Can`t disable auto partitioning. Disabling auto partitioning is a destructive operation, " << "after which all partitions will become active and the message order guarantee will be violated. " << "If you are sure of this, then set max_active_partitions to 0."; return nullptr; From 668264860f8c2156091c439241bf1b87972663a1 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 30 Jun 2024 15:21:27 +0000 Subject: [PATCH 16/18] Fix compatibility --- ydb/services/persqueue_v1/actors/schema_actors.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 3161acfa8917..d0217b17e675 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1077,7 +1077,13 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } const auto &config = pqDescr.GetPQTabletConfig(); - Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); + + if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge()) { + Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); + } else { + Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); + } + Result.mutable_partitioning_settings()->set_max_active_partitions(config.GetPartitionStrategy().GetMaxPartitionCount()); switch(config.GetPartitionStrategy().GetPartitionStrategyType()) { case ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT: From 1a675cd455b6fa48f3e8cf52f4246ed63eb1e047 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 30 Jun 2024 18:19:06 +0000 Subject: [PATCH 17/18] fix --- ydb/services/persqueue_v1/actors/schema_actors.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index d0217b17e675..a9ed9a83ea66 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1077,8 +1077,8 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } const auto &config = pqDescr.GetPQTabletConfig(); - - if (AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge()) { + GetContex + if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) { Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); } else { Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); From 7b305655cd4115c2c67e5f6c92c45c897a574e5f Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Sun, 30 Jun 2024 20:29:07 +0000 Subject: [PATCH 18/18] fix --- ydb/services/persqueue_v1/actors/schema_actors.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a9ed9a83ea66..a0e4d3e1bad8 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1077,7 +1077,6 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } const auto &config = pqDescr.GetPQTabletConfig(); - GetContex if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) { Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); } else {